Applied rust fmt
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Dreaded_X 2023-04-10 01:33:39 +02:00
parent de9203b8d5
commit 3645b53f7c
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
29 changed files with 842 additions and 375 deletions

View File

@ -1,6 +1,12 @@
use serde::Serialize;
use crate::{response, types::Type, traits::{AsOnOff, Trait, AsScene}, errors::{DeviceError, ErrorCode}, request::execute::CommandType};
use crate::{
errors::{DeviceError, ErrorCode},
request::execute::CommandType,
response,
traits::{AsOnOff, AsScene, Trait},
types::Type,
};
pub trait GoogleHomeDevice: AsOnOff + AsScene {
fn get_device_type(&self) -> Type;
@ -19,11 +25,10 @@ pub trait GoogleHomeDevice: AsOnOff + AsScene {
None
}
fn sync(&self) -> response::sync::Device {
let name = self.get_device_name();
let mut device = response::sync::Device::new(self.get_id(), &name.name, self.get_device_type());
let mut device =
response::sync::Device::new(self.get_id(), &name.name, self.get_device_type());
device.name = name;
device.will_report_state = self.will_report_state();
@ -60,9 +65,7 @@ pub trait GoogleHomeDevice: AsOnOff + AsScene {
// OnOff
if let Some(on_off) = AsOnOff::cast(self) {
device.state.on = on_off.is_on()
.map_err(|err| device.set_error(err))
.ok();
device.state.on = on_off.is_on().map_err(|err| device.set_error(err)).ok();
}
device
@ -75,13 +78,13 @@ pub trait GoogleHomeDevice: AsOnOff + AsScene {
.ok_or::<ErrorCode>(DeviceError::ActionNotAvailable.into())?;
on_off.set_on(*on)?;
},
}
CommandType::ActivateScene { deactivate } => {
let scene = AsScene::cast_mut(self)
.ok_or::<ErrorCode>(DeviceError::ActionNotAvailable.into())?;
scene.set_active(!deactivate)?;
},
}
}
Ok(())
@ -100,7 +103,11 @@ pub struct Name {
impl Name {
pub fn new(name: &str) -> Self {
Self { default_names: Vec::new(), name: name.into(), nicknames: Vec::new() }
Self {
default_names: Vec::new(),
name: name.into(),
nicknames: Vec::new(),
}
}
pub fn add_default_name(&mut self, name: &str) {

View File

@ -1,5 +1,5 @@
use thiserror::Error;
use serde::Serialize;
use thiserror::Error;
#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone, Serialize, Error)]
#[serde(rename_all = "camelCase")]
@ -16,8 +16,7 @@ pub enum DeviceError {
#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone, Serialize, Error)]
#[serde(rename_all = "camelCase")]
pub enum DeviceException {
}
pub enum DeviceException {}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Error)]
#[serde(untagged)]

View File

@ -2,7 +2,12 @@ use std::collections::HashMap;
use thiserror::Error;
use crate::{request::{Request, Intent, self}, device::GoogleHomeDevice, response::{sync, ResponsePayload, query, execute, Response, self, State}, errors::{DeviceError, ErrorCode}};
use crate::{
device::GoogleHomeDevice,
errors::{DeviceError, ErrorCode},
request::{self, Intent, Request},
response::{self, execute, query, sync, Response, ResponsePayload, State},
};
#[derive(Debug)]
pub struct GoogleHome {
@ -13,15 +18,21 @@ pub struct GoogleHome {
#[derive(Debug, Error)]
pub enum FullfillmentError {
#[error("Expected at least one ResponsePayload")]
ExpectedOnePayload
ExpectedOnePayload,
}
impl GoogleHome {
pub fn new(user_id: &str) -> Self {
Self { user_id: user_id.into() }
Self {
user_id: user_id.into(),
}
}
pub fn handle_request(&self, request: Request, devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>) -> Result<Response, FullfillmentError> {
pub fn handle_request(
&self,
request: Request,
devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>,
) -> Result<Response, FullfillmentError> {
// TODO: What do we do if we actually get more then one thing in the input array, right now
// we only respond to the first thing
let payload = request
@ -30,8 +41,11 @@ impl GoogleHome {
.map(|input| match input {
Intent::Sync => ResponsePayload::Sync(self.sync(devices)),
Intent::Query(payload) => ResponsePayload::Query(self.query(payload, devices)),
Intent::Execute(payload) => ResponsePayload::Execute(self.execute(payload, devices)),
}).next();
Intent::Execute(payload) => {
ResponsePayload::Execute(self.execute(payload, devices))
}
})
.next();
payload
.ok_or(FullfillmentError::ExpectedOnePayload)
@ -48,83 +62,111 @@ impl GoogleHome {
resp_payload
}
fn query(&self, payload: request::query::Payload, devices: &HashMap<&str, &mut dyn GoogleHomeDevice>) -> query::Payload {
fn query(
&self,
payload: request::query::Payload,
devices: &HashMap<&str, &mut dyn GoogleHomeDevice>,
) -> query::Payload {
let mut resp_payload = query::Payload::new();
resp_payload.devices = payload.devices
resp_payload.devices = payload
.devices
.into_iter()
.map(|device| device.id)
.map(|id| {
let device = devices.get(id.as_str())
.map_or_else(|| {
let device = devices.get(id.as_str()).map_or_else(
|| {
let mut device = query::Device::new();
device.set_offline();
device.set_error(DeviceError::DeviceNotFound.into());
device
}, |device| device.query());
},
|device| device.query(),
);
(id, device)
}).collect();
})
.collect();
resp_payload
}
fn execute(&self, payload: request::execute::Payload, devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>) -> execute::Payload {
fn execute(
&self,
payload: request::execute::Payload,
devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>,
) -> execute::Payload {
let mut resp_payload = response::execute::Payload::new();
payload.commands
.into_iter()
.for_each(|command| {
let mut success = response::execute::Command::new(execute::Status::Success);
success.states = Some(execute::States { online: true, state: State::default() });
let mut offline = response::execute::Command::new(execute::Status::Offline);
offline.states = Some(execute::States { online: false, state: State::default() });
let mut errors: HashMap<ErrorCode, response::execute::Command> = HashMap::new();
payload.commands.into_iter().for_each(|command| {
let mut success = response::execute::Command::new(execute::Status::Success);
success.states = Some(execute::States {
online: true,
state: State::default(),
});
let mut offline = response::execute::Command::new(execute::Status::Offline);
offline.states = Some(execute::States {
online: false,
state: State::default(),
});
let mut errors: HashMap<ErrorCode, response::execute::Command> = HashMap::new();
command.devices
.into_iter()
.map(|device| device.id)
.map(|id| {
devices.get_mut(id.as_str())
.map_or((id.clone(), Err(DeviceError::DeviceNotFound.into())), |device| {
if !device.is_online() {
return (id, Ok(false));
}
command
.devices
.into_iter()
.map(|device| device.id)
.map(|id| {
devices.get_mut(id.as_str()).map_or(
(id.clone(), Err(DeviceError::DeviceNotFound.into())),
|device| {
if !device.is_online() {
return (id, Ok(false));
}
let results = command.execution.iter().map(|cmd| {
let results = command
.execution
.iter()
.map(|cmd| {
// TODO: We should also return the state after update in the state
// struct, however that will make things WAY more complicated
device.execute(cmd)
}).collect::<Result<Vec<_>, ErrorCode>>();
})
.collect::<Result<Vec<_>, ErrorCode>>();
// TODO: We only get one error not all errors
if let Err(err) = results {
(id, Err(err))
} else {
(id, Ok(true))
// TODO: We only get one error not all errors
if let Err(err) = results {
(id, Err(err))
} else {
(id, Ok(true))
}
},
)
})
.for_each(|(id, state)| {
match state {
Ok(true) => success.add_id(&id),
Ok(false) => offline.add_id(&id),
Err(err) => errors
.entry(err)
.or_insert_with(|| match &err {
ErrorCode::DeviceError(_) => {
response::execute::Command::new(execute::Status::Error)
}
ErrorCode::DeviceException(_) => {
response::execute::Command::new(execute::Status::Exceptions)
}
})
}).for_each(|(id, state)| {
match state {
Ok(true) => success.add_id(&id),
Ok(false) => offline.add_id(&id),
Err(err) => errors.entry(err).or_insert_with(|| {
match &err {
ErrorCode::DeviceError(_) => response::execute::Command::new(execute::Status::Error),
ErrorCode::DeviceException(_) => response::execute::Command::new(execute::Status::Exceptions),
}
}).add_id(&id),
};
});
.add_id(&id),
};
});
resp_payload.add_command(success);
resp_payload.add_command(offline);
for (error, mut cmd) in errors {
cmd.error_code = Some(error);
resp_payload.add_command(cmd);
}
});
resp_payload.add_command(success);
resp_payload.add_command(offline);
for (error, mut cmd) in errors {
cmd.error_code = Some(error);
resp_payload.add_command(cmd);
}
});
resp_payload
}
@ -133,7 +175,12 @@ impl GoogleHome {
#[cfg(test)]
mod tests {
use super::*;
use crate::{request::Request, device::{GoogleHomeDevice, self}, types, traits, errors::ErrorCode};
use crate::{
device::{self, GoogleHomeDevice},
errors::ErrorCode,
request::Request,
traits, types,
};
#[derive(Debug)]
struct TestOutlet {
@ -143,7 +190,10 @@ mod tests {
impl TestOutlet {
fn new(name: &str) -> Self {
Self { name: name.into(), on: false }
Self {
name: name.into(),
on: false,
}
}
}

View File

@ -1,18 +1,18 @@
#![allow(incomplete_features)]
#![feature(specialization)]
mod fullfillment;
pub mod device;
mod fullfillment;
mod request;
mod response;
pub mod types;
pub mod traits;
pub mod errors;
mod attributes;
pub mod errors;
pub mod traits;
pub mod types;
pub use fullfillment::GoogleHome;
pub use device::GoogleHomeDevice;
pub use fullfillment::FullfillmentError;
pub use fullfillment::GoogleHome;
pub use request::Request;
pub use response::Response;
pub use device::GoogleHomeDevice;

View File

@ -1,6 +1,6 @@
pub mod sync;
pub mod query;
pub mod execute;
pub mod query;
pub mod sync;
use serde::Deserialize;

View File

@ -10,7 +10,7 @@ pub struct Payload {
#[serde(rename_all = "camelCase")]
pub struct Command {
pub devices: Vec<Device>,
pub execution: Vec<CommandType>
pub execution: Vec<CommandType>,
}
#[derive(Debug, Deserialize)]
@ -24,23 +24,18 @@ pub struct Device {
#[serde(tag = "command", content = "params")]
pub enum CommandType {
#[serde(rename = "action.devices.commands.OnOff")]
OnOff {
on: bool
},
OnOff { on: bool },
#[serde(rename = "action.devices.commands.ActivateScene")]
ActivateScene {
deactivate: bool
}
ActivateScene { deactivate: bool },
}
#[cfg(test)]
mod tests {
use super::*;
use crate::request::{Request, Intent};
use crate::request::{Intent, Request};
#[test]
fn deserialize() {
let json = r#"{
"requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf",
"inputs": [
@ -86,7 +81,10 @@ mod tests {
println!("{:?}", req);
assert_eq!(req.request_id, "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned());
assert_eq!(
req.request_id,
"ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned()
);
assert_eq!(req.inputs.len(), 1);
match &req.inputs[0] {
Intent::Execute(payload) => {
@ -96,11 +94,11 @@ mod tests {
assert_eq!(payload.commands[0].devices[1].id, "456");
assert_eq!(payload.commands[0].execution.len(), 1);
match payload.commands[0].execution[0] {
CommandType::OnOff{on} => assert!(on),
_ => panic!("Expected OnOff")
CommandType::OnOff { on } => assert!(on),
_ => panic!("Expected OnOff"),
}
},
_ => panic!("Expected Execute intent")
}
_ => panic!("Expected Execute intent"),
};
}
}

View File

@ -15,11 +15,10 @@ pub struct Device {
#[cfg(test)]
mod tests {
use crate::request::{Request, Intent};
use crate::request::{Intent, Request};
#[test]
fn deserialize() {
let json = r#"{
"requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf",
"inputs": [
@ -53,15 +52,18 @@ mod tests {
println!("{:?}", req);
assert_eq!(req.request_id, "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned());
assert_eq!(
req.request_id,
"ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned()
);
assert_eq!(req.inputs.len(), 1);
match &req.inputs[0] {
Intent::Query(payload) => {
assert_eq!(payload.devices.len(), 2);
assert_eq!(payload.devices[0].id, "123");
assert_eq!(payload.devices[1].id, "456");
},
_ => panic!("Expected Query intent")
}
_ => panic!("Expected Query intent"),
};
}
}

View File

@ -1,10 +1,9 @@
#[cfg(test)]
mod tests {
use crate::request::{Request, Intent};
use crate::request::{Intent, Request};
#[test]
fn deserialize() {
let json = r#"{
"requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf",
"inputs": [
@ -18,12 +17,14 @@ mod tests {
println!("{:?}", req);
assert_eq!(req.request_id, "ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned());
assert_eq!(
req.request_id,
"ff36a3cc-ec34-11e6-b1a0-64510650abcf".to_owned()
);
assert_eq!(req.inputs.len(), 1);
match req.inputs[0] {
Intent::Sync => {},
_ => panic!("Expected Sync intent")
Intent::Sync => {}
_ => panic!("Expected Sync intent"),
}
}
}

View File

@ -1,6 +1,6 @@
pub mod sync;
pub mod query;
pub mod execute;
pub mod query;
pub mod sync;
use serde::Serialize;
@ -13,7 +13,10 @@ pub struct Response {
impl Response {
pub fn new(request_id: &str, payload: ResponsePayload) -> Self {
Self { request_id: request_id.to_owned(), payload }
Self {
request_id: request_id.to_owned(),
payload,
}
}
}

View File

@ -1,6 +1,6 @@
use serde::Serialize;
use crate::{response::State, errors::ErrorCode};
use crate::{errors::ErrorCode, response::State};
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
@ -14,7 +14,11 @@ pub struct Payload {
impl Payload {
pub fn new() -> Self {
Self { error_code: None, debug_string: None, commands: Vec::new() }
Self {
error_code: None,
debug_string: None,
commands: Vec::new(),
}
}
pub fn add_command(&mut self, command: Command) {
@ -44,7 +48,12 @@ pub struct Command {
impl Command {
pub fn new(status: Status) -> Self {
Self { error_code: None, ids: Vec::new(), status, states: None }
Self {
error_code: None,
ids: Vec::new(),
status,
states: None,
}
}
pub fn add_id(&mut self, id: &str) {
@ -78,7 +87,10 @@ pub enum Status {
#[cfg(test)]
mod tests {
use super::*;
use crate::{response::{Response, ResponsePayload, State}, errors::DeviceError};
use crate::{
errors::DeviceError,
response::{Response, ResponsePayload, State},
};
#[test]
fn serialize() {
@ -98,7 +110,10 @@ mod tests {
command.ids.push("456".into());
execute_resp.add_command(command);
let resp = Response::new("ff36a3cc-ec34-11e6-b1a0-64510650abcf", ResponsePayload::Execute(execute_resp));
let resp = Response::new(
"ff36a3cc-ec34-11e6-b1a0-64510650abcf",
ResponsePayload::Execute(execute_resp),
);
let json = serde_json::to_string(&resp).unwrap();

View File

@ -2,7 +2,7 @@ use std::collections::HashMap;
use serde::Serialize;
use crate::{response::State, errors::ErrorCode};
use crate::{errors::ErrorCode, response::State};
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
@ -16,7 +16,11 @@ pub struct Payload {
impl Payload {
pub fn new() -> Self {
Self { error_code: None, debug_string: None, devices: HashMap::new() }
Self {
error_code: None,
debug_string: None,
devices: HashMap::new(),
}
}
pub fn add_device(&mut self, id: &str, device: Device) {
@ -53,7 +57,12 @@ pub struct Device {
impl Device {
pub fn new() -> Self {
Self { online: true, status: Status::Success, error_code: None, state: State::default() }
Self {
online: true,
status: Status::Success,
error_code: None,
state: State::default(),
}
}
pub fn set_offline(&mut self) {
@ -93,7 +102,10 @@ mod tests {
device.state.on = Some(false);
query_resp.add_device("456", device);
let resp = Response::new("ff36a3cc-ec34-11e6-b1a0-64510650abcf", ResponsePayload::Query(query_resp));
let resp = Response::new(
"ff36a3cc-ec34-11e6-b1a0-64510650abcf",
ResponsePayload::Query(query_resp),
);
let json = serde_json::to_string(&resp).unwrap();

View File

@ -3,8 +3,8 @@ use serde::Serialize;
use crate::attributes::Attributes;
use crate::device;
use crate::errors::ErrorCode;
use crate::types::Type;
use crate::traits::Trait;
use crate::types::Type;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
@ -19,7 +19,12 @@ pub struct Payload {
impl Payload {
pub fn new(agent_user_id: &str) -> Self {
Self { agent_user_id: agent_user_id.into(), error_code: None, debug_string: None, devices: Vec::new() }
Self {
agent_user_id: agent_user_id.into(),
error_code: None,
debug_string: None,
devices: Vec::new(),
}
}
pub fn add_device(&mut self, device: Device) {
@ -64,7 +69,11 @@ impl Device {
#[cfg(test)]
mod tests {
use super::*;
use crate::{response::{Response, ResponsePayload}, types::Type, traits::Trait};
use crate::{
response::{Response, ResponsePayload},
traits::Trait,
types::Type,
};
#[test]
fn serialize() {
@ -85,7 +94,10 @@ mod tests {
sync_resp.add_device(device);
let resp = Response::new("ff36a3cc-ec34-11e6-b1a0-64510650abcf", ResponsePayload::Sync(sync_resp));
let resp = Response::new(
"ff36a3cc-ec34-11e6-b1a0-64510650abcf",
ResponsePayload::Sync(sync_resp),
);
let json = serde_json::to_string(&resp).unwrap();

View File

@ -1,11 +1,14 @@
use axum::{
async_trait,
extract::{FromRequestParts, FromRef},
http::{StatusCode, request::Parts},
extract::{FromRef, FromRequestParts},
http::{request::Parts, StatusCode},
};
use serde::Deserialize;
use crate::{config::OpenIDConfig, error::{ApiError, ApiErrorJson}};
use crate::{
config::OpenIDConfig,
error::{ApiError, ApiErrorJson},
};
#[derive(Debug, Deserialize)]
pub struct User {
@ -26,8 +29,7 @@ where
// Create a request to the auth server
// TODO: Do some discovery to find the correct url for this instead of assuming
let mut req = reqwest::Client::new()
.get(format!("{}/userinfo", openid.base_url));
let mut req = reqwest::Client::new().get(format!("{}/userinfo", openid.base_url));
// Add auth header to the request if it exists
if let Some(auth) = parts.headers.get(axum::http::header::AUTHORIZATION) {
@ -35,14 +37,16 @@ where
}
// Send the request
let res = req.send()
let res = req
.send()
.await
.map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?;
// If the request is success full the auth token is valid and we are given userinfo
let status = res.status();
if status.is_success() {
let user = res.json()
let user = res
.json()
.await
.map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?;

View File

@ -1,13 +1,20 @@
use std::{fs, net::{Ipv4Addr, SocketAddr}, collections::HashMap};
use std::{
collections::HashMap,
fs,
net::{Ipv4Addr, SocketAddr},
};
use async_recursion::async_recursion;
use regex::{Regex, Captures};
use tracing::{debug, trace};
use rumqttc::{AsyncClient, has_wildcards};
use serde::Deserialize;
use eui48::MacAddress;
use regex::{Captures, Regex};
use rumqttc::{has_wildcards, AsyncClient};
use serde::Deserialize;
use tracing::{debug, trace};
use crate::{devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor, KasaOutlet, self}, error::{MissingEnv, MissingWildcard, ConfigParseError, DeviceCreationError}};
use crate::{
devices::{self, AudioSetup, ContactSensor, DeviceBox, IkeaOutlet, KasaOutlet, WakeOnLAN},
error::{ConfigParseError, DeviceCreationError, MissingEnv, MissingWildcard},
};
#[derive(Debug, Deserialize)]
pub struct Config {
@ -26,7 +33,7 @@ pub struct Config {
#[derive(Debug, Clone, Deserialize)]
pub struct OpenIDConfig {
pub base_url: String
pub base_url: String,
}
#[derive(Debug, Clone, Deserialize)]
@ -56,7 +63,10 @@ impl From<FullfillmentConfig> for SocketAddr {
impl Default for FullfillmentConfig {
fn default() -> Self {
Self { ip: default_fullfillment_ip(), port: default_fullfillment_port() }
Self {
ip: default_fullfillment_ip(),
port: default_fullfillment_port(),
}
}
}
@ -134,20 +144,33 @@ pub struct PresenceDeviceConfig {
pub mqtt: Option<MqttDeviceConfig>,
// TODO: Maybe make this an option? That way if no timeout is set it will immediately turn the
// device off again?
pub timeout: u64 // Timeout in seconds
pub timeout: u64, // Timeout in seconds
}
impl PresenceDeviceConfig {
/// Set the mqtt topic to an appropriate value if it is not already set
fn generate_topic(mut self, class: &str, identifier: &str, config: &Config) -> Result<PresenceDeviceConfig, MissingWildcard> {
fn generate_topic(
mut self,
class: &str,
identifier: &str,
config: &Config,
) -> Result<PresenceDeviceConfig, MissingWildcard> {
if self.mqtt.is_none() {
if !has_wildcards(&config.presence.topic) {
return Err(MissingWildcard::new(&config.presence.topic));
}
// TODO: This is not perfect, if the topic is some/+/thing/# this will fail
let offset = config.presence.topic.find('+').or(config.presence.topic.find('#')).unwrap();
let topic = format!("{}/{class}/{identifier}", &config.presence.topic[..offset-1]);
let offset = config
.presence
.topic
.find('+')
.or(config.presence.topic.find('#'))
.unwrap();
let topic = format!(
"{}/{class}/{identifier}",
&config.presence.topic[..offset - 1]
);
trace!("Setting presence mqtt topic: {topic}");
self.mqtt = Some(MqttDeviceConfig { topic });
}
@ -183,14 +206,14 @@ pub enum Device {
AudioSetup {
#[serde(flatten)]
mqtt: MqttDeviceConfig,
mixer: Box::<Device>,
speakers: Box::<Device>,
mixer: Box<Device>,
speakers: Box<Device>,
},
ContactSensor {
#[serde(flatten)]
mqtt: MqttDeviceConfig,
presence: Option<PresenceDeviceConfig>,
}
},
}
fn default_outlet_type() -> OutletType {
@ -239,42 +262,77 @@ fn device_box<T: devices::Device + 'static>(device: T) -> DeviceBox {
impl Device {
#[async_recursion]
pub async fn create(self, identifier: &str, config: &Config, client: AsyncClient) -> Result<DeviceBox, DeviceCreationError> {
pub async fn create(
self,
identifier: &str,
config: &Config,
client: AsyncClient,
) -> Result<DeviceBox, DeviceCreationError> {
let device = match self {
Device::IkeaOutlet { info, mqtt, outlet_type, timeout } => {
trace!(id = identifier, "IkeaOutlet [{} in {:?}]", info.name, info.room);
IkeaOutlet::build(identifier, info, mqtt, outlet_type, timeout, client).await
Device::IkeaOutlet {
info,
mqtt,
outlet_type,
timeout,
} => {
trace!(
id = identifier,
"IkeaOutlet [{} in {:?}]",
info.name,
info.room
);
IkeaOutlet::build(identifier, info, mqtt, outlet_type, timeout, client)
.await
.map(device_box)?
},
Device::WakeOnLAN { info, mqtt, mac_address, broadcast_ip } => {
trace!(id = identifier, "WakeOnLan [{} in {:?}]", info.name, info.room);
WakeOnLAN::build(identifier, info, mqtt, mac_address, broadcast_ip, client).await
}
Device::WakeOnLAN {
info,
mqtt,
mac_address,
broadcast_ip,
} => {
trace!(
id = identifier,
"WakeOnLan [{} in {:?}]",
info.name,
info.room
);
WakeOnLAN::build(identifier, info, mqtt, mac_address, broadcast_ip, client)
.await
.map(device_box)?
},
}
Device::KasaOutlet { ip } => {
trace!(id = identifier, "KasaOutlet [{}]", identifier);
device_box(KasaOutlet::new(identifier, ip))
}
Device::AudioSetup { mqtt, mixer, speakers } => {
Device::AudioSetup {
mqtt,
mixer,
speakers,
} => {
trace!(id = identifier, "AudioSetup [{}]", identifier);
// Create the child devices
let mixer_id = format!("{}.mixer", identifier);
let mixer = (*mixer).create(&mixer_id, config, client.clone()).await?;
let speakers_id = format!("{}.speakers", identifier);
let speakers = (*speakers).create(&speakers_id, config, client.clone()).await?;
let speakers = (*speakers)
.create(&speakers_id, config, client.clone())
.await?;
AudioSetup::build(identifier, mqtt, mixer, speakers, client).await
AudioSetup::build(identifier, mqtt, mixer, speakers, client)
.await
.map(device_box)?
},
}
Device::ContactSensor { mqtt, presence } => {
trace!(id = identifier, "ContactSensor [{}]", identifier);
let presence = presence
.map(|p| p.generate_topic("contact", identifier, config))
.transpose()?;
ContactSensor::build(identifier, mqtt, presence, client).await
ContactSensor::build(identifier, mqtt, presence, client)
.await
.map(device_box)?
},
}
};
Ok(device)

View File

@ -2,7 +2,12 @@ use async_trait::async_trait;
use rumqttc::AsyncClient;
use tracing::warn;
use crate::{config::DebugBridgeConfig, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}, mqtt::{PresenceMessage, DarknessMessage}};
use crate::{
config::DebugBridgeConfig,
light_sensor::{self, OnDarkness},
mqtt::{DarknessMessage, PresenceMessage},
presence::{self, OnPresence},
};
struct DebugBridge {
topic: String,
@ -11,11 +16,19 @@ struct DebugBridge {
impl DebugBridge {
pub fn new(topic: &str, client: AsyncClient) -> Self {
Self { topic: topic.to_owned(), client }
Self {
topic: topic.to_owned(),
client,
}
}
}
pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: &DebugBridgeConfig, client: AsyncClient) {
pub fn start(
mut presence_rx: presence::Receiver,
mut light_sensor_rx: light_sensor::Receiver,
config: &DebugBridgeConfig,
client: AsyncClient,
) {
let mut debug_bridge = DebugBridge::new(&config.topic, client);
tokio::spawn(async move {
@ -49,9 +62,20 @@ impl OnPresence for DebugBridge {
async fn on_presence(&mut self, presence: bool) {
let message = PresenceMessage::new(presence);
let topic = format!("{}/presence", self.topic);
self.client.publish(topic, rumqttc::QoS::AtLeastOnce, true, serde_json::to_string(&message).unwrap())
self.client
.publish(
topic,
rumqttc::QoS::AtLeastOnce,
true,
serde_json::to_string(&message).unwrap(),
)
.await
.map_err(|err| warn!("Failed to update presence on {}/presence: {err}", self.topic))
.map_err(|err| {
warn!(
"Failed to update presence on {}/presence: {err}",
self.topic
)
})
.ok();
}
}
@ -61,9 +85,20 @@ impl OnDarkness for DebugBridge {
async fn on_darkness(&mut self, dark: bool) {
let message = DarknessMessage::new(dark);
let topic = format!("{}/darkness", self.topic);
self.client.publish(topic, rumqttc::QoS::AtLeastOnce, true, serde_json::to_string(&message).unwrap())
self.client
.publish(
topic,
rumqttc::QoS::AtLeastOnce,
true,
serde_json::to_string(&message).unwrap(),
)
.await
.map_err(|err| warn!("Failed to update presence on {}/presence: {err}", self.topic))
.map_err(|err| {
warn!(
"Failed to update presence on {}/presence: {err}",
self.topic
)
})
.ok();
}
}

View File

@ -1,25 +1,29 @@
mod ikea_outlet;
mod wake_on_lan;
mod kasa_outlet;
mod audio_setup;
mod contact_sensor;
mod ikea_outlet;
mod kasa_outlet;
mod wake_on_lan;
pub use self::ikea_outlet::IkeaOutlet;
pub use self::wake_on_lan::WakeOnLAN;
pub use self::kasa_outlet::KasaOutlet;
pub use self::audio_setup::AudioSetup;
pub use self::contact_sensor::ContactSensor;
pub use self::ikea_outlet::IkeaOutlet;
pub use self::kasa_outlet::KasaOutlet;
pub use self::wake_on_lan::WakeOnLAN;
use std::collections::HashMap;
use thiserror::Error;
use async_trait::async_trait;
use google_home::{GoogleHomeDevice, traits::OnOff, GoogleHome, FullfillmentError, };
use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice};
use pollster::FutureExt;
use tokio::sync::{oneshot, mpsc};
use tracing::{trace, debug, span, Level};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, span, trace, Level};
use crate::{mqtt::{OnMqtt, self}, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}};
use crate::{
light_sensor::{self, OnDarkness},
mqtt::{self, OnMqtt},
presence::{self, OnPresence},
};
impl_cast::impl_cast!(Device, OnMqtt);
impl_cast::impl_cast!(Device, OnPresence);
@ -27,7 +31,16 @@ impl_cast::impl_cast!(Device, OnDarkness);
impl_cast::impl_cast!(Device, GoogleHomeDevice);
impl_cast::impl_cast!(Device, OnOff);
pub trait Device: AsGoogleHomeDevice + AsOnMqtt + AsOnPresence + AsOnDarkness + AsOnOff + std::fmt::Debug + Sync + Send {
pub trait Device:
AsGoogleHomeDevice
+ AsOnMqtt
+ AsOnPresence
+ AsOnDarkness
+ AsOnOff
+ std::fmt::Debug
+ Sync
+ Send
{
fn get_id(&self) -> &str;
}
@ -61,15 +74,15 @@ pub enum Command {
},
AddDevice {
device: DeviceBox,
tx: oneshot::Sender<()>
}
tx: oneshot::Sender<()>,
},
}
pub type DeviceBox = Box<dyn Device>;
#[derive(Clone)]
pub struct DevicesHandle {
tx: mpsc::Sender<Command>
tx: mpsc::Sender<Command>,
}
#[derive(Debug, Error)]
@ -82,12 +95,21 @@ pub enum DevicesError {
RecvError(#[from] tokio::sync::oneshot::error::RecvError),
}
impl DevicesHandle {
// TODO: Improve error type
pub async fn fullfillment(&self, google_home: GoogleHome, payload: google_home::Request) -> Result<google_home::Response, DevicesError> {
pub async fn fullfillment(
&self,
google_home: GoogleHome,
payload: google_home::Request,
) -> Result<google_home::Response, DevicesError> {
let (tx, rx) = oneshot::channel();
self.tx.send(Command::Fullfillment { google_home, payload, tx }).await?;
self.tx
.send(Command::Fullfillment {
google_home,
payload,
tx,
})
.await?;
Ok(rx.await??)
}
@ -98,9 +120,14 @@ impl DevicesHandle {
}
}
pub fn start(mut mqtt_rx: mqtt::Receiver, mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver) -> DevicesHandle {
let mut devices = Devices { devices: HashMap::new() };
pub fn start(
mut mqtt_rx: mqtt::Receiver,
mut presence_rx: presence::Receiver,
mut light_sensor_rx: light_sensor::Receiver,
) -> DevicesHandle {
let mut devices = Devices {
devices: HashMap::new(),
};
let (tx, mut rx) = mpsc::channel(100);
@ -132,15 +159,20 @@ pub fn start(mut mqtt_rx: mqtt::Receiver, mut presence_rx: presence::Receiver, m
impl Devices {
fn handle_cmd(&mut self, cmd: Command) {
match cmd {
Command::Fullfillment { google_home, payload, tx } => {
let result = google_home.handle_request(payload, &mut self.as_google_home_devices());
Command::Fullfillment {
google_home,
payload,
tx,
} => {
let result =
google_home.handle_request(payload, &mut self.as_google_home_devices());
tx.send(result).ok();
},
}
Command::AddDevice { device, tx } => {
self.add_device(device);
tx.send(()).ok();
},
}
}
}

View File

@ -1,14 +1,14 @@
use async_trait::async_trait;
use google_home::traits;
use rumqttc::{AsyncClient, matches};
use tracing::{error, warn, debug};
use rumqttc::{matches, AsyncClient};
use tracing::{debug, error, warn};
use crate::config::MqttDeviceConfig;
use crate::error::DeviceError;
use crate::mqtt::{OnMqtt, RemoteMessage, RemoteAction};
use crate::mqtt::{OnMqtt, RemoteAction, RemoteMessage};
use crate::presence::OnPresence;
use super::{Device, DeviceBox, AsOnOff};
use super::{AsOnOff, Device, DeviceBox};
// TODO: Ideally we store am Arc to the childern devices,
// that way they hook into everything just like all other devices
@ -21,19 +21,31 @@ pub struct AudioSetup {
}
impl AudioSetup {
pub async fn build(identifier: &str, mqtt: MqttDeviceConfig, mixer: DeviceBox, speakers: DeviceBox, client: AsyncClient) -> Result<Self, DeviceError> {
pub async fn build(
identifier: &str,
mqtt: MqttDeviceConfig,
mixer: DeviceBox,
speakers: DeviceBox,
client: AsyncClient,
) -> Result<Self, DeviceError> {
// We expect the children devices to implement the OnOff trait
let mixer_id = mixer.get_id().to_owned();
let mixer = AsOnOff::consume(mixer)
.ok_or_else(|| DeviceError::OnOffExpected(mixer_id))?;
let mixer = AsOnOff::consume(mixer).ok_or_else(|| DeviceError::OnOffExpected(mixer_id))?;
let speakers_id = speakers.get_id().to_owned();
let speakers = AsOnOff::consume(speakers)
.ok_or_else(|| DeviceError::OnOffExpected(speakers_id))?;
let speakers =
AsOnOff::consume(speakers).ok_or_else(|| DeviceError::OnOffExpected(speakers_id))?;
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
client
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { identifier: identifier.to_owned(), mqtt, mixer, speakers })
Ok(Self {
identifier: identifier.to_owned(),
mqtt,
mixer,
speakers,
})
}
}

View File

@ -1,11 +1,16 @@
use std::time::Duration;
use async_trait::async_trait;
use rumqttc::{AsyncClient, matches};
use rumqttc::{matches, AsyncClient};
use tokio::task::JoinHandle;
use tracing::{error, debug, warn};
use tracing::{debug, error, warn};
use crate::{config::{MqttDeviceConfig, PresenceDeviceConfig}, mqtt::{OnMqtt, ContactMessage, PresenceMessage}, presence::OnPresence, error::DeviceError};
use crate::{
config::{MqttDeviceConfig, PresenceDeviceConfig},
error::DeviceError,
mqtt::{ContactMessage, OnMqtt, PresenceMessage},
presence::OnPresence,
};
use super::Device;
@ -22,8 +27,15 @@ pub struct ContactSensor {
}
impl ContactSensor {
pub async fn build(identifier: &str, mqtt: MqttDeviceConfig, presence: Option<PresenceDeviceConfig>, client: AsyncClient) -> Result<Self, DeviceError> {
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
pub async fn build(
identifier: &str,
mqtt: MqttDeviceConfig,
presence: Option<PresenceDeviceConfig>,
client: AsyncClient,
) -> Result<Self, DeviceError> {
client
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
identifier: identifier.to_owned(),
@ -62,7 +74,7 @@ impl OnMqtt for ContactSensor {
Err(err) => {
error!(id = self.identifier, "Failed to parse message: {err}");
return;
},
}
};
if is_closed == self.is_closed {
@ -97,7 +109,13 @@ impl OnMqtt for ContactSensor {
// This is to prevent the house from being marked as present for however long the
// timeout is set when leaving the house
if !self.overall_presence {
self.client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&PresenceMessage::new(true)).unwrap())
self.client
.publish(
topic.clone(),
rumqttc::QoS::AtLeastOnce,
false,
serde_json::to_string(&PresenceMessage::new(true)).unwrap(),
)
.await
.map_err(|err| warn!("Failed to publish presence on {topic}: {err}"))
.ok();
@ -107,17 +125,16 @@ impl OnMqtt for ContactSensor {
let client = self.client.clone();
let id = self.identifier.clone();
let timeout = Duration::from_secs(presence.timeout);
self.handle = Some(
tokio::spawn(async move {
debug!(id, "Starting timeout ({timeout:?}) for contact sensor...");
tokio::time::sleep(timeout).await;
debug!(id, "Removing door device!");
client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "")
.await
.map_err(|err| warn!("Failed to publish presence on {topic}: {err}"))
.ok();
})
);
self.handle = Some(tokio::spawn(async move {
debug!(id, "Starting timeout ({timeout:?}) for contact sensor...");
tokio::time::sleep(timeout).await;
debug!(id, "Removing door device!");
client
.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "")
.await
.map_err(|err| warn!("Failed to publish presence on {topic}: {err}"))
.ok();
}));
}
}
}

View File

@ -1,11 +1,16 @@
use std::time::Duration;
use async_trait::async_trait;
use google_home::errors::ErrorCode;
use google_home::{GoogleHomeDevice, device, types::Type, traits::{self, OnOff}};
use rumqttc::{AsyncClient, Publish, matches};
use tracing::{debug, error, warn};
use tokio::task::JoinHandle;
use google_home::{
device,
traits::{self, OnOff},
types::Type,
GoogleHomeDevice,
};
use pollster::FutureExt as _;
use rumqttc::{matches, AsyncClient, Publish};
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing::{debug, error, warn};
use crate::config::{InfoConfig, MqttDeviceConfig, OutletType};
use crate::devices::Device;
@ -27,11 +32,29 @@ pub struct IkeaOutlet {
}
impl IkeaOutlet {
pub async fn build(identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, outlet_type: OutletType, timeout: Option<u64>, client: AsyncClient) -> Result<Self, DeviceError> {
pub async fn build(
identifier: &str,
info: InfoConfig,
mqtt: MqttDeviceConfig,
outlet_type: OutletType,
timeout: Option<u64>,
client: AsyncClient,
) -> Result<Self, DeviceError> {
// TODO: Handle potential errors here
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
client
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self{ identifier: identifier.to_owned(), info, mqtt, outlet_type, timeout, client, last_known_state: false, handle: None })
Ok(Self {
identifier: identifier.to_owned(),
info,
mqtt,
outlet_type,
timeout,
client,
last_known_state: false,
handle: None,
})
}
}
@ -40,7 +63,13 @@ async fn set_on(client: AsyncClient, topic: &str, on: bool) {
let topic = format!("{}/set", topic);
// TODO: Handle potential errors here
client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap())
client
.publish(
topic.clone(),
rumqttc::QoS::AtLeastOnce,
false,
serde_json::to_string(&message).unwrap(),
)
.await
.map_err(|err| warn!("Failed to update state on {topic}: {err}"))
.ok();
@ -94,17 +123,15 @@ impl OnMqtt for IkeaOutlet {
let client = self.client.clone();
let topic = self.mqtt.topic.clone();
let id = self.identifier.clone();
self.handle = Some(
tokio::spawn(async move {
debug!(id, "Starting timeout ({timeout:?}) for kettle...");
tokio::time::sleep(timeout).await;
debug!(id, "Turning kettle off!");
// TODO: Idealy we would call self.set_on(false), however since we want to do
// it after a timeout we have to put it in a seperate task.
// I don't think we can really get around calling outside function
set_on(client, &topic, false).await;
})
);
self.handle = Some(tokio::spawn(async move {
debug!(id, "Starting timeout ({timeout:?}) for kettle...");
tokio::time::sleep(timeout).await;
debug!(id, "Turning kettle off!");
// TODO: Idealy we would call self.set_on(false), however since we want to do
// it after a timeout we have to put it in a seperate task.
// I don't think we can really get around calling outside function
set_on(client, &topic, false).await;
}));
}
}
}

View File

@ -1,9 +1,16 @@
use std::{net::{SocketAddr, Ipv4Addr, TcpStream}, io::{Write, Read}, str::Utf8Error};
use std::{
io::{Read, Write},
net::{Ipv4Addr, SocketAddr, TcpStream},
str::Utf8Error,
};
use thiserror::Error;
use bytes::{Buf, BufMut};
use google_home::{traits, errors::{self, DeviceError}};
use serde::{Serialize, Deserialize};
use google_home::{
errors::{self, DeviceError},
traits,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use super::Device;
@ -15,7 +22,10 @@ pub struct KasaOutlet {
impl KasaOutlet {
pub fn new(identifier: &str, ip: Ipv4Addr) -> Self {
Self { identifier: identifier.to_owned(), addr: (ip, 9999).into() }
Self {
identifier: identifier.to_owned(),
addr: (ip, 9999).into(),
}
}
}
@ -50,9 +60,9 @@ impl Request {
fn get_sysinfo() -> Self {
Self {
system: RequestSystem {
get_sysinfo: Some(RequestSysinfo{}),
set_relay_state: None
}
get_sysinfo: Some(RequestSysinfo {}),
set_relay_state: None,
},
}
}
@ -61,9 +71,9 @@ impl Request {
system: RequestSystem {
get_sysinfo: None,
set_relay_state: Some(RequestRelayState {
state: if on { 1 } else { 0 }
})
}
state: if on { 1 } else { 0 },
}),
},
}
}
@ -153,8 +163,7 @@ impl From<serde_json::Error> for ResponseError {
impl Response {
fn get_current_relay_state(&self) -> Result<bool, ResponseError> {
if let Some(sysinfo) = &self.system.get_sysinfo {
return sysinfo.err_code.ok()
.map(|_| sysinfo.relay_state == 1);
return sysinfo.err_code.ok().map(|_| sysinfo.relay_state == 1);
}
Err(ResponseError::SysinfoNotFound)
@ -189,15 +198,21 @@ impl Response {
impl traits::OnOff for KasaOutlet {
fn is_on(&self) -> Result<bool, errors::ErrorCode> {
let mut stream = TcpStream::connect(self.addr).or::<DeviceError>(Err(DeviceError::DeviceOffline))?;
let mut stream =
TcpStream::connect(self.addr).or::<DeviceError>(Err(DeviceError::DeviceOffline))?;
let body = Request::get_sysinfo().encrypt();
stream.write_all(&body).and(stream.flush()).or::<DeviceError>(Err(DeviceError::TransientError))?;
stream
.write_all(&body)
.and(stream.flush())
.or::<DeviceError>(Err(DeviceError::TransientError))?;
let mut received = Vec::new();
let mut rx_bytes = [0; 1024];
loop {
let read = stream.read(&mut rx_bytes).or::<errors::ErrorCode>(Err(DeviceError::TransientError.into()))?;
let read = stream
.read(&mut rx_bytes)
.or::<errors::ErrorCode>(Err(DeviceError::TransientError.into()))?;
received.extend_from_slice(&rx_bytes[..read]);
@ -206,16 +221,22 @@ impl traits::OnOff for KasaOutlet {
}
}
let resp = Response::decrypt(received.into()).or::<errors::ErrorCode>(Err(DeviceError::TransientError.into()))?;
let resp = Response::decrypt(received.into())
.or::<errors::ErrorCode>(Err(DeviceError::TransientError.into()))?;
resp.get_current_relay_state().or(Err(DeviceError::TransientError.into()))
resp.get_current_relay_state()
.or(Err(DeviceError::TransientError.into()))
}
fn set_on(&mut self, on: bool) -> Result<(), errors::ErrorCode> {
let mut stream = TcpStream::connect(self.addr).or::<DeviceError>(Err(DeviceError::DeviceOffline))?;
let mut stream =
TcpStream::connect(self.addr).or::<DeviceError>(Err(DeviceError::DeviceOffline))?;
let body = Request::set_relay_state(on).encrypt();
stream.write_all(&body).and(stream.flush()).or::<DeviceError>(Err(DeviceError::TransientError))?;
stream
.write_all(&body)
.and(stream.flush())
.or::<DeviceError>(Err(DeviceError::TransientError))?;
let mut received = Vec::new();
let mut rx_bytes = [0; 1024];
@ -232,9 +253,10 @@ impl traits::OnOff for KasaOutlet {
}
}
let resp = Response::decrypt(received.into()).or::<errors::ErrorCode>(Err(DeviceError::TransientError.into()))?;
let resp = Response::decrypt(received.into())
.or::<errors::ErrorCode>(Err(DeviceError::TransientError.into()))?;
resp.check_set_relay_success().or(Err(DeviceError::TransientError.into()))
resp.check_set_relay_success()
.or(Err(DeviceError::TransientError.into()))
}
}

View File

@ -1,12 +1,22 @@
use std::net::Ipv4Addr;
use async_trait::async_trait;
use google_home::{GoogleHomeDevice, types::Type, device, traits::{self, Scene}, errors::ErrorCode};
use tracing::{debug, error};
use rumqttc::{AsyncClient, Publish, matches};
use eui48::MacAddress;
use google_home::{
device,
errors::ErrorCode,
traits::{self, Scene},
types::Type,
GoogleHomeDevice,
};
use rumqttc::{matches, AsyncClient, Publish};
use tracing::{debug, error};
use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::{OnMqtt, ActivateMessage}, error::DeviceError};
use crate::{
config::{InfoConfig, MqttDeviceConfig},
error::DeviceError,
mqtt::{ActivateMessage, OnMqtt},
};
use super::Device;
@ -20,11 +30,26 @@ pub struct WakeOnLAN {
}
impl WakeOnLAN {
pub async fn build(identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, mac_address: MacAddress, broadcast_ip: Ipv4Addr, client: AsyncClient) -> Result<Self, DeviceError> {
pub async fn build(
identifier: &str,
info: InfoConfig,
mqtt: MqttDeviceConfig,
mac_address: MacAddress,
broadcast_ip: Ipv4Addr,
client: AsyncClient,
) -> Result<Self, DeviceError> {
// TODO: Handle potential errors here
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
client
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { identifier: identifier.to_owned(), info, mqtt, mac_address, broadcast_ip })
Ok(Self {
identifier: identifier.to_owned(),
info,
mqtt,
mac_address,
broadcast_ip,
})
}
}
@ -81,20 +106,31 @@ impl GoogleHomeDevice for WakeOnLAN {
impl traits::Scene for WakeOnLAN {
fn set_active(&self, activate: bool) -> Result<(), ErrorCode> {
if activate {
debug!(id = self.identifier, "Activating Computer: {} (Sending to {})", self.mac_address, self.broadcast_ip);
let wol = wakey::WolPacket::from_bytes(&self.mac_address.to_array()).map_err(|err| {
error!(id = self.identifier, "invalid mac address: {err}");
google_home::errors::DeviceError::TransientError
})?;
debug!(
id = self.identifier,
"Activating Computer: {} (Sending to {})", self.mac_address, self.broadcast_ip
);
let wol =
wakey::WolPacket::from_bytes(&self.mac_address.to_array()).map_err(|err| {
error!(id = self.identifier, "invalid mac address: {err}");
google_home::errors::DeviceError::TransientError
})?;
wol.send_magic_to((Ipv4Addr::new(0, 0, 0, 0), 0), (self.broadcast_ip, 9)).map_err(|err| {
error!(id = self.identifier, "Failed to activate computer: {err}");
google_home::errors::DeviceError::TransientError.into()
}).map(|_| debug!(id = self.identifier, "Success!"))
wol.send_magic_to((Ipv4Addr::new(0, 0, 0, 0), 0), (self.broadcast_ip, 9))
.map_err(|err| {
error!(id = self.identifier, "Failed to activate computer: {err}");
google_home::errors::DeviceError::TransientError.into()
})
.map(|_| debug!(id = self.identifier, "Success!"))
} else {
debug!(id = self.identifier, "Trying to deactive computer, this is not currently supported");
debug!(
id = self.identifier,
"Trying to deactive computer, this is not currently supported"
);
// We do not support deactivating this scene
Err(ErrorCode::DeviceError(google_home::errors::DeviceError::ActionNotAvailable))
Err(ErrorCode::DeviceError(
google_home::errors::DeviceError::ActionNotAvailable,
))
}
}
}

View File

@ -1,13 +1,13 @@
use std::{fmt, error, result};
use std::{error, fmt, result};
use axum::{http::status::InvalidStatusCode, response::IntoResponse};
use rumqttc::ClientError;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use axum::{response::IntoResponse, http::status::InvalidStatusCode};
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone)]
pub struct MissingEnv {
keys: Vec<String>
keys: Vec<String>,
}
// TODO: Would be nice to somehow get the line number of the missing keys
@ -45,9 +45,10 @@ impl fmt::Display for MissingEnv {
write!(f, " '{}'", self.keys[0])?;
} else {
write!(f, "s '{}'", self.keys[0])?;
self.keys.iter().skip(1).try_for_each(|key| {
write!(f, ", '{key}'")
})?;
self.keys
.iter()
.skip(1)
.try_for_each(|key| write!(f, ", '{key}'"))?;
}
Ok(())
@ -63,19 +64,21 @@ pub enum ConfigParseError {
#[error(transparent)]
IoError(#[from] std::io::Error),
#[error(transparent)]
DeserializeError(#[from] toml::de::Error)
DeserializeError(#[from] toml::de::Error),
}
// TODO: Would be nice to somehow get the line number of the expected wildcard topic
#[derive(Debug, Error)]
#[error("Topic '{topic}' is expected to be a wildcard topic")]
pub struct MissingWildcard {
topic: String
topic: String,
}
impl MissingWildcard {
pub fn new(topic: &str) -> Self {
Self { topic: topic.to_owned() }
Self {
topic: topic.to_owned(),
}
}
}
@ -84,7 +87,7 @@ pub enum DeviceError {
#[error(transparent)]
SubscribeError(#[from] ClientError),
#[error("Expected device '{0}' to implement OnOff trait")]
OnOffExpected(String)
OnOffExpected(String),
}
#[derive(Debug, Error)]
@ -118,7 +121,10 @@ pub struct ApiError {
impl ApiError {
pub fn new(status_code: axum::http::StatusCode, source: Box<dyn std::error::Error>) -> Self {
Self { status_code, source }
Self {
status_code,
source,
}
}
}
@ -136,7 +142,11 @@ impl From<ApiError> for ApiErrorJson {
impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
(self.status_code, serde_json::to_string::<ApiErrorJson>(&self.into()).unwrap()).into_response()
(
self.status_code,
serde_json::to_string::<ApiErrorJson>(&self.into()).unwrap(),
)
.into_response()
}
}
@ -159,6 +169,9 @@ impl TryFrom<ApiErrorJson> for ApiError {
let status_code = axum::http::StatusCode::from_u16(value.error.code)?;
let source = value.error.reason.into();
Ok(Self { status_code, source })
Ok(Self {
status_code,
source,
})
}
}

View File

@ -2,9 +2,13 @@ use std::net::SocketAddr;
use async_trait::async_trait;
use serde::Serialize;
use tracing::{warn, error, trace};
use tracing::{error, trace, warn};
use crate::{config::{HueBridgeConfig, Flags}, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}};
use crate::{
config::{Flags, HueBridgeConfig},
light_sensor::{self, OnDarkness},
presence::{self, OnPresence},
};
pub enum Flag {
Presence,
@ -19,12 +23,16 @@ struct HueBridge {
#[derive(Debug, Serialize)]
struct FlagMessage {
flag: bool
flag: bool,
}
impl HueBridge {
pub fn new(addr: SocketAddr, login: &str, flags: Flags) -> Self {
Self { addr, login: login.to_owned(), flags }
Self {
addr,
login: login.to_owned(),
flags,
}
}
pub async fn set_flag(&self, flag: Flag, value: bool) {
@ -33,7 +41,10 @@ impl HueBridge {
Flag::Darkness => self.flags.darkness,
};
let url = format!("http://{}/api/{}/sensors/{flag}/state", self.addr, self.login);
let url = format!(
"http://{}/api/{}/sensors/{flag}/state",
self.addr, self.login
);
let res = reqwest::Client::new()
.put(url)
.json(&FlagMessage { flag: value })
@ -46,7 +57,7 @@ impl HueBridge {
if !status.is_success() {
warn!(flag, "Status code is not success: {status}");
}
},
}
Err(err) => {
error!(flag, "Error: {err}");
}
@ -54,8 +65,13 @@ impl HueBridge {
}
}
pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: &HueBridgeConfig) {
let mut hue_bridge = HueBridge::new((config.ip, 80).into(), &config.login, config.flags.clone());
pub fn start(
mut presence_rx: presence::Receiver,
mut light_sensor_rx: light_sensor::Receiver,
config: &HueBridgeConfig,
) {
let mut hue_bridge =
HueBridge::new((config.ip, 80).into(), &config.login, config.flags.clone());
tokio::spawn(async move {
loop {

View File

@ -1,12 +1,12 @@
#![allow(incomplete_features)]
#![feature(specialization)]
pub mod devices;
pub mod mqtt;
pub mod config;
pub mod presence;
pub mod ntfy;
pub mod light_sensor;
pub mod hue_bridge;
pub mod auth;
pub mod error;
pub mod config;
pub mod debug_bridge;
pub mod devices;
pub mod error;
pub mod hue_bridge;
pub mod light_sensor;
pub mod mqtt;
pub mod ntfy;
pub mod presence;

View File

@ -1,9 +1,13 @@
use async_trait::async_trait;
use rumqttc::{matches, AsyncClient};
use tokio::sync::watch;
use tracing::{error, trace, debug};
use tracing::{debug, error, trace};
use crate::{config::{MqttDeviceConfig, LightSensorConfig}, mqtt::{self, OnMqtt, BrightnessMessage}, error::LightSensorError};
use crate::{
config::{LightSensorConfig, MqttDeviceConfig},
error::LightSensorError,
mqtt::{self, BrightnessMessage, OnMqtt},
};
#[async_trait]
pub trait OnDarkness {
@ -24,12 +28,24 @@ struct LightSensor {
impl LightSensor {
fn new(mqtt: MqttDeviceConfig, min: isize, max: isize) -> Self {
let (tx, is_dark) = watch::channel(false);
Self { is_dark, mqtt, min, max, tx }
Self {
is_dark,
mqtt,
min,
max,
tx,
}
}
}
pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, client: AsyncClient) -> Result<Receiver, LightSensorError> {
client.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
pub async fn start(
mut mqtt_rx: mqtt::Receiver,
config: LightSensorConfig,
client: AsyncClient,
) -> Result<Receiver, LightSensorError> {
client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
let mut light_sensor = LightSensor::new(config.mqtt, config.min, config.max);
let is_dark = light_sensor.is_dark.clone();
@ -69,7 +85,12 @@ impl OnMqtt for LightSensor {
trace!("It is light");
false
} else {
trace!("In between min ({}) and max ({}) value, keeping current state: {}", self.min, self.max, *self.is_dark.borrow());
trace!(
"In between min ({}) and max ({}) value, keeping current state: {}",
self.min,
self.max,
*self.is_dark.borrow()
);
*self.is_dark.borrow()
};

View File

@ -1,22 +1,24 @@
#![feature(async_closure)]
use std::{ process, time::Duration, collections::HashMap };
use std::{collections::HashMap, process, time::Duration};
use axum::{extract::FromRef, http::StatusCode, routing::post, Json, Router, response::IntoResponse};
use axum::{
extract::FromRef, http::StatusCode, response::IntoResponse, routing::post, Json, Router,
};
use automation::{
auth::User,
config::{Config, OpenIDConfig},
devices,
hue_bridge,
light_sensor, mqtt::Mqtt,
ntfy,
presence, error::ApiError, debug_bridge,
debug_bridge, devices,
error::ApiError,
hue_bridge, light_sensor,
mqtt::Mqtt,
ntfy, presence,
};
use dotenvy::dotenv;
use rumqttc::{AsyncClient, MqttOptions, Transport, matches};
use futures::future::join_all;
use rumqttc::{matches, AsyncClient, MqttOptions, Transport};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, metadata::LevelFilter};
use futures::future::join_all;
use google_home::{GoogleHome, Request};
use tracing_subscriber::EnvFilter;
@ -45,7 +47,6 @@ async fn main() {
}
}
async fn app() -> anyhow::Result<()> {
dotenv().ok();
@ -73,8 +74,14 @@ async fn app() -> anyhow::Result<()> {
// Create a mqtt client and wrap the eventloop
let (client, eventloop) = AsyncClient::new(mqttoptions, 10);
let mqtt = Mqtt::new(eventloop);
let presence = presence::start(config.presence.clone(), mqtt.subscribe(), client.clone()).await?;
let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await?;
let presence =
presence::start(config.presence.clone(), mqtt.subscribe(), client.clone()).await?;
let light_sensor = light_sensor::start(
mqtt.subscribe(),
config.light_sensor.clone(),
client.clone(),
)
.await?;
// Start the ntfy service if it is configured
let mut ntfy = None;
@ -90,14 +97,22 @@ async fn app() -> anyhow::Result<()> {
// Start the debug bridge if it is configured
if let Some(config) = &config.debug_bridge {
debug_bridge::start(presence.clone(), light_sensor.clone(), config, client.clone());
debug_bridge::start(
presence.clone(),
light_sensor.clone(),
config,
client.clone(),
);
}
// Super hacky implementation for the washing machine, just for testing
{
let mut handle = None::<JoinHandle<()>>;
let mut mqtt = mqtt.subscribe();
client.subscribe("zigbee2mqtt/bathroom/washing", rumqttc::QoS::AtLeastOnce).await.unwrap();
client
.subscribe("zigbee2mqtt/bathroom/washing", rumqttc::QoS::AtLeastOnce)
.await
.unwrap();
tokio::spawn(async move {
if let Some(ntfy) = ntfy {
loop {
@ -107,7 +122,8 @@ async fn app() -> anyhow::Result<()> {
continue;
}
let map: HashMap<String, serde_json::Value> = serde_json::from_slice(&message.payload).unwrap();
let map: HashMap<String, serde_json::Value> =
serde_json::from_slice(&message.payload).unwrap();
debug!("Test: {:?}", map);
let strength = match map.get("strength").map(|value| value.as_i64().unwrap()) {
@ -127,20 +143,18 @@ async fn app() -> anyhow::Result<()> {
// Start a new timer, if the timer runs out we have not had an update of
// more then 15 in the last timeout period, assume we are done, notify user
let ntfy = ntfy.clone();
handle = Some(
tokio::spawn(async move {
debug!("Starting timeout of 10 min for washing machine...");
tokio::time::sleep(Duration::from_secs(10*60)).await;
debug!("Notifying user!");
handle = Some(tokio::spawn(async move {
debug!("Starting timeout of 10 min for washing machine...");
tokio::time::sleep(Duration::from_secs(10 * 60)).await;
debug!("Notifying user!");
let notification = ntfy::Notification::new()
.set_title("Laundy is done")
.set_message("Do not forget to hang it!")
.set_priority(ntfy::Priority::High);
let notification = ntfy::Notification::new()
.set_title("Laundy is done")
.set_message("Do not forget to hang it!")
.set_priority(ntfy::Priority::High);
ntfy.send(notification).await.ok();
})
);
ntfy.send(notification).await.ok();
}));
}
}
}
@ -156,12 +170,17 @@ async fn app() -> anyhow::Result<()> {
.map(|(identifier, device_config)| async {
// Force the async block to move identifier
let identifier = identifier;
let device = device_config.create(&identifier, &config, client.clone()).await?;
let device = device_config
.create(&identifier, &config, client.clone())
.await?;
devices.add_device(device).await?;
// We don't need a seperate error type in main
anyhow::Ok(())
})
).await.into_iter().collect::<Result<_, _>>()?;
}),
)
.await
.into_iter()
.collect::<Result<_, _>>()?;
// Actually start listening for mqtt message,
// we wait until all the setup is done, as otherwise we might miss some messages
@ -175,7 +194,10 @@ async fn app() -> anyhow::Result<()> {
let gc = GoogleHome::new(&user.preferred_username);
let result = match devices.fullfillment(gc, payload).await {
Ok(result) => result,
Err(err) => return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()).into_response(),
Err(err) => {
return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into())
.into_response()
}
};
debug!(username = user.preferred_username, "{result:#?}");

View File

@ -1,12 +1,12 @@
use std::time::{UNIX_EPOCH, SystemTime};
use std::time::{SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use thiserror::Error;
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{debug, warn};
use rumqttc::{Publish, Event, Incoming, EventLoop};
use rumqttc::{Event, EventLoop, Incoming, Publish};
use tokio::sync::broadcast;
#[async_trait]
@ -46,13 +46,13 @@ impl Mqtt {
match notification {
Ok(Event::Incoming(Incoming::Publish(p))) => {
self.tx.send(p).ok();
},
}
Ok(..) => continue,
Err(err) => {
// Something has gone wrong
// We stay in the loop as that will attempt to reconnect
warn!("{}", err);
},
}
}
}
});
@ -61,12 +61,14 @@ impl Mqtt {
#[derive(Debug, Serialize, Deserialize)]
pub struct OnOffMessage {
state: String
state: String,
}
impl OnOffMessage {
pub fn new(state: bool) -> Self {
Self { state: if state {"ON"} else {"OFF"}.into() }
Self {
state: if state { "ON" } else { "OFF" }.into(),
}
}
pub fn state(&self) -> bool {
@ -85,7 +87,7 @@ impl TryFrom<&Publish> for OnOffMessage {
#[derive(Debug, Deserialize)]
pub struct ActivateMessage {
activate: bool
activate: bool,
}
impl ActivateMessage {
@ -115,7 +117,7 @@ pub enum RemoteAction {
#[derive(Debug, Deserialize)]
pub struct RemoteMessage {
action: RemoteAction
action: RemoteAction,
}
impl RemoteMessage {
@ -141,7 +143,15 @@ pub struct PresenceMessage {
impl PresenceMessage {
pub fn new(state: bool) -> Self {
Self { state, updated: Some(SystemTime::now().duration_since(UNIX_EPOCH).expect("Time is after UNIX EPOCH").as_millis()) }
Self {
state,
updated: Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time is after UNIX EPOCH")
.as_millis(),
),
}
}
pub fn present(&self) -> bool {
@ -206,7 +216,15 @@ pub struct DarknessMessage {
impl DarknessMessage {
pub fn new(state: bool) -> Self {
Self { state, updated: Some(SystemTime::now().duration_since(UNIX_EPOCH).expect("Time is after UNIX EPOCH").as_millis()) }
Self {
state,
updated: Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time is after UNIX EPOCH")
.as_millis(),
),
}
}
pub fn present(&self) -> bool {
@ -222,4 +240,3 @@ impl TryFrom<&Publish> for DarknessMessage {
.or(Err(ParseError::InvalidPayload(message.payload.clone())))
}
}

View File

@ -1,12 +1,15 @@
use std::collections::HashMap;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tracing::{warn, error, debug};
use serde::Serialize;
use serde_repr::*;
use tokio::sync::mpsc;
use tracing::{debug, error, warn};
use crate::{presence::{self, OnPresence}, config::NtfyConfig};
use crate::{
config::NtfyConfig,
presence::{self, OnPresence},
};
pub type Sender = mpsc::Sender<Notification>;
pub type Receiver = mpsc::Receiver<Notification>;
@ -32,7 +35,7 @@ pub enum Priority {
pub enum ActionType {
Broadcast {
#[serde(skip_serializing_if = "HashMap::is_empty")]
extras: HashMap<String, String>
extras: HashMap<String, String>,
},
// View,
// Http
@ -69,7 +72,13 @@ pub struct Notification {
impl Notification {
pub fn new() -> Self {
Self { title: None, message: None, tags: Vec::new(), priority: None, actions: Vec::new() }
Self {
title: None,
message: None,
tags: Vec::new(),
priority: None,
actions: Vec::new(),
}
}
pub fn set_title(mut self, title: &str) -> Self {
@ -98,7 +107,10 @@ impl Notification {
}
fn finalize(self, topic: &str) -> NotificationFinal {
NotificationFinal { topic: topic.to_owned(), inner: self }
NotificationFinal {
topic: topic.to_owned(),
inner: self,
}
}
}
@ -110,7 +122,11 @@ impl Default for Notification {
impl Ntfy {
fn new(base_url: &str, topic: &str, tx: Sender) -> Self {
Self { base_url: base_url.to_owned(), topic: topic.to_owned(), tx }
Self {
base_url: base_url.to_owned(),
topic: topic.to_owned(),
tx,
}
}
async fn send(&self, notification: Notification) {
@ -170,7 +186,7 @@ impl OnPresence for Ntfy {
let action = Action {
action: ActionType::Broadcast { extras },
label: if presence { "Set away" } else { "Set home" }.to_owned(),
clear: Some(true)
clear: Some(true),
};
// Create the notification

View File

@ -1,11 +1,15 @@
use std::collections::HashMap;
use async_trait::async_trait;
use rumqttc::{has_wildcards, matches, AsyncClient};
use tokio::sync::watch;
use tracing::{debug, error};
use rumqttc::{AsyncClient, matches, has_wildcards};
use crate::{mqtt::{OnMqtt, PresenceMessage, self}, config::MqttDeviceConfig, error::{MissingWildcard, PresenceError}};
use crate::{
config::MqttDeviceConfig,
error::{MissingWildcard, PresenceError},
mqtt::{self, OnMqtt, PresenceMessage},
};
#[async_trait]
pub trait OnPresence {
@ -29,13 +33,24 @@ impl Presence {
}
let (tx, overall_presence) = watch::channel(false);
Ok(Self { devices: HashMap::new(), overall_presence, mqtt, tx })
Ok(Self {
devices: HashMap::new(),
overall_presence,
mqtt,
tx,
})
}
}
pub async fn start(mqtt: MqttDeviceConfig, mut mqtt_rx: mqtt::Receiver, client: AsyncClient) -> Result<Receiver, PresenceError> {
pub async fn start(
mqtt: MqttDeviceConfig,
mut mqtt_rx: mqtt::Receiver,
client: AsyncClient,
) -> Result<Receiver, PresenceError> {
// Subscribe to the relevant topics on mqtt
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
client
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
let mut presence = Presence::build(mqtt)?;
let overall_presence = presence.overall_presence.clone();
@ -59,7 +74,12 @@ impl OnMqtt for Presence {
return;
}
let offset = self.mqtt.topic.find('+').or(self.mqtt.topic.find('#')).expect("Presence::new fails if it does not contain wildcards");
let offset = self
.mqtt
.topic
.find('+')
.or(self.mqtt.topic.find('#'))
.expect("Presence::new fails if it does not contain wildcards");
let device_name = &message.topic[offset..];
if message.payload.is_empty() {