From 3b6a5e4c077a29c708ab837ce305d0d8d2012fe9 Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Wed, 28 Dec 2022 04:17:23 +0100 Subject: [PATCH] Moved some stuff from main into the libary --- src/config.rs | 20 +++++++++++++++++++- src/main.rs | 44 +++++++++++++++----------------------------- src/mqtt.rs | 42 +++++++++++++++++++++++++----------------- 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/src/config.rs b/src/config.rs index f5c7406..1711604 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,8 +1,11 @@ use std::{fs, error::Error, collections::HashMap}; -use log::debug; +use log::{debug, trace}; +use rumqttc::AsyncClient; use serde::Deserialize; +use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN}; + #[derive(Debug, Deserialize)] pub struct Config { pub mqtt: MQTTConfig, @@ -67,3 +70,18 @@ impl Config { Ok(config) } } + +impl Device { + pub fn into(self, identifier: String, client: AsyncClient) -> DeviceBox { + match self { + Device::IkeaOutlet { info, mqtt, kettle } => { + trace!("\tIkeaOutlet [{} in {:?}]", info.name, info.room); + Box::new(IkeaOutlet::new(identifier, info, mqtt, kettle, client)) + }, + Device::WakeOnLAN { info, mqtt, mac_address } => { + trace!("\tWakeOnLan [{} in {:?}]", info.name, info.room); + Box::new(WakeOnLAN::new(identifier, info, mqtt, mac_address, client)) + }, + } + } +} diff --git a/src/main.rs b/src/main.rs index b73720f..a128b4b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,13 @@ use std::{time::Duration, sync::{Arc, RwLock}, process, net::SocketAddr}; -use automation::config::{Config, Device}; +use automation::config::Config; use dotenv::dotenv; use warp::Filter; use rumqttc::{MqttOptions, Transport, AsyncClient}; use env_logger::Builder; -use log::{error, info, debug, trace, LevelFilter}; +use log::{error, info, debug, LevelFilter}; -use automation::{devices::{Devices, IkeaOutlet, WakeOnLAN}, mqtt::Notifier}; +use automation::{devices::Devices, mqtt::Notifier}; use google_home::{GoogleHome, Request}; #[tokio::main] @@ -26,8 +26,6 @@ async fn main() { process::exit(1); }); - debug!("Config: {config:#?}"); - info!("Starting automation_rs..."); // Create device holder @@ -40,35 +38,23 @@ async fn main() { mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_transport(Transport::tls_with_default_config()); - // Create a notifier and move it to a new thread - // @TODO Maybe rename this to make it clear it has to do with mqtt - let mut notifier = Notifier::new(); + // Create a notifier and start it in a seperate task let (client, eventloop) = AsyncClient::new(mqttoptions, 10); + let mut notifier = Notifier::new(eventloop); notifier.add_listener(Arc::downgrade(&devices)); - tokio::spawn(async move { - info!("Connecting to MQTT broker"); - notifier.start(eventloop).await; - todo!("Error in MQTT (most likely lost connection to mqtt server), we need to handle these errors!"); - }); + notifier.start(); // Create devices based on config // @TODO Move out of main (config? or maybe devices?) - for (identifier, device_config) in config.devices { - debug!("Adding device {identifier}"); - - let device: automation::devices::DeviceBox = match device_config { - Device::IkeaOutlet { info, mqtt, kettle } => { - trace!("\tIkeaOutlet [{} in {:?}]", info.name, info.room); - Box::new(IkeaOutlet::new(identifier, info, mqtt, kettle, client.clone())) - }, - Device::WakeOnLAN { info, mqtt, mac_address } => { - trace!("\tWakeOnLan [{} in {:?}]", info.name, info.room); - Box::new(WakeOnLAN::new(identifier, info, mqtt, mac_address, client.clone())) - }, - }; - - devices.write().unwrap().add_device(device); - } + config.devices + .into_iter() + .map(|(identifier, device_config)| { + device_config.into(identifier, client.clone()) + }) + .for_each(|device| { + debug!("Adding device {}", device.get_id()); + devices.write().unwrap().add_device(device); + }); // Google Home fullfillments let fullfillment_google_home = warp::path("google_home") diff --git a/src/mqtt.rs b/src/mqtt.rs index 0322458..83301d7 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,20 +1,23 @@ use std::sync::{Weak, RwLock}; -use log::error; +use log::{error, debug}; use rumqttc::{Publish, Event, Incoming, EventLoop}; use log::trace; +use tokio::task::JoinHandle; pub trait Listener { fn notify(&mut self, message: &Publish); } +// @TODO Maybe rename this to make it clear it has to do with mqtt pub struct Notifier { listeners: Vec>>, + eventloop: EventLoop, } impl Notifier { - pub fn new() -> Self { - return Self { listeners: Vec::new() } + pub fn new(eventloop: EventLoop) -> Self { + return Self { listeners: Vec::new(), eventloop } } fn notify(&mut self, message: Publish) { @@ -32,20 +35,25 @@ impl Notifier { self.listeners.push(listener); } - pub async fn start(&mut self, mut eventloop: EventLoop) { - loop { - let notification = eventloop.poll().await; - match notification { - Ok(Event::Incoming(Incoming::Publish(p))) => { - trace!("{:?}", p); - self.notify(p); - }, - Ok(..) => continue, - Err(err) => { - error!("{}", err); - break - }, + pub fn start(mut self) -> JoinHandle<()> { + tokio::spawn(async move { + debug!("Listening for MQTT events"); + loop { + let notification = self.eventloop.poll().await; + match notification { + Ok(Event::Incoming(Incoming::Publish(p))) => { + trace!("{:?}", p); + self.notify(p); + }, + Ok(..) => continue, + Err(err) => { + error!("{}", err); + break + }, + } } - } + + todo!("Error in MQTT (most likely lost connection to mqtt server), we need to handle these errors!"); + }) } }