diff --git a/Cargo.toml b/Cargo.toml index be5ca80..2c694cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,7 @@ version = "0.1.0" edition = "2021" [workspace] -members = [ - "impl_cast", - "google-home" -] +members = ["impl_cast", "google-home"] [dependencies] rumqttc = "0.18" diff --git a/src/config.rs b/src/config.rs index be81781..609ccc2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -281,9 +281,14 @@ impl Device { info.name, info.room ); - IkeaOutlet::build(identifier, info, mqtt, outlet_type, timeout, client) - .await - .map(Box::new)? + Box::new(IkeaOutlet::new( + identifier, + info, + mqtt, + outlet_type, + timeout, + client, + )) } Device::WakeOnLAN { info, @@ -297,9 +302,13 @@ impl Device { info.name, info.room ); - WakeOnLAN::build(identifier, info, mqtt, mac_address, broadcast_ip, client) - .await - .map(Box::new)? + Box::new(WakeOnLAN::new( + identifier, + info, + mqtt, + mac_address, + broadcast_ip, + )) } Device::KasaOutlet { ip } => { trace!(id = identifier, "KasaOutlet [{}]", identifier); @@ -319,7 +328,7 @@ impl Device { .create(&speakers_id, config, client.clone()) .await?; - AudioSetup::build(identifier, mqtt, mixer, speakers, client) + AudioSetup::build(identifier, mqtt, mixer, speakers) .await .map(Box::new)? } @@ -329,9 +338,7 @@ impl Device { .map(|p| p.generate_topic("contact", identifier, config)) .transpose()?; - ContactSensor::build(identifier, mqtt, presence, client) - .await - .map(Box::new)? + Box::new(ContactSensor::new(identifier, mqtt, presence, client)) } }; diff --git a/src/devices.rs b/src/devices.rs index 59a5461..c3c4dfc 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -15,9 +15,10 @@ use std::collections::HashMap; use async_trait::async_trait; use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice}; use pollster::FutureExt; +use rumqttc::{AsyncClient, QoS}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, trace}; +use tracing::{debug, error, trace}; use crate::{ light_sensor::{self, OnDarkness}, @@ -35,6 +36,7 @@ pub trait Device: std::fmt::Debug + Sync + Send { #[derive(Debug)] struct Devices { devices: HashMap>, + client: AsyncClient, } #[derive(Debug)] @@ -94,9 +96,11 @@ pub fn start( mut mqtt_rx: mqtt::Receiver, mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, + client: AsyncClient, ) -> DevicesHandle { let mut devices = Devices { devices: HashMap::new(), + client, }; let (tx, mut rx) = mpsc::channel(100); @@ -118,7 +122,7 @@ pub fn start( } // TODO: Handle receiving None better, otherwise it might constantly run doing // nothing - Some(cmd) = rx.recv() => devices.handle_cmd(cmd) + Some(cmd) = rx.recv() => devices.handle_cmd(cmd).await } } }); @@ -127,7 +131,7 @@ pub fn start( } impl Devices { - fn handle_cmd(&mut self, cmd: Command) { + async fn handle_cmd(&mut self, cmd: Command) { match cmd { Command::Fullfillment { google_home, @@ -139,15 +143,29 @@ impl Devices { tx.send(result).ok(); } Command::AddDevice { device, tx } => { - self.add_device(device); + self.add_device(device).await; tx.send(()).ok(); } } } - fn add_device(&mut self, device: Box) { - debug!(id = device.get_id(), "Adding device"); + async fn add_device(&mut self, device: Box) { + let id = device.get_id(); + debug!(id, "Adding device"); + + // If the device listens to mqtt, subscribe to the topics + if let Some(device) = As::::cast(device.as_ref()) { + for topic in device.topics() { + trace!(id, topic, "Subscribing to topic"); + if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await { + // NOTE: Pretty sure that this can only happen if the mqtt client if no longer + // running + error!(id, topic, "Failed to subscribe to topic: {err}"); + } + } + } + self.devices.insert(device.get_id().to_owned(), device); } @@ -165,6 +183,10 @@ impl Devices { #[async_trait] impl OnMqtt for Devices { + fn topics(&self) -> Vec<&str> { + Vec::new() + } + #[tracing::instrument(skip_all)] async fn on_mqtt(&mut self, message: &rumqttc::Publish) { self.get::() diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index 9ed9e08..22a7272 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use google_home::traits; -use rumqttc::{matches, AsyncClient}; +use rumqttc::matches; use tracing::{debug, error, warn}; use crate::config::MqttDeviceConfig; @@ -26,19 +26,13 @@ impl AudioSetup { mqtt: MqttDeviceConfig, mixer: Box, speakers: Box, - client: AsyncClient, ) -> Result { // We expect the children devices to implement the OnOff trait let mixer_id = mixer.get_id().to_owned(); - let mixer = As::consume(mixer).ok_or_else(|| DeviceError::OnOffExpected(mixer_id))?; + let mixer = As::consume(mixer).ok_or(DeviceError::OnOffExpected(mixer_id))?; let speakers_id = speakers.get_id().to_owned(); - let speakers = - As::consume(speakers).ok_or_else(|| DeviceError::OnOffExpected(speakers_id))?; - - client - .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) - .await?; + let speakers = As::consume(speakers).ok_or(DeviceError::OnOffExpected(speakers_id))?; Ok(Self { identifier: identifier.to_owned(), @@ -57,6 +51,10 @@ impl Device for AudioSetup { #[async_trait] impl OnMqtt for AudioSetup { + fn topics(&self) -> Vec<&str> { + vec![&self.mqtt.topic] + } + async fn on_mqtt(&mut self, message: &rumqttc::Publish) { if !matches(&message.topic, &self.mqtt.topic) { return; diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs index 31036d5..2045259 100644 --- a/src/devices/contact_sensor.rs +++ b/src/devices/contact_sensor.rs @@ -7,7 +7,6 @@ use tracing::{debug, error, warn}; use crate::{ config::{MqttDeviceConfig, PresenceDeviceConfig}, - error::DeviceError, mqtt::{ContactMessage, OnMqtt, PresenceMessage}, presence::OnPresence, }; @@ -27,17 +26,13 @@ pub struct ContactSensor { } impl ContactSensor { - pub async fn build( + pub fn new( identifier: &str, mqtt: MqttDeviceConfig, presence: Option, client: AsyncClient, - ) -> Result { - client - .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) - .await?; - - Ok(Self { + ) -> Self { + Self { identifier: identifier.to_owned(), mqtt, presence, @@ -45,7 +40,7 @@ impl ContactSensor { overall_presence: false, is_closed: true, handle: None, - }) + } } } @@ -64,6 +59,10 @@ impl OnPresence for ContactSensor { #[async_trait] impl OnMqtt for ContactSensor { + fn topics(&self) -> Vec<&str> { + vec![&self.mqtt.topic] + } + async fn on_mqtt(&mut self, message: &rumqttc::Publish) { if !matches(&message.topic, &self.mqtt.topic) { return; diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index 43a97c0..5ec0667 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -14,7 +14,6 @@ use tracing::{debug, error, warn}; use crate::config::{InfoConfig, MqttDeviceConfig, OutletType}; use crate::devices::Device; -use crate::error::DeviceError; use crate::mqtt::{OnMqtt, OnOffMessage}; use crate::presence::OnPresence; @@ -32,20 +31,15 @@ pub struct IkeaOutlet { } impl IkeaOutlet { - pub async fn build( + pub fn new( identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, outlet_type: OutletType, timeout: Option, client: AsyncClient, - ) -> Result { - // TODO: Handle potential errors here - client - .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) - .await?; - - Ok(Self { + ) -> Self { + Self { identifier: identifier.to_owned(), info, mqtt, @@ -54,7 +48,7 @@ impl IkeaOutlet { client, last_known_state: false, handle: None, - }) + } } } @@ -83,6 +77,10 @@ impl Device for IkeaOutlet { #[async_trait] impl OnMqtt for IkeaOutlet { + fn topics(&self) -> Vec<&str> { + vec![&self.mqtt.topic] + } + async fn on_mqtt(&mut self, message: &Publish) { // Update the internal state based on what the device has reported if !matches(&message.topic, &self.mqtt.topic) { diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index 0493e2d..d4d873d 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -9,12 +9,11 @@ use google_home::{ types::Type, GoogleHomeDevice, }; -use rumqttc::{matches, AsyncClient, Publish}; +use rumqttc::{matches, Publish}; use tracing::{debug, error}; use crate::{ config::{InfoConfig, MqttDeviceConfig}, - error::DeviceError, mqtt::{ActivateMessage, OnMqtt}, }; @@ -30,26 +29,20 @@ pub struct WakeOnLAN { } impl WakeOnLAN { - pub async fn build( + pub fn new( identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, mac_address: MacAddress, broadcast_ip: Ipv4Addr, - client: AsyncClient, - ) -> Result { - // TODO: Handle potential errors here - client - .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) - .await?; - - Ok(Self { + ) -> Self { + Self { identifier: identifier.to_owned(), info, mqtt, mac_address, broadcast_ip, - }) + } } } @@ -61,6 +54,10 @@ impl Device for WakeOnLAN { #[async_trait] impl OnMqtt for WakeOnLAN { + fn topics(&self) -> Vec<&str> { + vec![&self.mqtt.topic] + } + async fn on_mqtt(&mut self, message: &Publish) { if !matches(&message.topic, &self.mqtt.topic) { return; diff --git a/src/error.rs b/src/error.rs index d8910f6..13c81f9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -84,8 +84,6 @@ impl MissingWildcard { #[derive(Debug, Error)] pub enum DeviceError { - #[error(transparent)] - SubscribeError(#[from] ClientError), #[error("Expected device '{0}' to implement OnOff trait")] OnOffExpected(String), } diff --git a/src/light_sensor.rs b/src/light_sensor.rs index 8f3896c..ddfb415 100644 --- a/src/light_sensor.rs +++ b/src/light_sensor.rs @@ -65,6 +65,10 @@ pub async fn start( #[async_trait] impl OnMqtt for LightSensor { + fn topics(&self) -> Vec<&str> { + vec![&self.mqtt.topic] + } + async fn on_mqtt(&mut self, message: &rumqttc::Publish) { if !matches(&message.topic, &self.mqtt.topic) { return; diff --git a/src/main.rs b/src/main.rs index 96931f0..1c04b6c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -97,7 +97,12 @@ async fn app() -> anyhow::Result<()> { ); } - let devices = devices::start(mqtt.subscribe(), presence.clone(), light_sensor.clone()); + let devices = devices::start( + mqtt.subscribe(), + presence.clone(), + light_sensor.clone(), + client.clone(), + ); join_all( config .devices diff --git a/src/mqtt.rs b/src/mqtt.rs index bd3ff5a..d9f50b0 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -12,6 +12,7 @@ use tokio::sync::broadcast; #[async_trait] #[impl_cast::device_trait] pub trait OnMqtt { + fn topics(&self) -> Vec<&str>; async fn on_mqtt(&mut self, message: &Publish); } diff --git a/src/presence.rs b/src/presence.rs index 3fd20ae..64f45c8 100644 --- a/src/presence.rs +++ b/src/presence.rs @@ -70,6 +70,10 @@ pub async fn start( #[async_trait] impl OnMqtt for Presence { + fn topics(&self) -> Vec<&str> { + vec![&self.mqtt.topic] + } + async fn on_mqtt(&mut self, message: &rumqttc::Publish) { if !matches(&message.topic, &self.mqtt.topic) { return;