From b7329b58ee9bf3a426728186bf34572ef59ffd5a Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Fri, 14 Apr 2023 05:46:04 +0200 Subject: [PATCH] Everything is now implemented as a Device using device_traits with all events going through a single place --- src/config.rs | 34 +++++++-- src/debug_bridge.rs | 126 +++++++++++++++++++------------- src/devices.rs | 88 +++++++++++++++------- src/devices/audio_setup.rs | 11 ++- src/devices/contact_sensor.rs | 12 +-- src/devices/ikea_outlet.rs | 10 ++- src/devices/kasa_outlet.rs | 5 +- src/devices/wake_on_lan.rs | 6 +- src/event.rs | 24 ++---- src/hue_bridge.rs | 66 +++++++++-------- src/light_sensor.rs | 128 ++++++++++++++++---------------- src/main.rs | 68 ++++++++++------- src/mqtt.rs | 24 +++--- src/ntfy.rs | 117 ++++++++++++++++------------- src/presence.rs | 134 +++++++++++++++++----------------- 15 files changed, 487 insertions(+), 366 deletions(-) diff --git a/src/config.rs b/src/config.rs index c2151fd..0f31970 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,7 @@ use crate::{ debug_bridge::DebugBridgeConfig, devices::{self, AudioSetup, ContactSensor, IkeaOutlet, KasaOutlet, WakeOnLAN}, error::{ConfigParseError, CreateDeviceError, MissingEnv}, + event::EventChannel, hue_bridge::HueBridgeConfig, light_sensor::LightSensorConfig, presence::PresenceConfig, @@ -165,8 +166,10 @@ pub trait CreateDevice { fn create( identifier: &str, config: Self::Config, - client: AsyncClient, - presence_topic: &str, // Not a big fan of passing in the global config + event_channel: &EventChannel, + client: &AsyncClient, + // TODO: Not a big fan of passing in the global config + presence_topic: &str, ) -> Result where Self: Sized; @@ -176,16 +179,31 @@ impl Device { pub fn create( self, id: &str, - client: AsyncClient, + event_channel: &EventChannel, + client: &AsyncClient, presence: &str, ) -> Result, CreateDeviceError> { let device: Box = match self { // TODO: It would be nice if this would be more automatic, not sure how to do that... - Device::IkeaOutlet(c) => Box::new(IkeaOutlet::create(id, c, client, presence)?), - Device::WakeOnLAN(c) => Box::new(WakeOnLAN::create(id, c, client, presence)?), - Device::KasaOutlet(c) => Box::new(KasaOutlet::create(id, c, client, presence)?), - Device::AudioSetup(c) => Box::new(AudioSetup::create(id, c, client, presence)?), - Device::ContactSensor(c) => Box::new(ContactSensor::create(id, c, client, presence)?), + Device::IkeaOutlet(c) => { + Box::new(IkeaOutlet::create(id, c, event_channel, client, presence)?) + } + Device::WakeOnLAN(c) => { + Box::new(WakeOnLAN::create(id, c, event_channel, client, presence)?) + } + Device::KasaOutlet(c) => { + Box::new(KasaOutlet::create(id, c, event_channel, client, presence)?) + } + Device::AudioSetup(c) => { + Box::new(AudioSetup::create(id, c, event_channel, client, presence)?) + } + Device::ContactSensor(c) => Box::new(ContactSensor::create( + id, + c, + event_channel, + client, + presence, + )?), }; Ok(device) diff --git a/src/debug_bridge.rs b/src/debug_bridge.rs index e618e59..de87e82 100644 --- a/src/debug_bridge.rs +++ b/src/debug_bridge.rs @@ -1,64 +1,88 @@ +use async_trait::async_trait; use rumqttc::AsyncClient; use serde::Deserialize; use tracing::warn; use crate::{ - event::{Event, EventChannel}, + config::MqttDeviceConfig, + devices::Device, + light_sensor::OnDarkness, mqtt::{DarknessMessage, PresenceMessage}, + presence::OnPresence, }; #[derive(Debug, Deserialize)] pub struct DebugBridgeConfig { - pub topic: String, + #[serde(flatten)] + pub mqtt: MqttDeviceConfig, } -pub fn start(config: DebugBridgeConfig, event_channel: &EventChannel, client: AsyncClient) { - let mut rx = event_channel.get_rx(); - - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(Event::Presence(presence)) => { - let message = PresenceMessage::new(presence); - let topic = format!("{}/presence", config.topic); - client - .publish( - topic, - rumqttc::QoS::AtLeastOnce, - true, - serde_json::to_string(&message).unwrap(), - ) - .await - .map_err(|err| { - warn!( - "Failed to update presence on {}/presence: {err}", - config.topic - ) - }) - .ok(); - } - Ok(Event::Darkness(dark)) => { - let message = DarknessMessage::new(dark); - let topic = format!("{}/darkness", config.topic); - client - .publish( - topic, - rumqttc::QoS::AtLeastOnce, - true, - serde_json::to_string(&message).unwrap(), - ) - .await - .map_err(|err| { - warn!( - "Failed to update presence on {}/presence: {err}", - config.topic - ) - }) - .ok(); - } - Ok(_) => {} - Err(_) => todo!("Handle errors with the event channel properly"), - } - } - }); +#[derive(Debug)] +pub struct DebugBridge { + mqtt: MqttDeviceConfig, + client: AsyncClient, +} + +impl DebugBridge { + pub fn new( + config: DebugBridgeConfig, + client: &AsyncClient, + ) -> Result { + Ok(Self { + mqtt: config.mqtt, + client: client.clone(), + }) + } +} + +impl Device for DebugBridge { + fn get_id(&self) -> &str { + "debug_bridge" + } +} + +#[async_trait] +impl OnPresence for DebugBridge { + async fn on_presence(&mut self, presence: bool) { + let message = PresenceMessage::new(presence); + let topic = format!("{}/presence", self.mqtt.topic); + self.client + .publish( + topic, + rumqttc::QoS::AtLeastOnce, + true, + serde_json::to_string(&message).unwrap(), + ) + .await + .map_err(|err| { + warn!( + "Failed to update presence on {}/presence: {err}", + self.mqtt.topic + ) + }) + .ok(); + } +} + +#[async_trait] +impl OnDarkness for DebugBridge { + async fn on_darkness(&mut self, dark: bool) { + let message = DarknessMessage::new(dark); + let topic = format!("{}/darkness", self.mqtt.topic); + self.client + .publish( + topic, + rumqttc::QoS::AtLeastOnce, + true, + serde_json::to_string(&message).unwrap(), + ) + .await + .map_err(|err| { + warn!( + "Failed to update presence on {}/presence: {err}", + self.mqtt.topic + ) + }) + .ok(); + } } diff --git a/src/devices.rs b/src/devices.rs index e3501b7..a75e9bf 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -12,21 +12,22 @@ pub use self::wake_on_lan::WakeOnLAN; use std::collections::HashMap; +use futures::future::join_all; use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice}; -use pollster::FutureExt; use rumqttc::{matches, AsyncClient, QoS}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, trace}; +use tracing::{debug, error, instrument, trace}; use crate::{ event::{Event, EventChannel}, light_sensor::OnDarkness, mqtt::OnMqtt, + ntfy::OnNotification, presence::OnPresence, }; -#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + GoogleHomeDevice + OnOff)] +#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + OnNotification + GoogleHomeDevice + OnOff)] pub trait Device: std::fmt::Debug + Sync + Send { fn get_id(&self) -> &str; } @@ -85,6 +86,20 @@ impl DevicesHandle { Ok(rx.await??) } + // TODO: Finish implementing this + // pub fn create_device(&self, identifier: &str, config: T::Config, presence_topic: &str) -> Result + // where + // T: CreateDevice, + // { + // T::create( + // identifier, + // config, + // self.event_channel, + // self.client, + // presence_topic: presence_topic.to_owned(), + // ) + // } + pub async fn add_device(&self, device: Box) -> Result<(), DevicesError> { let (tx, rx) = oneshot::channel(); self.tx.send(Command::AddDevice { device, tx }).await?; @@ -92,21 +107,21 @@ impl DevicesHandle { } } -pub fn start(event_channel: &EventChannel, client: AsyncClient) -> DevicesHandle { +pub fn start(client: AsyncClient) -> (DevicesHandle, EventChannel) { let mut devices = Devices { devices: HashMap::new(), client, }; + let (event_channel, mut event_rx) = EventChannel::new(); let (tx, mut rx) = mpsc::channel(100); - let mut event_rx = event_channel.get_rx(); tokio::spawn(async move { // TODO: Handle error better loop { tokio::select! { event = event_rx.recv() => { - if event.is_err() { + if event.is_none() { todo!("Handle errors with the event channel properly") } devices.handle_event(event.unwrap()).await; @@ -123,7 +138,7 @@ pub fn start(event_channel: &EventChannel, client: AsyncClient) -> DevicesHandle } }); - DevicesHandle { tx } + (DevicesHandle { tx }, event_channel) } impl Devices { @@ -165,12 +180,13 @@ impl Devices { self.devices.insert(device.get_id().to_owned(), device); } + #[instrument(skip(self))] async fn handle_event(&mut self, event: Event) { match event { Event::MqttMessage(message) => { - self.get::() - .iter_mut() - .for_each(|(id, listener)| { + let iter = self.get::().into_iter().map(|(id, listener)| { + let message = message.clone(); + async move { let subscribed = listener .topics() .iter() @@ -178,27 +194,49 @@ impl Devices { if subscribed { trace!(id, "Handling"); - listener.on_mqtt(&message).block_on(); + listener.on_mqtt(message).await; } - }) + } + }); + + join_all(iter).await; } Event::Darkness(dark) => { - self.get::() - .iter_mut() - .for_each(|(id, device)| { - trace!(id, "Handling"); - device.on_darkness(dark).block_on(); - }) + let iter = + self.get::() + .into_iter() + .map(|(id, device)| async move { + trace!(id, "Handling"); + device.on_darkness(dark).await; + }); + + join_all(iter).await; } Event::Presence(presence) => { - self.get::() - .iter_mut() - .for_each(|(id, device)| { - trace!(id, "Handling"); - device.on_presence(presence).block_on(); - }) + let iter = + self.get::() + .into_iter() + .map(|(id, device)| async move { + trace!(id, "Handling"); + device.on_presence(presence).await; + }); + + join_all(iter).await; + } + Event::Ntfy(notification) => { + let iter = self + .get::() + .into_iter() + .map(|(id, device)| { + let notification = notification.clone(); + async move { + trace!(id, "Handling"); + device.on_notification(notification).await; + } + }); + + join_all(iter).await; } - Event::Ntfy(_) => {} } } diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index 531077b..9784ce5 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -6,6 +6,7 @@ use tracing::{debug, error, trace, warn}; use crate::config::{self, CreateDevice, MqttDeviceConfig}; use crate::error::CreateDeviceError; +use crate::event::EventChannel; use crate::mqtt::{OnMqtt, RemoteAction, RemoteMessage}; use crate::presence::OnPresence; @@ -34,18 +35,20 @@ impl CreateDevice for AudioSetup { fn create( identifier: &str, config: Self::Config, - client: AsyncClient, + event_channel: &EventChannel, + client: &AsyncClient, presence_topic: &str, ) -> Result { trace!(id = identifier, "Setting up AudioSetup"); // Create the child devices let mixer_id = format!("{}.mixer", identifier); - let mixer = (*config.mixer).create(&mixer_id, client.clone(), presence_topic)?; + let mixer = (*config.mixer).create(&mixer_id, event_channel, client, presence_topic)?; let mixer = As::consume(mixer).ok_or(CreateDeviceError::OnOffExpected(mixer_id))?; let speakers_id = format!("{}.speakers", identifier); - let speakers = (*config.speakers).create(&speakers_id, client, presence_topic)?; + let speakers = + (*config.speakers).create(&speakers_id, event_channel, client, presence_topic)?; let speakers = As::consume(speakers).ok_or(CreateDeviceError::OnOffExpected(speakers_id))?; @@ -70,7 +73,7 @@ impl OnMqtt for AudioSetup { vec![&self.mqtt.topic] } - async fn on_mqtt(&mut self, message: &rumqttc::Publish) { + async fn on_mqtt(&mut self, message: rumqttc::Publish) { let action = match RemoteMessage::try_from(message) { Ok(message) => message.action(), Err(err) => { diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs index 7545270..2df5c17 100644 --- a/src/devices/contact_sensor.rs +++ b/src/devices/contact_sensor.rs @@ -9,8 +9,9 @@ use tracing::{debug, error, trace, warn}; use crate::{ config::{CreateDevice, MqttDeviceConfig}, error::{CreateDeviceError, MissingWildcard}, + event::EventChannel, mqtt::{ContactMessage, OnMqtt, PresenceMessage}, - presence::OnPresence, + presence::{self, OnPresence}, }; use super::Device; @@ -75,7 +76,8 @@ impl CreateDevice for ContactSensor { fn create( identifier: &str, config: Self::Config, - client: AsyncClient, + _event_channel: &EventChannel, + client: &AsyncClient, presence_topic: &str, ) -> Result { trace!(id = identifier, "Setting up ContactSensor"); @@ -89,8 +91,8 @@ impl CreateDevice for ContactSensor { identifier: identifier.to_owned(), mqtt: config.mqtt, presence, - client, - overall_presence: false, + client: client.clone(), + overall_presence: presence::DEFAULT, is_closed: true, handle: None, }) @@ -116,7 +118,7 @@ impl OnMqtt for ContactSensor { vec![&self.mqtt.topic] } - async fn on_mqtt(&mut self, message: &rumqttc::Publish) { + async fn on_mqtt(&mut self, message: rumqttc::Publish) { let is_closed = match ContactMessage::try_from(message) { Ok(state) => state.is_closed(), Err(err) => { diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index adc069a..5feb5a9 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -16,6 +16,7 @@ use tracing::{debug, error, trace, warn}; use crate::config::{CreateDevice, InfoConfig, MqttDeviceConfig}; use crate::devices::Device; use crate::error::CreateDeviceError; +use crate::event::EventChannel; use crate::mqtt::{OnMqtt, OnOffMessage}; use crate::presence::OnPresence; @@ -60,8 +61,9 @@ impl CreateDevice for IkeaOutlet { fn create( identifier: &str, config: Self::Config, - client: AsyncClient, - _presence_topic: &str, // Not a big fan of passing in the global config + _event_channel: &EventChannel, + client: &AsyncClient, + _presence_topic: &str, ) -> Result { trace!( id = identifier, @@ -76,7 +78,7 @@ impl CreateDevice for IkeaOutlet { mqtt: config.mqtt, outlet_type: config.outlet_type, timeout: config.timeout, - client, + client: client.clone(), last_known_state: false, handle: None, }) @@ -112,7 +114,7 @@ impl OnMqtt for IkeaOutlet { vec![&self.mqtt.topic] } - async fn on_mqtt(&mut self, message: &Publish) { + async fn on_mqtt(&mut self, message: Publish) { // Update the internal state based on what the device has reported let state = match OnOffMessage::try_from(message) { Ok(state) => state.state(), diff --git a/src/devices/kasa_outlet.rs b/src/devices/kasa_outlet.rs index 2bb4bf4..89034b2 100644 --- a/src/devices/kasa_outlet.rs +++ b/src/devices/kasa_outlet.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::trace; -use crate::{config::CreateDevice, error::CreateDeviceError}; +use crate::{config::CreateDevice, error::CreateDeviceError, event::EventChannel}; use super::Device; @@ -35,7 +35,8 @@ impl CreateDevice for KasaOutlet { fn create( identifier: &str, config: Self::Config, - _client: AsyncClient, + _event_channel: &EventChannel, + _client: &AsyncClient, _presence_topic: &str, ) -> Result { trace!(id = identifier, "Setting up KasaOutlet"); diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index 90ddda9..e934698 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -16,6 +16,7 @@ use tracing::{debug, error, trace}; use crate::{ config::{CreateDevice, InfoConfig, MqttDeviceConfig}, error::CreateDeviceError, + event::EventChannel, mqtt::{ActivateMessage, OnMqtt}, }; @@ -51,7 +52,8 @@ impl CreateDevice for WakeOnLAN { fn create( identifier: &str, config: Self::Config, - _client: AsyncClient, + _event_channel: &EventChannel, + _client: &AsyncClient, _presence_topic: &str, ) -> Result { trace!( @@ -83,7 +85,7 @@ impl OnMqtt for WakeOnLAN { vec![&self.mqtt.topic] } - async fn on_mqtt(&mut self, message: &Publish) { + async fn on_mqtt(&mut self, message: Publish) { let activate = match ActivateMessage::try_from(message) { Ok(message) => message.activate(), Err(err) => { diff --git a/src/event.rs b/src/event.rs index 37b24de..3db82bc 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,9 +1,9 @@ use rumqttc::Publish; -use tokio::sync::broadcast; +use tokio::sync::mpsc; use crate::ntfy; -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum Event { MqttMessage(Publish), Darkness(bool), @@ -11,29 +11,19 @@ pub enum Event { Ntfy(ntfy::Notification), } -pub type Sender = broadcast::Sender; -pub type Receiver = broadcast::Receiver; +pub type Sender = mpsc::Sender; +pub type Receiver = mpsc::Receiver; pub struct EventChannel(Sender); impl EventChannel { - pub fn new() -> Self { - let (tx, _) = broadcast::channel(100); + pub fn new() -> (Self, Receiver) { + let (tx, rx) = mpsc::channel(100); - Self(tx) - } - - pub fn get_rx(&self) -> Receiver { - self.0.subscribe() + (Self(tx), rx) } pub fn get_tx(&self) -> Sender { self.0.clone() } } - -impl Default for EventChannel { - fn default() -> Self { - Self::new() - } -} diff --git a/src/hue_bridge.rs b/src/hue_bridge.rs index 39e5a0a..983aeb7 100644 --- a/src/hue_bridge.rs +++ b/src/hue_bridge.rs @@ -1,10 +1,12 @@ use std::net::{Ipv4Addr, SocketAddr}; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tracing::{error, trace, warn}; -use crate::event::{Event, EventChannel}; +use crate::{devices::Device, light_sensor::OnDarkness, presence::OnPresence}; +#[derive(Debug)] pub enum Flag { Presence, Darkness, @@ -23,7 +25,8 @@ pub struct HueBridgeConfig { pub flags: FlagIDs, } -struct HueBridge { +#[derive(Debug)] +pub struct HueBridge { addr: SocketAddr, login: String, flag_ids: FlagIDs, @@ -35,14 +38,6 @@ struct FlagMessage { } impl HueBridge { - pub fn new(config: HueBridgeConfig) -> Self { - Self { - addr: (config.ip, 80).into(), - login: config.login, - flag_ids: config.flags, - } - } - pub async fn set_flag(&self, flag: Flag, value: bool) { let flag_id = match flag { Flag::Presence => self.flag_ids.presence, @@ -53,6 +48,8 @@ impl HueBridge { "http://{}/api/{}/sensors/{flag_id}/state", self.addr, self.login ); + + trace!(?flag, flag_id, value, "Sending request to change flag"); let res = reqwest::Client::new() .put(url) .json(&FlagMessage { flag: value }) @@ -73,25 +70,34 @@ impl HueBridge { } } -pub fn start(config: HueBridgeConfig, event_channel: &EventChannel) { - let hue_bridge = HueBridge::new(config); - - let mut rx = event_channel.get_rx(); - - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(Event::Presence(presence)) => { - trace!("Bridging presence to hue"); - hue_bridge.set_flag(Flag::Presence, presence).await; - } - Ok(Event::Darkness(dark)) => { - trace!("Bridging darkness to hue"); - hue_bridge.set_flag(Flag::Darkness, dark).await; - } - Ok(_) => {} - Err(_) => todo!("Handle errors with the event channel properly"), - } +impl HueBridge { + pub fn new(config: HueBridgeConfig) -> Self { + Self { + addr: (config.ip, 80).into(), + login: config.login, + flag_ids: config.flags, } - }); + } +} + +impl Device for HueBridge { + fn get_id(&self) -> &str { + "hue_bridge" + } +} + +#[async_trait] +impl OnPresence for HueBridge { + async fn on_presence(&mut self, presence: bool) { + trace!("Bridging presence to hue"); + self.set_flag(Flag::Presence, presence).await; + } +} + +#[async_trait] +impl OnDarkness for HueBridge { + async fn on_darkness(&mut self, dark: bool) { + trace!("Bridging darkness to hue"); + self.set_flag(Flag::Darkness, dark).await; + } } diff --git a/src/light_sensor.rs b/src/light_sensor.rs index a02e568..ffc6e4d 100644 --- a/src/light_sensor.rs +++ b/src/light_sensor.rs @@ -1,13 +1,13 @@ use async_trait::async_trait; -use rumqttc::{matches, AsyncClient}; +use rumqttc::Publish; use serde::Deserialize; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, trace, warn}; use crate::{ config::MqttDeviceConfig, - error::LightSensorError, - event::{Event, EventChannel}, - mqtt::BrightnessMessage, + devices::Device, + event::{self, Event, EventChannel}, + mqtt::{BrightnessMessage, OnMqtt}, }; #[async_trait] @@ -23,72 +23,74 @@ pub struct LightSensorConfig { pub max: isize, } -const DEFAULT: bool = false; +pub const DEFAULT: bool = false; -pub async fn start( - config: LightSensorConfig, - event_channel: &EventChannel, - client: AsyncClient, -) -> Result<(), LightSensorError> { - // Subscrive to the mqtt topic - client - .subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) - .await?; +#[derive(Debug)] +pub struct LightSensor { + tx: event::Sender, + mqtt: MqttDeviceConfig, + min: isize, + max: isize, + is_dark: bool, +} - // Create the channels - let mut rx = event_channel.get_rx(); - let tx = event_channel.get_tx(); +impl LightSensor { + pub fn new(config: LightSensorConfig, event_channel: &EventChannel) -> Self { + Self { + tx: event_channel.get_tx(), + mqtt: config.mqtt, + min: config.min, + max: config.max, + is_dark: DEFAULT, + } + } +} - // Setup default value, this is needed for hysteresis - let mut current_is_dark = DEFAULT; +impl Device for LightSensor { + fn get_id(&self) -> &str { + "light_sensor" + } +} - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(Event::MqttMessage(message)) => { - if !matches(&message.topic, &config.mqtt.topic) { - continue; - } +#[async_trait] +impl OnMqtt for LightSensor { + fn topics(&self) -> Vec<&str> { + vec![&self.mqtt.topic] + } - let illuminance = match BrightnessMessage::try_from(message) { - Ok(state) => state.illuminance(), - Err(err) => { - error!("Failed to parse message: {err}"); - continue; - } - }; + async fn on_mqtt(&mut self, message: Publish) { + let illuminance = match BrightnessMessage::try_from(message) { + Ok(state) => state.illuminance(), + Err(err) => { + warn!("Failed to parse message: {err}"); + return; + } + }; - debug!("Illuminance: {illuminance}"); - let is_dark = if illuminance <= config.min { - trace!("It is dark"); - true - } else if illuminance >= config.max { - trace!("It is light"); - false - } else { - trace!( - "In between min ({}) and max ({}) value, keeping current state: {}", - config.min, - config.max, - current_is_dark - ); - current_is_dark - }; + debug!("Illuminance: {illuminance}"); + let is_dark = if illuminance <= self.min { + trace!("It is dark"); + true + } else if illuminance >= self.max { + trace!("It is light"); + false + } else { + trace!( + "In between min ({}) and max ({}) value, keeping current state: {}", + self.min, + self.max, + self.is_dark + ); + self.is_dark + }; - if is_dark != current_is_dark { - debug!("Dark state has changed: {is_dark}"); - current_is_dark = is_dark; + if is_dark != self.is_dark { + debug!("Dark state has changed: {is_dark}"); + self.is_dark = is_dark; - if tx.send(Event::Darkness(is_dark)).is_err() { - warn!("There are no receivers on the event channel"); - } - } - } - Ok(_) => {} - Err(_) => todo!("Handle errors with the event channel properly"), + if self.tx.send(Event::Darkness(is_dark)).await.is_err() { + warn!("There are no receivers on the event channel"); } } - }); - - Ok(()) + } } diff --git a/src/main.rs b/src/main.rs index d4fad56..1922e91 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,10 +8,14 @@ use axum::{ use automation::{ auth::{OpenIDConfig, User}, config::Config, - debug_bridge, devices, + debug_bridge::DebugBridge, + devices, error::ApiError, - event::EventChannel, - hue_bridge, light_sensor, mqtt, ntfy, presence, + hue_bridge::HueBridge, + light_sensor::LightSensor, + mqtt, + ntfy::Ntfy, + presence::Presence, }; use dotenvy::dotenv; use futures::future::join_all; @@ -55,41 +59,55 @@ async fn app() -> anyhow::Result<()> { std::env::var("AUTOMATION_CONFIG").unwrap_or("./config/config.toml".to_owned()); let config = Config::parse_file(&config_filename)?; - let event_channel = EventChannel::new(); - // Create a mqtt client let (client, eventloop) = AsyncClient::new(config.mqtt.clone(), 10); - let presence_topic = config.presence.mqtt.topic.to_owned(); - presence::start(config.presence, &event_channel, client.clone()).await?; - light_sensor::start(config.light_sensor, &event_channel, client.clone()).await?; + // Setup the device handler + let (device_handler, event_channel) = devices::start(client.clone()); - // Start the ntfy service if it is configured - if let Some(config) = config.ntfy { - ntfy::start(config, &event_channel); + // Create all the devices specified in the config + let mut devices = config + .devices + .into_iter() + .map(|(identifier, device_config)| { + device_config.create( + &identifier, + &event_channel, + &client, + &config.presence.mqtt.topic, + ) + }) + .collect::, _>>()?; + + // Create and add the light sensor + { + let light_sensor = LightSensor::new(config.light_sensor, &event_channel); + devices.push(Box::new(light_sensor)); } - // Start the hue bridge if it is configured + // Create and add the presence system + { + let presence = Presence::new(config.presence, &event_channel); + devices.push(Box::new(presence)); + } + + // If configured, create and add the hue bridge if let Some(config) = config.hue_bridge { - hue_bridge::start(config, &event_channel); + let hue_bridge = HueBridge::new(config); + devices.push(Box::new(hue_bridge)); } // Start the debug bridge if it is configured if let Some(config) = config.debug_bridge { - debug_bridge::start(config, &event_channel, client.clone()); + let debug_bridge = DebugBridge::new(config, &client)?; + devices.push(Box::new(debug_bridge)); } - // Setup the device handler - let device_handler = devices::start(&event_channel, client.clone()); - - // Create all the devices specified in the config - let devices = config - .devices - .into_iter() - .map(|(identifier, device_config)| { - device_config.create(&identifier, client.clone(), &presence_topic) - }) - .collect::, _>>()?; + // Start the ntfy service if it is configured + if let Some(config) = config.ntfy { + let ntfy = Ntfy::new(config, &event_channel); + devices.push(Box::new(ntfy)); + } // Can even add some more devices here // devices.push(device) diff --git a/src/mqtt.rs b/src/mqtt.rs index aaf0db6..225c39e 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -14,7 +14,7 @@ use crate::event::{self, EventChannel}; #[impl_cast::device_trait] pub trait OnMqtt { fn topics(&self) -> Vec<&str>; - async fn on_mqtt(&mut self, message: &Publish); + async fn on_mqtt(&mut self, message: Publish); } #[derive(Debug, Error)] @@ -32,7 +32,7 @@ pub fn start(mut eventloop: EventLoop, event_channel: &EventChannel) { let notification = eventloop.poll().await; match notification { Ok(Event::Incoming(Incoming::Publish(p))) => { - tx.send(event::Event::MqttMessage(p)).ok(); + tx.send(event::Event::MqttMessage(p)).await.ok(); } Ok(..) => continue, Err(err) => { @@ -62,10 +62,10 @@ impl OnOffMessage { } } -impl TryFrom<&Publish> for OnOffMessage { +impl TryFrom for OnOffMessage { type Error = ParseError; - fn try_from(message: &Publish) -> Result { + fn try_from(message: Publish) -> Result { serde_json::from_slice(&message.payload) .or(Err(ParseError::InvalidPayload(message.payload.clone()))) } @@ -82,10 +82,10 @@ impl ActivateMessage { } } -impl TryFrom<&Publish> for ActivateMessage { +impl TryFrom for ActivateMessage { type Error = ParseError; - fn try_from(message: &Publish) -> Result { + fn try_from(message: Publish) -> Result { serde_json::from_slice(&message.payload) .or(Err(ParseError::InvalidPayload(message.payload.clone()))) } @@ -112,10 +112,10 @@ impl RemoteMessage { } } -impl TryFrom<&Publish> for RemoteMessage { +impl TryFrom for RemoteMessage { type Error = ParseError; - fn try_from(message: &Publish) -> Result { + fn try_from(message: Publish) -> Result { serde_json::from_slice(&message.payload) .or(Err(ParseError::InvalidPayload(message.payload.clone()))) } @@ -185,10 +185,10 @@ impl ContactMessage { } } -impl TryFrom<&Publish> for ContactMessage { +impl TryFrom for ContactMessage { type Error = ParseError; - fn try_from(message: &Publish) -> Result { + fn try_from(message: Publish) -> Result { serde_json::from_slice(&message.payload) .or(Err(ParseError::InvalidPayload(message.payload.clone()))) } @@ -218,10 +218,10 @@ impl DarknessMessage { } } -impl TryFrom<&Publish> for DarknessMessage { +impl TryFrom for DarknessMessage { type Error = ParseError; - fn try_from(message: &Publish) -> Result { + fn try_from(message: Publish) -> Result { serde_json::from_slice(&message.payload) .or(Err(ParseError::InvalidPayload(message.payload.clone()))) } diff --git a/src/ntfy.rs b/src/ntfy.rs index 87ff7c0..1942367 100644 --- a/src/ntfy.rs +++ b/src/ntfy.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use async_trait::async_trait; +use impl_cast::device_trait; use serde::Serialize; use serde_repr::*; use tokio::sync::mpsc; @@ -7,18 +9,28 @@ use tracing::{debug, error, warn}; use crate::{ config::NtfyConfig, - event::{Event, EventChannel}, + devices::Device, + event::{self, Event, EventChannel}, + presence::OnPresence, }; pub type Sender = mpsc::Sender; pub type Receiver = mpsc::Receiver; -struct Ntfy { - base_url: String, - topic: String, +#[async_trait] +#[device_trait] +pub trait OnNotification { + async fn on_notification(&mut self, notification: Notification); } -#[derive(Serialize_repr, Clone, Copy)] +#[derive(Debug)] +pub struct Ntfy { + base_url: String, + topic: String, + tx: event::Sender, +} + +#[derive(Debug, Serialize_repr, Clone, Copy)] #[repr(u8)] pub enum Priority { Min = 1, @@ -28,7 +40,7 @@ pub enum Priority { Max, } -#[derive(Serialize, Clone)] +#[derive(Debug, Serialize, Clone)] #[serde(rename_all = "snake_case", tag = "action")] pub enum ActionType { Broadcast { @@ -39,7 +51,7 @@ pub enum ActionType { // Http } -#[derive(Serialize, Clone)] +#[derive(Debug, Serialize, Clone)] pub struct Action { #[serde(flatten)] action: ActionType, @@ -54,7 +66,7 @@ struct NotificationFinal { inner: Notification, } -#[derive(Serialize, Clone)] +#[derive(Debug, Serialize, Clone)] pub struct Notification { #[serde(skip_serializing_if = "Option::is_none")] title: Option, @@ -119,10 +131,11 @@ impl Default for Notification { } impl Ntfy { - fn new(base_url: &str, topic: &str) -> Self { + pub fn new(config: NtfyConfig, event_channel: &EventChannel) -> Self { Self { - base_url: base_url.to_owned(), - topic: topic.to_owned(), + base_url: config.url, + topic: config.topic, + tx: event_channel.get_tx(), } } @@ -148,45 +161,45 @@ impl Ntfy { } } -pub fn start(config: NtfyConfig, event_channel: &EventChannel) { - let mut rx = event_channel.get_rx(); - let tx = event_channel.get_tx(); - - let ntfy = Ntfy::new(&config.url, &config.topic); - - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(Event::Presence(presence)) => { - // Setup extras for the broadcast - let extras = HashMap::from([ - ("cmd".into(), "presence".into()), - ("state".into(), if presence { "0" } else { "1" }.into()), - ]); - - // Create broadcast action - let action = Action { - action: ActionType::Broadcast { extras }, - label: if presence { "Set away" } else { "Set home" }.to_owned(), - clear: Some(true), - }; - - // Create the notification - let notification = Notification::new() - .set_title("Presence") - .set_message(if presence { "Home" } else { "Away" }) - .add_tag("house") - .add_action(action) - .set_priority(Priority::Low); - - if tx.send(Event::Ntfy(notification)).is_err() { - warn!("There are no receivers on the event channel"); - } - } - Ok(Event::Ntfy(notification)) => ntfy.send(notification).await, - Ok(_) => {} - Err(_) => todo!("Handle errors with the event channel properly"), - } - } - }); +impl Device for Ntfy { + fn get_id(&self) -> &str { + "ntfy" + } +} + +#[async_trait] +impl OnPresence for Ntfy { + async fn on_presence(&mut self, presence: bool) { + // Setup extras for the broadcast + let extras = HashMap::from([ + ("cmd".into(), "presence".into()), + ("state".into(), if presence { "0" } else { "1" }.into()), + ]); + + // Create broadcast action + let action = Action { + action: ActionType::Broadcast { extras }, + label: if presence { "Set away" } else { "Set home" }.to_owned(), + clear: Some(true), + }; + + // Create the notification + let notification = Notification::new() + .set_title("Presence") + .set_message(if presence { "Home" } else { "Away" }) + .add_tag("house") + .add_action(action) + .set_priority(Priority::Low); + + if self.tx.send(Event::Ntfy(notification)).await.is_err() { + warn!("There are no receivers on the event channel"); + } + } +} + +#[async_trait] +impl OnNotification for Ntfy { + async fn on_notification(&mut self, notification: Notification) { + self.send(notification).await; + } } diff --git a/src/presence.rs b/src/presence.rs index a988415..a53577a 100644 --- a/src/presence.rs +++ b/src/presence.rs @@ -1,18 +1,15 @@ use std::collections::HashMap; use async_trait::async_trait; -use rumqttc::{has_wildcards, matches, AsyncClient}; +use rumqttc::Publish; use serde::Deserialize; use tracing::{debug, warn}; use crate::{ config::MqttDeviceConfig, - error::{MissingWildcard, PresenceError}, - event::{ - Event::{self, MqttMessage}, - EventChannel, - }, - mqtt::PresenceMessage, + devices::Device, + event::{self, Event, EventChannel}, + mqtt::{OnMqtt, PresenceMessage}, }; #[async_trait] @@ -26,73 +23,78 @@ pub struct PresenceConfig { pub mqtt: MqttDeviceConfig, } -const DEFAULT: bool = false; +pub const DEFAULT: bool = false; -pub async fn start( - config: PresenceConfig, - event_channel: &EventChannel, - client: AsyncClient, -) -> Result<(), PresenceError> { - if !has_wildcards(&config.mqtt.topic) { - return Err(MissingWildcard::new(&config.mqtt.topic).into()); +#[derive(Debug)] +pub struct Presence { + tx: event::Sender, + mqtt: MqttDeviceConfig, + devices: HashMap, + current_overall_presence: bool, +} + +impl Presence { + pub fn new(config: PresenceConfig, event_channel: &EventChannel) -> Self { + Self { + tx: event_channel.get_tx(), + mqtt: config.mqtt, + devices: HashMap::new(), + current_overall_presence: DEFAULT, + } + } +} + +impl Device for Presence { + fn get_id(&self) -> &str { + "presence" + } +} + +#[async_trait] +impl OnMqtt for Presence { + fn topics(&self) -> Vec<&str> { + vec![&self.mqtt.topic] } - // Subscribe to the relevant topics on mqtt - client - .subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) - .await?; + async fn on_mqtt(&mut self, message: Publish) { + let offset = self + .mqtt + .topic + .find('+') + .or(self.mqtt.topic.find('#')) + .expect("Presence::create fails if it does not contain wildcards"); + let device_name = message.topic[offset..].to_owned(); - let mut rx = event_channel.get_rx(); - let tx = event_channel.get_tx(); - - let mut devices = HashMap::::new(); - let mut current_overall_presence = DEFAULT; - - tokio::spawn(async move { - loop { - // TODO: Handle errors, warn if lagging - if let Ok(MqttMessage(message)) = rx.recv().await { - if !matches(&message.topic, &config.mqtt.topic) { - continue; + if message.payload.is_empty() { + // Remove the device from the map + debug!("State of device [{device_name}] has been removed"); + self.devices.remove(&device_name); + } else { + let present = match PresenceMessage::try_from(message) { + Ok(state) => state.present(), + Err(err) => { + warn!("Failed to parse message: {err}"); + return; } + }; - let offset = config - .mqtt - .topic - .find('+') - .or(config.mqtt.topic.find('#')) - .expect("Presence::new fails if it does not contain wildcards"); - let device_name = message.topic[offset..].to_owned(); + debug!("State of device [{device_name}] has changed: {}", present); + self.devices.insert(device_name, present); + } - if message.payload.is_empty() { - // Remove the device from the map - debug!("State of device [{device_name}] has been removed"); - devices.remove(&device_name); - } else { - let present = match PresenceMessage::try_from(message) { - Ok(state) => state.present(), - Err(err) => { - warn!("Failed to parse message: {err}"); - continue; - } - }; + let overall_presence = self.devices.iter().any(|(_, v)| *v); + if overall_presence != self.current_overall_presence { + debug!("Overall presence updated: {overall_presence}"); + self.current_overall_presence = overall_presence; - debug!("State of device [{device_name}] has changed: {}", present); - devices.insert(device_name, present); - } - - let overall_presence = devices.iter().any(|(_, v)| *v); - if overall_presence != current_overall_presence { - debug!("Overall presence updated: {overall_presence}"); - current_overall_presence = overall_presence; - - if tx.send(Event::Presence(overall_presence)).is_err() { - warn!("There are no receivers on the event channel"); - } - } + if self + .tx + .send(Event::Presence(overall_presence)) + .await + .is_err() + { + warn!("There are no receivers on the event channel"); } } - }); - - Ok(()) + } }