diff --git a/google-home/src/device.rs b/google-home/src/device.rs index f3346b6..a9383ad 100644 --- a/google-home/src/device.rs +++ b/google-home/src/device.rs @@ -1,6 +1,12 @@ use serde::Serialize; -use crate::{response, types::Type, traits::{AsOnOff, Trait, AsScene}, errors::{DeviceError, ErrorCode}, request::execute::CommandType}; +use crate::{ + errors::{DeviceError, ErrorCode}, + request::execute::CommandType, + response, + traits::{AsOnOff, AsScene, Trait}, + types::Type, +}; pub trait GoogleHomeDevice: AsOnOff + AsScene { fn get_device_type(&self) -> Type; @@ -19,11 +25,10 @@ pub trait GoogleHomeDevice: AsOnOff + AsScene { None } - - fn sync(&self) -> response::sync::Device { let name = self.get_device_name(); - let mut device = response::sync::Device::new(self.get_id(), &name.name, self.get_device_type()); + let mut device = + response::sync::Device::new(self.get_id(), &name.name, self.get_device_type()); device.name = name; device.will_report_state = self.will_report_state(); @@ -60,9 +65,7 @@ pub trait GoogleHomeDevice: AsOnOff + AsScene { // OnOff if let Some(on_off) = AsOnOff::cast(self) { - device.state.on = on_off.is_on() - .map_err(|err| device.set_error(err)) - .ok(); + device.state.on = on_off.is_on().map_err(|err| device.set_error(err)).ok(); } device @@ -75,13 +78,13 @@ pub trait GoogleHomeDevice: AsOnOff + AsScene { .ok_or::(DeviceError::ActionNotAvailable.into())?; on_off.set_on(*on)?; - }, + } CommandType::ActivateScene { deactivate } => { let scene = AsScene::cast_mut(self) .ok_or::(DeviceError::ActionNotAvailable.into())?; scene.set_active(!deactivate)?; - }, + } } Ok(()) @@ -100,7 +103,11 @@ pub struct Name { impl Name { pub fn new(name: &str) -> Self { - Self { default_names: Vec::new(), name: name.into(), nicknames: Vec::new() } + Self { + default_names: Vec::new(), + name: name.into(), + nicknames: Vec::new(), + } } pub fn add_default_name(&mut self, name: &str) { diff --git a/google-home/src/errors.rs b/google-home/src/errors.rs index 12aec14..1d4fa39 100644 --- a/google-home/src/errors.rs +++ b/google-home/src/errors.rs @@ -1,5 +1,5 @@ -use thiserror::Error; use serde::Serialize; +use thiserror::Error; #[derive(Debug, Hash, PartialEq, Eq, Copy, Clone, Serialize, Error)] #[serde(rename_all = "camelCase")] @@ -16,8 +16,7 @@ pub enum DeviceError { #[derive(Debug, Hash, PartialEq, Eq, Copy, Clone, Serialize, Error)] #[serde(rename_all = "camelCase")] -pub enum DeviceException { -} +pub enum DeviceException {} #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Error)] #[serde(untagged)] diff --git a/google-home/src/fullfillment.rs b/google-home/src/fullfillment.rs index 861e3b8..dcf6ae9 100644 --- a/google-home/src/fullfillment.rs +++ b/google-home/src/fullfillment.rs @@ -2,7 +2,12 @@ use std::collections::HashMap; use thiserror::Error; -use crate::{request::{Request, Intent, self}, device::GoogleHomeDevice, response::{sync, ResponsePayload, query, execute, Response, self, State}, errors::{DeviceError, ErrorCode}}; +use crate::{ + device::GoogleHomeDevice, + errors::{DeviceError, ErrorCode}, + request::{self, Intent, Request}, + response::{self, execute, query, sync, Response, ResponsePayload, State}, +}; #[derive(Debug)] pub struct GoogleHome { @@ -13,15 +18,21 @@ pub struct GoogleHome { #[derive(Debug, Error)] pub enum FullfillmentError { #[error("Expected at least one ResponsePayload")] - ExpectedOnePayload + ExpectedOnePayload, } impl GoogleHome { pub fn new(user_id: &str) -> Self { - Self { user_id: user_id.into() } + Self { + user_id: user_id.into(), + } } - pub fn handle_request(&self, request: Request, devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>) -> Result { + pub fn handle_request( + &self, + request: Request, + devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>, + ) -> Result { // TODO: What do we do if we actually get more then one thing in the input array, right now // we only respond to the first thing let payload = request @@ -30,8 +41,11 @@ impl GoogleHome { .map(|input| match input { Intent::Sync => ResponsePayload::Sync(self.sync(devices)), Intent::Query(payload) => ResponsePayload::Query(self.query(payload, devices)), - Intent::Execute(payload) => ResponsePayload::Execute(self.execute(payload, devices)), - }).next(); + Intent::Execute(payload) => { + ResponsePayload::Execute(self.execute(payload, devices)) + } + }) + .next(); payload .ok_or(FullfillmentError::ExpectedOnePayload) @@ -48,83 +62,111 @@ impl GoogleHome { resp_payload } - fn query(&self, payload: request::query::Payload, devices: &HashMap<&str, &mut dyn GoogleHomeDevice>) -> query::Payload { + fn query( + &self, + payload: request::query::Payload, + devices: &HashMap<&str, &mut dyn GoogleHomeDevice>, + ) -> query::Payload { let mut resp_payload = query::Payload::new(); - resp_payload.devices = payload.devices + resp_payload.devices = payload + .devices .into_iter() .map(|device| device.id) .map(|id| { - let device = devices.get(id.as_str()) - .map_or_else(|| { + let device = devices.get(id.as_str()).map_or_else( + || { let mut device = query::Device::new(); device.set_offline(); device.set_error(DeviceError::DeviceNotFound.into()); device - }, |device| device.query()); + }, + |device| device.query(), + ); (id, device) - }).collect(); + }) + .collect(); resp_payload - } - fn execute(&self, payload: request::execute::Payload, devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>) -> execute::Payload { + fn execute( + &self, + payload: request::execute::Payload, + devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>, + ) -> execute::Payload { let mut resp_payload = response::execute::Payload::new(); - payload.commands - .into_iter() - .for_each(|command| { - let mut success = response::execute::Command::new(execute::Status::Success); - success.states = Some(execute::States { online: true, state: State::default() }); - let mut offline = response::execute::Command::new(execute::Status::Offline); - offline.states = Some(execute::States { online: false, state: State::default() }); - let mut errors: HashMap = HashMap::new(); + payload.commands.into_iter().for_each(|command| { + let mut success = response::execute::Command::new(execute::Status::Success); + success.states = Some(execute::States { + online: true, + state: State::default(), + }); + let mut offline = response::execute::Command::new(execute::Status::Offline); + offline.states = Some(execute::States { + online: false, + state: State::default(), + }); + let mut errors: HashMap = HashMap::new(); - command.devices - .into_iter() - .map(|device| device.id) - .map(|id| { - devices.get_mut(id.as_str()) - .map_or((id.clone(), Err(DeviceError::DeviceNotFound.into())), |device| { - if !device.is_online() { - return (id, Ok(false)); - } + command + .devices + .into_iter() + .map(|device| device.id) + .map(|id| { + devices.get_mut(id.as_str()).map_or( + (id.clone(), Err(DeviceError::DeviceNotFound.into())), + |device| { + if !device.is_online() { + return (id, Ok(false)); + } - let results = command.execution.iter().map(|cmd| { + let results = command + .execution + .iter() + .map(|cmd| { // TODO: We should also return the state after update in the state // struct, however that will make things WAY more complicated device.execute(cmd) - }).collect::, ErrorCode>>(); + }) + .collect::, ErrorCode>>(); - // TODO: We only get one error not all errors - if let Err(err) = results { - (id, Err(err)) - } else { - (id, Ok(true)) + // TODO: We only get one error not all errors + if let Err(err) = results { + (id, Err(err)) + } else { + (id, Ok(true)) + } + }, + ) + }) + .for_each(|(id, state)| { + match state { + Ok(true) => success.add_id(&id), + Ok(false) => offline.add_id(&id), + Err(err) => errors + .entry(err) + .or_insert_with(|| match &err { + ErrorCode::DeviceError(_) => { + response::execute::Command::new(execute::Status::Error) + } + ErrorCode::DeviceException(_) => { + response::execute::Command::new(execute::Status::Exceptions) } }) - }).for_each(|(id, state)| { - match state { - Ok(true) => success.add_id(&id), - Ok(false) => offline.add_id(&id), - Err(err) => errors.entry(err).or_insert_with(|| { - match &err { - ErrorCode::DeviceError(_) => response::execute::Command::new(execute::Status::Error), - ErrorCode::DeviceException(_) => response::execute::Command::new(execute::Status::Exceptions), - } - }).add_id(&id), - }; - }); + .add_id(&id), + }; + }); - resp_payload.add_command(success); - resp_payload.add_command(offline); - for (error, mut cmd) in errors { - cmd.error_code = Some(error); - resp_payload.add_command(cmd); - } - }); + resp_payload.add_command(success); + resp_payload.add_command(offline); + for (error, mut cmd) in errors { + cmd.error_code = Some(error); + resp_payload.add_command(cmd); + } + }); resp_payload } @@ -133,7 +175,12 @@ impl GoogleHome { #[cfg(test)] mod tests { use super::*; - use crate::{request::Request, device::{GoogleHomeDevice, self}, types, traits, errors::ErrorCode}; + use crate::{ + device::{self, GoogleHomeDevice}, + errors::ErrorCode, + request::Request, + traits, types, + }; #[derive(Debug)] struct TestOutlet { @@ -143,7 +190,10 @@ mod tests { impl TestOutlet { fn new(name: &str) -> Self { - Self { name: name.into(), on: false } + Self { + name: name.into(), + on: false, + } } } diff --git a/google-home/src/lib.rs b/google-home/src/lib.rs index 322cd98..2a675e4 100644 --- a/google-home/src/lib.rs +++ b/google-home/src/lib.rs @@ -1,18 +1,18 @@ #![allow(incomplete_features)] #![feature(specialization)] -mod fullfillment; pub mod device; +mod fullfillment; mod request; mod response; -pub mod types; -pub mod traits; -pub mod errors; mod attributes; +pub mod errors; +pub mod traits; +pub mod types; -pub use fullfillment::GoogleHome; +pub use device::GoogleHomeDevice; pub use fullfillment::FullfillmentError; +pub use fullfillment::GoogleHome; pub use request::Request; pub use response::Response; -pub use device::GoogleHomeDevice; diff --git a/google-home/src/request.rs b/google-home/src/request.rs index b0cda09..2ad61d6 100644 --- a/google-home/src/request.rs +++ b/google-home/src/request.rs @@ -1,6 +1,6 @@ -pub mod sync; -pub mod query; pub mod execute; +pub mod query; +pub mod sync; use serde::Deserialize; diff --git a/google-home/src/request/execute.rs b/google-home/src/request/execute.rs index 5245252..6efb99c 100644 --- a/google-home/src/request/execute.rs +++ b/google-home/src/request/execute.rs @@ -10,7 +10,7 @@ pub struct Payload { #[serde(rename_all = "camelCase")] pub struct Command { pub devices: Vec, - pub execution: Vec + pub execution: Vec, } #[derive(Debug, Deserialize)] @@ -24,23 +24,18 @@ pub struct Device { #[serde(tag = "command", content = "params")] pub enum CommandType { #[serde(rename = "action.devices.commands.OnOff")] - OnOff { - on: bool - }, + OnOff { on: bool }, #[serde(rename = "action.devices.commands.ActivateScene")] - ActivateScene { - deactivate: bool - } + ActivateScene { deactivate: bool }, } #[cfg(test)] mod tests { use super::*; - use crate::request::{Request, Intent}; + use crate::request::{Intent, Request}; #[test] fn deserialize() { - let json = r#"{ "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", "inputs": [ @@ -86,7 +81,10 @@ mod tests { println!("{:?}", req); - assert_eq!(req.request_id, "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned()); + assert_eq!( + req.request_id, + "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned() + ); assert_eq!(req.inputs.len(), 1); match &req.inputs[0] { Intent::Execute(payload) => { @@ -96,11 +94,11 @@ mod tests { assert_eq!(payload.commands[0].devices[1].id, "456"); assert_eq!(payload.commands[0].execution.len(), 1); match payload.commands[0].execution[0] { - CommandType::OnOff{on} => assert!(on), - _ => panic!("Expected OnOff") + CommandType::OnOff { on } => assert!(on), + _ => panic!("Expected OnOff"), } - }, - _ => panic!("Expected Execute intent") + } + _ => panic!("Expected Execute intent"), }; } } diff --git a/google-home/src/request/query.rs b/google-home/src/request/query.rs index 93d8d5f..bb54d0b 100644 --- a/google-home/src/request/query.rs +++ b/google-home/src/request/query.rs @@ -15,11 +15,10 @@ pub struct Device { #[cfg(test)] mod tests { - use crate::request::{Request, Intent}; + use crate::request::{Intent, Request}; #[test] fn deserialize() { - let json = r#"{ "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", "inputs": [ @@ -53,15 +52,18 @@ mod tests { println!("{:?}", req); - assert_eq!(req.request_id, "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned()); + assert_eq!( + req.request_id, + "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned() + ); assert_eq!(req.inputs.len(), 1); match &req.inputs[0] { Intent::Query(payload) => { assert_eq!(payload.devices.len(), 2); assert_eq!(payload.devices[0].id, "123"); assert_eq!(payload.devices[1].id, "456"); - }, - _ => panic!("Expected Query intent") + } + _ => panic!("Expected Query intent"), }; } } diff --git a/google-home/src/request/sync.rs b/google-home/src/request/sync.rs index f610d67..8e8f0f9 100644 --- a/google-home/src/request/sync.rs +++ b/google-home/src/request/sync.rs @@ -1,10 +1,9 @@ #[cfg(test)] mod tests { - use crate::request::{Request, Intent}; + use crate::request::{Intent, Request}; #[test] fn deserialize() { - let json = r#"{ "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", "inputs": [ @@ -18,12 +17,14 @@ mod tests { println!("{:?}", req); - assert_eq!(req.request_id, "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned()); + assert_eq!( + req.request_id, + "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned() + ); assert_eq!(req.inputs.len(), 1); match req.inputs[0] { - Intent::Sync => {}, - _ => panic!("Expected Sync intent") + Intent::Sync => {} + _ => panic!("Expected Sync intent"), } } } - diff --git a/google-home/src/response.rs b/google-home/src/response.rs index 6029acb..28bd3f5 100644 --- a/google-home/src/response.rs +++ b/google-home/src/response.rs @@ -1,6 +1,6 @@ -pub mod sync; -pub mod query; pub mod execute; +pub mod query; +pub mod sync; use serde::Serialize; @@ -13,7 +13,10 @@ pub struct Response { impl Response { pub fn new(request_id: &str, payload: ResponsePayload) -> Self { - Self { request_id: request_id.to_owned(), payload } + Self { + request_id: request_id.to_owned(), + payload, + } } } diff --git a/google-home/src/response/execute.rs b/google-home/src/response/execute.rs index 0861781..4ef3a66 100644 --- a/google-home/src/response/execute.rs +++ b/google-home/src/response/execute.rs @@ -1,6 +1,6 @@ use serde::Serialize; -use crate::{response::State, errors::ErrorCode}; +use crate::{errors::ErrorCode, response::State}; #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] @@ -14,7 +14,11 @@ pub struct Payload { impl Payload { pub fn new() -> Self { - Self { error_code: None, debug_string: None, commands: Vec::new() } + Self { + error_code: None, + debug_string: None, + commands: Vec::new(), + } } pub fn add_command(&mut self, command: Command) { @@ -44,7 +48,12 @@ pub struct Command { impl Command { pub fn new(status: Status) -> Self { - Self { error_code: None, ids: Vec::new(), status, states: None } + Self { + error_code: None, + ids: Vec::new(), + status, + states: None, + } } pub fn add_id(&mut self, id: &str) { @@ -78,7 +87,10 @@ pub enum Status { #[cfg(test)] mod tests { use super::*; - use crate::{response::{Response, ResponsePayload, State}, errors::DeviceError}; + use crate::{ + errors::DeviceError, + response::{Response, ResponsePayload, State}, + }; #[test] fn serialize() { @@ -98,7 +110,10 @@ mod tests { command.ids.push("456".into()); execute_resp.add_command(command); - let resp = Response::new("ff36a3cc-ec34-11e6-b1a0-64510650abcf", ResponsePayload::Execute(execute_resp)); + let resp = Response::new( + "ff36a3cc-ec34-11e6-b1a0-64510650abcf", + ResponsePayload::Execute(execute_resp), + ); let json = serde_json::to_string(&resp).unwrap(); diff --git a/google-home/src/response/query.rs b/google-home/src/response/query.rs index 00a4355..9ca03d6 100644 --- a/google-home/src/response/query.rs +++ b/google-home/src/response/query.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use serde::Serialize; -use crate::{response::State, errors::ErrorCode}; +use crate::{errors::ErrorCode, response::State}; #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] @@ -16,7 +16,11 @@ pub struct Payload { impl Payload { pub fn new() -> Self { - Self { error_code: None, debug_string: None, devices: HashMap::new() } + Self { + error_code: None, + debug_string: None, + devices: HashMap::new(), + } } pub fn add_device(&mut self, id: &str, device: Device) { @@ -53,7 +57,12 @@ pub struct Device { impl Device { pub fn new() -> Self { - Self { online: true, status: Status::Success, error_code: None, state: State::default() } + Self { + online: true, + status: Status::Success, + error_code: None, + state: State::default(), + } } pub fn set_offline(&mut self) { @@ -93,7 +102,10 @@ mod tests { device.state.on = Some(false); query_resp.add_device("456", device); - let resp = Response::new("ff36a3cc-ec34-11e6-b1a0-64510650abcf", ResponsePayload::Query(query_resp)); + let resp = Response::new( + "ff36a3cc-ec34-11e6-b1a0-64510650abcf", + ResponsePayload::Query(query_resp), + ); let json = serde_json::to_string(&resp).unwrap(); diff --git a/google-home/src/response/sync.rs b/google-home/src/response/sync.rs index 1637176..7697061 100644 --- a/google-home/src/response/sync.rs +++ b/google-home/src/response/sync.rs @@ -3,8 +3,8 @@ use serde::Serialize; use crate::attributes::Attributes; use crate::device; use crate::errors::ErrorCode; -use crate::types::Type; use crate::traits::Trait; +use crate::types::Type; #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] @@ -19,7 +19,12 @@ pub struct Payload { impl Payload { pub fn new(agent_user_id: &str) -> Self { - Self { agent_user_id: agent_user_id.into(), error_code: None, debug_string: None, devices: Vec::new() } + Self { + agent_user_id: agent_user_id.into(), + error_code: None, + debug_string: None, + devices: Vec::new(), + } } pub fn add_device(&mut self, device: Device) { @@ -64,7 +69,11 @@ impl Device { #[cfg(test)] mod tests { use super::*; - use crate::{response::{Response, ResponsePayload}, types::Type, traits::Trait}; + use crate::{ + response::{Response, ResponsePayload}, + traits::Trait, + types::Type, + }; #[test] fn serialize() { @@ -85,7 +94,10 @@ mod tests { sync_resp.add_device(device); - let resp = Response::new("ff36a3cc-ec34-11e6-b1a0-64510650abcf", ResponsePayload::Sync(sync_resp)); + let resp = Response::new( + "ff36a3cc-ec34-11e6-b1a0-64510650abcf", + ResponsePayload::Sync(sync_resp), + ); let json = serde_json::to_string(&resp).unwrap(); diff --git a/src/auth.rs b/src/auth.rs index 916f1dd..58b98bc 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,11 +1,14 @@ use axum::{ async_trait, - extract::{FromRequestParts, FromRef}, - http::{StatusCode, request::Parts}, + extract::{FromRef, FromRequestParts}, + http::{request::Parts, StatusCode}, }; use serde::Deserialize; -use crate::{config::OpenIDConfig, error::{ApiError, ApiErrorJson}}; +use crate::{ + config::OpenIDConfig, + error::{ApiError, ApiErrorJson}, +}; #[derive(Debug, Deserialize)] pub struct User { @@ -26,8 +29,7 @@ where // Create a request to the auth server // TODO: Do some discovery to find the correct url for this instead of assuming - let mut req = reqwest::Client::new() - .get(format!("{}/userinfo", openid.base_url)); + let mut req = reqwest::Client::new().get(format!("{}/userinfo", openid.base_url)); // Add auth header to the request if it exists if let Some(auth) = parts.headers.get(axum::http::header::AUTHORIZATION) { @@ -35,14 +37,16 @@ where } // Send the request - let res = req.send() + let res = req + .send() .await .map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?; // If the request is success full the auth token is valid and we are given userinfo let status = res.status(); if status.is_success() { - let user = res.json() + let user = res + .json() .await .map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?; diff --git a/src/config.rs b/src/config.rs index a7bdc28..701e339 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,13 +1,20 @@ -use std::{fs, net::{Ipv4Addr, SocketAddr}, collections::HashMap}; +use std::{ + collections::HashMap, + fs, + net::{Ipv4Addr, SocketAddr}, +}; use async_recursion::async_recursion; -use regex::{Regex, Captures}; -use tracing::{debug, trace}; -use rumqttc::{AsyncClient, has_wildcards}; -use serde::Deserialize; use eui48::MacAddress; +use regex::{Captures, Regex}; +use rumqttc::{has_wildcards, AsyncClient}; +use serde::Deserialize; +use tracing::{debug, trace}; -use crate::{devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor, KasaOutlet, self}, error::{MissingEnv, MissingWildcard, ConfigParseError, DeviceCreationError}}; +use crate::{ + devices::{self, AudioSetup, ContactSensor, DeviceBox, IkeaOutlet, KasaOutlet, WakeOnLAN}, + error::{ConfigParseError, DeviceCreationError, MissingEnv, MissingWildcard}, +}; #[derive(Debug, Deserialize)] pub struct Config { @@ -26,7 +33,7 @@ pub struct Config { #[derive(Debug, Clone, Deserialize)] pub struct OpenIDConfig { - pub base_url: String + pub base_url: String, } #[derive(Debug, Clone, Deserialize)] @@ -56,7 +63,10 @@ impl From for SocketAddr { impl Default for FullfillmentConfig { fn default() -> Self { - Self { ip: default_fullfillment_ip(), port: default_fullfillment_port() } + Self { + ip: default_fullfillment_ip(), + port: default_fullfillment_port(), + } } } @@ -134,20 +144,33 @@ pub struct PresenceDeviceConfig { pub mqtt: Option, // TODO: Maybe make this an option? That way if no timeout is set it will immediately turn the // device off again? - pub timeout: u64 // Timeout in seconds + pub timeout: u64, // Timeout in seconds } impl PresenceDeviceConfig { /// Set the mqtt topic to an appropriate value if it is not already set - fn generate_topic(mut self, class: &str, identifier: &str, config: &Config) -> Result { + fn generate_topic( + mut self, + class: &str, + identifier: &str, + config: &Config, + ) -> Result { if self.mqtt.is_none() { if !has_wildcards(&config.presence.topic) { return Err(MissingWildcard::new(&config.presence.topic)); } // TODO: This is not perfect, if the topic is some/+/thing/# this will fail - let offset = config.presence.topic.find('+').or(config.presence.topic.find('#')).unwrap(); - let topic = format!("{}/{class}/{identifier}", &config.presence.topic[..offset-1]); + let offset = config + .presence + .topic + .find('+') + .or(config.presence.topic.find('#')) + .unwrap(); + let topic = format!( + "{}/{class}/{identifier}", + &config.presence.topic[..offset - 1] + ); trace!("Setting presence mqtt topic: {topic}"); self.mqtt = Some(MqttDeviceConfig { topic }); } @@ -183,14 +206,14 @@ pub enum Device { AudioSetup { #[serde(flatten)] mqtt: MqttDeviceConfig, - mixer: Box::, - speakers: Box::, + mixer: Box, + speakers: Box, }, ContactSensor { #[serde(flatten)] mqtt: MqttDeviceConfig, presence: Option, - } + }, } fn default_outlet_type() -> OutletType { @@ -239,42 +262,77 @@ fn device_box(device: T) -> DeviceBox { impl Device { #[async_recursion] - pub async fn create(self, identifier: &str, config: &Config, client: AsyncClient) -> Result { + pub async fn create( + self, + identifier: &str, + config: &Config, + client: AsyncClient, + ) -> Result { let device = match self { - Device::IkeaOutlet { info, mqtt, outlet_type, timeout } => { - trace!(id = identifier, "IkeaOutlet [{} in {:?}]", info.name, info.room); - IkeaOutlet::build(identifier, info, mqtt, outlet_type, timeout, client).await + Device::IkeaOutlet { + info, + mqtt, + outlet_type, + timeout, + } => { + trace!( + id = identifier, + "IkeaOutlet [{} in {:?}]", + info.name, + info.room + ); + IkeaOutlet::build(identifier, info, mqtt, outlet_type, timeout, client) + .await .map(device_box)? - }, - Device::WakeOnLAN { info, mqtt, mac_address, broadcast_ip } => { - trace!(id = identifier, "WakeOnLan [{} in {:?}]", info.name, info.room); - WakeOnLAN::build(identifier, info, mqtt, mac_address, broadcast_ip, client).await + } + Device::WakeOnLAN { + info, + mqtt, + mac_address, + broadcast_ip, + } => { + trace!( + id = identifier, + "WakeOnLan [{} in {:?}]", + info.name, + info.room + ); + WakeOnLAN::build(identifier, info, mqtt, mac_address, broadcast_ip, client) + .await .map(device_box)? - }, + } Device::KasaOutlet { ip } => { trace!(id = identifier, "KasaOutlet [{}]", identifier); device_box(KasaOutlet::new(identifier, ip)) } - Device::AudioSetup { mqtt, mixer, speakers } => { + Device::AudioSetup { + mqtt, + mixer, + speakers, + } => { trace!(id = identifier, "AudioSetup [{}]", identifier); // Create the child devices let mixer_id = format!("{}.mixer", identifier); let mixer = (*mixer).create(&mixer_id, config, client.clone()).await?; let speakers_id = format!("{}.speakers", identifier); - let speakers = (*speakers).create(&speakers_id, config, client.clone()).await?; + let speakers = (*speakers) + .create(&speakers_id, config, client.clone()) + .await?; - AudioSetup::build(identifier, mqtt, mixer, speakers, client).await + AudioSetup::build(identifier, mqtt, mixer, speakers, client) + .await .map(device_box)? - }, + } Device::ContactSensor { mqtt, presence } => { trace!(id = identifier, "ContactSensor [{}]", identifier); let presence = presence .map(|p| p.generate_topic("contact", identifier, config)) .transpose()?; - ContactSensor::build(identifier, mqtt, presence, client).await + ContactSensor::build(identifier, mqtt, presence, client) + .await .map(device_box)? - }, + } }; Ok(device) diff --git a/src/debug_bridge.rs b/src/debug_bridge.rs index 62b26ff..d946132 100644 --- a/src/debug_bridge.rs +++ b/src/debug_bridge.rs @@ -2,7 +2,12 @@ use async_trait::async_trait; use rumqttc::AsyncClient; use tracing::warn; -use crate::{config::DebugBridgeConfig, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}, mqtt::{PresenceMessage, DarknessMessage}}; +use crate::{ + config::DebugBridgeConfig, + light_sensor::{self, OnDarkness}, + mqtt::{DarknessMessage, PresenceMessage}, + presence::{self, OnPresence}, +}; struct DebugBridge { topic: String, @@ -11,11 +16,19 @@ struct DebugBridge { impl DebugBridge { pub fn new(topic: &str, client: AsyncClient) -> Self { - Self { topic: topic.to_owned(), client } + Self { + topic: topic.to_owned(), + client, + } } } -pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: &DebugBridgeConfig, client: AsyncClient) { +pub fn start( + mut presence_rx: presence::Receiver, + mut light_sensor_rx: light_sensor::Receiver, + config: &DebugBridgeConfig, + client: AsyncClient, +) { let mut debug_bridge = DebugBridge::new(&config.topic, client); tokio::spawn(async move { @@ -49,9 +62,20 @@ impl OnPresence for DebugBridge { async fn on_presence(&mut self, presence: bool) { let message = PresenceMessage::new(presence); let topic = format!("{}/presence", self.topic); - self.client.publish(topic, rumqttc::QoS::AtLeastOnce, true, serde_json::to_string(&message).unwrap()) + self.client + .publish( + topic, + rumqttc::QoS::AtLeastOnce, + true, + serde_json::to_string(&message).unwrap(), + ) .await - .map_err(|err| warn!("Failed to update presence on {}/presence: {err}", self.topic)) + .map_err(|err| { + warn!( + "Failed to update presence on {}/presence: {err}", + self.topic + ) + }) .ok(); } } @@ -61,9 +85,20 @@ impl OnDarkness for DebugBridge { async fn on_darkness(&mut self, dark: bool) { let message = DarknessMessage::new(dark); let topic = format!("{}/darkness", self.topic); - self.client.publish(topic, rumqttc::QoS::AtLeastOnce, true, serde_json::to_string(&message).unwrap()) + self.client + .publish( + topic, + rumqttc::QoS::AtLeastOnce, + true, + serde_json::to_string(&message).unwrap(), + ) .await - .map_err(|err| warn!("Failed to update presence on {}/presence: {err}", self.topic)) + .map_err(|err| { + warn!( + "Failed to update presence on {}/presence: {err}", + self.topic + ) + }) .ok(); } } diff --git a/src/devices.rs b/src/devices.rs index ce50faa..50abeb8 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -1,25 +1,29 @@ -mod ikea_outlet; -mod wake_on_lan; -mod kasa_outlet; mod audio_setup; mod contact_sensor; +mod ikea_outlet; +mod kasa_outlet; +mod wake_on_lan; -pub use self::ikea_outlet::IkeaOutlet; -pub use self::wake_on_lan::WakeOnLAN; -pub use self::kasa_outlet::KasaOutlet; pub use self::audio_setup::AudioSetup; pub use self::contact_sensor::ContactSensor; +pub use self::ikea_outlet::IkeaOutlet; +pub use self::kasa_outlet::KasaOutlet; +pub use self::wake_on_lan::WakeOnLAN; use std::collections::HashMap; -use thiserror::Error; use async_trait::async_trait; -use google_home::{GoogleHomeDevice, traits::OnOff, GoogleHome, FullfillmentError, }; +use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice}; use pollster::FutureExt; -use tokio::sync::{oneshot, mpsc}; -use tracing::{trace, debug, span, Level}; +use thiserror::Error; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, span, trace, Level}; -use crate::{mqtt::{OnMqtt, self}, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}}; +use crate::{ + light_sensor::{self, OnDarkness}, + mqtt::{self, OnMqtt}, + presence::{self, OnPresence}, +}; impl_cast::impl_cast!(Device, OnMqtt); impl_cast::impl_cast!(Device, OnPresence); @@ -27,7 +31,16 @@ impl_cast::impl_cast!(Device, OnDarkness); impl_cast::impl_cast!(Device, GoogleHomeDevice); impl_cast::impl_cast!(Device, OnOff); -pub trait Device: AsGoogleHomeDevice + AsOnMqtt + AsOnPresence + AsOnDarkness + AsOnOff + std::fmt::Debug + Sync + Send { +pub trait Device: + AsGoogleHomeDevice + + AsOnMqtt + + AsOnPresence + + AsOnDarkness + + AsOnOff + + std::fmt::Debug + + Sync + + Send +{ fn get_id(&self) -> &str; } @@ -61,15 +74,15 @@ pub enum Command { }, AddDevice { device: DeviceBox, - tx: oneshot::Sender<()> - } + tx: oneshot::Sender<()>, + }, } pub type DeviceBox = Box; #[derive(Clone)] pub struct DevicesHandle { - tx: mpsc::Sender + tx: mpsc::Sender, } #[derive(Debug, Error)] @@ -82,12 +95,21 @@ pub enum DevicesError { RecvError(#[from] tokio::sync::oneshot::error::RecvError), } - impl DevicesHandle { // TODO: Improve error type - pub async fn fullfillment(&self, google_home: GoogleHome, payload: google_home::Request) -> Result { + pub async fn fullfillment( + &self, + google_home: GoogleHome, + payload: google_home::Request, + ) -> Result { let (tx, rx) = oneshot::channel(); - self.tx.send(Command::Fullfillment { google_home, payload, tx }).await?; + self.tx + .send(Command::Fullfillment { + google_home, + payload, + tx, + }) + .await?; Ok(rx.await??) } @@ -98,9 +120,14 @@ impl DevicesHandle { } } -pub fn start(mut mqtt_rx: mqtt::Receiver, mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver) -> DevicesHandle { - - let mut devices = Devices { devices: HashMap::new() }; +pub fn start( + mut mqtt_rx: mqtt::Receiver, + mut presence_rx: presence::Receiver, + mut light_sensor_rx: light_sensor::Receiver, +) -> DevicesHandle { + let mut devices = Devices { + devices: HashMap::new(), + }; let (tx, mut rx) = mpsc::channel(100); @@ -132,15 +159,20 @@ pub fn start(mut mqtt_rx: mqtt::Receiver, mut presence_rx: presence::Receiver, m impl Devices { fn handle_cmd(&mut self, cmd: Command) { match cmd { - Command::Fullfillment { google_home, payload, tx } => { - let result = google_home.handle_request(payload, &mut self.as_google_home_devices()); + Command::Fullfillment { + google_home, + payload, + tx, + } => { + let result = + google_home.handle_request(payload, &mut self.as_google_home_devices()); tx.send(result).ok(); - }, + } Command::AddDevice { device, tx } => { self.add_device(device); tx.send(()).ok(); - }, + } } } diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index 52e1379..22fa805 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -1,14 +1,14 @@ use async_trait::async_trait; use google_home::traits; -use rumqttc::{AsyncClient, matches}; -use tracing::{error, warn, debug}; +use rumqttc::{matches, AsyncClient}; +use tracing::{debug, error, warn}; use crate::config::MqttDeviceConfig; use crate::error::DeviceError; -use crate::mqtt::{OnMqtt, RemoteMessage, RemoteAction}; +use crate::mqtt::{OnMqtt, RemoteAction, RemoteMessage}; use crate::presence::OnPresence; -use super::{Device, DeviceBox, AsOnOff}; +use super::{AsOnOff, Device, DeviceBox}; // TODO: Ideally we store am Arc to the childern devices, // that way they hook into everything just like all other devices @@ -21,19 +21,31 @@ pub struct AudioSetup { } impl AudioSetup { - pub async fn build(identifier: &str, mqtt: MqttDeviceConfig, mixer: DeviceBox, speakers: DeviceBox, client: AsyncClient) -> Result { + pub async fn build( + identifier: &str, + mqtt: MqttDeviceConfig, + mixer: DeviceBox, + speakers: DeviceBox, + client: AsyncClient, + ) -> Result { // We expect the children devices to implement the OnOff trait let mixer_id = mixer.get_id().to_owned(); - let mixer = AsOnOff::consume(mixer) - .ok_or_else(|| DeviceError::OnOffExpected(mixer_id))?; + let mixer = AsOnOff::consume(mixer).ok_or_else(|| DeviceError::OnOffExpected(mixer_id))?; let speakers_id = speakers.get_id().to_owned(); - let speakers = AsOnOff::consume(speakers) - .ok_or_else(|| DeviceError::OnOffExpected(speakers_id))?; + let speakers = + AsOnOff::consume(speakers).ok_or_else(|| DeviceError::OnOffExpected(speakers_id))?; - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; + client + .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) + .await?; - Ok(Self { identifier: identifier.to_owned(), mqtt, mixer, speakers }) + Ok(Self { + identifier: identifier.to_owned(), + mqtt, + mixer, + speakers, + }) } } diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs index a4f41e3..31036d5 100644 --- a/src/devices/contact_sensor.rs +++ b/src/devices/contact_sensor.rs @@ -1,11 +1,16 @@ use std::time::Duration; use async_trait::async_trait; -use rumqttc::{AsyncClient, matches}; +use rumqttc::{matches, AsyncClient}; use tokio::task::JoinHandle; -use tracing::{error, debug, warn}; +use tracing::{debug, error, warn}; -use crate::{config::{MqttDeviceConfig, PresenceDeviceConfig}, mqtt::{OnMqtt, ContactMessage, PresenceMessage}, presence::OnPresence, error::DeviceError}; +use crate::{ + config::{MqttDeviceConfig, PresenceDeviceConfig}, + error::DeviceError, + mqtt::{ContactMessage, OnMqtt, PresenceMessage}, + presence::OnPresence, +}; use super::Device; @@ -22,8 +27,15 @@ pub struct ContactSensor { } impl ContactSensor { - pub async fn build(identifier: &str, mqtt: MqttDeviceConfig, presence: Option, client: AsyncClient) -> Result { - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; + pub async fn build( + identifier: &str, + mqtt: MqttDeviceConfig, + presence: Option, + client: AsyncClient, + ) -> Result { + client + .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) + .await?; Ok(Self { identifier: identifier.to_owned(), @@ -62,7 +74,7 @@ impl OnMqtt for ContactSensor { Err(err) => { error!(id = self.identifier, "Failed to parse message: {err}"); return; - }, + } }; if is_closed == self.is_closed { @@ -97,7 +109,13 @@ impl OnMqtt for ContactSensor { // This is to prevent the house from being marked as present for however long the // timeout is set when leaving the house if !self.overall_presence { - self.client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&PresenceMessage::new(true)).unwrap()) + self.client + .publish( + topic.clone(), + rumqttc::QoS::AtLeastOnce, + false, + serde_json::to_string(&PresenceMessage::new(true)).unwrap(), + ) .await .map_err(|err| warn!("Failed to publish presence on {topic}: {err}")) .ok(); @@ -107,17 +125,16 @@ impl OnMqtt for ContactSensor { let client = self.client.clone(); let id = self.identifier.clone(); let timeout = Duration::from_secs(presence.timeout); - self.handle = Some( - tokio::spawn(async move { - debug!(id, "Starting timeout ({timeout:?}) for contact sensor..."); - tokio::time::sleep(timeout).await; - debug!(id, "Removing door device!"); - client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "") - .await - .map_err(|err| warn!("Failed to publish presence on {topic}: {err}")) - .ok(); - }) - ); + self.handle = Some(tokio::spawn(async move { + debug!(id, "Starting timeout ({timeout:?}) for contact sensor..."); + tokio::time::sleep(timeout).await; + debug!(id, "Removing door device!"); + client + .publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "") + .await + .map_err(|err| warn!("Failed to publish presence on {topic}: {err}")) + .ok(); + })); } } } diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index 1dd616f..99d1783 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -1,11 +1,16 @@ -use std::time::Duration; use async_trait::async_trait; use google_home::errors::ErrorCode; -use google_home::{GoogleHomeDevice, device, types::Type, traits::{self, OnOff}}; -use rumqttc::{AsyncClient, Publish, matches}; -use tracing::{debug, error, warn}; -use tokio::task::JoinHandle; +use google_home::{ + device, + traits::{self, OnOff}, + types::Type, + GoogleHomeDevice, +}; use pollster::FutureExt as _; +use rumqttc::{matches, AsyncClient, Publish}; +use std::time::Duration; +use tokio::task::JoinHandle; +use tracing::{debug, error, warn}; use crate::config::{InfoConfig, MqttDeviceConfig, OutletType}; use crate::devices::Device; @@ -27,11 +32,29 @@ pub struct IkeaOutlet { } impl IkeaOutlet { - pub async fn build(identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, outlet_type: OutletType, timeout: Option, client: AsyncClient) -> Result { + pub async fn build( + identifier: &str, + info: InfoConfig, + mqtt: MqttDeviceConfig, + outlet_type: OutletType, + timeout: Option, + client: AsyncClient, + ) -> Result { // TODO: Handle potential errors here - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; + client + .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) + .await?; - Ok(Self{ identifier: identifier.to_owned(), info, mqtt, outlet_type, timeout, client, last_known_state: false, handle: None }) + Ok(Self { + identifier: identifier.to_owned(), + info, + mqtt, + outlet_type, + timeout, + client, + last_known_state: false, + handle: None, + }) } } @@ -40,7 +63,13 @@ async fn set_on(client: AsyncClient, topic: &str, on: bool) { let topic = format!("{}/set", topic); // TODO: Handle potential errors here - client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap()) + client + .publish( + topic.clone(), + rumqttc::QoS::AtLeastOnce, + false, + serde_json::to_string(&message).unwrap(), + ) .await .map_err(|err| warn!("Failed to update state on {topic}: {err}")) .ok(); @@ -94,17 +123,15 @@ impl OnMqtt for IkeaOutlet { let client = self.client.clone(); let topic = self.mqtt.topic.clone(); let id = self.identifier.clone(); - self.handle = Some( - tokio::spawn(async move { - debug!(id, "Starting timeout ({timeout:?}) for kettle..."); - tokio::time::sleep(timeout).await; - debug!(id, "Turning kettle off!"); - // TODO: Idealy we would call self.set_on(false), however since we want to do - // it after a timeout we have to put it in a seperate task. - // I don't think we can really get around calling outside function - set_on(client, &topic, false).await; - }) - ); + self.handle = Some(tokio::spawn(async move { + debug!(id, "Starting timeout ({timeout:?}) for kettle..."); + tokio::time::sleep(timeout).await; + debug!(id, "Turning kettle off!"); + // TODO: Idealy we would call self.set_on(false), however since we want to do + // it after a timeout we have to put it in a seperate task. + // I don't think we can really get around calling outside function + set_on(client, &topic, false).await; + })); } } } diff --git a/src/devices/kasa_outlet.rs b/src/devices/kasa_outlet.rs index 9773965..5a4f8f9 100644 --- a/src/devices/kasa_outlet.rs +++ b/src/devices/kasa_outlet.rs @@ -1,9 +1,16 @@ -use std::{net::{SocketAddr, Ipv4Addr, TcpStream}, io::{Write, Read}, str::Utf8Error}; +use std::{ + io::{Read, Write}, + net::{Ipv4Addr, SocketAddr, TcpStream}, + str::Utf8Error, +}; -use thiserror::Error; use bytes::{Buf, BufMut}; -use google_home::{traits, errors::{self, DeviceError}}; -use serde::{Serialize, Deserialize}; +use google_home::{ + errors::{self, DeviceError}, + traits, +}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; use super::Device; @@ -15,7 +22,10 @@ pub struct KasaOutlet { impl KasaOutlet { pub fn new(identifier: &str, ip: Ipv4Addr) -> Self { - Self { identifier: identifier.to_owned(), addr: (ip, 9999).into() } + Self { + identifier: identifier.to_owned(), + addr: (ip, 9999).into(), + } } } @@ -50,9 +60,9 @@ impl Request { fn get_sysinfo() -> Self { Self { system: RequestSystem { - get_sysinfo: Some(RequestSysinfo{}), - set_relay_state: None - } + get_sysinfo: Some(RequestSysinfo {}), + set_relay_state: None, + }, } } @@ -61,9 +71,9 @@ impl Request { system: RequestSystem { get_sysinfo: None, set_relay_state: Some(RequestRelayState { - state: if on { 1 } else { 0 } - }) - } + state: if on { 1 } else { 0 }, + }), + }, } } @@ -153,8 +163,7 @@ impl From for ResponseError { impl Response { fn get_current_relay_state(&self) -> Result { if let Some(sysinfo) = &self.system.get_sysinfo { - return sysinfo.err_code.ok() - .map(|_| sysinfo.relay_state == 1); + return sysinfo.err_code.ok().map(|_| sysinfo.relay_state == 1); } Err(ResponseError::SysinfoNotFound) @@ -189,15 +198,21 @@ impl Response { impl traits::OnOff for KasaOutlet { fn is_on(&self) -> Result { - let mut stream = TcpStream::connect(self.addr).or::(Err(DeviceError::DeviceOffline))?; + let mut stream = + TcpStream::connect(self.addr).or::(Err(DeviceError::DeviceOffline))?; let body = Request::get_sysinfo().encrypt(); - stream.write_all(&body).and(stream.flush()).or::(Err(DeviceError::TransientError))?; + stream + .write_all(&body) + .and(stream.flush()) + .or::(Err(DeviceError::TransientError))?; let mut received = Vec::new(); let mut rx_bytes = [0; 1024]; loop { - let read = stream.read(&mut rx_bytes).or::(Err(DeviceError::TransientError.into()))?; + let read = stream + .read(&mut rx_bytes) + .or::(Err(DeviceError::TransientError.into()))?; received.extend_from_slice(&rx_bytes[..read]); @@ -206,16 +221,22 @@ impl traits::OnOff for KasaOutlet { } } - let resp = Response::decrypt(received.into()).or::(Err(DeviceError::TransientError.into()))?; + let resp = Response::decrypt(received.into()) + .or::(Err(DeviceError::TransientError.into()))?; - resp.get_current_relay_state().or(Err(DeviceError::TransientError.into())) + resp.get_current_relay_state() + .or(Err(DeviceError::TransientError.into())) } fn set_on(&mut self, on: bool) -> Result<(), errors::ErrorCode> { - let mut stream = TcpStream::connect(self.addr).or::(Err(DeviceError::DeviceOffline))?; + let mut stream = + TcpStream::connect(self.addr).or::(Err(DeviceError::DeviceOffline))?; let body = Request::set_relay_state(on).encrypt(); - stream.write_all(&body).and(stream.flush()).or::(Err(DeviceError::TransientError))?; + stream + .write_all(&body) + .and(stream.flush()) + .or::(Err(DeviceError::TransientError))?; let mut received = Vec::new(); let mut rx_bytes = [0; 1024]; @@ -232,9 +253,10 @@ impl traits::OnOff for KasaOutlet { } } - let resp = Response::decrypt(received.into()).or::(Err(DeviceError::TransientError.into()))?; + let resp = Response::decrypt(received.into()) + .or::(Err(DeviceError::TransientError.into()))?; - resp.check_set_relay_success().or(Err(DeviceError::TransientError.into())) + resp.check_set_relay_success() + .or(Err(DeviceError::TransientError.into())) } } - diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index 25470b0..0493e2d 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -1,12 +1,22 @@ use std::net::Ipv4Addr; use async_trait::async_trait; -use google_home::{GoogleHomeDevice, types::Type, device, traits::{self, Scene}, errors::ErrorCode}; -use tracing::{debug, error}; -use rumqttc::{AsyncClient, Publish, matches}; use eui48::MacAddress; +use google_home::{ + device, + errors::ErrorCode, + traits::{self, Scene}, + types::Type, + GoogleHomeDevice, +}; +use rumqttc::{matches, AsyncClient, Publish}; +use tracing::{debug, error}; -use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::{OnMqtt, ActivateMessage}, error::DeviceError}; +use crate::{ + config::{InfoConfig, MqttDeviceConfig}, + error::DeviceError, + mqtt::{ActivateMessage, OnMqtt}, +}; use super::Device; @@ -20,11 +30,26 @@ pub struct WakeOnLAN { } impl WakeOnLAN { - pub async fn build(identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, mac_address: MacAddress, broadcast_ip: Ipv4Addr, client: AsyncClient) -> Result { + pub async fn build( + identifier: &str, + info: InfoConfig, + mqtt: MqttDeviceConfig, + mac_address: MacAddress, + broadcast_ip: Ipv4Addr, + client: AsyncClient, + ) -> Result { // TODO: Handle potential errors here - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; + client + .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) + .await?; - Ok(Self { identifier: identifier.to_owned(), info, mqtt, mac_address, broadcast_ip }) + Ok(Self { + identifier: identifier.to_owned(), + info, + mqtt, + mac_address, + broadcast_ip, + }) } } @@ -81,20 +106,31 @@ impl GoogleHomeDevice for WakeOnLAN { impl traits::Scene for WakeOnLAN { fn set_active(&self, activate: bool) -> Result<(), ErrorCode> { if activate { - debug!(id = self.identifier, "Activating Computer: {} (Sending to {})", self.mac_address, self.broadcast_ip); - let wol = wakey::WolPacket::from_bytes(&self.mac_address.to_array()).map_err(|err| { - error!(id = self.identifier, "invalid mac address: {err}"); - google_home::errors::DeviceError::TransientError - })?; + debug!( + id = self.identifier, + "Activating Computer: {} (Sending to {})", self.mac_address, self.broadcast_ip + ); + let wol = + wakey::WolPacket::from_bytes(&self.mac_address.to_array()).map_err(|err| { + error!(id = self.identifier, "invalid mac address: {err}"); + google_home::errors::DeviceError::TransientError + })?; - wol.send_magic_to((Ipv4Addr::new(0, 0, 0, 0), 0), (self.broadcast_ip, 9)).map_err(|err| { - error!(id = self.identifier, "Failed to activate computer: {err}"); - google_home::errors::DeviceError::TransientError.into() - }).map(|_| debug!(id = self.identifier, "Success!")) + wol.send_magic_to((Ipv4Addr::new(0, 0, 0, 0), 0), (self.broadcast_ip, 9)) + .map_err(|err| { + error!(id = self.identifier, "Failed to activate computer: {err}"); + google_home::errors::DeviceError::TransientError.into() + }) + .map(|_| debug!(id = self.identifier, "Success!")) } else { - debug!(id = self.identifier, "Trying to deactive computer, this is not currently supported"); + debug!( + id = self.identifier, + "Trying to deactive computer, this is not currently supported" + ); // We do not support deactivating this scene - Err(ErrorCode::DeviceError(google_home::errors::DeviceError::ActionNotAvailable)) + Err(ErrorCode::DeviceError( + google_home::errors::DeviceError::ActionNotAvailable, + )) } } } diff --git a/src/error.rs b/src/error.rs index 3775c54..d8910f6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,13 +1,13 @@ -use std::{fmt, error, result}; +use std::{error, fmt, result}; +use axum::{http::status::InvalidStatusCode, response::IntoResponse}; use rumqttc::ClientError; +use serde::{Deserialize, Serialize}; use thiserror::Error; -use axum::{response::IntoResponse, http::status::InvalidStatusCode}; -use serde::{Serialize, Deserialize}; #[derive(Debug, Clone)] pub struct MissingEnv { - keys: Vec + keys: Vec, } // TODO: Would be nice to somehow get the line number of the missing keys @@ -45,9 +45,10 @@ impl fmt::Display for MissingEnv { write!(f, " '{}'", self.keys[0])?; } else { write!(f, "s '{}'", self.keys[0])?; - self.keys.iter().skip(1).try_for_each(|key| { - write!(f, ", '{key}'") - })?; + self.keys + .iter() + .skip(1) + .try_for_each(|key| write!(f, ", '{key}'"))?; } Ok(()) @@ -63,19 +64,21 @@ pub enum ConfigParseError { #[error(transparent)] IoError(#[from] std::io::Error), #[error(transparent)] - DeserializeError(#[from] toml::de::Error) + DeserializeError(#[from] toml::de::Error), } // TODO: Would be nice to somehow get the line number of the expected wildcard topic #[derive(Debug, Error)] #[error("Topic '{topic}' is expected to be a wildcard topic")] pub struct MissingWildcard { - topic: String + topic: String, } impl MissingWildcard { pub fn new(topic: &str) -> Self { - Self { topic: topic.to_owned() } + Self { + topic: topic.to_owned(), + } } } @@ -84,7 +87,7 @@ pub enum DeviceError { #[error(transparent)] SubscribeError(#[from] ClientError), #[error("Expected device '{0}' to implement OnOff trait")] - OnOffExpected(String) + OnOffExpected(String), } #[derive(Debug, Error)] @@ -118,7 +121,10 @@ pub struct ApiError { impl ApiError { pub fn new(status_code: axum::http::StatusCode, source: Box) -> Self { - Self { status_code, source } + Self { + status_code, + source, + } } } @@ -136,7 +142,11 @@ impl From for ApiErrorJson { impl IntoResponse for ApiError { fn into_response(self) -> axum::response::Response { - (self.status_code, serde_json::to_string::(&self.into()).unwrap()).into_response() + ( + self.status_code, + serde_json::to_string::(&self.into()).unwrap(), + ) + .into_response() } } @@ -159,6 +169,9 @@ impl TryFrom for ApiError { let status_code = axum::http::StatusCode::from_u16(value.error.code)?; let source = value.error.reason.into(); - Ok(Self { status_code, source }) + Ok(Self { + status_code, + source, + }) } } diff --git a/src/hue_bridge.rs b/src/hue_bridge.rs index 867ade5..c5c7af7 100644 --- a/src/hue_bridge.rs +++ b/src/hue_bridge.rs @@ -2,9 +2,13 @@ use std::net::SocketAddr; use async_trait::async_trait; use serde::Serialize; -use tracing::{warn, error, trace}; +use tracing::{error, trace, warn}; -use crate::{config::{HueBridgeConfig, Flags}, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}}; +use crate::{ + config::{Flags, HueBridgeConfig}, + light_sensor::{self, OnDarkness}, + presence::{self, OnPresence}, +}; pub enum Flag { Presence, @@ -19,12 +23,16 @@ struct HueBridge { #[derive(Debug, Serialize)] struct FlagMessage { - flag: bool + flag: bool, } impl HueBridge { pub fn new(addr: SocketAddr, login: &str, flags: Flags) -> Self { - Self { addr, login: login.to_owned(), flags } + Self { + addr, + login: login.to_owned(), + flags, + } } pub async fn set_flag(&self, flag: Flag, value: bool) { @@ -33,7 +41,10 @@ impl HueBridge { Flag::Darkness => self.flags.darkness, }; - let url = format!("http://{}/api/{}/sensors/{flag}/state", self.addr, self.login); + let url = format!( + "http://{}/api/{}/sensors/{flag}/state", + self.addr, self.login + ); let res = reqwest::Client::new() .put(url) .json(&FlagMessage { flag: value }) @@ -46,7 +57,7 @@ impl HueBridge { if !status.is_success() { warn!(flag, "Status code is not success: {status}"); } - }, + } Err(err) => { error!(flag, "Error: {err}"); } @@ -54,8 +65,13 @@ impl HueBridge { } } -pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: &HueBridgeConfig) { - let mut hue_bridge = HueBridge::new((config.ip, 80).into(), &config.login, config.flags.clone()); +pub fn start( + mut presence_rx: presence::Receiver, + mut light_sensor_rx: light_sensor::Receiver, + config: &HueBridgeConfig, +) { + let mut hue_bridge = + HueBridge::new((config.ip, 80).into(), &config.login, config.flags.clone()); tokio::spawn(async move { loop { diff --git a/src/lib.rs b/src/lib.rs index b965398..6f49045 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,12 @@ #![allow(incomplete_features)] #![feature(specialization)] -pub mod devices; -pub mod mqtt; -pub mod config; -pub mod presence; -pub mod ntfy; -pub mod light_sensor; -pub mod hue_bridge; pub mod auth; -pub mod error; +pub mod config; pub mod debug_bridge; +pub mod devices; +pub mod error; +pub mod hue_bridge; +pub mod light_sensor; +pub mod mqtt; +pub mod ntfy; +pub mod presence; diff --git a/src/light_sensor.rs b/src/light_sensor.rs index 1d0c4e1..0201fa9 100644 --- a/src/light_sensor.rs +++ b/src/light_sensor.rs @@ -1,9 +1,13 @@ use async_trait::async_trait; use rumqttc::{matches, AsyncClient}; use tokio::sync::watch; -use tracing::{error, trace, debug}; +use tracing::{debug, error, trace}; -use crate::{config::{MqttDeviceConfig, LightSensorConfig}, mqtt::{self, OnMqtt, BrightnessMessage}, error::LightSensorError}; +use crate::{ + config::{LightSensorConfig, MqttDeviceConfig}, + error::LightSensorError, + mqtt::{self, BrightnessMessage, OnMqtt}, +}; #[async_trait] pub trait OnDarkness { @@ -24,12 +28,24 @@ struct LightSensor { impl LightSensor { fn new(mqtt: MqttDeviceConfig, min: isize, max: isize) -> Self { let (tx, is_dark) = watch::channel(false); - Self { is_dark, mqtt, min, max, tx } + Self { + is_dark, + mqtt, + min, + max, + tx, + } } } -pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, client: AsyncClient) -> Result { - client.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; +pub async fn start( + mut mqtt_rx: mqtt::Receiver, + config: LightSensorConfig, + client: AsyncClient, +) -> Result { + client + .subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) + .await?; let mut light_sensor = LightSensor::new(config.mqtt, config.min, config.max); let is_dark = light_sensor.is_dark.clone(); @@ -69,7 +85,12 @@ impl OnMqtt for LightSensor { trace!("It is light"); false } else { - trace!("In between min ({}) and max ({}) value, keeping current state: {}", self.min, self.max, *self.is_dark.borrow()); + trace!( + "In between min ({}) and max ({}) value, keeping current state: {}", + self.min, + self.max, + *self.is_dark.borrow() + ); *self.is_dark.borrow() }; diff --git a/src/main.rs b/src/main.rs index d37e0b7..f2badcb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,22 +1,24 @@ #![feature(async_closure)] -use std::{ process, time::Duration, collections::HashMap }; +use std::{collections::HashMap, process, time::Duration}; -use axum::{extract::FromRef, http::StatusCode, routing::post, Json, Router, response::IntoResponse}; +use axum::{ + extract::FromRef, http::StatusCode, response::IntoResponse, routing::post, Json, Router, +}; use automation::{ auth::User, config::{Config, OpenIDConfig}, - devices, - hue_bridge, - light_sensor, mqtt::Mqtt, - ntfy, - presence, error::ApiError, debug_bridge, + debug_bridge, devices, + error::ApiError, + hue_bridge, light_sensor, + mqtt::Mqtt, + ntfy, presence, }; use dotenvy::dotenv; -use rumqttc::{AsyncClient, MqttOptions, Transport, matches}; +use futures::future::join_all; +use rumqttc::{matches, AsyncClient, MqttOptions, Transport}; use tokio::task::JoinHandle; use tracing::{debug, error, info, metadata::LevelFilter}; -use futures::future::join_all; use google_home::{GoogleHome, Request}; use tracing_subscriber::EnvFilter; @@ -45,7 +47,6 @@ async fn main() { } } - async fn app() -> anyhow::Result<()> { dotenv().ok(); @@ -73,8 +74,14 @@ async fn app() -> anyhow::Result<()> { // Create a mqtt client and wrap the eventloop let (client, eventloop) = AsyncClient::new(mqttoptions, 10); let mqtt = Mqtt::new(eventloop); - let presence = presence::start(config.presence.clone(), mqtt.subscribe(), client.clone()).await?; - let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await?; + let presence = + presence::start(config.presence.clone(), mqtt.subscribe(), client.clone()).await?; + let light_sensor = light_sensor::start( + mqtt.subscribe(), + config.light_sensor.clone(), + client.clone(), + ) + .await?; // Start the ntfy service if it is configured let mut ntfy = None; @@ -90,14 +97,22 @@ async fn app() -> anyhow::Result<()> { // Start the debug bridge if it is configured if let Some(config) = &config.debug_bridge { - debug_bridge::start(presence.clone(), light_sensor.clone(), config, client.clone()); + debug_bridge::start( + presence.clone(), + light_sensor.clone(), + config, + client.clone(), + ); } // Super hacky implementation for the washing machine, just for testing { let mut handle = None::>; let mut mqtt = mqtt.subscribe(); - client.subscribe("zigbee2mqtt/bathroom/washing", rumqttc::QoS::AtLeastOnce).await.unwrap(); + client + .subscribe("zigbee2mqtt/bathroom/washing", rumqttc::QoS::AtLeastOnce) + .await + .unwrap(); tokio::spawn(async move { if let Some(ntfy) = ntfy { loop { @@ -107,7 +122,8 @@ async fn app() -> anyhow::Result<()> { continue; } - let map: HashMap = serde_json::from_slice(&message.payload).unwrap(); + let map: HashMap = + serde_json::from_slice(&message.payload).unwrap(); debug!("Test: {:?}", map); let strength = match map.get("strength").map(|value| value.as_i64().unwrap()) { @@ -127,20 +143,18 @@ async fn app() -> anyhow::Result<()> { // Start a new timer, if the timer runs out we have not had an update of // more then 15 in the last timeout period, assume we are done, notify user let ntfy = ntfy.clone(); - handle = Some( - tokio::spawn(async move { - debug!("Starting timeout of 10 min for washing machine..."); - tokio::time::sleep(Duration::from_secs(10*60)).await; - debug!("Notifying user!"); + handle = Some(tokio::spawn(async move { + debug!("Starting timeout of 10 min for washing machine..."); + tokio::time::sleep(Duration::from_secs(10 * 60)).await; + debug!("Notifying user!"); - let notification = ntfy::Notification::new() - .set_title("Laundy is done") - .set_message("Do not forget to hang it!") - .set_priority(ntfy::Priority::High); + let notification = ntfy::Notification::new() + .set_title("Laundy is done") + .set_message("Do not forget to hang it!") + .set_priority(ntfy::Priority::High); - ntfy.send(notification).await.ok(); - }) - ); + ntfy.send(notification).await.ok(); + })); } } } @@ -156,12 +170,17 @@ async fn app() -> anyhow::Result<()> { .map(|(identifier, device_config)| async { // Force the async block to move identifier let identifier = identifier; - let device = device_config.create(&identifier, &config, client.clone()).await?; + let device = device_config + .create(&identifier, &config, client.clone()) + .await?; devices.add_device(device).await?; // We don't need a seperate error type in main anyhow::Ok(()) - }) - ).await.into_iter().collect::>()?; + }), + ) + .await + .into_iter() + .collect::>()?; // Actually start listening for mqtt message, // we wait until all the setup is done, as otherwise we might miss some messages @@ -175,7 +194,10 @@ async fn app() -> anyhow::Result<()> { let gc = GoogleHome::new(&user.preferred_username); let result = match devices.fullfillment(gc, payload).await { Ok(result) => result, - Err(err) => return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()).into_response(), + Err(err) => { + return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()) + .into_response() + } }; debug!(username = user.preferred_username, "{result:#?}"); diff --git a/src/mqtt.rs b/src/mqtt.rs index 557b5a0..2495666 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,12 +1,12 @@ -use std::time::{UNIX_EPOCH, SystemTime}; +use std::time::{SystemTime, UNIX_EPOCH}; -use bytes::Bytes; -use thiserror::Error; use async_trait::async_trait; -use serde::{Serialize, Deserialize}; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use thiserror::Error; use tracing::{debug, warn}; -use rumqttc::{Publish, Event, Incoming, EventLoop}; +use rumqttc::{Event, EventLoop, Incoming, Publish}; use tokio::sync::broadcast; #[async_trait] @@ -46,13 +46,13 @@ impl Mqtt { match notification { Ok(Event::Incoming(Incoming::Publish(p))) => { self.tx.send(p).ok(); - }, + } Ok(..) => continue, Err(err) => { // Something has gone wrong // We stay in the loop as that will attempt to reconnect warn!("{}", err); - }, + } } } }); @@ -61,12 +61,14 @@ impl Mqtt { #[derive(Debug, Serialize, Deserialize)] pub struct OnOffMessage { - state: String + state: String, } impl OnOffMessage { pub fn new(state: bool) -> Self { - Self { state: if state {"ON"} else {"OFF"}.into() } + Self { + state: if state { "ON" } else { "OFF" }.into(), + } } pub fn state(&self) -> bool { @@ -85,7 +87,7 @@ impl TryFrom<&Publish> for OnOffMessage { #[derive(Debug, Deserialize)] pub struct ActivateMessage { - activate: bool + activate: bool, } impl ActivateMessage { @@ -115,7 +117,7 @@ pub enum RemoteAction { #[derive(Debug, Deserialize)] pub struct RemoteMessage { - action: RemoteAction + action: RemoteAction, } impl RemoteMessage { @@ -141,7 +143,15 @@ pub struct PresenceMessage { impl PresenceMessage { pub fn new(state: bool) -> Self { - Self { state, updated: Some(SystemTime::now().duration_since(UNIX_EPOCH).expect("Time is after UNIX EPOCH").as_millis()) } + Self { + state, + updated: Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time is after UNIX EPOCH") + .as_millis(), + ), + } } pub fn present(&self) -> bool { @@ -206,7 +216,15 @@ pub struct DarknessMessage { impl DarknessMessage { pub fn new(state: bool) -> Self { - Self { state, updated: Some(SystemTime::now().duration_since(UNIX_EPOCH).expect("Time is after UNIX EPOCH").as_millis()) } + Self { + state, + updated: Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time is after UNIX EPOCH") + .as_millis(), + ), + } } pub fn present(&self) -> bool { @@ -222,4 +240,3 @@ impl TryFrom<&Publish> for DarknessMessage { .or(Err(ParseError::InvalidPayload(message.payload.clone()))) } } - diff --git a/src/ntfy.rs b/src/ntfy.rs index 3baa89d..f74154f 100644 --- a/src/ntfy.rs +++ b/src/ntfy.rs @@ -1,12 +1,15 @@ use std::collections::HashMap; use async_trait::async_trait; -use tokio::sync::mpsc; -use tracing::{warn, error, debug}; use serde::Serialize; use serde_repr::*; +use tokio::sync::mpsc; +use tracing::{debug, error, warn}; -use crate::{presence::{self, OnPresence}, config::NtfyConfig}; +use crate::{ + config::NtfyConfig, + presence::{self, OnPresence}, +}; pub type Sender = mpsc::Sender; pub type Receiver = mpsc::Receiver; @@ -32,7 +35,7 @@ pub enum Priority { pub enum ActionType { Broadcast { #[serde(skip_serializing_if = "HashMap::is_empty")] - extras: HashMap + extras: HashMap, }, // View, // Http @@ -69,7 +72,13 @@ pub struct Notification { impl Notification { pub fn new() -> Self { - Self { title: None, message: None, tags: Vec::new(), priority: None, actions: Vec::new() } + Self { + title: None, + message: None, + tags: Vec::new(), + priority: None, + actions: Vec::new(), + } } pub fn set_title(mut self, title: &str) -> Self { @@ -98,7 +107,10 @@ impl Notification { } fn finalize(self, topic: &str) -> NotificationFinal { - NotificationFinal { topic: topic.to_owned(), inner: self } + NotificationFinal { + topic: topic.to_owned(), + inner: self, + } } } @@ -110,7 +122,11 @@ impl Default for Notification { impl Ntfy { fn new(base_url: &str, topic: &str, tx: Sender) -> Self { - Self { base_url: base_url.to_owned(), topic: topic.to_owned(), tx } + Self { + base_url: base_url.to_owned(), + topic: topic.to_owned(), + tx, + } } async fn send(&self, notification: Notification) { @@ -170,7 +186,7 @@ impl OnPresence for Ntfy { let action = Action { action: ActionType::Broadcast { extras }, label: if presence { "Set away" } else { "Set home" }.to_owned(), - clear: Some(true) + clear: Some(true), }; // Create the notification diff --git a/src/presence.rs b/src/presence.rs index 9ed3f5f..03bdd53 100644 --- a/src/presence.rs +++ b/src/presence.rs @@ -1,11 +1,15 @@ use std::collections::HashMap; use async_trait::async_trait; +use rumqttc::{has_wildcards, matches, AsyncClient}; use tokio::sync::watch; use tracing::{debug, error}; -use rumqttc::{AsyncClient, matches, has_wildcards}; -use crate::{mqtt::{OnMqtt, PresenceMessage, self}, config::MqttDeviceConfig, error::{MissingWildcard, PresenceError}}; +use crate::{ + config::MqttDeviceConfig, + error::{MissingWildcard, PresenceError}, + mqtt::{self, OnMqtt, PresenceMessage}, +}; #[async_trait] pub trait OnPresence { @@ -29,13 +33,24 @@ impl Presence { } let (tx, overall_presence) = watch::channel(false); - Ok(Self { devices: HashMap::new(), overall_presence, mqtt, tx }) + Ok(Self { + devices: HashMap::new(), + overall_presence, + mqtt, + tx, + }) } } -pub async fn start(mqtt: MqttDeviceConfig, mut mqtt_rx: mqtt::Receiver, client: AsyncClient) -> Result { +pub async fn start( + mqtt: MqttDeviceConfig, + mut mqtt_rx: mqtt::Receiver, + client: AsyncClient, +) -> Result { // Subscribe to the relevant topics on mqtt - client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?; + client + .subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce) + .await?; let mut presence = Presence::build(mqtt)?; let overall_presence = presence.overall_presence.clone(); @@ -59,7 +74,12 @@ impl OnMqtt for Presence { return; } - let offset = self.mqtt.topic.find('+').or(self.mqtt.topic.find('#')).expect("Presence::new fails if it does not contain wildcards"); + let offset = self + .mqtt + .topic + .find('+') + .or(self.mqtt.topic.find('#')) + .expect("Presence::new fails if it does not contain wildcards"); let device_name = &message.topic[offset..]; if message.payload.is_empty() {