diff --git a/config.lua b/config.lua index 39e0c13..ace9b39 100644 --- a/config.lua +++ b/config.lua @@ -20,6 +20,7 @@ automation.device_manager:add(Ntfy.new({ automation.device_manager:add(Presence.new({ topic = "automation_dev/presence/+/#", + client = automation.mqtt_client, event_channel = automation.event_channel, })) @@ -45,6 +46,7 @@ automation.device_manager:add(HueBridge.new({ automation.device_manager:add(LightSensor.new({ identifier = "living_light_sensor", topic = mqtt_z2m("living/light"), + client = automation.mqtt_client, min = 22000, max = 23500, event_channel = automation.event_channel, @@ -54,6 +56,7 @@ automation.device_manager:add(WakeOnLAN.new({ name = "Zeus", room = "Living Room", topic = mqtt_automation("appliance/living_room/zeus"), + client = automation.mqtt_client, mac_address = "30:9c:23:60:9c:13", broadcast_ip = "10.0.0.255", })) @@ -66,6 +69,7 @@ automation.device_manager:add(living_speakers) automation.device_manager:add(AudioSetup.new({ identifier = "living_audio", topic = mqtt_z2m("living/remote"), + client = automation.mqtt_client, mixer = living_mixer, speakers = living_speakers, })) @@ -95,6 +99,7 @@ automation.device_manager:add(IkeaOutlet.new({ automation.device_manager:add(Washer.new({ identifier = "bathroom_washer", topic = mqtt_z2m("batchroom/washer"), + client = automation.mqtt_client, threshold = 1, event_channel = automation.event_channel, })) @@ -125,6 +130,7 @@ local hallway_lights = automation.device_manager:add(HueGroup.new({ remotes = { { topic = mqtt_z2m("hallway/remote") }, }, + client = automation.mqtt_client, })) automation.device_manager:add(ContactSensor.new({ diff --git a/src/device_manager.rs b/src/device_manager.rs index d24e4d7..0ec40d1 100644 --- a/src/device_manager.rs +++ b/src/device_manager.rs @@ -5,10 +5,9 @@ use std::sync::Arc; use futures::future::join_all; use google_home::traits::OnOff; use mlua::{FromLua, LuaSerdeExt}; -use rumqttc::{matches, AsyncClient, QoS}; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio_cron_scheduler::{Job, JobScheduler}; -use tracing::{debug, error, instrument, trace}; +use tracing::{debug, instrument, trace}; use crate::devices::{As, Device}; use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence}; @@ -43,17 +42,15 @@ pub type DeviceMap = HashMap>>>; #[derive(Debug, Clone)] pub struct DeviceManager { devices: Arc>, - client: AsyncClient, event_channel: EventChannel, } impl DeviceManager { - pub fn new(client: AsyncClient) -> Self { + pub fn new() -> Self { let (event_channel, mut event_rx) = EventChannel::new(); let device_manager = Self { devices: Arc::new(RwLock::new(HashMap::new())), - client, event_channel, }; @@ -127,18 +124,6 @@ impl DeviceManager { debug!(id, "Adding device"); - // If the device listens to mqtt, subscribe to the topics - if let Some(device) = As::::cast(device.read().await.as_ref()) { - for topic in device.topics() { - trace!(id, topic, "Subscribing to topic"); - if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await { - // NOTE: Pretty sure that this can only happen if the mqtt client if no longer - // running - error!(id, topic, "Failed to subscribe to topic: {err}"); - } - } - } - self.devices.write().await.insert(id, device.0.clone()); } @@ -170,16 +155,16 @@ impl DeviceManager { let mut device = device.write().await; let device = device.as_mut(); if let Some(device) = As::::cast_mut(device) { - let subscribed = device - .topics() - .iter() - .any(|topic| matches(&message.topic, topic)); - - if subscribed { - trace!(id, "Handling"); - device.on_mqtt(message).await; - trace!(id, "Done"); - } + // let subscribed = device + // .topics() + // .iter() + // .any(|topic| matches(&message.topic, topic)); + // + // if subscribed { + trace!(id, "Handling"); + device.on_mqtt(message).await; + trace!(id, "Done"); + // } } } }); @@ -235,6 +220,12 @@ impl DeviceManager { } } +impl Default for DeviceManager { + fn default() -> Self { + Self::new() + } +} + impl mlua::UserData for DeviceManager { fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) { methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move { diff --git a/src/devices/air_filter.rs b/src/devices/air_filter.rs index ac2346c..63461b3 100644 --- a/src/devices/air_filter.rs +++ b/src/devices/air_filter.rs @@ -42,7 +42,7 @@ impl AirFilter { self.config .client .publish( - topic.clone(), + &topic, rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap(), @@ -56,6 +56,12 @@ impl AirFilter { impl AirFilter { async fn create(config: AirFilterConfig) -> Result { trace!(id = config.info.identifier(), "Setting up AirFilter"); + + config + .client + .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) + .await?; + Ok(Self { config, last_known_state: AirFilterState { @@ -74,11 +80,11 @@ impl Device for AirFilter { #[async_trait] impl OnMqtt for AirFilter { - fn topics(&self) -> Vec<&str> { - vec![&self.config.mqtt.topic] - } - async fn on_mqtt(&mut self, message: Publish) { + if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { + return; + } + let state = match AirFilterState::try_from(message) { Ok(state) => state, Err(err) => { diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index b7f7944..6697a8f 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -10,6 +10,7 @@ use crate::devices::As; use crate::error::DeviceConfigError; use crate::event::{OnMqtt, OnPresence}; use crate::messages::{RemoteAction, RemoteMessage}; +use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] pub struct AudioSetupConfig { @@ -20,6 +21,8 @@ pub struct AudioSetupConfig { mixer: WrappedDevice, #[device_config(from_lua)] speakers: WrappedDevice, + #[device_config(from_lua)] + client: WrappedAsyncClient, } #[derive(Debug, LuaDevice)] @@ -42,6 +45,11 @@ impl AudioSetup { return Err(DeviceConfigError::MissingTrait(speakers_id, "OnOff".into())); } + config + .client + .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) + .await?; + Ok(AudioSetup { config }) } } @@ -54,11 +62,11 @@ impl Device for AudioSetup { #[async_trait] impl OnMqtt for AudioSetup { - fn topics(&self) -> Vec<&str> { - vec![&self.config.mqtt.topic] - } - async fn on_mqtt(&mut self, message: rumqttc::Publish) { + if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { + return; + } + let action = match RemoteMessage::try_from(message) { Ok(message) => message.action(), Err(err) => { diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs index 849f78c..8105620 100644 --- a/src/devices/contact_sensor.rs +++ b/src/devices/contact_sensor.rs @@ -91,6 +91,11 @@ impl ContactSensor { } } + config + .client + .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) + .await?; + Ok(Self { config: config.clone(), overall_presence: DEFAULT_PRESENCE, @@ -115,11 +120,11 @@ impl OnPresence for ContactSensor { #[async_trait] impl OnMqtt for ContactSensor { - fn topics(&self) -> Vec<&str> { - vec![&self.config.mqtt.topic] - } - async fn on_mqtt(&mut self, message: rumqttc::Publish) { + if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { + return; + } + let is_closed = match ContactMessage::try_from(message) { Ok(state) => state.is_closed(), Err(err) => { @@ -187,7 +192,7 @@ impl OnMqtt for ContactSensor { self.config .client .publish( - presence.mqtt.topic.clone(), + &presence.mqtt.topic, rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&PresenceMessage::new(true)).unwrap(), @@ -212,7 +217,7 @@ impl OnMqtt for ContactSensor { tokio::time::sleep(timeout).await; debug!(id, "Removing door device!"); client - .publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "") + .publish(&topic, rumqttc::QoS::AtLeastOnce, false, "") .await .map_err(|err| warn!("Failed to publish presence on {topic}: {err}")) .ok(); diff --git a/src/devices/hue_light.rs b/src/devices/hue_light.rs index 47649ba..fc4606c 100644 --- a/src/devices/hue_light.rs +++ b/src/devices/hue_light.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use automation_macro::{LuaDevice, LuaDeviceConfig}; use google_home::errors::ErrorCode; use google_home::traits::OnOff; -use rumqttc::Publish; +use rumqttc::{Publish, SubscribeFilter}; use tracing::{debug, error, trace, warn}; use super::Device; @@ -14,6 +14,7 @@ use crate::config::MqttDeviceConfig; use crate::error::DeviceConfigError; use crate::event::OnMqtt; use crate::messages::{RemoteAction, RemoteMessage}; +use crate::mqtt::WrappedAsyncClient; use crate::traits::Timeout; #[derive(Debug, Clone, LuaDeviceConfig)] @@ -27,6 +28,8 @@ pub struct HueGroupConfig { pub scene_id: String, #[device_config(default)] pub remotes: Vec, + #[device_config(from_lua)] + client: WrappedAsyncClient, } #[derive(Debug, LuaDevice)] @@ -39,6 +42,17 @@ pub struct HueGroup { impl HueGroup { async fn create(config: HueGroupConfig) -> Result { trace!(id = config.identifier, "Setting up AudioSetup"); + + if !config.remotes.is_empty() { + config + .client + .subscribe_many(config.remotes.iter().map(|remote| SubscribeFilter { + path: remote.topic.clone(), + qos: rumqttc::QoS::AtLeastOnce, + })) + .await?; + } + Ok(Self { config }) } @@ -67,15 +81,16 @@ impl Device for HueGroup { #[async_trait] impl OnMqtt for HueGroup { - fn topics(&self) -> Vec<&str> { - self.config + async fn on_mqtt(&mut self, message: Publish) { + if !self + .config .remotes .iter() - .map(|mqtt| mqtt.topic.as_str()) - .collect() - } + .any(|remote| rumqttc::matches(&message.topic, &remote.topic)) + { + return; + } - async fn on_mqtt(&mut self, message: Publish) { let action = match RemoteMessage::try_from(message) { Ok(message) => message.action(), Err(err) => { diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index 3ad0150..85862c7 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -7,7 +7,7 @@ use google_home::errors::ErrorCode; use google_home::traits::{self, OnOff}; use google_home::types::Type; use google_home::{device, GoogleHomeDevice}; -use rumqttc::{matches, Publish}; +use rumqttc::{matches, Publish, SubscribeFilter}; use serde::Deserialize; use tokio::task::JoinHandle; use tracing::{debug, error, trace, warn}; @@ -61,7 +61,7 @@ async fn set_on(client: WrappedAsyncClient, topic: &str, on: bool) { // TODO: Handle potential errors here client .publish( - topic.clone(), + &topic, rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap(), @@ -75,6 +75,21 @@ impl IkeaOutlet { async fn create(config: IkeaOutletConfig) -> Result { trace!(id = config.info.identifier(), "Setting up IkeaOutlet"); + if !config.remotes.is_empty() { + config + .client + .subscribe_many(config.remotes.iter().map(|remote| SubscribeFilter { + path: remote.topic.clone(), + qos: rumqttc::QoS::AtLeastOnce, + })) + .await?; + } + + config + .client + .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) + .await?; + Ok(Self { config, last_known_state: false, @@ -91,19 +106,6 @@ impl Device for IkeaOutlet { #[async_trait] impl OnMqtt for IkeaOutlet { - fn topics(&self) -> Vec<&str> { - let mut topics: Vec<_> = self - .config - .remotes - .iter() - .map(|mqtt| mqtt.topic.as_str()) - .collect(); - - topics.push(&self.config.mqtt.topic); - - topics - } - async fn on_mqtt(&mut self, message: Publish) { // Check if the message is from the deviec itself or from a remote if matches(&message.topic, &self.config.mqtt.topic) { @@ -131,7 +133,12 @@ impl OnMqtt for IkeaOutlet { if state && let Some(timeout) = self.config.timeout { self.start_timeout(timeout).await.unwrap(); } - } else { + } else if self + .config + .remotes + .iter() + .any(|remote| rumqttc::matches(&message.topic, &remote.topic)) + { let action = match RemoteMessage::try_from(message) { Ok(message) => message.action(), Err(err) => { diff --git a/src/devices/light_sensor.rs b/src/devices/light_sensor.rs index 00e4b0e..cf0d10a 100644 --- a/src/devices/light_sensor.rs +++ b/src/devices/light_sensor.rs @@ -8,6 +8,7 @@ use crate::devices::Device; use crate::error::DeviceConfigError; use crate::event::{self, Event, EventChannel, OnMqtt}; use crate::messages::BrightnessMessage; +use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] pub struct LightSensorConfig { @@ -18,6 +19,8 @@ pub struct LightSensorConfig { pub max: isize, #[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))] pub tx: event::Sender, + #[device_config(from_lua)] + client: WrappedAsyncClient, } pub const DEFAULT: bool = false; @@ -33,6 +36,12 @@ pub struct LightSensor { impl LightSensor { async fn create(config: LightSensorConfig) -> Result { trace!(id = config.identifier, "Setting up LightSensor"); + + config + .client + .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) + .await?; + Ok(Self { config, is_dark: DEFAULT, @@ -48,11 +57,11 @@ impl Device for LightSensor { #[async_trait] impl OnMqtt for LightSensor { - fn topics(&self) -> Vec<&str> { - vec![&self.config.mqtt.topic] - } - async fn on_mqtt(&mut self, message: Publish) { + if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { + return; + } + let illuminance = match BrightnessMessage::try_from(message) { Ok(state) => state.illuminance(), Err(err) => { diff --git a/src/devices/presence.rs b/src/devices/presence.rs index 712b6e5..d6f8ac0 100644 --- a/src/devices/presence.rs +++ b/src/devices/presence.rs @@ -10,6 +10,7 @@ use crate::devices::Device; use crate::error::DeviceConfigError; use crate::event::{self, Event, EventChannel, OnMqtt}; use crate::messages::PresenceMessage; +use crate::mqtt::WrappedAsyncClient; #[derive(Debug, LuaDeviceConfig)] pub struct PresenceConfig { @@ -17,6 +18,8 @@ pub struct PresenceConfig { pub mqtt: MqttDeviceConfig, #[device_config(from_lua, rename("event_channel"), with(|ec: EventChannel| ec.get_tx()))] tx: event::Sender, + #[device_config(from_lua)] + client: WrappedAsyncClient, } pub const DEFAULT_PRESENCE: bool = false; @@ -32,6 +35,12 @@ pub struct Presence { impl Presence { async fn create(config: PresenceConfig) -> Result { trace!(id = "ntfy", "Setting up Presence"); + + config + .client + .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) + .await?; + Ok(Self { config, devices: HashMap::new(), @@ -48,11 +57,11 @@ impl Device for Presence { #[async_trait] impl OnMqtt for Presence { - fn topics(&self) -> Vec<&str> { - vec![&self.config.mqtt.topic] - } - async fn on_mqtt(&mut self, message: Publish) { + if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { + return; + } + let offset = self .config .mqtt diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index 9a97ee4..23c92de 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -15,6 +15,7 @@ use crate::config::{InfoConfig, MqttDeviceConfig}; use crate::error::DeviceConfigError; use crate::event::OnMqtt; use crate::messages::ActivateMessage; +use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] pub struct WakeOnLANConfig { @@ -25,6 +26,8 @@ pub struct WakeOnLANConfig { mac_address: MacAddress, #[device_config(default(Ipv4Addr::new(255, 255, 255, 255)))] broadcast_ip: Ipv4Addr, + #[device_config(from_lua)] + client: WrappedAsyncClient, } #[derive(Debug, LuaDevice)] @@ -37,6 +40,11 @@ impl WakeOnLAN { async fn create(config: WakeOnLANConfig) -> Result { trace!(id = config.info.identifier(), "Setting up WakeOnLAN"); + config + .client + .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) + .await?; + Ok(Self { config }) } } @@ -49,11 +57,11 @@ impl Device for WakeOnLAN { #[async_trait] impl OnMqtt for WakeOnLAN { - fn topics(&self) -> Vec<&str> { - vec![&self.config.mqtt.topic] - } - async fn on_mqtt(&mut self, message: Publish) { + if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { + return; + } + let activate = match ActivateMessage::try_from(message) { Ok(message) => message.activate(), Err(err) => { diff --git a/src/devices/washer.rs b/src/devices/washer.rs index 75e2f91..342af84 100644 --- a/src/devices/washer.rs +++ b/src/devices/washer.rs @@ -9,6 +9,7 @@ use crate::config::MqttDeviceConfig; use crate::error::DeviceConfigError; use crate::event::{self, Event, EventChannel, OnMqtt}; use crate::messages::PowerMessage; +use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] pub struct WasherConfig { @@ -19,6 +20,8 @@ pub struct WasherConfig { threshold: f32, #[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))] pub tx: event::Sender, + #[device_config(from_lua)] + client: WrappedAsyncClient, } // TODO: Add google home integration @@ -33,6 +36,12 @@ pub struct Washer { impl Washer { async fn create(config: WasherConfig) -> Result { trace!(id = config.identifier, "Setting up Washer"); + + config + .client + .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) + .await?; + Ok(Self { config, running: 0 }) } } @@ -50,11 +59,11 @@ const HYSTERESIS: isize = 10; #[async_trait] impl OnMqtt for Washer { - fn topics(&self) -> Vec<&str> { - vec![&self.config.mqtt.topic] - } - async fn on_mqtt(&mut self, message: Publish) { + if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { + return; + } + let power = match PowerMessage::try_from(message) { Ok(state) => state.power(), Err(err) => { diff --git a/src/error.rs b/src/error.rs index fbd47d6..c2f0723 100644 --- a/src/error.rs +++ b/src/error.rs @@ -98,6 +98,8 @@ pub enum DeviceConfigError { MissingTrait(String, String), #[error(transparent)] MissingWildcard(#[from] MissingWildcard), + #[error(transparent)] + MqttClientError(#[from] rumqttc::ClientError), } #[derive(Debug, Error)] diff --git a/src/event.rs b/src/event.rs index 0d60063..afbf918 100644 --- a/src/event.rs +++ b/src/event.rs @@ -37,7 +37,7 @@ impl mlua::UserData for EventChannel {} #[async_trait] #[device_trait] pub trait OnMqtt { - fn topics(&self) -> Vec<&str>; + // fn topics(&self) -> Vec<&str>; async fn on_mqtt(&mut self, message: Publish); } diff --git a/src/main.rs b/src/main.rs index e87fb2c..8f5a62f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -61,7 +61,7 @@ async fn app() -> anyhow::Result<()> { let (client, eventloop) = AsyncClient::new(config.mqtt.clone(), 100); // Setup the device handler - let device_manager = DeviceManager::new(client.clone()); + let device_manager = DeviceManager::new(); let event_channel = device_manager.event_channel();