Improved how mqtt topics are handled

This commit is contained in:
Dreaded_X 2023-01-06 05:25:39 +01:00
parent 1326a8878c
commit 47afda8dee
Signed by: Dreaded_X
GPG Key ID: 76BDEC4E165D8AD9
8 changed files with 58 additions and 48 deletions

View File

@ -11,7 +11,7 @@ password="${MQTT_PASSWORD}"
topic = "${NTFY_TOPIC}"
[presence]
topic = "automation_dev/presence"
topic = "automation_dev/presence/+"
[hue_bridge]
ip = "10.0.0.146"

View File

@ -2,7 +2,7 @@ use std::{fs, error::Error, collections::HashMap, net::{Ipv4Addr, SocketAddr}};
use regex::{Regex, Captures};
use tracing::{debug, trace, error};
use rumqttc::AsyncClient;
use rumqttc::{AsyncClient, has_wildcards};
use serde::Deserialize;
use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor};
@ -125,7 +125,7 @@ impl PresenceDeviceConfig {
/// Set the mqtt topic to an appropriate value if it is not already set
fn generate_topic(&mut self, identifier: &str, config: &Config) {
if self.mqtt.is_none() {
let topic = config.presence.topic.clone() + "/" + identifier;
let topic = config.presence.topic.replace('+', identifier).replace('#', identifier);
trace!("Setting presence mqtt topic: {topic}");
self.mqtt = Some(MqttDeviceConfig { topic });
}
@ -187,7 +187,16 @@ impl Config {
return Err("Missing environment variables".into());
}
let config = toml::from_str(&file)?;
let config: Config = toml::from_str(&file)?;
// Some extra config validation
if !has_wildcards(&config.presence.topic) {
return Err(format!("Invalid presence topic '{}', needs to contain a wildcard (+/#) in order to listen to presence devices", config.presence.topic).into());
}
// @TODO It would be nice it was possible to add validation to serde,
// that way we can check that the provided mqtt topics are actually valid
Ok(config)
}
}

View File

