diff --git a/src/config.rs b/src/config.rs index 8fa069a..2068656 100644 --- a/src/config.rs +++ b/src/config.rs @@ -87,7 +87,7 @@ pub struct LightSensorConfig { pub max: isize, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Deserialize)] pub struct Flags { pub presence: isize, pub darkness: isize, diff --git a/src/debug_bridge.rs b/src/debug_bridge.rs index 653783d..b93d4c0 100644 --- a/src/debug_bridge.rs +++ b/src/debug_bridge.rs @@ -15,7 +15,7 @@ impl DebugBridge { } } -pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: DebugBridgeConfig, client: AsyncClient) { +pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: &DebugBridgeConfig, client: AsyncClient) { let mut debug_bridge = DebugBridge::new(&config.topic, client); tokio::spawn(async move { diff --git a/src/hue_bridge.rs b/src/hue_bridge.rs index 138c126..dca99f6 100644 --- a/src/hue_bridge.rs +++ b/src/hue_bridge.rs @@ -54,8 +54,8 @@ impl HueBridge { } } -pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: HueBridgeConfig) { - let mut hue_bridge = HueBridge::new((config.ip, 80).into(), &config.login, config.flags); +pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: &HueBridgeConfig) { + let mut hue_bridge = HueBridge::new((config.ip, 80).into(), &config.login, config.flags.clone()); tokio::spawn(async move { loop { diff --git a/src/main.rs b/src/main.rs index fc8ba80..28b25f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ #![feature(async_closure)] -use std::{process, time::Duration}; +use std::{process, time::Duration, collections::HashMap}; use axum::{extract::FromRef, http::StatusCode, routing::post, Json, Router, response::IntoResponse}; @@ -13,7 +13,7 @@ use automation::{ presence, error::ApiError, debug_bridge, }; use dotenvy::dotenv; -use rumqttc::{AsyncClient, MqttOptions, Transport}; +use rumqttc::{AsyncClient, MqttOptions, Transport, matches}; use tracing::{debug, error, info, metadata::LevelFilter}; use futures::future::join_all; @@ -75,6 +75,23 @@ async fn app() -> anyhow::Result<()> { let presence = presence::start(config.presence.clone(), mqtt.subscribe(), client.clone()).await?; let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await?; + // Start the ntfy service if it is configured + let mut ntfy = None; + if let Some(config) = &config.ntfy { + ntfy = Some(ntfy::start(presence.clone(), config)); + } + let ntfy = ntfy; + + // Start the hue bridge if it is configured + if let Some(config) = &config.hue_bridge { + hue_bridge::start(presence.clone(), light_sensor.clone(), config); + } + + // Start the debug bridge if it is configured + if let Some(config) = &config.debug_bridge { + debug_bridge::start(presence.clone(), light_sensor.clone(), config, client.clone()); + } + let devices = devices::start(mqtt.subscribe(), presence.clone(), light_sensor.clone()); join_all( config @@ -91,21 +108,6 @@ async fn app() -> anyhow::Result<()> { }) ).await.into_iter().collect::>()?; - // Start the ntfy service if it is configured - if let Some(config) = config.ntfy { - ntfy::start(presence.clone(), config); - } - - // Start the hue bridge if it is configured - if let Some(config) = config.hue_bridge { - hue_bridge::start(presence.clone(), light_sensor.clone(), config); - } - - // Start the debug bridge if it is configured - if let Some(config) = config.debug_bridge { - debug_bridge::start(presence.clone(), light_sensor.clone(), config, client.clone()); - } - // Actually start listening for mqtt message, // we wait until all the setup is done, as otherwise we might miss some messages mqtt.start(); diff --git a/src/ntfy.rs b/src/ntfy.rs index 1c84204..18c2f1f 100644 --- a/src/ntfy.rs +++ b/src/ntfy.rs @@ -1,20 +1,25 @@ use std::collections::HashMap; use async_trait::async_trait; +use tokio::sync::mpsc; use tracing::{warn, error, debug}; use serde::Serialize; use serde_repr::*; use crate::{presence::{self, OnPresence}, config::NtfyConfig}; +pub type Sender = mpsc::Sender; +pub type Receiver = mpsc::Receiver; + struct Ntfy { base_url: String, - topic: String + topic: String, + tx: Sender, } #[derive(Serialize_repr)] #[repr(u8)] -enum Priority { +pub enum Priority { Min = 1, Low, Default, @@ -24,7 +29,7 @@ enum Priority { #[derive(Serialize)] #[serde(rename_all = "snake_case", tag = "action")] -enum ActionType { +pub enum ActionType { Broadcast { #[serde(skip_serializing_if = "HashMap::is_empty")] extras: HashMap @@ -34,7 +39,7 @@ enum ActionType { } #[derive(Serialize)] -struct Action { +pub struct Action { #[serde(flatten)] action: ActionType, label: String, @@ -42,8 +47,14 @@ struct Action { } #[derive(Serialize)] -struct Notification { +struct NotificationFinal { topic: String, + #[serde(flatten)] + inner: Notification, +} + +#[derive(Serialize)] +pub struct Notification { #[serde(skip_serializing_if = "Option::is_none")] title: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -57,53 +68,87 @@ struct Notification { } impl Notification { - fn new(topic: &str) -> Self { - Self { topic: topic.to_owned(), title: None, message: None, tags: Vec::new(), priority: None, actions: Vec::new() } + pub fn new() -> Self { + Self { title: None, message: None, tags: Vec::new(), priority: None, actions: Vec::new() } } - fn set_title(mut self, title: &str) -> Self { + pub fn set_title(mut self, title: &str) -> Self { self.title = Some(title.to_owned()); self } - fn set_message(mut self, message: &str) -> Self { + pub fn set_message(mut self, message: &str) -> Self { self.message = Some(message.to_owned()); self } - fn add_tag(mut self, tag: &str) -> Self { + pub fn add_tag(mut self, tag: &str) -> Self { self.tags.push(tag.to_owned()); self } - fn set_priority(mut self, priority: Priority) -> Self { + pub fn set_priority(mut self, priority: Priority) -> Self { self.priority = Some(priority); self } - fn add_action(mut self, action: Action) -> Self { + pub fn add_action(mut self, action: Action) -> Self { self.actions.push(action); self } + + fn finalize(self, topic: &str) -> NotificationFinal { + NotificationFinal { topic: topic.to_owned(), inner: self } + } } impl Ntfy { - fn new(base_url: &str, topic: &str) -> Self { - Self { base_url: base_url.to_owned(), topic: topic.to_owned() } + fn new(base_url: &str, topic: &str, tx: Sender) -> Self { + Self { base_url: base_url.to_owned(), topic: topic.to_owned(), tx } + } + + async fn send(&self, notification: Notification) { + let notification = notification.finalize(&self.topic); + debug!("Sending notfication"); + + // Create the request + let res = reqwest::Client::new() + .post(self.base_url.clone()) + .json(¬ification) + .send() + .await; + + if let Err(err) = res { + error!("Something went wrong while sending the notifcation: {err}"); + } else if let Ok(res) = res { + let status = res.status(); + if !status.is_success() { + warn!("Received status {status} when sending notification"); + } + } } } -pub fn start(mut rx: presence::Receiver, config: NtfyConfig) { - let mut ntfy = Ntfy::new(&config.url, &config.topic); +pub fn start(mut presence_rx: presence::Receiver, config: &NtfyConfig) -> Sender { + let (tx, mut rx) = mpsc::channel(10); + + let mut ntfy = Ntfy::new(&config.url, &config.topic, tx.clone()); tokio::spawn(async move { - while rx.changed().await.is_ok() { - let presence = *rx.borrow(); - ntfy.on_presence(presence).await; + loop { + tokio::select! { + Ok(_) = presence_rx.changed() => { + let presence = *presence_rx.borrow(); + ntfy.on_presence(presence).await; + }, + Some(notifcation) = rx.recv() => { + ntfy.send(notifcation).await; + } + } } - - unreachable!("Did not expect this"); }); + + return tx; } #[async_trait] @@ -123,29 +168,13 @@ impl OnPresence for Ntfy { }; // Create the notification - let notification = Notification::new(&self.topic) + let notification = Notification::new() .set_title("Presence") .set_message(if presence { "Home" } else { "Away" }) .add_tag("house") .add_action(action) .set_priority(Priority::Low); - debug!("Notifying presence as {presence}"); - - // Create the request - let res = reqwest::Client::new() - .post(self.base_url.clone()) - .json(¬ification) - .send() - .await; - - if let Err(err) = res { - error!("Something went wrong while sending the notifcation: {err}"); - } else if let Ok(res) = res { - let status = res.status(); - if !status.is_success() { - warn!("Received status {status} when sending notification"); - } - } + self.tx.send(notification).await.ok(); } }