diff --git a/config/zeus.dev.toml b/config/zeus.dev.toml index fded465..7976b62 100644 --- a/config/zeus.dev.toml +++ b/config/zeus.dev.toml @@ -44,3 +44,9 @@ type = "AudioSetup" topic = "zigbee2mqtt/living/remote" mixer = "10.0.0.49" speakers = "10.0.0.182" + +[devices.hallway_frontdoor] +type = "ContactSensor" +topic = "zigbee2mqtt/hallway/frontdoor" +# @TODO This should be automatically constructed from the identifier and presence topic +presence = { topic = "automation_dev/presence/frontdoor", timeout = 10 } diff --git a/src/config.rs b/src/config.rs index ee32f85..6d3bccf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,7 +4,7 @@ use tracing::{debug, trace}; use rumqttc::AsyncClient; use serde::Deserialize; -use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup}; +use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor}; // @TODO Configure more defaults @@ -94,6 +94,15 @@ pub struct KettleConfig { pub timeout: Option, // Timeout in seconds } +#[derive(Debug, Deserialize)] +pub struct PresenceDeviceConfig { + #[serde(flatten)] + pub mqtt: MqttDeviceConfig, + // @TODO Maybe make this an option? That way if no timeout is set it will immediately turn the + // device off again? + pub timeout: u64 // Timeout in seconds +} + #[derive(Debug, Deserialize)] #[serde(tag = "type")] pub enum Device { @@ -116,6 +125,11 @@ pub enum Device { mqtt: MqttDeviceConfig, mixer: Ipv4Addr, speakers: Ipv4Addr, + }, + ContactSensor { + #[serde(flatten)] + mqtt: MqttDeviceConfig, + presence: Option, } } @@ -147,6 +161,10 @@ impl Device { trace!(id = identifier, "AudioSetup [{}]", identifier); Box::new(AudioSetup::new(identifier, mqtt, mixer, speakers, client)) }, + Device::ContactSensor { mqtt, presence } => { + trace!(id = identifier, "ContactSensor [{}]", identifier); + Box::new(ContactSensor::new(identifier, mqtt, presence, client)) + }, } } } diff --git a/src/devices.rs b/src/devices.rs index fb78d2a..4adac0a 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -7,6 +7,9 @@ pub use self::wake_on_lan::WakeOnLAN; mod audio_setup; pub use self::audio_setup::AudioSetup; +mod contact_sensor; +pub use self::contact_sensor::ContactSensor; + use std::collections::HashMap; use google_home::{GoogleHomeDevice, traits::OnOff}; diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs new file mode 100644 index 0000000..719918d --- /dev/null +++ b/src/devices/contact_sensor.rs @@ -0,0 +1,94 @@ +use std::time::Duration; + +use pollster::FutureExt; +use rumqttc::AsyncClient; +use tokio::task::JoinHandle; +use tracing::{error, debug}; + +use crate::{config::{MqttDeviceConfig, PresenceDeviceConfig}, mqtt::{OnMqtt, ContactMessage, PresenceMessage}}; + +use super::Device; + +pub struct ContactSensor { + identifier: String, + mqtt: MqttDeviceConfig, + presence: Option, + + client: AsyncClient, + is_closed: bool, + handle: Option>, +} + +impl ContactSensor { + pub fn new(identifier: String, mqtt: MqttDeviceConfig, presence: Option, client: AsyncClient) -> Self { + client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).block_on().unwrap(); + + Self { + identifier, + mqtt, + presence, + client, + is_closed: true, + handle: None, + } + } +} + +impl Device for ContactSensor { + fn get_id(&self) -> String { + self.identifier.clone() + } +} + +impl OnMqtt for ContactSensor { + fn on_mqtt(&mut self, message: &rumqttc::Publish) { + if message.topic != self.mqtt.topic { + return; + } + + let is_closed = match ContactMessage::try_from(message) { + Ok(state) => state.is_closed(), + Err(err) => { + error!(id = self.identifier, "Failed to parse message: {err}"); + return; + }, + }; + + if is_closed == self.is_closed { + return; + } + + debug!(id = self.identifier, "Updating state to {is_closed}"); + self.is_closed = is_closed; + + // Check if this contact sensor works as a presence device + // If not we are done here + let presence = match &self.presence { + Some(presence) => presence, + None => return, + }; + + if !is_closed { + // Activate presence and stop any timeout once we open the door + if let Some(handle) = self.handle.take() { + handle.abort(); + } + + self.client.publish(presence.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&PresenceMessage::new(true)).unwrap()).block_on().unwrap(); + } else { + // Once the door is closed again we start a timeout for removing the presence + let client = self.client.clone(); + let topic = presence.mqtt.topic.clone(); + let id = self.identifier.clone(); + let timeout = Duration::from_secs(presence.timeout); + self.handle = Some( + tokio::spawn(async move { + debug!(id, "Starting timeout ({timeout:?}) for contact sensor..."); + tokio::time::sleep(timeout).await; + debug!(id, "Removing door device!"); + client.publish(topic, rumqttc::QoS::AtLeastOnce, false, "").await.unwrap(); + }) + ); + } + } +} diff --git a/src/mqtt.rs b/src/mqtt.rs index bde9aec..caf10aa 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -137,12 +137,16 @@ impl TryFrom<&Publish> for RemoteMessage { } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct PresenceMessage { state: bool } impl PresenceMessage { + pub fn new(state: bool) -> Self { + Self { state } + } + pub fn present(&self) -> bool { self.state } @@ -177,3 +181,23 @@ impl TryFrom<&Publish> for BrightnessMessage { } } +#[derive(Debug, Deserialize)] +pub struct ContactMessage { + contact: bool, +} + +impl ContactMessage { + pub fn is_closed(&self) -> bool { + self.contact + } +} + +impl TryFrom<&Publish> for ContactMessage { + type Error = anyhow::Error; + + fn try_from(message: &Publish) -> Result { + serde_json::from_slice(&message.payload) + .or(Err(anyhow::anyhow!("Invalid message payload received: {:?}", message.payload))) + } +} +