@ -4,7 +4,7 @@ use std::net::{TcpStream, SocketAddr, Ipv4Addr};
use bytes::{BufMut, Buf};
use google_home::errors::{ErrorCode, DeviceError};
use google_home::traits::{self, OnOff};
use rumqttc::AsyncClient;
use rumqttc::{AsyncClient, matches};
use serde::{Deserialize, Serialize};
use tracing::{error, warn};
use pollster::FutureExt as _;
@ -225,7 +225,7 @@ impl Device for AudioSetup {
impl OnMqtt for AudioSetup {
fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if message.topic != self.mqtt.topic {
if !matches(&message.topic, &self.mqtt.topic) {
return;
}

View File

@ -1,7 +1,7 @@
use std::time::Duration;
use pollster::FutureExt;
use rumqttc::AsyncClient;
use rumqttc::{AsyncClient, matches};
use tokio::task::JoinHandle;
use tracing::{error, debug, warn};
@ -50,7 +50,7 @@ impl OnPresence for ContactSensor {
impl OnMqtt for ContactSensor {
fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if message.topic != self.mqtt.topic {
if !matches(&message.topic, &self.mqtt.topic) {
return;
}

View File

@ -2,7 +2,7 @@ use std::time::Duration;
use google_home::errors::ErrorCode;
use google_home::{GoogleHomeDevice, device, types::Type, traits};
use rumqttc::{AsyncClient, Publish};
use rumqttc::{AsyncClient, Publish, matches};
use tracing::{debug, trace, error};
use tokio::task::JoinHandle;
use pollster::FutureExt as _;
@ -48,7 +48,7 @@ impl Device for IkeaOutlet {
impl OnMqtt for IkeaOutlet {
fn on_mqtt(&mut self, message: &Publish) {
// Update the internal state based on what the device has reported
if message.topic != self.mqtt.topic {
if !matches(&message.topic, &self.mqtt.topic) {
return;
}

View File

@ -1,6 +1,6 @@
use google_home::{GoogleHomeDevice, types::Type, device, traits::{self, Scene}, errors::{ErrorCode, DeviceError}};
use tracing::{debug, error};
use rumqttc::{AsyncClient, Publish};
use rumqttc::{AsyncClient, Publish, matches};
use pollster::FutureExt as _;
use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::{OnMqtt, ActivateMessage}};
@ -31,7 +31,7 @@ impl Device for WakeOnLAN {
impl OnMqtt for WakeOnLAN {
fn on_mqtt(&mut self, message: &Publish) {
if message.topic != self.mqtt.topic {
if !matches(&message.topic, &self.mqtt.topic) {
return;
}

View File

@ -1,7 +1,7 @@
use std::sync::{Weak, RwLock};
use pollster::FutureExt as _;
use rumqttc::AsyncClient;
use rumqttc::{AsyncClient, matches};
use tracing::{span, Level, log::{error, trace}, debug};
use crate::{config::{MqttDeviceConfig, LightSensorConfig}, mqtt::{OnMqtt, BrightnessMessage}};
@ -42,7 +42,7 @@ impl LightSensor {
impl OnMqtt for LightSensor {
fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if message.topic != self.mqtt.topic {
if !matches(&message.topic, &self.mqtt.topic) {
return;
}

View File

@ -1,7 +1,7 @@
use std::{sync::{Weak, RwLock}, collections::HashMap};
use tracing::{debug, span, Level, error};
use rumqttc::AsyncClient;
use rumqttc::{AsyncClient, matches};
use pollster::FutureExt as _;
use crate::{mqtt::{OnMqtt, PresenceMessage}, config::MqttDeviceConfig};
@ -19,9 +19,7 @@ pub struct Presence {
impl Presence {
pub fn new(mqtt: MqttDeviceConfig, client: AsyncClient) -> Self {
// @TODO Handle potential errors here
let topic = mqtt.topic.clone() + "/+";
client.subscribe(topic, rumqttc::QoS::AtLeastOnce).block_on().unwrap();
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).block_on().unwrap();
Self { listeners: Vec::new(), devices: HashMap::new(), overall_presence: false, mqtt }
}
@ -42,41 +40,44 @@ impl Presence {
impl OnMqtt for Presence {
fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if message.topic.starts_with(&(self.mqtt.topic.clone() + "/")) {
let device_name = message.topic.rsplit_once("/").unwrap().1;
if !matches(&message.topic, &self.mqtt.topic) {
return;
}
if message.payload.len() == 0 {
// Remove the device from the map
debug!("State of device [{device_name}] has been removed");
self.devices.remove(device_name);
} else {
let present = match PresenceMessage::try_from(message) {
Ok(state) => state.present(),
Err(err) => {
error!("Failed to parse message: {err}");
return;
}
};
// @TODO More robust mechanism for splitting
let device_name = message.topic.rsplit_once("/").unwrap().1;
debug!("State of device [{device_name}] has changed: {}", present);
self.devices.insert(device_name.to_owned(), present);
}
if message.payload.len() == 0 {
// Remove the device from the map
debug!("State of device [{device_name}] has been removed");
self.devices.remove(device_name);
} else {
let present = match PresenceMessage::try_from(message) {
Ok(state) => state.present(),
Err(err) => {
error!("Failed to parse message: {err}");
return;
}
};
let overall_presence = self.devices.iter().any(|(_, v)| *v);
if overall_presence != self.overall_presence {
debug!("Overall presence updated: {overall_presence}");
self.overall_presence = overall_presence;
debug!("State of device [{device_name}] has changed: {}", present);
self.devices.insert(device_name.to_owned(), present);
}
// Remove non-existing listeners
self.listeners.retain(|listener| listener.strong_count() > 0);
// Clone the listeners
let listeners = self.listeners.clone();
let overall_presence = self.devices.iter().any(|(_, v)| *v);
if overall_presence != self.overall_presence {
debug!("Overall presence updated: {overall_presence}");
self.overall_presence = overall_presence;
// Notify might block, so we spawn a blocking task
tokio::task::spawn_blocking(move || {
Presence::notify(overall_presence, listeners);
});
}
// Remove non-existing listeners
self.listeners.retain(|listener| listener.strong_count() > 0);
// Clone the listeners
let listeners = self.listeners.clone();
// Notify might block, so we spawn a blocking task
tokio::task::spawn_blocking(move || {
Presence::notify(overall_presence, listeners);
});
}
}
}