From 458c5e25a344ebb22c8b52d166455ecb18d41a4e Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Thu, 29 Dec 2022 00:50:16 +0100 Subject: [PATCH] Renamed mqtt system to make it more clear that it has to do with mqtt --- src/config.rs | 4 ++-- src/devices.rs | 22 ++++++++++++---------- src/devices/ikea_outlet.rs | 6 +++--- src/devices/wake_on_lan.rs | 6 +++--- src/main.rs | 30 ++++++++++++++++-------------- src/mqtt.rs | 14 +++++++------- src/presence.rs | 6 +++--- 7 files changed, 46 insertions(+), 42 deletions(-) diff --git a/src/config.rs b/src/config.rs index 179075d..efa4d81 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,7 +8,7 @@ use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN}; #[derive(Debug, Deserialize)] pub struct Config { - pub mqtt: MQTTConfig, + pub mqtt: MqttConfig, pub fullfillment: FullfillmentConfig, pub presence: MqttDeviceConfig, #[serde(default)] @@ -16,7 +16,7 @@ pub struct Config { } #[derive(Debug, Deserialize)] -pub struct MQTTConfig { +pub struct MqttConfig { pub host: String, pub port: u16, pub username: String, diff --git a/src/devices.rs b/src/devices.rs index 1a34879..336e16a 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -9,14 +9,14 @@ use std::collections::HashMap; use google_home::{GoogleHomeDevice, traits::OnOff}; use log::trace; -use crate::{mqtt::Listener, presence::OnPresence}; +use crate::{mqtt::OnMqtt, presence::OnPresence}; -impl_cast::impl_cast!(Device, Listener); +impl_cast::impl_cast!(Device, OnMqtt); impl_cast::impl_cast!(Device, OnPresence); impl_cast::impl_cast!(Device, GoogleHomeDevice); impl_cast::impl_cast!(Device, OnOff); -pub trait Device: AsGoogleHomeDevice + AsListener + AsOnPresence + AsOnOff { +pub trait Device: AsGoogleHomeDevice + AsOnMqtt + AsOnPresence + AsOnOff { fn get_id(&self) -> String; } @@ -54,7 +54,7 @@ impl Devices { self.devices.insert(device.get_id(), device); } - get_cast!(Listener); + get_cast!(OnMqtt); get_cast!(OnPresence); get_cast!(GoogleHomeDevice); get_cast!(OnOff); @@ -67,10 +67,12 @@ impl Devices { } } -impl Listener for Devices { - fn notify(&mut self, message: &rumqttc::Publish) { - self.as_listeners().iter_mut().for_each(|(_, listener)| { - listener.notify(message); +impl OnMqtt for Devices { + fn on_mqtt(&mut self, message: &rumqttc::Publish) { + trace!("OnMqtt for devices"); + self.as_on_mqtts().iter_mut().for_each(|(id, listener)| { + trace!("OnMqtt: {id}"); + listener.on_mqtt(message); }) } } @@ -78,8 +80,8 @@ impl Listener for Devices { impl OnPresence for Devices { fn on_presence(&mut self, presence: bool) { trace!("OnPresence for devices"); - self.as_on_presences().iter_mut().for_each(|(name, device)| { - trace!("OnPresence: {name}"); + self.as_on_presences().iter_mut().for_each(|(id, device)| { + trace!("OnPresence: {id}"); device.on_presence(presence); }) } diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index edef75e..3762c82 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -9,7 +9,7 @@ use tokio::task::JoinHandle; use crate::config::{KettleConfig, InfoConfig, MqttDeviceConfig}; use crate::devices::Device; -use crate::mqtt::Listener; +use crate::mqtt::OnMqtt; use crate::presence::OnPresence; pub struct IkeaOutlet { @@ -69,8 +69,8 @@ impl TryFrom<&Publish> for StateMessage { } } -impl Listener for IkeaOutlet { - fn notify(&mut self, message: &Publish) { +impl OnMqtt for IkeaOutlet { + fn on_mqtt(&mut self, message: &Publish) { // Update the internal state based on what the device has reported if message.topic != self.mqtt.topic { return; diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index 7889bb4..75b28c8 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -3,7 +3,7 @@ use log::{debug, warn}; use rumqttc::{AsyncClient, Publish}; use serde::Deserialize; -use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::Listener}; +use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::OnMqtt}; use super::Device; @@ -46,8 +46,8 @@ impl TryFrom<&Publish> for StateMessage { } } -impl Listener for WakeOnLAN { - fn notify(&mut self, message: &Publish) { +impl OnMqtt for WakeOnLAN { + fn on_mqtt(&mut self, message: &Publish) { if message.topic != self.mqtt.topic { return; diff --git a/src/main.rs b/src/main.rs index 64a0da7..fdf7c3f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use rumqttc::{MqttOptions, Transport, AsyncClient}; use env_logger::Builder; use log::{error, info, debug, LevelFilter}; -use automation::{devices::Devices, mqtt::Notifier}; +use automation::{devices::Devices, mqtt::Mqtt}; use google_home::{GoogleHome, Request}; #[tokio::main] @@ -30,31 +30,33 @@ async fn main() { info!("Starting automation_rs..."); - // Create device holder - // @TODO Make this nices to work with, we devices.rs - let devices = Arc::new(RwLock::new(Devices::new())); - - // Setup MQTT + // Configure MQTT let mut mqttoptions = MqttOptions::new("rust-test", config.mqtt.host, config.mqtt.port); mqttoptions.set_credentials(config.mqtt.username, config.mqtt.password.unwrap()); mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_transport(Transport::tls_with_default_config()); - // Create a notifier and start it in a seperate task + // Create a mqtt client and wrap the eventloop let (client, eventloop) = AsyncClient::new(mqttoptions, 10); - let mut notifier = Notifier::new(eventloop); + let mut mqtt = Mqtt::new(eventloop); - notifier.add_listener(Arc::downgrade(&devices)); + // Create device holder and register it as listener for mqtt + let devices = Arc::new(RwLock::new(Devices::new())); + mqtt.add_listener(Arc::downgrade(&devices)); + // Setup presence system let mut presence = Presence::new(config.presence, client.clone()); + // Register devices as presence listener presence.add_listener(Arc::downgrade(&devices)); + + // Register presence as mqtt listener let presence = Arc::new(RwLock::new(presence)); - notifier.add_listener(Arc::downgrade(&presence)); + mqtt.add_listener(Arc::downgrade(&presence)); - notifier.start(); + // Start mqtt, this spawns a seperate async task + mqtt.start(); - // Create devices based on config - // @TODO Move out of main (config? or maybe devices?) + // Turn the config into actual devices and add them config.devices .into_iter() .map(|(identifier, device_config)| { @@ -65,7 +67,7 @@ async fn main() { devices.write().unwrap().add_device(device); }); - // Fullfillments + // Create google home fullfillment route let fullfillment = Router::new() .route("/google_home", post(async move |Json(payload): Json| { // @TODO Verify that we are actually logged in diff --git a/src/mqtt.rs b/src/mqtt.rs index 7bafecd..7e8a9b6 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -5,17 +5,17 @@ use rumqttc::{Publish, Event, Incoming, EventLoop}; use log::trace; use tokio::task::JoinHandle; -pub trait Listener { - fn notify(&mut self, message: &Publish); +pub trait OnMqtt { + fn on_mqtt(&mut self, message: &Publish); } // @TODO Maybe rename this to make it clear it has to do with mqtt -pub struct Notifier { - listeners: Vec>>, +pub struct Mqtt { + listeners: Vec>>, eventloop: EventLoop, } -impl Notifier { +impl Mqtt { pub fn new(eventloop: EventLoop) -> Self { return Self { listeners: Vec::new(), eventloop } } @@ -25,7 +25,7 @@ impl Notifier { self.listeners.retain(|listener| { if let Some(listener) = listener.upgrade() { - listener.write().unwrap().notify(&message); + listener.write().unwrap().on_mqtt(&message); return true; } else { trace!("Removing listener..."); @@ -35,7 +35,7 @@ impl Notifier { }) } - pub fn add_listener(&mut self, listener: Weak>) { + pub fn add_listener(&mut self, listener: Weak>) { self.listeners.push(listener); } diff --git a/src/presence.rs b/src/presence.rs index a5a723c..d0acebb 100644 --- a/src/presence.rs +++ b/src/presence.rs @@ -4,7 +4,7 @@ use log::{debug, warn, trace}; use rumqttc::{AsyncClient, Publish}; use serde::{Serialize, Deserialize}; -use crate::{mqtt::Listener, config::MqttDeviceConfig}; +use crate::{mqtt::OnMqtt, config::MqttDeviceConfig}; pub trait OnPresence { fn on_presence(&mut self, presence: bool); @@ -47,8 +47,8 @@ impl TryFrom<&Publish> for StateMessage { } } -impl Listener for Presence { - fn notify(&mut self, message: &rumqttc::Publish) { +impl OnMqtt for Presence { + fn on_mqtt(&mut self, message: &rumqttc::Publish) { if message.topic.starts_with(&(self.mqtt.topic.clone() + "/")) { let device_name = message.topic.rsplit_once("/").unwrap().1;