From 2cf4e40ad54322e11defd8c4fb6f044c342ca47a Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Thu, 25 Jul 2024 23:15:23 +0200 Subject: [PATCH] Devices are now clonable --- google_home/google_home/src/traits.rs | 12 +-- src/devices/air_filter.rs | 63 +++++++------ src/devices/audio_setup.rs | 19 ++-- src/devices/contact_sensor.rs | 128 +++++++++++++++----------- src/devices/debug_bridge.rs | 12 +-- src/devices/hue_bridge.rs | 12 +-- src/devices/hue_group.rs | 40 +++----- src/devices/ikea_outlet.rs | 98 +++++++++++--------- src/devices/kasa_outlet.rs | 10 +- src/devices/light_sensor.rs | 46 ++++++--- src/devices/mod.rs | 24 ++--- src/devices/ntfy.rs | 14 +-- src/devices/presence.rs | 49 +++++++--- src/devices/wake_on_lan.rs | 12 +-- src/devices/washer.rs | 46 ++++++--- src/event.rs | 8 +- src/traits.rs | 4 +- 17 files changed, 338 insertions(+), 259 deletions(-) diff --git a/google_home/google_home/src/traits.rs b/google_home/google_home/src/traits.rs index d7f7a4e..73cd517 100644 --- a/google_home/google_home/src/traits.rs +++ b/google_home/google_home/src/traits.rs @@ -12,35 +12,35 @@ traits! { command_only_on_off: Option, query_only_on_off: Option, async fn on(&self) -> Result, - "action.devices.commands.OnOff" => async fn set_on(&mut self, on: bool) -> Result<(), ErrorCode>, + "action.devices.commands.OnOff" => async fn set_on(&self, on: bool) -> Result<(), ErrorCode>, }, "action.devices.traits.Scene" => trait Scene { scene_reversible: Option, - "action.devices.commands.ActivateScene" => async fn set_active(&mut self, deactivate: bool) -> Result<(), ErrorCode>, + "action.devices.commands.ActivateScene" => async fn set_active(&self, deactivate: bool) -> Result<(), ErrorCode>, }, "action.devices.traits.FanSpeed" => trait FanSpeed { reversible: Option, command_only_fan_speed: Option, available_fan_speeds: AvailableSpeeds, - fn current_fan_speed_setting(&self) -> Result, + async fn current_fan_speed_setting(&self) -> Result, // TODO: Figure out some syntax for optional command? // Probably better to just force the user to always implement commands? - "action.devices.commands.SetFanSpeed" => async fn set_fan_speed(&mut self, fan_speed: String) -> Result<(), ErrorCode>, + "action.devices.commands.SetFanSpeed" => async fn set_fan_speed(&self, fan_speed: String) -> Result<(), ErrorCode>, }, "action.devices.traits.HumiditySetting" => trait HumiditySetting { query_only_humidity_setting: Option, - fn humidity_ambient_percent(&self) -> Result, + async fn humidity_ambient_percent(&self) -> Result, }, "action.devices.traits.TemperatureControl" => trait TemperatureSetting { query_only_temperature_control: Option, // TODO: Add rename temperatureUnitForUX: TemperatureUnit, - fn temperature_ambient_celsius(&self) -> f32, + async fn temperature_ambient_celsius(&self) -> f32, } } diff --git a/src/devices/air_filter.rs b/src/devices/air_filter.rs index 49747c2..d5dc936 100644 --- a/src/devices/air_filter.rs +++ b/src/devices/air_filter.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use async_trait::async_trait; use automation_macro::LuaDeviceConfig; use google_home::device::Name; @@ -8,6 +10,7 @@ use google_home::traits::{ }; use google_home::types::Type; use rumqttc::Publish; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::{debug, error, trace, warn}; use super::LuaDeviceCreate; @@ -18,7 +21,7 @@ use crate::messages::{AirFilterFanState, AirFilterState, SetAirFilterFanState}; use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct AirFilterConfig { +pub struct Config { #[device_config(flatten)] pub info: InfoConfig, #[device_config(flatten)] @@ -27,11 +30,10 @@ pub struct AirFilterConfig { pub client: WrappedAsyncClient, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AirFilter { - config: AirFilterConfig, - - last_known_state: AirFilterState, + config: Config, + state: Arc>, } impl AirFilter { @@ -52,11 +54,19 @@ impl AirFilter { .map_err(|err| warn!("Failed to update state on {topic}: {err}")) .ok(); } + + async fn state(&self) -> RwLockReadGuard { + self.state.read().await + } + + async fn state_mut(&self) -> RwLockWriteGuard { + self.state.write().await + } } #[async_trait] impl LuaDeviceCreate for AirFilter { - type Config = AirFilterConfig; + type Config = Config; type Error = rumqttc::ClientError; async fn create(config: Self::Config) -> Result { @@ -67,14 +77,14 @@ impl LuaDeviceCreate for AirFilter { .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .await?; - Ok(Self { - config, - last_known_state: AirFilterState { - state: AirFilterFanState::Off, - humidity: 0.0, - temperature: 0.0, - }, - }) + let state = AirFilterState { + state: AirFilterFanState::Off, + humidity: 0.0, + temperature: 0.0, + }; + let state = Arc::new(RwLock::new(state)); + + Ok(Self { config, state }) } } @@ -86,7 +96,7 @@ impl Device for AirFilter { #[async_trait] impl OnMqtt for AirFilter { - async fn on_mqtt(&mut self, message: Publish) { + async fn on_mqtt(&self, message: Publish) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { return; } @@ -99,13 +109,13 @@ impl OnMqtt for AirFilter { } }; - if state == self.last_known_state { + if state == *self.state().await { return; } debug!(id = Device::get_id(self), "Updating state to {state:?}"); - self.last_known_state = state; + *self.state_mut().await = state; } } @@ -138,10 +148,10 @@ impl google_home::Device for AirFilter { #[async_trait] impl OnOff for AirFilter { async fn on(&self) -> Result { - Ok(self.last_known_state.state != AirFilterFanState::Off) + Ok(self.state().await.state != AirFilterFanState::Off) } - async fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { + async fn set_on(&self, on: bool) -> Result<(), ErrorCode> { debug!("Turning on air filter: {on}"); if on { @@ -192,8 +202,8 @@ impl FanSpeed for AirFilter { } } - fn current_fan_speed_setting(&self) -> Result { - let speed = match self.last_known_state.state { + async fn current_fan_speed_setting(&self) -> Result { + let speed = match self.state().await.state { AirFilterFanState::Off => "off", AirFilterFanState::Low => "low", AirFilterFanState::Medium => "medium", @@ -203,7 +213,7 @@ impl FanSpeed for AirFilter { Ok(speed.into()) } - async fn set_fan_speed(&mut self, fan_speed: String) -> Result<(), ErrorCode> { + async fn set_fan_speed(&self, fan_speed: String) -> Result<(), ErrorCode> { let fan_speed = fan_speed.as_str(); let state = if fan_speed == "off" { AirFilterFanState::Off @@ -229,11 +239,12 @@ impl HumiditySetting for AirFilter { Some(true) } - fn humidity_ambient_percent(&self) -> Result { - Ok(self.last_known_state.humidity.round() as isize) + async fn humidity_ambient_percent(&self) -> Result { + Ok(self.state().await.humidity.round() as isize) } } +#[async_trait] impl TemperatureSetting for AirFilter { fn query_only_temperature_control(&self) -> Option { Some(true) @@ -244,8 +255,8 @@ impl TemperatureSetting for AirFilter { TemperatureUnit::Celsius } - fn temperature_ambient_celsius(&self) -> f32 { + async fn temperature_ambient_celsius(&self) -> f32 { // HACK: Round to one decimal place - (10.0 * self.last_known_state.temperature).round() / 10.0 + (10.0 * self.state().await.temperature).round() / 10.0 } } diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index 5ce5de1..8a01359 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -12,7 +12,7 @@ use crate::messages::{RemoteAction, RemoteMessage}; use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct AudioSetupConfig { +pub struct Config { pub identifier: String, #[device_config(flatten)] pub mqtt: MqttDeviceConfig, @@ -24,14 +24,14 @@ pub struct AudioSetupConfig { pub client: WrappedAsyncClient, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AudioSetup { - config: AudioSetupConfig, + config: Config, } #[async_trait] impl LuaDeviceCreate for AudioSetup { - type Config = AudioSetupConfig; + type Config = Config; type Error = DeviceConfigError; async fn create(config: Self::Config) -> Result { @@ -68,7 +68,7 @@ impl Device for AudioSetup { #[async_trait] impl OnMqtt for AudioSetup { - async fn on_mqtt(&mut self, message: rumqttc::Publish) { + async fn on_mqtt(&self, message: rumqttc::Publish) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { return; } @@ -76,10 +76,7 @@ impl OnMqtt for AudioSetup { let action = match RemoteMessage::try_from(message) { Ok(message) => message.action(), Err(err) => { - error!( - id = self.config.identifier, - "Failed to parse message: {err}" - ); + error!(id = self.get_id(), "Failed to parse message: {err}"); return; } }; @@ -118,7 +115,7 @@ impl OnMqtt for AudioSetup { #[async_trait] impl OnPresence for AudioSetup { - async fn on_presence(&mut self, presence: bool) { + async fn on_presence(&self, presence: bool) { let mut mixer = self.config.mixer.write().await; let mut speakers = self.config.speakers.write().await; @@ -128,7 +125,7 @@ impl OnPresence for AudioSetup { ) { // Turn off the audio setup when we leave the house if !presence { - debug!(id = self.config.identifier, "Turning devices off"); + debug!(id = self.get_id(), "Turning devices off"); speakers.set_on(false).await.unwrap(); mixer.set_on(false).await.unwrap(); } diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs index 6cf28b4..0daa13a 100644 --- a/src/devices/contact_sensor.rs +++ b/src/devices/contact_sensor.rs @@ -1,9 +1,10 @@ +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use automation_macro::LuaDeviceConfig; use google_home::traits::OnOff; -use mlua::FromLua; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::JoinHandle; use tracing::{debug, error, trace, warn}; @@ -26,31 +27,16 @@ pub struct PresenceDeviceConfig { pub timeout: Duration, } -#[derive(Debug, Clone)] -struct TriggerDevicesHelper(Vec); - -impl<'lua> FromLua<'lua> for TriggerDevicesHelper { - fn from_lua(value: mlua::Value<'lua>, lua: &'lua mlua::Lua) -> mlua::Result { - Ok(TriggerDevicesHelper(mlua::FromLua::from_lua(value, lua)?)) - } -} - -impl From for Vec<(WrappedDevice, bool)> { - fn from(value: TriggerDevicesHelper) -> Self { - value.0.into_iter().map(|device| (device, false)).collect() - } -} - #[derive(Debug, Clone, LuaDeviceConfig)] pub struct TriggerConfig { - #[device_config(from_lua, from(TriggerDevicesHelper))] - pub devices: Vec<(WrappedDevice, bool)>, + #[device_config(from_lua)] + pub devices: Vec, #[device_config(default, with(|t: Option<_>| t.map(Duration::from_secs)))] pub timeout: Option, } #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct ContactSensorConfig { +pub struct Config { pub identifier: String, #[device_config(flatten)] pub mqtt: MqttDeviceConfig, @@ -63,25 +49,41 @@ pub struct ContactSensorConfig { } #[derive(Debug)] -pub struct ContactSensor { - config: ContactSensorConfig, - +struct State { overall_presence: bool, is_closed: bool, + previous: Vec, handle: Option>, } +#[derive(Debug, Clone)] +pub struct ContactSensor { + config: Config, + state: Arc>, +} + +impl ContactSensor { + async fn state(&self) -> RwLockReadGuard { + self.state.read().await + } + + async fn state_mut(&self) -> RwLockWriteGuard { + self.state.write().await + } +} + #[async_trait] impl LuaDeviceCreate for ContactSensor { - type Config = ContactSensorConfig; + type Config = Config; type Error = DeviceConfigError; async fn create(config: Self::Config) -> Result { trace!(id = config.identifier, "Setting up ContactSensor"); + let mut previous = Vec::new(); // Make sure the devices implement the required traits if let Some(trigger) = &config.trigger { - for (device, _) in &trigger.devices { + for device in &trigger.devices { { let device = device.read().await; let id = device.get_id().to_owned(); @@ -96,6 +98,7 @@ impl LuaDeviceCreate for ContactSensor { } } } + previous.resize(trigger.devices.len(), false); } config @@ -103,12 +106,15 @@ impl LuaDeviceCreate for ContactSensor { .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .await?; - Ok(Self { - config: config.clone(), + let state = State { overall_presence: DEFAULT_PRESENCE, is_closed: true, + previous, handle: None, - }) + }; + let state = Arc::new(RwLock::new(state)); + + Ok(Self { config, state }) } } @@ -120,14 +126,14 @@ impl Device for ContactSensor { #[async_trait] impl OnPresence for ContactSensor { - async fn on_presence(&mut self, presence: bool) { - self.overall_presence = presence; + async fn on_presence(&self, presence: bool) { + self.state_mut().await.overall_presence = presence; } } #[async_trait] impl OnMqtt for ContactSensor { - async fn on_mqtt(&mut self, message: rumqttc::Publish) { + async fn on_mqtt(&self, message: rumqttc::Publish) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { return; } @@ -135,24 +141,25 @@ impl OnMqtt for ContactSensor { let is_closed = match ContactMessage::try_from(message) { Ok(state) => state.is_closed(), Err(err) => { - error!( - id = self.config.identifier, - "Failed to parse message: {err}" - ); + error!(id = self.get_id(), "Failed to parse message: {err}"); return; } }; - if is_closed == self.is_closed { + if is_closed == self.state().await.is_closed { return; } - debug!(id = self.config.identifier, "Updating state to {is_closed}"); - self.is_closed = is_closed; + debug!(id = self.get_id(), "Updating state to {is_closed}"); + self.state_mut().await.is_closed = is_closed; - if let Some(trigger) = &mut self.config.trigger { - if !self.is_closed { - for (light, previous) in &mut trigger.devices { + if let Some(trigger) = &self.config.trigger { + if !is_closed { + for (light, previous) in trigger + .devices + .iter() + .zip(self.state_mut().await.previous.iter_mut()) + { let mut light = light.write().await; if let Some(light) = light.as_mut().cast_mut() as Option<&mut dyn OnOff> { *previous = light.on().await.unwrap(); @@ -160,7 +167,11 @@ impl OnMqtt for ContactSensor { } } } else { - for (light, previous) in &trigger.devices { + for (light, previous) in trigger + .devices + .iter() + .zip(self.state_mut().await.previous.iter()) + { let mut light = light.write().await; if !previous { // If the timeout is zero just turn the light off directly @@ -183,20 +194,20 @@ impl OnMqtt for ContactSensor { // Check if this contact sensor works as a presence device // If not we are done here let presence = match &self.config.presence { - Some(presence) => presence, + Some(presence) => presence.clone(), None => return, }; if !is_closed { // Activate presence and stop any timeout once we open the door - if let Some(handle) = self.handle.take() { + if let Some(handle) = self.state_mut().await.handle.take() { handle.abort(); } // Only use the door as an presence sensor if there the current presence is set false // This is to prevent the house from being marked as present for however long the // timeout is set when leaving the house - if !self.overall_presence { + if !self.state().await.overall_presence { self.config .client .publish( @@ -216,18 +227,25 @@ impl OnMqtt for ContactSensor { } } else { // Once the door is closed again we start a timeout for removing the presence - let client = self.config.client.clone(); - let id = self.config.identifier.clone(); - let timeout = presence.timeout; - let topic = presence.mqtt.topic.clone(); - self.handle = Some(tokio::spawn(async move { - debug!(id, "Starting timeout ({timeout:?}) for contact sensor..."); - tokio::time::sleep(timeout).await; - debug!(id, "Removing door device!"); - client - .publish(&topic, rumqttc::QoS::AtLeastOnce, false, "") + let device = self.clone(); + self.state_mut().await.handle = Some(tokio::spawn(async move { + debug!( + id = device.get_id(), + "Starting timeout ({:?}) for contact sensor...", presence.timeout + ); + tokio::time::sleep(presence.timeout).await; + debug!(id = device.get_id(), "Removing door device!"); + device + .config + .client + .publish(&presence.mqtt.topic, rumqttc::QoS::AtLeastOnce, false, "") .await - .map_err(|err| warn!("Failed to publish presence on {topic}: {err}")) + .map_err(|err| { + warn!( + "Failed to publish presence on {}: {err}", + presence.mqtt.topic + ) + }) .ok(); })); } diff --git a/src/devices/debug_bridge.rs b/src/devices/debug_bridge.rs index 88b4677..ab45f2b 100644 --- a/src/devices/debug_bridge.rs +++ b/src/devices/debug_bridge.rs @@ -12,7 +12,7 @@ use crate::messages::{DarknessMessage, PresenceMessage}; use crate::mqtt::WrappedAsyncClient; #[derive(Debug, LuaDeviceConfig, Clone)] -pub struct DebugBridgeConfig { +pub struct Config { pub identifier: String, #[device_config(flatten)] pub mqtt: MqttDeviceConfig, @@ -20,14 +20,14 @@ pub struct DebugBridgeConfig { pub client: WrappedAsyncClient, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DebugBridge { - config: DebugBridgeConfig, + config: Config, } #[async_trait] impl LuaDeviceCreate for DebugBridge { - type Config = DebugBridgeConfig; + type Config = Config; type Error = Infallible; async fn create(config: Self::Config) -> Result { @@ -44,7 +44,7 @@ impl Device for DebugBridge { #[async_trait] impl OnPresence for DebugBridge { - async fn on_presence(&mut self, presence: bool) { + async fn on_presence(&self, presence: bool) { let message = PresenceMessage::new(presence); let topic = format!("{}/presence", self.config.mqtt.topic); self.config @@ -68,7 +68,7 @@ impl OnPresence for DebugBridge { #[async_trait] impl OnDarkness for DebugBridge { - async fn on_darkness(&mut self, dark: bool) { + async fn on_darkness(&self, dark: bool) { let message = DarknessMessage::new(dark); let topic = format!("{}/darkness", self.config.mqtt.topic); self.config diff --git a/src/devices/hue_bridge.rs b/src/devices/hue_bridge.rs index 7e8fba8..d1829ac 100644 --- a/src/devices/hue_bridge.rs +++ b/src/devices/hue_bridge.rs @@ -23,7 +23,7 @@ pub struct FlagIDs { } #[derive(Debug, LuaDeviceConfig, Clone)] -pub struct HueBridgeConfig { +pub struct Config { pub identifier: String, #[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 80)))] pub addr: SocketAddr, @@ -31,9 +31,9 @@ pub struct HueBridgeConfig { pub flags: FlagIDs, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HueBridge { - config: HueBridgeConfig, + config: Config, } #[derive(Debug, Serialize)] @@ -43,7 +43,7 @@ struct FlagMessage { #[async_trait] impl LuaDeviceCreate for HueBridge { - type Config = HueBridgeConfig; + type Config = Config; type Error = Infallible; async fn create(config: Self::Config) -> Result { @@ -93,7 +93,7 @@ impl Device for HueBridge { #[async_trait] impl OnPresence for HueBridge { - async fn on_presence(&mut self, presence: bool) { + async fn on_presence(&self, presence: bool) { trace!("Bridging presence to hue"); self.set_flag(Flag::Presence, presence).await; } @@ -101,7 +101,7 @@ impl OnPresence for HueBridge { #[async_trait] impl OnDarkness for HueBridge { - async fn on_darkness(&mut self, dark: bool) { + async fn on_darkness(&self, dark: bool) { trace!("Bridging darkness to hue"); self.set_flag(Flag::Darkness, dark).await; } diff --git a/src/devices/hue_group.rs b/src/devices/hue_group.rs index ba23da6..d98b3cb 100644 --- a/src/devices/hue_group.rs +++ b/src/devices/hue_group.rs @@ -17,7 +17,7 @@ use crate::mqtt::WrappedAsyncClient; use crate::traits::Timeout; #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct HueGroupConfig { +pub struct Config { pub identifier: String, #[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 80)))] pub addr: SocketAddr, @@ -31,15 +31,15 @@ pub struct HueGroupConfig { pub client: WrappedAsyncClient, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HueGroup { - config: HueGroupConfig, + config: Config, } // Couple of helper function to get the correct urls #[async_trait] impl LuaDeviceCreate for HueGroup { - type Config = HueGroupConfig; + type Config = Config; type Error = rumqttc::ClientError; async fn create(config: Self::Config) -> Result { @@ -85,7 +85,7 @@ impl Device for HueGroup { #[async_trait] impl OnMqtt for HueGroup { - async fn on_mqtt(&mut self, message: Publish) { + async fn on_mqtt(&self, message: Publish) { if !self .config .remotes @@ -98,10 +98,7 @@ impl OnMqtt for HueGroup { let action = match RemoteMessage::try_from(message) { Ok(message) => message.action(), Err(err) => { - error!( - id = self.config.identifier, - "Failed to parse message: {err}" - ); + error!(id = self.get_id(), "Failed to parse message: {err}"); return; } }; @@ -120,7 +117,7 @@ impl OnMqtt for HueGroup { #[async_trait] impl OnOff for HueGroup { - async fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { + async fn set_on(&self, on: bool) -> Result<(), ErrorCode> { // Abort any timer that is currently running self.stop_timeout().await.unwrap(); @@ -140,13 +137,10 @@ impl OnOff for HueGroup { Ok(res) => { let status = res.status(); if !status.is_success() { - warn!( - id = self.config.identifier, - "Status code is not success: {status}" - ); + warn!(id = self.get_id(), "Status code is not success: {status}"); } } - Err(err) => error!(id = self.config.identifier, "Error: {err}"), + Err(err) => error!(id = self.get_id(), "Error: {err}"), } Ok(()) @@ -162,19 +156,13 @@ impl OnOff for HueGroup { Ok(res) => { let status = res.status(); if !status.is_success() { - warn!( - id = self.config.identifier, - "Status code is not success: {status}" - ); + warn!(id = self.get_id(), "Status code is not success: {status}"); } let on = match res.json::().await { Ok(info) => info.any_on(), Err(err) => { - error!( - id = self.config.identifier, - "Failed to parse message: {err}" - ); + error!(id = self.get_id(), "Failed to parse message: {err}"); // TODO: Error code return Ok(false); } @@ -182,7 +170,7 @@ impl OnOff for HueGroup { return Ok(on); } - Err(err) => error!(id = self.config.identifier, "Error: {err}"), + Err(err) => error!(id = self.get_id(), "Error: {err}"), } Ok(false) @@ -191,7 +179,7 @@ impl OnOff for HueGroup { #[async_trait] impl Timeout for HueGroup { - async fn start_timeout(&mut self, timeout: Duration) -> Result<()> { + async fn start_timeout(&self, timeout: Duration) -> Result<()> { // Abort any timer that is currently running self.stop_timeout().await?; @@ -214,7 +202,7 @@ impl Timeout for HueGroup { Ok(()) } - async fn stop_timeout(&mut self) -> Result<()> { + async fn stop_timeout(&self) -> Result<()> { let message = message::Timeout::new(None); let res = reqwest::Client::new() .put(self.url_set_schedule()) diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index eb95b2b..862730f 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use anyhow::Result; @@ -9,6 +10,7 @@ use google_home::traits::{self, OnOff}; use google_home::types::Type; use rumqttc::{matches, Publish, SubscribeFilter}; use serde::Deserialize; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::JoinHandle; use tracing::{debug, error, trace, warn}; @@ -29,7 +31,7 @@ pub enum OutletType { } #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct IkeaOutletConfig { +pub struct Config { #[device_config(flatten)] pub info: InfoConfig, #[device_config(flatten)] @@ -46,33 +48,31 @@ pub struct IkeaOutletConfig { } #[derive(Debug)] -pub struct IkeaOutlet { - config: IkeaOutletConfig, - +pub struct State { last_known_state: bool, handle: Option>, } -async fn set_on(client: WrappedAsyncClient, topic: &str, on: bool) { - let message = OnOffMessage::new(on); +#[derive(Debug, Clone)] +pub struct IkeaOutlet { + config: Config, - let topic = format!("{}/set", topic); - // TODO: Handle potential errors here - client - .publish( - &topic, - rumqttc::QoS::AtLeastOnce, - false, - serde_json::to_string(&message).unwrap(), - ) - .await - .map_err(|err| warn!("Failed to update state on {topic}: {err}")) - .ok(); + state: Arc>, +} + +impl IkeaOutlet { + async fn state(&self) -> RwLockReadGuard { + self.state.read().await + } + + async fn state_mut(&self) -> RwLockWriteGuard { + self.state.write().await + } } #[async_trait] impl LuaDeviceCreate for IkeaOutlet { - type Config = IkeaOutletConfig; + type Config = Config; type Error = rumqttc::ClientError; async fn create(config: Self::Config) -> Result { @@ -93,11 +93,13 @@ impl LuaDeviceCreate for IkeaOutlet { .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .await?; - Ok(Self { - config, + let state = State { last_known_state: false, handle: None, - }) + }; + let state = Arc::new(RwLock::new(state)); + + Ok(Self { config, state }) } } @@ -109,7 +111,7 @@ impl Device for IkeaOutlet { #[async_trait] impl OnMqtt for IkeaOutlet { - async fn on_mqtt(&mut self, message: Publish) { + async fn on_mqtt(&self, message: Publish) { // Check if the message is from the deviec itself or from a remote if matches(&message.topic, &self.config.mqtt.topic) { // Update the internal state based on what the device has reported @@ -122,7 +124,7 @@ impl OnMqtt for IkeaOutlet { }; // No need to do anything if the state has not changed - if state == self.last_known_state { + if state == self.state().await.last_known_state { return; } @@ -130,7 +132,7 @@ impl OnMqtt for IkeaOutlet { self.stop_timeout().await.unwrap(); debug!(id = Device::get_id(self), "Updating state to {state}"); - self.last_known_state = state; + self.state_mut().await.last_known_state = state; // If this is a kettle start a timeout for turning it of again if state && let Some(timeout) = self.config.timeout { @@ -162,7 +164,7 @@ impl OnMqtt for IkeaOutlet { #[async_trait] impl OnPresence for IkeaOutlet { - async fn on_presence(&mut self, presence: bool) { + async fn on_presence(&self, presence: bool) { // Turn off the outlet when we leave the house (Not if it is a battery charger) if !presence && self.config.outlet_type != OutletType::Charger { debug!(id = Device::get_id(self), "Turning device off"); @@ -206,11 +208,25 @@ impl google_home::Device for IkeaOutlet { #[async_trait] impl traits::OnOff for IkeaOutlet { async fn on(&self) -> Result { - Ok(self.last_known_state) + Ok(self.state().await.last_known_state) } - async fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { - set_on(self.config.client.clone(), &self.config.mqtt.topic, on).await; + async fn set_on(&self, on: bool) -> Result<(), ErrorCode> { + let message = OnOffMessage::new(on); + + let topic = format!("{}/set", self.config.mqtt.topic); + // TODO: Handle potential errors here + self.config + .client + .publish( + &topic, + rumqttc::QoS::AtLeastOnce, + false, + serde_json::to_string(&message).unwrap(), + ) + .await + .map_err(|err| warn!("Failed to update state on {topic}: {err}")) + .ok(); Ok(()) } @@ -218,31 +234,23 @@ impl traits::OnOff for IkeaOutlet { #[async_trait] impl crate::traits::Timeout for IkeaOutlet { - async fn start_timeout(&mut self, timeout: Duration) -> Result<()> { + async fn start_timeout(&self, timeout: Duration) -> Result<()> { // Abort any timer that is currently running self.stop_timeout().await?; - // Turn the kettle of after the specified timeout - // TODO: Impl Drop for IkeaOutlet that will abort the handle if the IkeaOutlet - // get dropped - let client = self.config.client.clone(); - let topic = self.config.mqtt.topic.clone(); - let id = Device::get_id(self).clone(); - self.handle = Some(tokio::spawn(async move { - debug!(id, "Starting timeout ({timeout:?})..."); + let device = self.clone(); + self.state_mut().await.handle = Some(tokio::spawn(async move { + debug!(id = device.get_id(), "Starting timeout ({timeout:?})..."); tokio::time::sleep(timeout).await; - debug!(id, "Turning outlet off!"); - // TODO: Idealy we would call self.set_on(false), however since we want to do - // it after a timeout we have to put it in a separate task. - // I don't think we can really get around calling outside function - set_on(client, &topic, false).await; + debug!(id = device.get_id(), "Turning outlet off!"); + device.set_on(false).await.unwrap(); })); Ok(()) } - async fn stop_timeout(&mut self) -> Result<()> { - if let Some(handle) = self.handle.take() { + async fn stop_timeout(&self) -> Result<()> { + if let Some(handle) = self.state_mut().await.handle.take() { handle.abort(); } diff --git a/src/devices/kasa_outlet.rs b/src/devices/kasa_outlet.rs index 222af32..f510b7c 100644 --- a/src/devices/kasa_outlet.rs +++ b/src/devices/kasa_outlet.rs @@ -16,20 +16,20 @@ use tracing::trace; use super::{Device, LuaDeviceCreate}; #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct KasaOutletConfig { +pub struct Config { pub identifier: String, #[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 9999)))] pub addr: SocketAddr, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct KasaOutlet { - config: KasaOutletConfig, + config: Config, } #[async_trait] impl LuaDeviceCreate for KasaOutlet { - type Config = KasaOutletConfig; + type Config = Config; type Error = Infallible; async fn create(config: Self::Config) -> Result { @@ -241,7 +241,7 @@ impl traits::OnOff for KasaOutlet { .or(Err(DeviceError::TransientError.into())) } - async fn set_on(&mut self, on: bool) -> Result<(), errors::ErrorCode> { + async fn set_on(&self, on: bool) -> Result<(), errors::ErrorCode> { let mut stream = TcpStream::connect(self.config.addr) .await .or::(Err(DeviceError::DeviceOffline))?; diff --git a/src/devices/light_sensor.rs b/src/devices/light_sensor.rs index 99bfe3d..6c12941 100644 --- a/src/devices/light_sensor.rs +++ b/src/devices/light_sensor.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + use async_trait::async_trait; use automation_macro::LuaDeviceConfig; use rumqttc::Publish; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::{debug, trace, warn}; use super::LuaDeviceCreate; @@ -11,7 +14,7 @@ use crate::messages::BrightnessMessage; use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct LightSensorConfig { +pub struct Config { pub identifier: String, #[device_config(flatten)] pub mqtt: MqttDeviceConfig, @@ -26,15 +29,29 @@ pub struct LightSensorConfig { const DEFAULT: bool = false; #[derive(Debug)] -pub struct LightSensor { - config: LightSensorConfig, - +pub struct State { is_dark: bool, } +#[derive(Debug, Clone)] +pub struct LightSensor { + config: Config, + state: Arc>, +} + +impl LightSensor { + async fn state(&self) -> RwLockReadGuard { + self.state.read().await + } + + async fn state_mut(&self) -> RwLockWriteGuard { + self.state.write().await + } +} + #[async_trait] impl LuaDeviceCreate for LightSensor { - type Config = LightSensorConfig; + type Config = Config; type Error = rumqttc::ClientError; async fn create(config: Self::Config) -> Result { @@ -45,10 +62,10 @@ impl LuaDeviceCreate for LightSensor { .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .await?; - Ok(Self { - config, - is_dark: DEFAULT, - }) + let state = State { is_dark: DEFAULT }; + let state = Arc::new(RwLock::new(state)); + + Ok(Self { config, state }) } } @@ -60,7 +77,7 @@ impl Device for LightSensor { #[async_trait] impl OnMqtt for LightSensor { - async fn on_mqtt(&mut self, message: Publish) { + async fn on_mqtt(&self, message: Publish) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { return; } @@ -81,18 +98,19 @@ impl OnMqtt for LightSensor { trace!("It is light"); false } else { + let is_dark = self.state().await.is_dark; trace!( "In between min ({}) and max ({}) value, keeping current state: {}", self.config.min, self.config.max, - self.is_dark + is_dark ); - self.is_dark + is_dark }; - if is_dark != self.is_dark { + if is_dark != self.state().await.is_dark { debug!("Dark state has changed: {is_dark}"); - self.is_dark = is_dark; + self.state_mut().await.is_dark = is_dark; if self.config.tx.send(Event::Darkness(is_dark)).await.is_err() { warn!("There are no receivers on the event channel"); diff --git a/src/devices/mod.rs b/src/devices/mod.rs index 81f2243..c7a794f 100644 --- a/src/devices/mod.rs +++ b/src/devices/mod.rs @@ -18,19 +18,19 @@ use async_trait::async_trait; use automation_cast::Cast; use google_home::traits::OnOff; -pub use self::air_filter::*; -pub use self::audio_setup::*; -pub use self::contact_sensor::*; -pub use self::debug_bridge::*; -pub use self::hue_bridge::*; -pub use self::hue_group::*; -pub use self::ikea_outlet::*; -pub use self::kasa_outlet::*; -pub use self::light_sensor::*; +pub use self::air_filter::AirFilter; +pub use self::audio_setup::AudioSetup; +pub use self::contact_sensor::ContactSensor; +pub use self::debug_bridge::DebugBridge; +pub use self::hue_bridge::HueBridge; +pub use self::hue_group::HueGroup; +pub use self::ikea_outlet::IkeaOutlet; +pub use self::kasa_outlet::KasaOutlet; +pub use self::light_sensor::LightSensor; pub use self::ntfy::{Notification, Ntfy}; -pub use self::presence::{Presence, PresenceConfig, DEFAULT_PRESENCE}; -pub use self::wake_on_lan::*; -pub use self::washer::*; +pub use self::presence::{Presence, DEFAULT_PRESENCE}; +pub use self::wake_on_lan::WakeOnLAN; +pub use self::washer::Washer; use crate::event::{OnDarkness, OnMqtt, OnNotification, OnPresence}; use crate::traits::Timeout; diff --git a/src/devices/ntfy.rs b/src/devices/ntfy.rs index ef96168..d3ee42b 100644 --- a/src/devices/ntfy.rs +++ b/src/devices/ntfy.rs @@ -111,8 +111,8 @@ impl Default for Notification { } } -#[derive(Debug, LuaDeviceConfig)] -pub struct NtfyConfig { +#[derive(Debug, Clone, LuaDeviceConfig)] +pub struct Config { #[device_config(default("https://ntfy.sh".into()))] pub url: String, pub topic: String, @@ -120,14 +120,14 @@ pub struct NtfyConfig { pub tx: event::Sender, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Ntfy { - config: NtfyConfig, + config: Config, } #[async_trait] impl LuaDeviceCreate for Ntfy { - type Config = NtfyConfig; + type Config = Config; type Error = Infallible; async fn create(config: Self::Config) -> Result { @@ -166,7 +166,7 @@ impl Ntfy { #[async_trait] impl OnPresence for Ntfy { - async fn on_presence(&mut self, presence: bool) { + async fn on_presence(&self, presence: bool) { // Setup extras for the broadcast let extras = HashMap::from([ ("cmd".into(), "presence".into()), @@ -202,7 +202,7 @@ impl OnPresence for Ntfy { #[async_trait] impl OnNotification for Ntfy { - async fn on_notification(&mut self, notification: Notification) { + async fn on_notification(&self, notification: Notification) { self.send(notification).await; } } diff --git a/src/devices/presence.rs b/src/devices/presence.rs index 672b909..21b8e55 100644 --- a/src/devices/presence.rs +++ b/src/devices/presence.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; use automation_macro::LuaDeviceConfig; use rumqttc::Publish; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::{debug, trace, warn}; use super::LuaDeviceCreate; @@ -12,8 +14,8 @@ use crate::event::{self, Event, EventChannel, OnMqtt}; use crate::messages::PresenceMessage; use crate::mqtt::WrappedAsyncClient; -#[derive(Debug, LuaDeviceConfig)] -pub struct PresenceConfig { +#[derive(Debug, Clone, LuaDeviceConfig)] +pub struct Config { #[device_config(flatten)] pub mqtt: MqttDeviceConfig, #[device_config(from_lua, rename("event_channel"), with(|ec: EventChannel| ec.get_tx()))] @@ -25,30 +27,47 @@ pub struct PresenceConfig { pub const DEFAULT_PRESENCE: bool = false; #[derive(Debug)] -pub struct Presence { - config: PresenceConfig, +pub struct State { devices: HashMap, current_overall_presence: bool, } +#[derive(Debug, Clone)] +pub struct Presence { + config: Config, + state: Arc>, +} + +impl Presence { + async fn state(&self) -> RwLockReadGuard { + self.state.read().await + } + + async fn state_mut(&self) -> RwLockWriteGuard { + self.state.write().await + } +} + #[async_trait] impl LuaDeviceCreate for Presence { - type Config = PresenceConfig; + type Config = Config; type Error = rumqttc::ClientError; async fn create(config: Self::Config) -> Result { - trace!(id = "ntfy", "Setting up Presence"); + trace!(id = "presence", "Setting up Presence"); config .client .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .await?; - Ok(Self { - config, + let state = State { devices: HashMap::new(), current_overall_presence: DEFAULT_PRESENCE, - }) + }; + let state = Arc::new(RwLock::new(state)); + + Ok(Self { config, state }) } } @@ -60,7 +79,7 @@ impl Device for Presence { #[async_trait] impl OnMqtt for Presence { - async fn on_mqtt(&mut self, message: Publish) { + async fn on_mqtt(&self, message: Publish) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { return; } @@ -77,7 +96,7 @@ impl OnMqtt for Presence { if message.payload.is_empty() { // Remove the device from the map debug!("State of device [{device_name}] has been removed"); - self.devices.remove(&device_name); + self.state_mut().await.devices.remove(&device_name); } else { let present = match PresenceMessage::try_from(message) { Ok(state) => state.presence(), @@ -88,13 +107,13 @@ impl OnMqtt for Presence { }; debug!("State of device [{device_name}] has changed: {}", present); - self.devices.insert(device_name, present); + self.state_mut().await.devices.insert(device_name, present); } - let overall_presence = self.devices.iter().any(|(_, v)| *v); - if overall_presence != self.current_overall_presence { + let overall_presence = self.state().await.devices.iter().any(|(_, v)| *v); + if overall_presence != self.state().await.current_overall_presence { debug!("Overall presence updated: {overall_presence}"); - self.current_overall_presence = overall_presence; + self.state_mut().await.current_overall_presence = overall_presence; if self .config diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index ea2eba0..16a5b90 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -17,7 +17,7 @@ use crate::messages::ActivateMessage; use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct WakeOnLANConfig { +pub struct Config { #[device_config(flatten)] pub info: InfoConfig, #[device_config(flatten)] @@ -29,14 +29,14 @@ pub struct WakeOnLANConfig { pub client: WrappedAsyncClient, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct WakeOnLAN { - config: WakeOnLANConfig, + config: Config, } #[async_trait] impl LuaDeviceCreate for WakeOnLAN { - type Config = WakeOnLANConfig; + type Config = Config; type Error = rumqttc::ClientError; async fn create(config: Self::Config) -> Result { @@ -59,7 +59,7 @@ impl Device for WakeOnLAN { #[async_trait] impl OnMqtt for WakeOnLAN { - async fn on_mqtt(&mut self, message: Publish) { + async fn on_mqtt(&self, message: Publish) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { return; } @@ -103,7 +103,7 @@ impl google_home::Device for WakeOnLAN { #[async_trait] impl traits::Scene for WakeOnLAN { - async fn set_active(&mut self, deactivate: bool) -> Result<(), ErrorCode> { + async fn set_active(&self, deactivate: bool) -> Result<(), ErrorCode> { if deactivate { debug!( id = Device::get_id(self), diff --git a/src/devices/washer.rs b/src/devices/washer.rs index c9ee627..3f0f0f8 100644 --- a/src/devices/washer.rs +++ b/src/devices/washer.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + use async_trait::async_trait; use automation_macro::LuaDeviceConfig; use rumqttc::Publish; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::{debug, error, trace, warn}; use super::ntfy::Priority; @@ -11,7 +14,7 @@ use crate::messages::PowerMessage; use crate::mqtt::WrappedAsyncClient; #[derive(Debug, Clone, LuaDeviceConfig)] -pub struct WasherConfig { +pub struct Config { pub identifier: String, #[device_config(flatten)] pub mqtt: MqttDeviceConfig, @@ -23,17 +26,31 @@ pub struct WasherConfig { pub client: WrappedAsyncClient, } -// TODO: Add google home integration #[derive(Debug)] -pub struct Washer { - config: WasherConfig, - +pub struct State { running: isize, } +// TODO: Add google home integration +#[derive(Debug, Clone)] +pub struct Washer { + config: Config, + state: Arc>, +} + +impl Washer { + async fn state(&self) -> RwLockReadGuard { + self.state.read().await + } + + async fn state_mut(&self) -> RwLockWriteGuard { + self.state.write().await + } +} + #[async_trait] impl LuaDeviceCreate for Washer { - type Config = WasherConfig; + type Config = Config; type Error = rumqttc::ClientError; async fn create(config: Self::Config) -> Result { @@ -44,7 +61,10 @@ impl LuaDeviceCreate for Washer { .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .await?; - Ok(Self { config, running: 0 }) + let state = State { running: 0 }; + let state = Arc::new(RwLock::new(state)); + + Ok(Self { config, state }) } } @@ -61,7 +81,7 @@ const HYSTERESIS: isize = 10; #[async_trait] impl OnMqtt for Washer { - async fn on_mqtt(&mut self, message: Publish) { + async fn on_mqtt(&self, message: Publish) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { return; } @@ -79,7 +99,7 @@ impl OnMqtt for Washer { // debug!(id = self.identifier, power, "Washer state update"); - if power < self.config.threshold && self.running >= HYSTERESIS { + if power < self.config.threshold && self.state().await.running >= HYSTERESIS { // The washer is done running debug!( id = self.config.identifier, @@ -88,7 +108,7 @@ impl OnMqtt for Washer { "Washer is done" ); - self.running = 0; + self.state_mut().await.running = 0; let notification = Notification::new() .set_title("Laundy is done") .set_message("Don't forget to hang it!") @@ -106,8 +126,8 @@ impl OnMqtt for Washer { } } else if power < self.config.threshold { // Prevent false positives - self.running = 0; - } else if power >= self.config.threshold && self.running < HYSTERESIS { + self.state_mut().await.running = 0; + } else if power >= self.config.threshold && self.state().await.running < HYSTERESIS { // Washer could be starting debug!( id = self.config.identifier, @@ -116,7 +136,7 @@ impl OnMqtt for Washer { "Washer is starting" ); - self.running += 1; + self.state_mut().await.running += 1; } } } diff --git a/src/event.rs b/src/event.rs index 62a6f15..0065fbf 100644 --- a/src/event.rs +++ b/src/event.rs @@ -36,20 +36,20 @@ impl mlua::UserData for EventChannel {} #[async_trait] pub trait OnMqtt: Sync + Send { // fn topics(&self) -> Vec<&str>; - async fn on_mqtt(&mut self, message: Publish); + async fn on_mqtt(&self, message: Publish); } #[async_trait] pub trait OnPresence: Sync + Send { - async fn on_presence(&mut self, presence: bool); + async fn on_presence(&self, presence: bool); } #[async_trait] pub trait OnDarkness: Sync + Send { - async fn on_darkness(&mut self, dark: bool); + async fn on_darkness(&self, dark: bool); } #[async_trait] pub trait OnNotification: Sync + Send { - async fn on_notification(&mut self, notification: Notification); + async fn on_notification(&self, notification: Notification); } diff --git a/src/traits.rs b/src/traits.rs index eaeff9f..9e67051 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -5,6 +5,6 @@ use async_trait::async_trait; #[async_trait] pub trait Timeout: Sync + Send { - async fn start_timeout(&mut self, _timeout: Duration) -> Result<()>; - async fn stop_timeout(&mut self) -> Result<()>; + async fn start_timeout(&self, _timeout: Duration) -> Result<()>; + async fn stop_timeout(&self) -> Result<()>; }