ntfy notifications are now send through a channel, allowing notifications to be send from other places in the program

This commit is contained in:
Dreaded_X 2023-01-24 19:00:38 +01:00
parent 6c8b73f60f
commit 18bca5abf4
Signed by: Dreaded_X
GPG Key ID: 76BDEC4E165D8AD9
5 changed files with 91 additions and 60 deletions

View File

@ -87,7 +87,7 @@ pub struct LightSensorConfig {
pub max: isize,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct Flags {
pub presence: isize,
pub darkness: isize,

View File

@ -15,7 +15,7 @@ impl DebugBridge {
}
}
pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: DebugBridgeConfig, client: AsyncClient) {
pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: &DebugBridgeConfig, client: AsyncClient) {
let mut debug_bridge = DebugBridge::new(&config.topic, client);
tokio::spawn(async move {

View File

@ -54,8 +54,8 @@ impl HueBridge {
}
}
pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: HueBridgeConfig) {
let mut hue_bridge = HueBridge::new((config.ip, 80).into(), &config.login, config.flags);
pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: &HueBridgeConfig) {
let mut hue_bridge = HueBridge::new((config.ip, 80).into(), &config.login, config.flags.clone());
tokio::spawn(async move {
loop {

View File

@ -1,5 +1,5 @@
#![feature(async_closure)]
use std::{process, time::Duration};
use std::{process, time::Duration, collections::HashMap};
use axum::{extract::FromRef, http::StatusCode, routing::post, Json, Router, response::IntoResponse};
@ -13,7 +13,7 @@ use automation::{
presence, error::ApiError, debug_bridge,
};
use dotenvy::dotenv;
use rumqttc::{AsyncClient, MqttOptions, Transport};
use rumqttc::{AsyncClient, MqttOptions, Transport, matches};
use tracing::{debug, error, info, metadata::LevelFilter};
use futures::future::join_all;
@ -75,6 +75,23 @@ async fn app() -> anyhow::Result<()> {
let presence = presence::start(config.presence.clone(), mqtt.subscribe(), client.clone()).await?;
let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await?;
// Start the ntfy service if it is configured
let mut ntfy = None;
if let Some(config) = &config.ntfy {
ntfy = Some(ntfy::start(presence.clone(), config));
}
let ntfy = ntfy;
// Start the hue bridge if it is configured
if let Some(config) = &config.hue_bridge {
hue_bridge::start(presence.clone(), light_sensor.clone(), config);
}
// Start the debug bridge if it is configured
if let Some(config) = &config.debug_bridge {
debug_bridge::start(presence.clone(), light_sensor.clone(), config, client.clone());
}
let devices = devices::start(mqtt.subscribe(), presence.clone(), light_sensor.clone());
join_all(
config
@ -91,21 +108,6 @@ async fn app() -> anyhow::Result<()> {
})
).await.into_iter().collect::<Result<_, _>>()?;
// Start the ntfy service if it is configured
if let Some(config) = config.ntfy {
ntfy::start(presence.clone(), config);
}
// Start the hue bridge if it is configured
if let Some(config) = config.hue_bridge {
hue_bridge::start(presence.clone(), light_sensor.clone(), config);
}
// Start the debug bridge if it is configured
if let Some(config) = config.debug_bridge {
debug_bridge::start(presence.clone(), light_sensor.clone(), config, client.clone());
}
// Actually start listening for mqtt message,
// we wait until all the setup is done, as otherwise we might miss some messages
mqtt.start();

View File

@ -1,20 +1,25 @@
use std::collections::HashMap;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tracing::{warn, error, debug};
use serde::Serialize;
use serde_repr::*;
use crate::{presence::{self, OnPresence}, config::NtfyConfig};
pub type Sender = mpsc::Sender<Notification>;
pub type Receiver = mpsc::Receiver<Notification>;
struct Ntfy {
base_url: String,
topic: String
topic: String,
tx: Sender,
}
#[derive(Serialize_repr)]
#[repr(u8)]
enum Priority {
pub enum Priority {
Min = 1,
Low,
Default,
@ -24,7 +29,7 @@ enum Priority {
#[derive(Serialize)]
#[serde(rename_all = "snake_case", tag = "action")]
enum ActionType {
pub enum ActionType {
Broadcast {
#[serde(skip_serializing_if = "HashMap::is_empty")]
extras: HashMap<String, String>
@ -34,7 +39,7 @@ enum ActionType {
}
#[derive(Serialize)]
struct Action {
pub struct Action {
#[serde(flatten)]
action: ActionType,
label: String,
@ -42,8 +47,14 @@ struct Action {
}
#[derive(Serialize)]
struct Notification {
struct NotificationFinal {
topic: String,
#[serde(flatten)]
inner: Notification,
}
#[derive(Serialize)]
pub struct Notification {
#[serde(skip_serializing_if = "Option::is_none")]
title: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
@ -57,53 +68,87 @@ struct Notification {
}
impl Notification {
fn new(topic: &str) -> Self {
Self { topic: topic.to_owned(), title: None, message: None, tags: Vec::new(), priority: None, actions: Vec::new() }
pub fn new() -> Self {
Self { title: None, message: None, tags: Vec::new(), priority: None, actions: Vec::new() }
}
fn set_title(mut self, title: &str) -> Self {
pub fn set_title(mut self, title: &str) -> Self {
self.title = Some(title.to_owned());
self
}
fn set_message(mut self, message: &str) -> Self {
pub fn set_message(mut self, message: &str) -> Self {
self.message = Some(message.to_owned());
self
}
fn add_tag(mut self, tag: &str) -> Self {
pub fn add_tag(mut self, tag: &str) -> Self {
self.tags.push(tag.to_owned());
self
}
fn set_priority(mut self, priority: Priority) -> Self {
pub fn set_priority(mut self, priority: Priority) -> Self {
self.priority = Some(priority);
self
}
fn add_action(mut self, action: Action) -> Self {
pub fn add_action(mut self, action: Action) -> Self {
self.actions.push(action);
self
}
fn finalize(self, topic: &str) -> NotificationFinal {
NotificationFinal { topic: topic.to_owned(), inner: self }
}
}
impl Ntfy {
fn new(base_url: &str, topic: &str) -> Self {
Self { base_url: base_url.to_owned(), topic: topic.to_owned() }
fn new(base_url: &str, topic: &str, tx: Sender) -> Self {
Self { base_url: base_url.to_owned(), topic: topic.to_owned(), tx }
}
async fn send(&self, notification: Notification) {
let notification = notification.finalize(&self.topic);
debug!("Sending notfication");
// Create the request
let res = reqwest::Client::new()
.post(self.base_url.clone())
.json(&notification)
.send()
.await;
if let Err(err) = res {
error!("Something went wrong while sending the notifcation: {err}");
} else if let Ok(res) = res {
let status = res.status();
if !status.is_success() {
warn!("Received status {status} when sending notification");
}
}
}
}
pub fn start(mut rx: presence::Receiver, config: NtfyConfig) {
let mut ntfy = Ntfy::new(&config.url, &config.topic);
pub fn start(mut presence_rx: presence::Receiver, config: &NtfyConfig) -> Sender {
let (tx, mut rx) = mpsc::channel(10);
let mut ntfy = Ntfy::new(&config.url, &config.topic, tx.clone());
tokio::spawn(async move {
while rx.changed().await.is_ok() {
let presence = *rx.borrow();
ntfy.on_presence(presence).await;
loop {
tokio::select! {
Ok(_) = presence_rx.changed() => {
let presence = *presence_rx.borrow();
ntfy.on_presence(presence).await;
},
Some(notifcation) = rx.recv() => {
ntfy.send(notifcation).await;
}
}
}
unreachable!("Did not expect this");
});
return tx;
}
#[async_trait]
@ -123,29 +168,13 @@ impl OnPresence for Ntfy {
};
// Create the notification
let notification = Notification::new(&self.topic)
let notification = Notification::new()
.set_title("Presence")
.set_message(if presence { "Home" } else { "Away" })
.add_tag("house")
.add_action(action)
.set_priority(Priority::Low);
debug!("Notifying presence as {presence}");
// Create the request
let res = reqwest::Client::new()
.post(self.base_url.clone())
.json(&notification)
.send()
.await;
if let Err(err) = res {
error!("Something went wrong while sending the notifcation: {err}");
} else if let Ok(res) = res {
let status = res.status();
if !status.is_success() {
warn!("Received status {status} when sending notification");
}
}
self.tx.send(notification).await.ok();
}
}