diff --git a/src/devices.rs b/src/devices.rs
index e16fd12..e2a8cd4 100644
--- a/src/devices.rs
+++ b/src/devices.rs
@@ -95,40 +95,25 @@ pub fn start(mut mqtt_rx: mqtt::Receiver, mut presence_rx: presence::Receiver, m
let (tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
+ // @TODO Handle error better
loop {
tokio::select! {
- res = mqtt_rx.changed() => {
- if !res.is_ok() {
- break;
- }
-
- // @TODO Not ideal that we have to clone here, but not sure how to work around that
- let message = mqtt_rx.borrow().clone();
- if let Some(message) = message {
- devices.on_mqtt(&message).await;
- }
+ Ok(message) = mqtt_rx.recv() => {
+ devices.on_mqtt(&message).await;
}
- res = presence_rx.changed() => {
- if !res.is_ok() {
- break;
- }
-
+ Ok(_) = presence_rx.changed() => {
let presence = *presence_rx.borrow();
devices.on_presence(presence).await;
}
- res = light_sensor_rx.changed() => {
- if !res.is_ok() {
- break;
- }
-
+ Ok(_) = light_sensor_rx.changed() => {
let darkness = *light_sensor_rx.borrow();
devices.on_darkness(darkness).await;
}
+ // @TODO Handle receiving None better, otherwise it might constantly run doing
+ // nothing
Some(cmd) = rx.recv() => devices.handle_cmd(cmd)
}
}
-
- unreachable!("Did not expect this");
});
return DeviceHandle { tx };
diff --git a/src/light_sensor.rs b/src/light_sensor.rs
index 90f0f36..50e6827 100644
--- a/src/light_sensor.rs
+++ b/src/light_sensor.rs
@@ -28,14 +28,12 @@ pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, clien
let mut light_sensor = LightSensor { is_dark: is_dark.clone(), mqtt: config.mqtt, min: config.min, max: config.max, tx };
tokio::spawn(async move {
- while mqtt_rx.changed().await.is_ok() {
- let message = mqtt_rx.borrow().clone();
- if let Some(message) = message {
+ loop {
+ // @TODO Handle errors, warn if lagging
+ if let Ok(message) = mqtt_rx.recv().await {
light_sensor.on_mqtt(&message).await;
}
}
-
- unreachable!("Did not expect this");
});
return is_dark;
diff --git a/src/main.rs b/src/main.rs
index 1e4ebd4..a26d4ed 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -8,7 +8,7 @@ use automation::{
config::{Config, OpenIDConfig},
devices,
hue_bridge::HueBridge,
- light_sensor, mqtt::{self, Mqtt},
+ light_sensor, mqtt::Mqtt,
ntfy::Ntfy,
presence,
};
diff --git a/src/mqtt.rs b/src/mqtt.rs
index 52ecd82..00eb91e 100644
--- a/src/mqtt.rs
+++ b/src/mqtt.rs
@@ -3,15 +3,15 @@ use serde::{Serialize, Deserialize};
use tracing::{error, debug};
use rumqttc::{Publish, Event, Incoming, EventLoop};
-use tokio::sync::watch;
+use tokio::sync::broadcast;
#[async_trait]
pub trait OnMqtt {
async fn on_mqtt(&mut self, message: &Publish);
}
-pub type Receiver = watch::Receiver