From e9d1cf554d495658bf21dd4324cdc5e2567a7f4f Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Tue, 10 Jan 2023 22:59:26 +0100 Subject: [PATCH] Use broadcast for mqtt message so we can have a queue --- src/devices.rs | 29 +++++++---------------------- src/light_sensor.rs | 8 +++----- src/main.rs | 2 +- src/mqtt.rs | 10 +++++----- src/presence.rs | 9 +++------ 5 files changed, 19 insertions(+), 39 deletions(-) diff --git a/src/devices.rs b/src/devices.rs index e16fd12..e2a8cd4 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -95,40 +95,25 @@ pub fn start(mut mqtt_rx: mqtt::Receiver, mut presence_rx: presence::Receiver, m let (tx, mut rx) = mpsc::channel(100); tokio::spawn(async move { + // @TODO Handle error better loop { tokio::select! { - res = mqtt_rx.changed() => { - if !res.is_ok() { - break; - } - - // @TODO Not ideal that we have to clone here, but not sure how to work around that - let message = mqtt_rx.borrow().clone(); - if let Some(message) = message { - devices.on_mqtt(&message).await; - } + Ok(message) = mqtt_rx.recv() => { + devices.on_mqtt(&message).await; } - res = presence_rx.changed() => { - if !res.is_ok() { - break; - } - + Ok(_) = presence_rx.changed() => { let presence = *presence_rx.borrow(); devices.on_presence(presence).await; } - res = light_sensor_rx.changed() => { - if !res.is_ok() { - break; - } - + Ok(_) = light_sensor_rx.changed() => { let darkness = *light_sensor_rx.borrow(); devices.on_darkness(darkness).await; } + // @TODO Handle receiving None better, otherwise it might constantly run doing + // nothing Some(cmd) = rx.recv() => devices.handle_cmd(cmd) } } - - unreachable!("Did not expect this"); }); return DeviceHandle { tx }; diff --git a/src/light_sensor.rs b/src/light_sensor.rs index 90f0f36..50e6827 100644 --- a/src/light_sensor.rs +++ b/src/light_sensor.rs @@ -28,14 +28,12 @@ pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, clien let mut light_sensor = LightSensor { is_dark: is_dark.clone(), mqtt: config.mqtt, min: config.min, max: config.max, tx }; tokio::spawn(async move { - while mqtt_rx.changed().await.is_ok() { - let message = mqtt_rx.borrow().clone(); - if let Some(message) = message { + loop { + // @TODO Handle errors, warn if lagging + if let Ok(message) = mqtt_rx.recv().await { light_sensor.on_mqtt(&message).await; } } - - unreachable!("Did not expect this"); }); return is_dark; diff --git a/src/main.rs b/src/main.rs index 1e4ebd4..a26d4ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use automation::{ config::{Config, OpenIDConfig}, devices, hue_bridge::HueBridge, - light_sensor, mqtt::{self, Mqtt}, + light_sensor, mqtt::Mqtt, ntfy::Ntfy, presence, }; diff --git a/src/mqtt.rs b/src/mqtt.rs index 52ecd82..00eb91e 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -3,15 +3,15 @@ use serde::{Serialize, Deserialize}; use tracing::{error, debug}; use rumqttc::{Publish, Event, Incoming, EventLoop}; -use tokio::sync::watch; +use tokio::sync::broadcast; #[async_trait] pub trait OnMqtt { async fn on_mqtt(&mut self, message: &Publish); } -pub type Receiver = watch::Receiver>; -type Sender = watch::Sender>; +pub type Receiver = broadcast::Receiver; +type Sender = broadcast::Sender; pub struct Mqtt { tx: Sender, @@ -20,7 +20,7 @@ pub struct Mqtt { impl Mqtt { pub fn new(eventloop: EventLoop) -> Self { - let (tx, _rx) = watch::channel(None); + let (tx, _rx) = broadcast::channel(100); Self { tx, eventloop } } @@ -35,7 +35,7 @@ impl Mqtt { let notification = self.eventloop.poll().await; match notification { Ok(Event::Incoming(Incoming::Publish(p))) => { - self.tx.send(Some(p)).ok(); + self.tx.send(p).ok(); }, Ok(..) => continue, Err(err) => { diff --git a/src/presence.rs b/src/presence.rs index 54a01c4..38aea3e 100644 --- a/src/presence.rs +++ b/src/presence.rs @@ -30,15 +30,12 @@ pub async fn start(mut mqtt_rx: mqtt::Receiver, mqtt: MqttDeviceConfig, client: let mut presence = Presence { devices: HashMap::new(), overall_presence: overall_presence.clone(), mqtt, tx }; tokio::spawn(async move { - while mqtt_rx.changed().await.is_ok() { - // @TODO Not ideal that we have to clone here, but not sure how to work around that - let message = mqtt_rx.borrow().clone(); - if let Some(message) = message { + loop { + // @TODO Handle errors, warn if lagging + if let Ok(message) = mqtt_rx.recv().await { presence.on_mqtt(&message).await; } } - - unreachable!("Did not expect this"); }); return overall_presence;