diff --git a/config/zeus.dev.toml b/config/zeus.dev.toml index 1fe5e6e..e931c12 100644 --- a/config/zeus.dev.toml +++ b/config/zeus.dev.toml @@ -11,7 +11,7 @@ password="${MQTT_PASSWORD}" topic = "${NTFY_TOPIC}" [presence] -topic = "automation_dev/presence" +topic = "automation_dev/presence/+" [hue_bridge] ip = "10.0.0.146" diff --git a/src/config.rs b/src/config.rs index 861d377..9ad8983 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,7 @@ use std::{fs, error::Error, collections::HashMap, net::{Ipv4Addr, SocketAddr}}; use regex::{Regex, Captures}; use tracing::{debug, trace, error}; -use rumqttc::AsyncClient; +use rumqttc::{AsyncClient, has_wildcards}; use serde::Deserialize; use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor}; @@ -125,7 +125,7 @@ impl PresenceDeviceConfig { /// Set the mqtt topic to an appropriate value if it is not already set fn generate_topic(&mut self, identifier: &str, config: &Config) { if self.mqtt.is_none() { - let topic = config.presence.topic.clone() + "/" + identifier; + let topic = config.presence.topic.replace('+', identifier).replace('#', identifier); trace!("Setting presence mqtt topic: {topic}"); self.mqtt = Some(MqttDeviceConfig { topic }); } @@ -187,7 +187,16 @@ impl Config { return Err("Missing environment variables".into()); } - let config = toml::from_str(&file)?; + let config: Config = toml::from_str(&file)?; + + // Some extra config validation + if !has_wildcards(&config.presence.topic) { + return Err(format!("Invalid presence topic '{}', needs to contain a wildcard (+/#) in order to listen to presence devices", config.presence.topic).into()); + } + + // @TODO It would be nice it was possible to add validation to serde, + // that way we can check that the provided mqtt topics are actually valid + Ok(config) } } diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index a4b118a..50090c8 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -4,7 +4,7 @@ use std::net::{TcpStream, SocketAddr, Ipv4Addr}; use bytes::{BufMut, Buf}; use google_home::errors::{ErrorCode, DeviceError}; use google_home::traits::{self, OnOff}; -use rumqttc::AsyncClient; +use rumqttc::{AsyncClient, matches}; use serde::{Deserialize, Serialize}; use tracing::{error, warn}; use pollster::FutureExt as _; @@ -225,7 +225,7 @@ impl Device for AudioSetup { impl OnMqtt for AudioSetup { fn on_mqtt(&mut self, message: &rumqttc::Publish) { - if message.topic != self.mqtt.topic { + if !matches(&message.topic, &self.mqtt.topic) { return; } diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs index e6b0fde..c0cfe8b 100644 --- a/src/devices/contact_sensor.rs +++ b/src/devices/contact_sensor.rs @@ -1,7 +1,7 @@ use std::time::Duration; use pollster::FutureExt; -use rumqttc::AsyncClient; +use rumqttc::{AsyncClient, matches}; use tokio::task::JoinHandle; use tracing::{error, debug, warn}; @@ -50,7 +50,7 @@ impl OnPresence for ContactSensor { impl OnMqtt for ContactSensor { fn on_mqtt(&mut self, message: &rumqttc::Publish) { - if message.topic != self.mqtt.topic { + if !matches(&message.topic, &self.mqtt.topic) { return; } diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index 447cad4..6d40f18 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -2,7 +2,7 @@ use std::time::Duration; use google_home::errors::ErrorCode; use google_home::{GoogleHomeDevice, device, types::Type, traits}; -use rumqttc::{AsyncClient, Publish}; +use rumqttc::{AsyncClient, Publish, matches}; use tracing::{debug, trace, error}; use tokio::task::JoinHandle; use pollster::FutureExt as _; @@ -48,7 +48,7 @@ impl Device for IkeaOutlet { impl OnMqtt for IkeaOutlet { fn on_mqtt(&mut self, message: &Publish) { // Update the internal state based on what the device has reported - if message.topic != self.mqtt.topic { + if !matches(&message.topic, &self.mqtt.topic) { return; } diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index a7d1729..ebb4dc5 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -1,6 +1,6 @@ use google_home::{GoogleHomeDevice, types::Type, device, traits::{self, Scene}, errors::{ErrorCode, DeviceError}}; use tracing::{debug, error}; -use rumqttc::{AsyncClient, Publish}; +use rumqttc::{AsyncClient, Publish, matches}; use pollster::FutureExt as _; use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::{OnMqtt, ActivateMessage}}; @@ -31,7 +31,7 @@ impl Device for WakeOnLAN { impl OnMqtt for WakeOnLAN { fn on_mqtt(&mut self, message: &Publish) { - if message.topic != self.mqtt.topic { + if !matches(&message.topic, &self.mqtt.topic) { return; } diff --git a/src/light_sensor.rs b/src/light_sensor.rs index b3f235e..8d330ae 100644 --- a/src/light_sensor.rs +++ b/src/light_sensor.rs @@ -1,7 +1,7 @@ use std::sync::{Weak, RwLock}; use pollster::FutureExt as _; -use rumqttc::AsyncClient; +use rumqttc::{AsyncClient, matches}; use tracing::{span, Level, log::{error, trace}, debug}; use crate::{config::{MqttDeviceConfig, LightSensorConfig}, mqtt::{OnMqtt, BrightnessMessage}}; @@ -42,7 +42,7 @@ impl LightSensor { impl OnMqtt for LightSensor { fn on_mqtt(&mut self, message: &rumqttc::Publish) { - if message.topic != self.mqtt.topic { + if !matches(&message.topic, &self.mqtt.topic) { return; } diff --git a/src/presence.rs b/src/presence.rs index 7bbb1c1..4ae1efe 100644 --- a/src/presence.rs +++ b/src/presence.rs @@ -1,7 +1,7 @@ use std::{sync::{Weak, RwLock}, collections::HashMap}; use tracing::{debug, span, Level, error}; -use rumqttc::AsyncClient; +use rumqttc::{AsyncClient, matches}; use pollster::FutureExt as _; use crate::{mqtt::{OnMqtt, PresenceMessage}, config::MqttDeviceConfig}; @@ -19,9 +19,7 @@ pub struct Presence { impl Presence { pub fn new(mqtt: MqttDeviceConfig, client: AsyncClient) -> Self { - // @TODO Handle potential errors here - let topic = mqtt.topic.clone() + "/+"; - client.subscribe(topic, rumqttc::QoS::AtLeastOnce).block_on().unwrap(); + client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).block_on().unwrap(); Self { listeners: Vec::new(), devices: HashMap::new(), overall_presence: false, mqtt } } @@ -42,41 +40,44 @@ impl Presence { impl OnMqtt for Presence { fn on_mqtt(&mut self, message: &rumqttc::Publish) { - if message.topic.starts_with(&(self.mqtt.topic.clone() + "/")) { - let device_name = message.topic.rsplit_once("/").unwrap().1; + if !matches(&message.topic, &self.mqtt.topic) { + return; + } - if message.payload.len() == 0 { - // Remove the device from the map - debug!("State of device [{device_name}] has been removed"); - self.devices.remove(device_name); - } else { - let present = match PresenceMessage::try_from(message) { - Ok(state) => state.present(), - Err(err) => { - error!("Failed to parse message: {err}"); - return; - } - }; + // @TODO More robust mechanism for splitting + let device_name = message.topic.rsplit_once("/").unwrap().1; - debug!("State of device [{device_name}] has changed: {}", present); - self.devices.insert(device_name.to_owned(), present); - } + if message.payload.len() == 0 { + // Remove the device from the map + debug!("State of device [{device_name}] has been removed"); + self.devices.remove(device_name); + } else { + let present = match PresenceMessage::try_from(message) { + Ok(state) => state.present(), + Err(err) => { + error!("Failed to parse message: {err}"); + return; + } + }; - let overall_presence = self.devices.iter().any(|(_, v)| *v); - if overall_presence != self.overall_presence { - debug!("Overall presence updated: {overall_presence}"); - self.overall_presence = overall_presence; + debug!("State of device [{device_name}] has changed: {}", present); + self.devices.insert(device_name.to_owned(), present); + } - // Remove non-existing listeners - self.listeners.retain(|listener| listener.strong_count() > 0); - // Clone the listeners - let listeners = self.listeners.clone(); + let overall_presence = self.devices.iter().any(|(_, v)| *v); + if overall_presence != self.overall_presence { + debug!("Overall presence updated: {overall_presence}"); + self.overall_presence = overall_presence; - // Notify might block, so we spawn a blocking task - tokio::task::spawn_blocking(move || { - Presence::notify(overall_presence, listeners); - }); - } + // Remove non-existing listeners + self.listeners.retain(|listener| listener.strong_count() > 0); + // Clone the listeners + let listeners = self.listeners.clone(); + + // Notify might block, so we spawn a blocking task + tokio::task::spawn_blocking(move || { + Presence::notify(overall_presence, listeners); + }); } } }