From 13f5c87c03757262eb2963a29ddb19916b116696 Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Thu, 12 Jan 2023 02:01:14 +0100 Subject: [PATCH] Improved error handling --- Cargo.lock | 18 +++ Cargo.toml | 1 + google-home/src/fullfillment.rs | 2 +- src/auth.rs | 20 ++-- src/config.rs | 92 +++++++-------- src/devices.rs | 18 +-- src/devices/audio_setup.rs | 19 ++- src/devices/contact_sensor.rs | 16 +-- src/devices/ikea_outlet.rs | 11 +- src/devices/kasa_outlet.rs | 5 +- src/devices/wake_on_lan.rs | 11 +- src/error.rs | 203 ++++++++++++++++++++++++++++++++ src/hue_bridge.rs | 64 +++++----- src/lib.rs | 1 + src/light_sensor.rs | 21 ++-- src/main.rs | 57 +++++---- src/ntfy.rs | 27 +++-- src/presence.rs | 31 +++-- 18 files changed, 445 insertions(+), 172 deletions(-) create mode 100644 src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 25860e0..76dae15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,6 +55,7 @@ dependencies = [ "axum", "bytes", "dotenvy", + "eui48", "futures", "google-home", "impl_cast", @@ -189,6 +190,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "eui48" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "887418ac5e8d57c2e66e04bdc2fe15f9a5407be20b54a82c86bd0e368b709701" +dependencies = [ + "regex", + "rustc-serialize", + "serde", +] + [[package]] name = "flume" version = "0.10.14" @@ -772,6 +784,12 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "rustc-serialize" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda" + [[package]] name = "rustls" version = "0.20.7" diff --git a/Cargo.toml b/Cargo.toml index 4ef6c81..94ba631 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ regex = "1.7.0" async-trait = "0.1.61" async-recursion = "1.0.0" futures = "0.3.25" +eui48 = { version = "1.1.0", features = ["disp_hexstring", "serde"] } [profile.release] lto=true diff --git a/google-home/src/fullfillment.rs b/google-home/src/fullfillment.rs index 1c109c9..e5650ca 100644 --- a/google-home/src/fullfillment.rs +++ b/google-home/src/fullfillment.rs @@ -27,7 +27,7 @@ impl GoogleHome { match payload { Some(payload) => Ok(Response::new(request.request_id, payload)), - _ => Err(anyhow::anyhow!("Something went wrong, expected at least ResponsePayload")), + _ => Err(anyhow::anyhow!("Expected at least one ResponsePayload")), } } diff --git a/src/auth.rs b/src/auth.rs index 16a0bbd..8ee6393 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -2,11 +2,10 @@ use axum::{ async_trait, extract::{FromRequestParts, FromRef}, http::{StatusCode, request::Parts}, - response::{IntoResponse, Response}, }; use serde::Deserialize; -use crate::config::OpenIDConfig; +use crate::{config::OpenIDConfig, error::{ApiError, ApiErrorJson}}; #[derive(Debug, Deserialize)] pub struct User { @@ -19,7 +18,7 @@ where OpenIDConfig: FromRef, S: Send + Sync, { - type Rejection = Response; + type Rejection = ApiError; async fn from_request_parts(parts: &mut Parts, state: &S) -> Result { // Get the state @@ -38,23 +37,26 @@ where // Send the request let res = req.send() .await - .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?; + .map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?; // If the request is success full the auth token is valid and we are given userinfo let status = res.status(); if status.is_success() { let user = res.json() .await - .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?; + .map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?; return Ok(user); } else { - let err = res - .text() + let err: ApiErrorJson = res + .json() .await - .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?; + .map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?; - return Err((status, err).into_response()); + let err = ApiError::try_from(err) + .map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?; + + Err(err) } } } diff --git a/src/config.rs b/src/config.rs index ea31135..be1731f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,14 +1,13 @@ -use std::{fs, error::Error, net::{Ipv4Addr, SocketAddr}, collections::HashMap}; +use std::{fs, net::{Ipv4Addr, SocketAddr}, collections::HashMap}; use async_recursion::async_recursion; use regex::{Regex, Captures}; -use tracing::{debug, trace, error}; +use tracing::{debug, trace}; use rumqttc::{AsyncClient, has_wildcards}; use serde::Deserialize; +use eui48::MacAddress; -use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor, KasaOutlet, AsOnOff}; - -// @TODO Configure more defaults +use crate::{devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor, KasaOutlet}, error::{FailedToParseConfig, MissingEnv, MissingWildcard, Error, FailedToCreateDevice}}; #[derive(Debug, Deserialize)] pub struct Config { @@ -124,14 +123,20 @@ pub struct PresenceDeviceConfig { impl PresenceDeviceConfig { /// Set the mqtt topic to an appropriate value if it is not already set - fn generate_topic(&mut self, class: &str, identifier: &str, config: &Config) { + fn generate_topic(&mut self, class: &str, identifier: &str, config: &Config) -> Result<(), MissingWildcard> { if self.mqtt.is_none() { + if !has_wildcards(&config.presence.topic) { + return Err(MissingWildcard::new(&config.presence.topic).into()); + } + // @TODO This is not perfect, if the topic is some/+/thing/# this will fail let offset = config.presence.topic.find('+').or(config.presence.topic.find('#')).unwrap(); let topic = config.presence.topic[..offset].to_owned() + class + "/" + identifier; trace!("Setting presence mqtt topic: {topic}"); self.mqtt = Some(MqttDeviceConfig { topic }); } + + Ok(()) } } @@ -150,7 +155,7 @@ pub enum Device { info: InfoConfig, #[serde(flatten)] mqtt: MqttDeviceConfig, - mac_address: String, + mac_address: MacAddress, }, KasaOutlet { ip: Ipv4Addr, @@ -169,39 +174,31 @@ pub enum Device { } impl Config { - pub fn build(filename: &str) -> Result> { + pub fn parse_file(filename: &str) -> Result { debug!("Loading config: {filename}"); - let file = fs::read_to_string(filename)?; + let file = fs::read_to_string(filename) + .map_err(|err| FailedToParseConfig::new(filename, err.into()))?; // Substitute in environment variables let re = Regex::new(r"\$\{(.*)\}").unwrap(); - let mut failure = false; + let mut missing = MissingEnv::new(); let file = re.replace_all(&file, |caps: &Captures| { let key = caps.get(1).unwrap().as_str(); debug!("Substituting '{key}' in config"); match std::env::var(key) { Ok(value) => value, Err(_) => { - failure = true; - error!("Environment variable '{key}' is not set"); + missing.add_missing(key); "".to_string() } } }); - if failure { - return Err("Missing environment variables".into()); - } + missing.has_missing() + .map_err(|err| FailedToParseConfig::new(filename, err.into()))?; - let config: Config = toml::from_str(&file)?; - - // Some extra config validation - if !has_wildcards(&config.presence.topic) { - return Err(format!("Invalid presence topic '{}', needs to contain a wildcard (+/#) in order to listen to presence devices", config.presence.topic).into()); - } - - // @TODO It would be nice it was possible to add validation to serde, - // that way we can check that the provided mqtt topics are actually valid + let config: Config = toml::from_str(&file) + .map_err(|err| FailedToParseConfig::new(filename, err.into()))?; Ok(config) } @@ -209,48 +206,51 @@ impl Config { impl Device { #[async_recursion] - pub async fn into(self, identifier: String, config: &Config, client: AsyncClient) -> DeviceBox { - let device: DeviceBox = match self { + pub async fn create(self, identifier: String, config: &Config, client: AsyncClient) -> Result { + let device: Result = match self { Device::IkeaOutlet { info, mqtt, kettle } => { trace!(id = identifier, "IkeaOutlet [{} in {:?}]", info.name, info.room); - Box::new(IkeaOutlet::new(identifier, info, mqtt, kettle, client).await) + match IkeaOutlet::build(&identifier, info, mqtt, kettle, client).await { + Ok(device) => Ok(Box::new(device)), + Err(err) => Err(err), + } }, Device::WakeOnLAN { info, mqtt, mac_address } => { trace!(id = identifier, "WakeOnLan [{} in {:?}]", info.name, info.room); - Box::new(WakeOnLAN::new(identifier, info, mqtt, mac_address, client).await) + match WakeOnLAN::build(&identifier, info, mqtt, mac_address, client).await { + Ok(device) => Ok(Box::new(device)), + Err(err) => Err(err), + } }, Device::KasaOutlet { ip } => { trace!(id = identifier, "KasaOutlet [{}]", identifier); - Box::new(KasaOutlet::new(identifier, ip)) + Ok(Box::new(KasaOutlet::new(&identifier, ip))) } Device::AudioSetup { mqtt, mixer, speakers } => { trace!(id = identifier, "AudioSetup [{}]", identifier); // Create the child devices - let mixer = (*mixer).into(identifier.clone() + ".mixer", config, client.clone()).await; - let speakers = (*speakers).into(identifier.clone() + ".speakers", config, client.clone()).await; + let mixer = (*mixer).create(identifier.clone() + ".mixer", config, client.clone()).await?; + let speakers = (*speakers).create(identifier.clone() + ".speakers", config, client.clone()).await?; - // The AudioSetup expects the children to be something that implements the OnOff trait - // So let's convert the children and make sure OnOff is implemented - let mixer = match AsOnOff::consume(mixer) { - Some(mixer) => mixer, - None => todo!("Handle this properly"), - }; - let speakers = match AsOnOff::consume(speakers) { - Some(speakers) => speakers, - None => todo!("Handle this properly"), - }; - - Box::new(AudioSetup::new(identifier, mqtt, mixer, speakers, client).await) + match AudioSetup::build(&identifier, mqtt, mixer, speakers, client).await { + Ok(device) => Ok(Box::new(device)), + Err(err) => Err(err), + } }, Device::ContactSensor { mqtt, mut presence } => { trace!(id = identifier, "ContactSensor [{}]", identifier); if let Some(presence) = &mut presence { - presence.generate_topic("contact", &identifier, &config); + presence.generate_topic("contact", &identifier, &config) + .map_err(|err| FailedToCreateDevice::new(&identifier, err.into()))?; + } + + match ContactSensor::build(&identifier, mqtt, presence, client).await { + Ok(device) => Ok(Box::new(device)), + Err(err) => Err(err), } - Box::new(ContactSensor::new(identifier, mqtt, presence, client).await) }, }; - return device; + return device.map_err(|err| FailedToCreateDevice::new(&identifier, err)); } } diff --git a/src/devices.rs b/src/devices.rs index e2a8cd4..c1a977c 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -18,7 +18,7 @@ use pollster::FutureExt; use tokio::sync::{oneshot, mpsc}; use tracing::{trace, debug, span, Level}; -use crate::{mqtt::{OnMqtt, self}, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}}; +use crate::{mqtt::{OnMqtt, self}, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}, error}; impl_cast::impl_cast!(Device, OnMqtt); impl_cast::impl_cast!(Device, OnPresence); @@ -58,7 +58,7 @@ enum Command { Fullfillment { google_home: GoogleHome, payload: google_home::Request, - tx: oneshot::Sender, + tx: oneshot::Sender>, }, AddDevice { device: DeviceBox, @@ -75,16 +75,16 @@ pub struct DeviceHandle { impl DeviceHandle { // @TODO Improve error type - pub async fn fullfillment(&self, google_home: GoogleHome, payload: google_home::Request) -> Result { + pub async fn fullfillment(&self, google_home: GoogleHome, payload: google_home::Request) -> anyhow::Result { let (tx, rx) = oneshot::channel(); - self.tx.send(Command::Fullfillment { google_home, payload, tx }).await.unwrap(); - rx.await + self.tx.send(Command::Fullfillment { google_home, payload, tx }).await?; + rx.await? } - pub async fn add_device(&self, device: DeviceBox) { + pub async fn add_device(&self, device: DeviceBox) -> error::Result<()> { let (tx, rx) = oneshot::channel(); - self.tx.send(Command::AddDevice { device, tx }).await.unwrap(); - rx.await.ok(); + self.tx.send(Command::AddDevice { device, tx }).await?; + Ok(rx.await?) } } @@ -123,7 +123,7 @@ impl Devices { fn handle_cmd(&mut self, cmd: Command) { match cmd { Command::Fullfillment { google_home, payload, tx } => { - let result = google_home.handle_request(payload, &mut self.as_google_home_devices()).unwrap(); + let result = google_home.handle_request(payload, &mut self.as_google_home_devices()); tx.send(result).ok(); }, Command::AddDevice { device, tx } => { diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index cf739fc..06e5137 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -4,10 +4,11 @@ use rumqttc::{AsyncClient, matches}; use tracing::{error, warn, debug}; use crate::config::MqttDeviceConfig; +use crate::error; use crate::mqtt::{OnMqtt, RemoteMessage, RemoteAction}; use crate::presence::OnPresence; -use super::Device; +use super::{Device, DeviceBox, AsOnOff}; // @TODO Ideally we store am Arc to the childern devices, // that way they hook into everything just like all other devices @@ -20,10 +21,20 @@ pub struct AudioSetup { } impl AudioSetup { - pub async fn new(identifier: String, mqtt: MqttDeviceConfig, mixer: Box, speakers: Box, client: AsyncClient) -> Self { - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap(); + pub async fn build(identifier: &str, mqtt: MqttDeviceConfig, mixer: DeviceBox, speakers: DeviceBox, client: AsyncClient) -> error::Result { + // We expect the children devices to implement the OnOff trait + let mixer = match AsOnOff::consume(mixer) { + Some(mixer) => mixer, + None => Err(error::ExpectedOnOff::new(&(identifier.to_owned() + ".mixer")))?, + }; + let speakers = match AsOnOff::consume(speakers) { + Some(speakers) => speakers, + None => Err(error::ExpectedOnOff::new(&(identifier.to_owned() + ".speakers")))?, + }; - Self { identifier, mqtt, mixer, speakers } + client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; + + Ok(Self { identifier: identifier.to_owned(), mqtt, mixer, speakers }) } } diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs index 330294c..c1954a3 100644 --- a/src/devices/contact_sensor.rs +++ b/src/devices/contact_sensor.rs @@ -5,7 +5,7 @@ use rumqttc::{AsyncClient, matches}; use tokio::task::JoinHandle; use tracing::{error, debug, warn}; -use crate::{config::{MqttDeviceConfig, PresenceDeviceConfig}, mqtt::{OnMqtt, ContactMessage, PresenceMessage}, presence::OnPresence}; +use crate::{config::{MqttDeviceConfig, PresenceDeviceConfig}, mqtt::{OnMqtt, ContactMessage, PresenceMessage}, presence::OnPresence, error}; use super::Device; @@ -22,18 +22,18 @@ pub struct ContactSensor { } impl ContactSensor { - pub async fn new(identifier: String, mqtt: MqttDeviceConfig, presence: Option, client: AsyncClient) -> Self { - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap(); + pub async fn build(identifier: &str, mqtt: MqttDeviceConfig, presence: Option, client: AsyncClient) -> error::Result { + client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; - Self { - identifier, + Ok(Self { + identifier: identifier.to_owned(), mqtt, presence, client, overall_presence: false, is_closed: true, handle: None, - } + }) } } @@ -97,7 +97,7 @@ impl OnMqtt for ContactSensor { // This is to prevent the house from being marked as present for however long the // timeout is set when leaving the house if !self.overall_presence { - self.client.publish(topic, rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&PresenceMessage::new(true)).unwrap()).await.unwrap(); + self.client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&PresenceMessage::new(true)).unwrap()).await.map_err(|err| warn!("Failed to publish presence on {topic}: {err}")).ok(); } } else { // Once the door is closed again we start a timeout for removing the presence @@ -109,7 +109,7 @@ impl OnMqtt for ContactSensor { debug!(id, "Starting timeout ({timeout:?}) for contact sensor..."); tokio::time::sleep(timeout).await; debug!(id, "Removing door device!"); - client.publish(topic, rumqttc::QoS::AtLeastOnce, false, "").await.unwrap(); + client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "").await.map_err(|err| warn!("Failed to publish presence on {topic}: {err}")).ok(); }) ); } diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index b2b07c8..b2ef5d2 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -4,12 +4,13 @@ use async_trait::async_trait; use google_home::errors::ErrorCode; use google_home::{GoogleHomeDevice, device, types::Type, traits::{self, OnOff}}; use rumqttc::{AsyncClient, Publish, matches}; -use tracing::{debug, trace, error}; +use tracing::{debug, trace, error, warn}; use tokio::task::JoinHandle; use pollster::FutureExt as _; use crate::config::{KettleConfig, InfoConfig, MqttDeviceConfig}; use crate::devices::Device; +use crate::error; use crate::mqtt::{OnMqtt, OnOffMessage}; use crate::presence::OnPresence; @@ -26,11 +27,11 @@ pub struct IkeaOutlet { } impl IkeaOutlet { - pub async fn new(identifier: String, info: InfoConfig, mqtt: MqttDeviceConfig, kettle: Option, client: AsyncClient) -> Self { + pub async fn build(identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, kettle: Option, client: AsyncClient) -> error::Result { // @TODO Handle potential errors here - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap(); + client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; - Self{ identifier, info, mqtt, kettle, client, last_known_state: false, handle: None } + Ok(Self{ identifier: identifier.to_owned(), info, mqtt, kettle, client, last_known_state: false, handle: None }) } } @@ -38,7 +39,7 @@ async fn set_on(client: AsyncClient, topic: String, on: bool) { let message = OnOffMessage::new(on); // @TODO Handle potential errors here - client.publish(topic + "/set", rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap()).await.unwrap(); + client.publish(topic.clone() + "/set", rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap()).await.map_err(|err| warn!("Failed to update state on {topic}: {err}")).ok(); } impl Device for IkeaOutlet { diff --git a/src/devices/kasa_outlet.rs b/src/devices/kasa_outlet.rs index abbe956..bb3f9a5 100644 --- a/src/devices/kasa_outlet.rs +++ b/src/devices/kasa_outlet.rs @@ -13,9 +13,8 @@ pub struct KasaOutlet { } impl KasaOutlet { - pub fn new(identifier: String, ip: Ipv4Addr) -> Self { - // @TODO Get the current state of the outlet - Self { identifier, addr: (ip, 9999).into() } + pub fn new(identifier: &str, ip: Ipv4Addr) -> Self { + Self { identifier: identifier.to_owned(), addr: (ip, 9999).into() } } } diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index 493fdac..baeffe8 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -3,8 +3,9 @@ use google_home::{GoogleHomeDevice, types::Type, device, traits::{self, Scene}, use tracing::{debug, error}; use rumqttc::{AsyncClient, Publish, matches}; use pollster::FutureExt as _; +use eui48::MacAddress; -use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::{OnMqtt, ActivateMessage}}; +use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::{OnMqtt, ActivateMessage}, error}; use super::Device; @@ -13,15 +14,15 @@ pub struct WakeOnLAN { identifier: String, info: InfoConfig, mqtt: MqttDeviceConfig, - mac_address: String, + mac_address: MacAddress, } impl WakeOnLAN { - pub async fn new(identifier: String, info: InfoConfig, mqtt: MqttDeviceConfig, mac_address: String, client: AsyncClient) -> Self { + pub async fn build(identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, mac_address: MacAddress, client: AsyncClient) -> error::Result { // @TODO Handle potential errors here - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap(); + client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; - Self { identifier, info, mqtt, mac_address } + Ok(Self { identifier: identifier.to_owned(), info, mqtt, mac_address }) } } diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..a7290b9 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,203 @@ +use std::{fmt, error, result}; + +use axum::{response::IntoResponse, http::status::InvalidStatusCode}; +use serde::{Serialize, Deserialize}; + +pub type Error = Box; +pub type Result = result::Result; + +#[derive(Debug, Clone)] +pub struct MissingEnv { + keys: Vec +} + +// @TODO Would be nice to somehow get the line number of the missing keys +impl MissingEnv { + pub fn new() -> Self { + Self { keys: Vec::new() } + } + + pub fn add_missing(&mut self, key: &str) { + self.keys.push(key.to_owned()); + } + + pub fn has_missing(self) -> result::Result<(), Self> { + if self.keys.len() > 0 { + Err(self) + } else { + Ok(()) + } + } +} + +impl fmt::Display for MissingEnv { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Missing environment variable")?; + if self.keys.len() == 0 { + unreachable!("This error should only be returned if there are actually missing environment variables"); + } + if self.keys.len() == 1 { + write!(f, " '{}'", self.keys[0])?; + } else { + write!(f, "s '{}'", self.keys[0])?; + self.keys.iter().skip(1).map(|key| { + write!(f, ", '{key}'") + }).collect::()?; + } + + Ok(()) + } +} + +impl error::Error for MissingEnv {} + + +// @TODO Would be nice to somehow get the line number of the expected wildcard topic +#[derive(Debug, Clone)] +pub struct MissingWildcard { + topic: String +} + +impl MissingWildcard { + pub fn new(topic: &str) -> Self { + Self { topic: topic.to_owned() } + } +} + +impl fmt::Display for MissingWildcard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Topic '{}' is exptected to be a wildcard topic", self.topic) + } +} + +impl error::Error for MissingWildcard {} + + +#[derive(Debug)] +pub struct FailedToParseConfig { + config: String, + cause: Error, +} + +impl FailedToParseConfig { + pub fn new(config: &str, cause: Error) -> Self { + Self { config: config.to_owned(), cause } + } +} + +impl fmt::Display for FailedToParseConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Failed to parse config '{}'", self.config) + } +} + +impl error::Error for FailedToParseConfig { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + Some(self.cause.as_ref()) + } +} + + +#[derive(Debug)] +pub struct FailedToCreateDevice { + device: String, + cause: Error, +} + +impl FailedToCreateDevice { + pub fn new(device: &str, cause: Error) -> Self { + Self { device: device.to_owned(), cause } + } +} + +impl fmt::Display for FailedToCreateDevice { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Failed to create device '{}'", self.device) + } +} + +impl error::Error for FailedToCreateDevice { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + Some(self.cause.as_ref()) + } +} + + +#[derive(Debug, Clone)] +pub struct ExpectedOnOff { + device: String +} + +impl ExpectedOnOff { + pub fn new(device: &str) -> Self { + Self { device: device.to_owned() } + } +} + +impl fmt::Display for ExpectedOnOff { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Expected device '{}' to implement OnOff trait", self.device) + } +} + +impl error::Error for ExpectedOnOff {} + + +#[derive(Debug)] +pub struct ApiError { + status_code: axum::http::StatusCode, + error: Error, +} + +impl ApiError { + pub fn new(status_code: axum::http::StatusCode, error: Error) -> Self { + Self { status_code, error } + } + + pub fn prepare_for_json(&self) -> ApiErrorJson { + let error = ApiErrorJsonError { + code: self.status_code.as_u16(), + status: self.status_code.to_string(), + reason: self.error.to_string(), + }; + + ApiErrorJson { error } + } +} + +impl fmt::Display for ApiError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.error.fmt(f) + } +} + +impl error::Error for ApiError {} + +impl IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + (self.status_code, serde_json::to_string(&self.prepare_for_json()).unwrap()).into_response() + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct ApiErrorJsonError { + code: u16, + status: String, + reason: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ApiErrorJson { + error: ApiErrorJsonError, +} + +impl TryFrom for ApiError { + type Error = InvalidStatusCode; + + fn try_from(value: ApiErrorJson) -> result::Result { + let status_code = axum::http::StatusCode::from_u16(value.error.code)?; + let error = value.error.reason.into(); + + Ok(Self { status_code, error }) + } +} diff --git a/src/hue_bridge.rs b/src/hue_bridge.rs index 06ab8f5..138c126 100644 --- a/src/hue_bridge.rs +++ b/src/hue_bridge.rs @@ -11,7 +11,7 @@ pub enum Flag { Darkness, } -pub struct HueBridge { +struct HueBridge { addr: SocketAddr, login: String, flags: Flags, @@ -23,37 +23,8 @@ struct FlagMessage { } impl HueBridge { - pub fn create(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: HueBridgeConfig) { - let mut hue_bridge = Self { - addr: (config.ip, 80).into(), - login: config.login, - flags: config.flags, - }; - - tokio::spawn(async move { - loop { - tokio::select! { - res = presence_rx.changed() => { - if !res.is_ok() { - break; - } - - let presence = *presence_rx.borrow(); - hue_bridge.on_presence(presence).await; - } - res = light_sensor_rx.changed() => { - if !res.is_ok() { - break; - } - - let darkness = *light_sensor_rx.borrow(); - hue_bridge.on_darkness(darkness).await; - } - } - } - - unreachable!("Did not expect this"); - }); + pub fn new(addr: SocketAddr, login: &str, flags: Flags) -> Self { + Self { addr, login: login.to_owned(), flags } } pub async fn set_flag(&self, flag: Flag, value: bool) { @@ -83,6 +54,35 @@ impl HueBridge { } } +pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: HueBridgeConfig) { + let mut hue_bridge = HueBridge::new((config.ip, 80).into(), &config.login, config.flags); + + tokio::spawn(async move { + loop { + tokio::select! { + res = presence_rx.changed() => { + if !res.is_ok() { + break; + } + + let presence = *presence_rx.borrow(); + hue_bridge.on_presence(presence).await; + } + res = light_sensor_rx.changed() => { + if !res.is_ok() { + break; + } + + let darkness = *light_sensor_rx.borrow(); + hue_bridge.on_darkness(darkness).await; + } + } + } + + unreachable!("Did not expect this"); + }); +} + #[async_trait] impl OnPresence for HueBridge { async fn on_presence(&mut self, presence: bool) { diff --git a/src/lib.rs b/src/lib.rs index 770d471..134620e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,3 +7,4 @@ pub mod ntfy; pub mod light_sensor; pub mod hue_bridge; pub mod auth; +pub mod error; diff --git a/src/light_sensor.rs b/src/light_sensor.rs index 50e6827..75ea3cf 100644 --- a/src/light_sensor.rs +++ b/src/light_sensor.rs @@ -3,7 +3,7 @@ use rumqttc::{matches, AsyncClient}; use tokio::sync::watch; use tracing::{error, trace, debug}; -use crate::{config::{MqttDeviceConfig, LightSensorConfig}, mqtt::{self, OnMqtt, BrightnessMessage}}; +use crate::{config::{MqttDeviceConfig, LightSensorConfig}, mqtt::{self, OnMqtt, BrightnessMessage}, error}; #[async_trait] pub trait OnDarkness { @@ -14,18 +14,25 @@ pub type Receiver = watch::Receiver; type Sender = watch::Sender; struct LightSensor { - is_dark: Receiver, mqtt: MqttDeviceConfig, min: isize, max: isize, tx: Sender, + is_dark: Receiver, } -pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, client: AsyncClient) -> Receiver { - client.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap(); +impl LightSensor { + fn new(mqtt: MqttDeviceConfig, min: isize, max: isize) -> Self { + let (tx, is_dark) = watch::channel(false); + Self { is_dark: is_dark.clone(), mqtt, min, max, tx } + } +} - let (tx, is_dark) = watch::channel(false); - let mut light_sensor = LightSensor { is_dark: is_dark.clone(), mqtt: config.mqtt, min: config.min, max: config.max, tx }; +pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, client: AsyncClient) -> error::Result { + client.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; + + let mut light_sensor = LightSensor::new(config.mqtt, config.min, config.max); + let is_dark = light_sensor.is_dark.clone(); tokio::spawn(async move { loop { @@ -36,7 +43,7 @@ pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, clien } }); - return is_dark; + Ok(is_dark) } #[async_trait] diff --git a/src/main.rs b/src/main.rs index a26d4ed..6ec9024 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,16 @@ #![feature(async_closure)] use std::{process, time::Duration}; -use axum::{extract::FromRef, http::StatusCode, routing::post, Json, Router}; +use axum::{extract::FromRef, http::StatusCode, routing::post, Json, Router, response::IntoResponse}; use automation::{ auth::User, config::{Config, OpenIDConfig}, devices, - hue_bridge::HueBridge, + hue_bridge, light_sensor, mqtt::Mqtt, - ntfy::Ntfy, - presence, + ntfy, + presence, error::ApiError, }; use dotenvy::dotenv; use rumqttc::{AsyncClient, MqttOptions, Transport}; @@ -33,6 +33,19 @@ impl FromRef for automation::config::OpenIDConfig { #[tokio::main] async fn main() { + if let Err(err) = app().await { + error!("Error: {err}"); + let mut cause = err.source(); + while let Some(c) = cause { + error!("Cause: {c}"); + cause = c.source(); + } + process::exit(1); + } +} + + +async fn app() -> Result<(), Box> { dotenv().ok(); let filter = EnvFilter::builder() @@ -44,10 +57,7 @@ async fn main() { info!("Starting automation_rs..."); let config = std::env::var("AUTOMATION_CONFIG").unwrap_or("./config/config.toml".to_owned()); - let config = Config::build(&config).unwrap_or_else(|err| { - error!("Failed to load config: {err}"); - process::exit(1); - }); + let config = Config::parse_file(&config)?; // Configure MQTT let mqtt = config.mqtt.clone(); @@ -59,8 +69,8 @@ async fn main() { // Create a mqtt client and wrap the eventloop let (client, eventloop) = AsyncClient::new(mqttoptions, 10); let mqtt = Mqtt::new(eventloop); - let presence = presence::start(mqtt.subscribe(), config.presence.clone(), client.clone()).await; - let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await; + let presence = presence::start(config.presence.clone(), mqtt.subscribe(), client.clone()).await?; + let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await?; let devices = devices::start(mqtt.subscribe(), presence.clone(), light_sensor.clone()); join_all( @@ -69,21 +79,20 @@ async fn main() { .clone() .into_iter() .map(|(identifier, device_config)| async { - // This can technically block, but this only happens during start-up, so should not be - // a problem - let device = device_config.into(identifier, &config, client.clone()).await; - devices.add_device(device).await; + let device = device_config.create(identifier, &config, client.clone()).await?; + devices.add_device(device).await?; + Ok::<(), Box>(()) }) - ).await; + ).await.into_iter().collect::>()?; // Start the ntfy service if it is configured if let Some(ntfy_config) = config.ntfy { - Ntfy::create(presence.clone(), ntfy_config); + ntfy::start(presence.clone(), &ntfy_config); } // Start he hue bridge if it is configured if let Some(hue_bridge_config) = config.hue_bridge { - HueBridge::create(presence.clone(), light_sensor.clone(), hue_bridge_config); + hue_bridge::start(presence.clone(), light_sensor.clone(), hue_bridge_config); } // Actually start listening for mqtt message, @@ -96,11 +105,14 @@ async fn main() { post(async move |user: User, Json(payload): Json| { debug!(username = user.preferred_username, "{payload:#?}"); let gc = GoogleHome::new(&user.preferred_username); - let result = devices.fullfillment(gc, payload).await.unwrap(); + let result = match devices.fullfillment(gc, payload).await { + Ok(result) => result, + Err(err) => return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()).into_response(), + }; debug!(username = user.preferred_username, "{result:#?}"); - return (StatusCode::OK, Json(result)); + return (StatusCode::OK, Json(result)).into_response(); }), ); @@ -114,8 +126,9 @@ async fn main() { // Start the web server let addr = config.fullfillment.into(); info!("Server started on http://{addr}"); - axum::Server::bind(&addr) + axum::Server::try_bind(&addr)? .serve(app.into_make_service()) - .await - .unwrap(); + .await?; + + Ok(()) } diff --git a/src/ntfy.rs b/src/ntfy.rs index f77bf19..6d06293 100644 --- a/src/ntfy.rs +++ b/src/ntfy.rs @@ -7,7 +7,7 @@ use serde_repr::*; use crate::{presence::{self, OnPresence}, config::NtfyConfig}; -pub struct Ntfy { +struct Ntfy { base_url: String, topic: String } @@ -88,19 +88,24 @@ impl Notification { } impl Ntfy { - pub fn create(mut rx: presence::Receiver, config: NtfyConfig) { - let mut ntfy = Self { base_url: config.url, topic: config.topic }; - tokio::spawn(async move { - while rx.changed().await.is_ok() { - let presence = *rx.borrow(); - ntfy.on_presence(presence).await; - } - - unreachable!("Did not expect this"); - }); + fn new(base_url: &str, topic: &str) -> Self { + Self { base_url: base_url.to_owned(), topic: topic.to_owned() } } } +pub fn start(mut rx: presence::Receiver, config: &NtfyConfig) { + let mut ntfy = Ntfy::new(&config.url, &config.topic); + + tokio::spawn(async move { + while rx.changed().await.is_ok() { + let presence = *rx.borrow(); + ntfy.on_presence(presence).await; + } + + unreachable!("Did not expect this"); + }); +} + #[async_trait] impl OnPresence for Ntfy { async fn on_presence(&mut self, presence: bool) { diff --git a/src/presence.rs b/src/presence.rs index 38aea3e..a5e0b30 100644 --- a/src/presence.rs +++ b/src/presence.rs @@ -3,9 +3,9 @@ use std::collections::HashMap; use async_trait::async_trait; use tokio::sync::watch; use tracing::{debug, error}; -use rumqttc::{AsyncClient, matches}; +use rumqttc::{AsyncClient, matches, has_wildcards}; -use crate::{mqtt::{OnMqtt, PresenceMessage, self}, config::MqttDeviceConfig}; +use crate::{mqtt::{OnMqtt, PresenceMessage, self}, config::MqttDeviceConfig, error::{self, MissingWildcard}}; #[async_trait] pub trait OnPresence { @@ -17,17 +17,28 @@ type Sender = watch::Sender; struct Presence { devices: HashMap, - overall_presence: Receiver, mqtt: MqttDeviceConfig, tx: Sender, + overall_presence: Receiver, } -pub async fn start(mut mqtt_rx: mqtt::Receiver, mqtt: MqttDeviceConfig, client: AsyncClient) -> Receiver { - // Subscribe to the relevant topics on mqtt - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap(); +impl Presence { + fn build(mqtt: MqttDeviceConfig) -> Result { + if !has_wildcards(&mqtt.topic) { + return Err(MissingWildcard::new(&mqtt.topic).into()); + } - let (tx, overall_presence) = watch::channel(false); - let mut presence = Presence { devices: HashMap::new(), overall_presence: overall_presence.clone(), mqtt, tx }; + let (tx, overall_presence) = watch::channel(false); + Ok(Self { devices: HashMap::new(), overall_presence: overall_presence.clone(), mqtt, tx }) + } +} + +pub async fn start(mqtt: MqttDeviceConfig, mut mqtt_rx: mqtt::Receiver, client: AsyncClient) -> error::Result { + // Subscribe to the relevant topics on mqtt + client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; + + let mut presence = Presence::build(mqtt)?; + let overall_presence = presence.overall_presence.clone(); tokio::spawn(async move { loop { @@ -38,7 +49,7 @@ pub async fn start(mut mqtt_rx: mqtt::Receiver, mqtt: MqttDeviceConfig, client: } }); - return overall_presence; + Ok(overall_presence) } #[async_trait] @@ -48,7 +59,7 @@ impl OnMqtt for Presence { return; } - let offset = self.mqtt.topic.find('+').or(self.mqtt.topic.find('#')).unwrap(); + let offset = self.mqtt.topic.find('+').or(self.mqtt.topic.find('#')).expect("Presence::new fails if it does not contain wildcards"); let device_name = &message.topic[offset..]; if message.payload.len() == 0 {