diff --git a/src/devices.rs b/src/devices.rs index bed37ba..e16fd12 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -58,10 +58,11 @@ enum Command { Fullfillment { google_home: GoogleHome, payload: google_home::Request, - tx: oneshot::Sender + tx: oneshot::Sender, }, AddDevice { device: DeviceBox, + tx: oneshot::Sender<()> } } @@ -81,7 +82,9 @@ impl DeviceHandle { } pub async fn add_device(&self, device: DeviceBox) { - self.tx.send(Command::AddDevice { device }).await.unwrap(); + let (tx, rx) = oneshot::channel(); + self.tx.send(Command::AddDevice { device, tx }).await.unwrap(); + rx.await.ok(); } } @@ -138,7 +141,11 @@ impl Devices { let result = google_home.handle_request(payload, &mut self.as_google_home_devices()).unwrap(); tx.send(result).ok(); }, - Command::AddDevice { device } => self.add_device(device), + Command::AddDevice { device, tx } => { + self.add_device(device); + + tx.send(()).ok(); + }, } } diff --git a/src/main.rs b/src/main.rs index 74f8e5c..1e4ebd4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use automation::{ config::{Config, OpenIDConfig}, devices, hue_bridge::HueBridge, - light_sensor, mqtt, + light_sensor, mqtt::{self, Mqtt}, ntfy::Ntfy, presence, }; @@ -41,14 +41,14 @@ async fn main() { tracing_subscriber::fmt().with_env_filter(filter).init(); + info!("Starting automation_rs..."); + let config = std::env::var("AUTOMATION_CONFIG").unwrap_or("./config/config.toml".to_owned()); let config = Config::build(&config).unwrap_or_else(|err| { error!("Failed to load config: {err}"); process::exit(1); }); - info!("Starting automation_rs..."); - // Configure MQTT let mqtt = config.mqtt.clone(); let mut mqttoptions = MqttOptions::new("rust-test", mqtt.host, mqtt.port); @@ -58,11 +58,11 @@ async fn main() { // Create a mqtt client and wrap the eventloop let (client, eventloop) = AsyncClient::new(mqttoptions, 10); - let mqtt = mqtt::start(eventloop); - let presence = presence::start(mqtt.clone(), config.presence.clone(), client.clone()).await; - let light_sensor = light_sensor::start(mqtt.clone(), config.light_sensor.clone(), client.clone()).await; + let mqtt = Mqtt::new(eventloop); + let presence = presence::start(mqtt.subscribe(), config.presence.clone(), client.clone()).await; + let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await; - let devices = devices::start(mqtt, presence.clone(), light_sensor.clone()); + let devices = devices::start(mqtt.subscribe(), presence.clone(), light_sensor.clone()); join_all( config .devices @@ -86,15 +86,19 @@ async fn main() { HueBridge::create(presence.clone(), light_sensor.clone(), hue_bridge_config); } + // Actually start listening for mqtt message, + // we wait until all the setup is done, as otherwise we might miss some messages + mqtt.start(); + // Create google home fullfillment route let fullfillment = Router::new().route( "/google_home", post(async move |user: User, Json(payload): Json| { - debug!(username = user.preferred_username, "{payload:?}"); + debug!(username = user.preferred_username, "{payload:#?}"); let gc = GoogleHome::new(&user.preferred_username); let result = devices.fullfillment(gc, payload).await.unwrap(); - debug!(username = user.preferred_username, "{result:?}"); + debug!(username = user.preferred_username, "{result:#?}"); return (StatusCode::OK, Json(result)); }), diff --git a/src/mqtt.rs b/src/mqtt.rs index 2c36ce8..52ecd82 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -11,29 +11,43 @@ pub trait OnMqtt { } pub type Receiver = watch::Receiver>; +type Sender = watch::Sender>; -pub fn start(mut eventloop: EventLoop) -> Receiver { - let (tx, rx) = watch::channel(None); - tokio::spawn(async move { - debug!("Listening for MQTT events"); - loop { - let notification = eventloop.poll().await; - match notification { - Ok(Event::Incoming(Incoming::Publish(p))) => { - tx.send(Some(p)).ok(); - }, - Ok(..) => continue, - Err(err) => { - error!("{}", err); - break - }, +pub struct Mqtt { + tx: Sender, + eventloop: EventLoop, +} + +impl Mqtt { + pub fn new(eventloop: EventLoop) -> Self { + let (tx, _rx) = watch::channel(None); + Self { tx, eventloop } + } + + pub fn subscribe(&self) -> Receiver { + self.tx.subscribe() + } + + pub fn start(mut self) { + tokio::spawn(async move { + debug!("Listening for MQTT events"); + loop { + let notification = self.eventloop.poll().await; + match notification { + Ok(Event::Incoming(Incoming::Publish(p))) => { + self.tx.send(Some(p)).ok(); + }, + Ok(..) => continue, + Err(err) => { + error!("{}", err); + break + }, + } } - } - todo!("Error in MQTT (most likely lost connection to mqtt server), we need to handle these errors!"); - }); - - return rx; + todo!("Error in MQTT (most likely lost connection to mqtt server), we need to handle these errors!"); + }); + } } #[derive(Debug, Serialize, Deserialize)]