From 330523166f034a47baf2ea33153209e4995d85af Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Fri, 11 Aug 2023 02:24:58 +0200 Subject: [PATCH] Store devices wrapped in Arc RwLock --- Cargo.lock | 13 + Cargo.toml | 1 + google-home/Cargo.toml | 5 +- google-home/src/device.rs | 64 ++- google-home/src/fullfillment.rs | 734 +++++++++++++++------------- google-home/src/lib.rs | 1 + google-home/src/request/execute.rs | 2 +- google-home/src/response.rs | 2 +- google-home/src/response/execute.rs | 8 +- impl_cast/src/lib.rs | 2 +- src/config.rs | 5 +- src/devices.rs | 148 +++--- src/main.rs | 10 +- 13 files changed, 538 insertions(+), 457 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6588dc2..80824ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,7 @@ dependencies = [ "reqwest", "rumqttc", "serde", + "serde-tuple-vec-map", "serde_json", "serde_repr", "thiserror", @@ -461,10 +462,13 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" name = "google-home" version = "0.1.0" dependencies = [ + "async-trait", + "futures", "impl_cast", "serde", "serde_json", "thiserror", + "tokio", ] [[package]] @@ -1212,6 +1216,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-tuple-vec-map" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a04d0ebe0de77d7d445bb729a895dcb0a288854b267ca85f030ce51cdc578c82" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.183" diff --git a/Cargo.toml b/Cargo.toml index d6a7c98..4bd9b82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ anyhow = "1.0.68" wakey = "0.3.0" console-subscriber = "0.1.8" tracing-subscriber = "0.3.16" +serde-tuple-vec-map = "1.0.1" [profile.release] lto = true diff --git a/google-home/Cargo.toml b/google-home/Cargo.toml index db9aaa0..c47b3fe 100644 --- a/google-home/Cargo.toml +++ b/google-home/Cargo.toml @@ -7,6 +7,9 @@ edition = "2021" [dependencies] impl_cast = { path = "../impl_cast" } -serde = { version ="1.0.149", features = ["derive"] } +serde = { version = "1.0.149", features = ["derive"] } serde_json = "1.0.89" thiserror = "1.0.37" +tokio = "1" +async-trait = "0.1.61" +futures = "0.3.25" diff --git a/google-home/src/device.rs b/google-home/src/device.rs index 2d919d3..a3c5277 100644 --- a/google-home/src/device.rs +++ b/google-home/src/device.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use serde::Serialize; use crate::{ @@ -8,8 +9,43 @@ use crate::{ types::Type, }; +// TODO: Find a more elegant way to do this +pub trait AsGoogleHomeDevice { + fn cast(&self) -> Option<&dyn GoogleHomeDevice>; + fn cast_mut(&mut self) -> Option<&mut dyn GoogleHomeDevice>; +} + +// Default impl +impl AsGoogleHomeDevice for T +where + T: 'static, +{ + default fn cast(&self) -> Option<&(dyn GoogleHomeDevice + 'static)> { + None + } + + default fn cast_mut(&mut self) -> Option<&mut (dyn GoogleHomeDevice + 'static)> { + None + } +} + +// Specialization +impl AsGoogleHomeDevice for T +where + T: GoogleHomeDevice + 'static, +{ + fn cast(&self) -> Option<&(dyn GoogleHomeDevice + 'static)> { + Some(self) + } + + fn cast_mut(&mut self) -> Option<&mut (dyn GoogleHomeDevice + 'static)> { + Some(self) + } +} + +#[async_trait] #[impl_cast::device(As: OnOff + Scene)] -pub trait GoogleHomeDevice: Sync + Send + 'static { +pub trait GoogleHomeDevice: AsGoogleHomeDevice + Sync + Send + 'static { fn get_device_type(&self) -> Type; fn get_device_name(&self) -> Name; fn get_id(&self) -> &str; @@ -26,7 +62,7 @@ pub trait GoogleHomeDevice: Sync + Send + 'static { None } - fn sync(&self) -> response::sync::Device { + async fn sync(&self) -> response::sync::Device { let name = self.get_device_name(); let mut device = response::sync::Device::new(self.get_id(), &name.name, self.get_device_type()); @@ -40,9 +76,11 @@ pub trait GoogleHomeDevice: Sync + Send + 'static { device.device_info = self.get_device_info(); let mut traits = Vec::new(); + // OnOff if let Some(on_off) = As::::cast(self) { traits.push(Trait::OnOff); + let on_off = on_off; device.attributes.command_only_on_off = on_off.is_command_only(); device.attributes.query_only_on_off = on_off.is_query_only(); } @@ -58,7 +96,7 @@ pub trait GoogleHomeDevice: Sync + Send + 'static { device } - fn query(&self) -> response::query::Device { + async fn query(&self) -> response::query::Device { let mut device = response::query::Device::new(); if !self.is_online() { device.set_offline(); @@ -72,19 +110,21 @@ pub trait GoogleHomeDevice: Sync + Send + 'static { device } - fn execute(&mut self, command: &CommandType) -> Result<(), ErrorCode> { + async fn execute(&mut self, command: &CommandType) -> Result<(), ErrorCode> { match command { CommandType::OnOff { on } => { - let on_off = As::::cast_mut(self) - .ok_or::(DeviceError::ActionNotAvailable.into())?; - - on_off.set_on(*on)?; + if let Some(on_off) = As::::cast_mut(self) { + on_off.set_on(*on)?; + } else { + return Err(DeviceError::ActionNotAvailable.into()); + } } CommandType::ActivateScene { deactivate } => { - let scene = As::::cast_mut(self) - .ok_or::(DeviceError::ActionNotAvailable.into())?; - - scene.set_active(!deactivate)?; + if let Some(scene) = As::::cast(self) { + scene.set_active(!deactivate)?; + } else { + return Err(DeviceError::ActionNotAvailable.into()); + } } } diff --git a/google-home/src/fullfillment.rs b/google-home/src/fullfillment.rs index dcf6ae9..7a65210 100644 --- a/google-home/src/fullfillment.rs +++ b/google-home/src/fullfillment.rs @@ -1,9 +1,11 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; +use futures::future::{join_all, OptionFuture}; use thiserror::Error; +use tokio::sync::{Mutex, RwLock}; use crate::{ - device::GoogleHomeDevice, + device::AsGoogleHomeDevice, errors::{DeviceError, ErrorCode}, request::{self, Intent, Request}, response::{self, execute, query, sync, Response, ResponsePayload, State}, @@ -28,387 +30,415 @@ impl GoogleHome { } } - pub fn handle_request( + pub async fn handle_request( &self, request: Request, - devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>, + devices: &HashMap>>>, ) -> Result { // TODO: What do we do if we actually get more then one thing in the input array, right now // we only respond to the first thing - let payload = request - .inputs - .into_iter() - .map(|input| match input { - Intent::Sync => ResponsePayload::Sync(self.sync(devices)), - Intent::Query(payload) => ResponsePayload::Query(self.query(payload, devices)), - Intent::Execute(payload) => { - ResponsePayload::Execute(self.execute(payload, devices)) + let intent = request.inputs.into_iter().next(); + + let payload: OptionFuture<_> = intent + .map(|intent| async move { + match intent { + Intent::Sync => ResponsePayload::Sync(self.sync(devices).await), + Intent::Query(payload) => { + ResponsePayload::Query(self.query(payload, devices).await) + } + Intent::Execute(payload) => { + ResponsePayload::Execute(self.execute(payload, devices).await) + } } }) - .next(); + .into(); payload + .await .ok_or(FullfillmentError::ExpectedOnePayload) .map(|payload| Response::new(&request.request_id, payload)) } - fn sync(&self, devices: &HashMap<&str, &mut dyn GoogleHomeDevice>) -> sync::Payload { + async fn sync( + &self, + devices: &HashMap>>>, + ) -> sync::Payload { let mut resp_payload = sync::Payload::new(&self.user_id); - resp_payload.devices = devices - .iter() - .map(|(_, device)| device.sync()) - .collect::>(); - - resp_payload - } - - fn query( - &self, - payload: request::query::Payload, - devices: &HashMap<&str, &mut dyn GoogleHomeDevice>, - ) -> query::Payload { - let mut resp_payload = query::Payload::new(); - resp_payload.devices = payload - .devices - .into_iter() - .map(|device| device.id) - .map(|id| { - let device = devices.get(id.as_str()).map_or_else( - || { - let mut device = query::Device::new(); - device.set_offline(); - device.set_error(DeviceError::DeviceNotFound.into()); - - device - }, - |device| device.query(), - ); - - (id, device) - }) - .collect(); - - resp_payload - } - - fn execute( - &self, - payload: request::execute::Payload, - devices: &mut HashMap<&str, &mut dyn GoogleHomeDevice>, - ) -> execute::Payload { - let mut resp_payload = response::execute::Payload::new(); - - payload.commands.into_iter().for_each(|command| { - let mut success = response::execute::Command::new(execute::Status::Success); - success.states = Some(execute::States { - online: true, - state: State::default(), - }); - let mut offline = response::execute::Command::new(execute::Status::Offline); - offline.states = Some(execute::States { - online: false, - state: State::default(), - }); - let mut errors: HashMap = HashMap::new(); - - command - .devices - .into_iter() - .map(|device| device.id) - .map(|id| { - devices.get_mut(id.as_str()).map_or( - (id.clone(), Err(DeviceError::DeviceNotFound.into())), - |device| { - if !device.is_online() { - return (id, Ok(false)); - } - - let results = command - .execution - .iter() - .map(|cmd| { - // TODO: We should also return the state after update in the state - // struct, however that will make things WAY more complicated - device.execute(cmd) - }) - .collect::, ErrorCode>>(); - - // TODO: We only get one error not all errors - if let Err(err) = results { - (id, Err(err)) - } else { - (id, Ok(true)) - } - }, - ) - }) - .for_each(|(id, state)| { - match state { - Ok(true) => success.add_id(&id), - Ok(false) => offline.add_id(&id), - Err(err) => errors - .entry(err) - .or_insert_with(|| match &err { - ErrorCode::DeviceError(_) => { - response::execute::Command::new(execute::Status::Error) - } - ErrorCode::DeviceException(_) => { - response::execute::Command::new(execute::Status::Exceptions) - } - }) - .add_id(&id), - }; - }); - - resp_payload.add_command(success); - resp_payload.add_command(offline); - for (error, mut cmd) in errors { - cmd.error_code = Some(error); - resp_payload.add_command(cmd); + let f = devices.iter().map(|(_, device)| async move { + if let Some(device) = device.read().await.as_ref().cast() { + Some(device.sync().await) + } else { + None } }); + resp_payload.devices = join_all(f).await.into_iter().flatten().collect(); resp_payload } -} -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - device::{self, GoogleHomeDevice}, - errors::ErrorCode, - request::Request, - traits, types, - }; + async fn query( + &self, + payload: request::query::Payload, + devices: &HashMap>>>, + ) -> query::Payload { + let mut resp_payload = query::Payload::new(); + let f = payload + .devices + .into_iter() + .map(|device| device.id) + .map(|id| async move { + // NOTE: Requires let_chains feature + let device = if let Some(device) = devices.get(id.as_str()) && let Some(device) = device.read().await.as_ref().cast() { + device.query().await + } else { + let mut device = query::Device::new(); + device.set_offline(); + device.set_error(DeviceError::DeviceNotFound.into()); - #[derive(Debug)] - struct TestOutlet { - name: String, - on: bool, + device + }; + + (id, device) + }); + + // Await all the futures and then convert the resulting vector into a hashmap + resp_payload.devices = join_all(f).await.into_iter().collect(); + resp_payload } - impl TestOutlet { - fn new(name: &str) -> Self { - Self { - name: name.into(), - on: false, - } - } - } + async fn execute( + &self, + payload: request::execute::Payload, + devices: &HashMap>>>, + ) -> execute::Payload { + let resp_payload = Arc::new(Mutex::new(response::execute::Payload::new())); - impl GoogleHomeDevice for TestOutlet { - fn get_device_type(&self) -> types::Type { - types::Type::Outlet - } + let f = payload.commands.into_iter().map(|command| { + let resp_payload = resp_payload.clone(); + async move { + let mut success = response::execute::Command::new(execute::Status::Success); + success.states = Some(execute::States { + online: true, + state: State::default(), + }); + let mut offline = response::execute::Command::new(execute::Status::Offline); + offline.states = Some(execute::States { + online: false, + state: State::default(), + }); + let mut errors: HashMap = HashMap::new(); - fn get_device_name(&self) -> device::Name { - let mut name = device::Name::new("Nightstand"); - name.add_default_name("Outlet"); - name.add_nickname("Nightlight"); + let f = command + .devices + .into_iter() + .map(|device| device.id) + .map(|id| { + let execution = command.execution.clone(); + async move { + if let Some(device) = devices.get(id.as_str()) && let Some(device) = device.write().await.as_mut().cast_mut() { + if !device.is_online() { + return (id, Ok(false)); + } - name - } + // NOTE: We can not use .map here because async =( + let mut results = Vec::new(); + for cmd in &execution { + results.push(device.execute(cmd).await); + } - fn get_id(&self) -> &str { - &self.name - } + // Convert vec of results to a result with a vec and the first + // encountered error + let results = results + .into_iter() + .collect::, ErrorCode>>(); - fn is_online(&self) -> bool { - true - } + // TODO: We only get one error not all errors + if let Err(err) = results { + (id, Err(err)) + } else { + (id, Ok(true)) + } + } else { + (id.clone(), Err(DeviceError::DeviceNotFound.into())) + } + } + }); - fn get_room_hint(&self) -> Option<&str> { - Some("Bedroom") - } + let a = join_all(f).await; + a.into_iter().for_each(|(id, state)| { + match state { + Ok(true) => success.add_id(&id), + Ok(false) => offline.add_id(&id), + Err(err) => errors + .entry(err) + .or_insert_with(|| match &err { + ErrorCode::DeviceError(_) => { + response::execute::Command::new(execute::Status::Error) + } + ErrorCode::DeviceException(_) => { + response::execute::Command::new(execute::Status::Exceptions) + } + }) + .add_id(&id), + }; + }); - fn get_device_info(&self) -> Option { - Some(device::Info { - manufacturer: Some("Company".into()), - model: Some("Outlet II".into()), - hw_version: None, - sw_version: None, - }) - } - } + let mut resp_payload = resp_payload.lock().await; + resp_payload.add_command(success); + resp_payload.add_command(offline); + for (error, mut cmd) in errors { + cmd.error_code = Some(error); + resp_payload.add_command(cmd); + } + } + }); - impl traits::OnOff for TestOutlet { - fn is_on(&self) -> Result { - Ok(self.on) - } + join_all(f).await; - fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { - self.on = on; - Ok(()) - } - } - - #[derive(Debug)] - struct TestScene; - - impl TestScene { - fn new() -> Self { - Self {} - } - } - - impl GoogleHomeDevice for TestScene { - fn get_device_type(&self) -> types::Type { - types::Type::Scene - } - - fn get_device_name(&self) -> device::Name { - device::Name::new("Party") - } - - fn get_id(&self) -> &str { - "living/party_mode" - } - - fn is_online(&self) -> bool { - true - } - - fn get_room_hint(&self) -> Option<&str> { - Some("Living room") - } - } - - impl traits::Scene for TestScene { - fn set_active(&self, _activate: bool) -> Result<(), ErrorCode> { - println!("Activating the party scene"); - Ok(()) - } - } - - #[test] - fn handle_sync() { - let json = r#"{ - "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", - "inputs": [ - { - "intent": "action.devices.SYNC" - } - ] -}"#; - let req: Request = serde_json::from_str(json).unwrap(); - - let gh = GoogleHome { - user_id: "Dreaded_X".into(), - }; - - let mut nightstand = TestOutlet::new("bedroom/nightstand"); - let mut lamp = TestOutlet::new("living/lamp"); - let mut scene = TestScene::new(); - let mut devices: HashMap<&str, &mut dyn GoogleHomeDevice> = HashMap::new(); - let id = nightstand.get_id().to_owned(); - devices.insert(&id, &mut nightstand); - let id = lamp.get_id().to_owned(); - devices.insert(&id, &mut lamp); - let id = scene.get_id().to_owned(); - devices.insert(&id, &mut scene); - - let resp = gh.handle_request(req, &mut devices).unwrap(); - - let json = serde_json::to_string(&resp).unwrap(); - println!("{}", json) - } - - #[test] - fn handle_query() { - let json = r#"{ - "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", - "inputs": [ - { - "intent": "action.devices.QUERY", - "payload": { - "devices": [ - { - "id": "bedroom/nightstand" - }, - { - "id": "living/party_mode" - } - ] - } - } - ] -}"#; - let req: Request = serde_json::from_str(json).unwrap(); - - let gh = GoogleHome { - user_id: "Dreaded_X".into(), - }; - - let mut nightstand = TestOutlet::new("bedroom/nightstand"); - let mut lamp = TestOutlet::new("living/lamp"); - let mut scene = TestScene::new(); - let mut devices: HashMap<&str, &mut dyn GoogleHomeDevice> = HashMap::new(); - let id = nightstand.get_id().to_owned(); - devices.insert(&id, &mut nightstand); - let id = lamp.get_id().to_owned(); - devices.insert(&id, &mut lamp); - let id = scene.get_id().to_owned(); - devices.insert(&id, &mut scene); - - let resp = gh.handle_request(req, &mut devices).unwrap(); - - let json = serde_json::to_string(&resp).unwrap(); - println!("{}", json) - } - - #[test] - fn handle_execute() { - let json = r#"{ - "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", - "inputs": [ - { - "intent": "action.devices.EXECUTE", - "payload": { - "commands": [ - { - "devices": [ - { - "id": "bedroom/nightstand" - }, - { - "id": "living/lamp" - } - ], - "execution": [ - { - "command": "action.devices.commands.OnOff", - "params": { - "on": true - } - } - ] - } - ] - } - } - ] -}"#; - let req: Request = serde_json::from_str(json).unwrap(); - - let gh = GoogleHome { - user_id: "Dreaded_X".into(), - }; - - let mut nightstand = TestOutlet::new("bedroom/nightstand"); - let mut lamp = TestOutlet::new("living/lamp"); - let mut scene = TestScene::new(); - let mut devices: HashMap<&str, &mut dyn GoogleHomeDevice> = HashMap::new(); - let id = nightstand.get_id().to_owned(); - devices.insert(&id, &mut nightstand); - let id = lamp.get_id().to_owned(); - devices.insert(&id, &mut lamp); - let id = scene.get_id().to_owned(); - devices.insert(&id, &mut scene); - - let resp = gh.handle_request(req, &mut devices).unwrap(); - - let json = serde_json::to_string(&resp).unwrap(); - println!("{}", json) + // We await all the futures that use resp_payload so try_unwrap should never fail + std::sync::Arc::>::try_unwrap(resp_payload) + .unwrap() + .into_inner() } } + +// #[cfg(test)] +// mod tests { +// use super::*; +// use crate::{ +// device::{self, GoogleHomeDevice}, +// errors::ErrorCode, +// request::Request, +// traits, types, +// }; +// +// #[derive(Debug)] +// struct TestOutlet { +// name: String, +// on: bool, +// } +// +// impl TestOutlet { +// fn new(name: &str) -> Self { +// Self { +// name: name.into(), +// on: false, +// } +// } +// } +// +// impl GoogleHomeDevice for TestOutlet { +// fn get_device_type(&self) -> types::Type { +// types::Type::Outlet +// } +// +// fn get_device_name(&self) -> device::Name { +// let mut name = device::Name::new("Nightstand"); +// name.add_default_name("Outlet"); +// name.add_nickname("Nightlight"); +// +// name +// } +// +// fn get_id(&self) -> &str { +// &self.name +// } +// +// fn is_online(&self) -> bool { +// true +// } +// +// fn get_room_hint(&self) -> Option<&str> { +// Some("Bedroom") +// } +// +// fn get_device_info(&self) -> Option { +// Some(device::Info { +// manufacturer: Some("Company".into()), +// model: Some("Outlet II".into()), +// hw_version: None, +// sw_version: None, +// }) +// } +// } +// +// impl traits::OnOff for TestOutlet { +// fn is_on(&self) -> Result { +// Ok(self.on) +// } +// +// fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { +// self.on = on; +// Ok(()) +// } +// } +// +// #[derive(Debug)] +// struct TestScene; +// +// impl TestScene { +// fn new() -> Self { +// Self {} +// } +// } +// +// impl GoogleHomeDevice for TestScene { +// fn get_device_type(&self) -> types::Type { +// types::Type::Scene +// } +// +// fn get_device_name(&self) -> device::Name { +// device::Name::new("Party") +// } +// +// fn get_id(&self) -> &str { +// "living/party_mode" +// } +// +// fn is_online(&self) -> bool { +// true +// } +// +// fn get_room_hint(&self) -> Option<&str> { +// Some("Living room") +// } +// } +// +// impl traits::Scene for TestScene { +// fn set_active(&self, _activate: bool) -> Result<(), ErrorCode> { +// println!("Activating the party scene"); +// Ok(()) +// } +// } +// +// #[test] +// fn handle_sync() { +// let json = r#"{ +// "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", +// "inputs": [ +// { +// "intent": "action.devices.SYNC" +// } +// ] +// }"#; +// let req: Request = serde_json::from_str(json).unwrap(); +// +// let gh = GoogleHome { +// user_id: "Dreaded_X".into(), +// }; +// +// let mut nightstand = TestOutlet::new("bedroom/nightstand"); +// let mut lamp = TestOutlet::new("living/lamp"); +// let mut scene = TestScene::new(); +// let mut devices: HashMap<&str, &mut dyn GoogleHomeDevice> = HashMap::new(); +// let id = nightstand.get_id().to_owned(); +// devices.insert(&id, &mut nightstand); +// let id = lamp.get_id().to_owned(); +// devices.insert(&id, &mut lamp); +// let id = scene.get_id().to_owned(); +// devices.insert(&id, &mut scene); +// +// let resp = gh.handle_request(req, &mut devices).unwrap(); +// +// let json = serde_json::to_string(&resp).unwrap(); +// println!("{}", json) +// } +// +// #[test] +// fn handle_query() { +// let json = r#"{ +// "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", +// "inputs": [ +// { +// "intent": "action.devices.QUERY", +// "payload": { +// "devices": [ +// { +// "id": "bedroom/nightstand" +// }, +// { +// "id": "living/party_mode" +// } +// ] +// } +// } +// ] +// }"#; +// let req: Request = serde_json::from_str(json).unwrap(); +// +// let gh = GoogleHome { +// user_id: "Dreaded_X".into(), +// }; +// +// let mut nightstand = TestOutlet::new("bedroom/nightstand"); +// let mut lamp = TestOutlet::new("living/lamp"); +// let mut scene = TestScene::new(); +// let mut devices: HashMap<&str, &mut dyn GoogleHomeDevice> = HashMap::new(); +// let id = nightstand.get_id().to_owned(); +// devices.insert(&id, &mut nightstand); +// let id = lamp.get_id().to_owned(); +// devices.insert(&id, &mut lamp); +// let id = scene.get_id().to_owned(); +// devices.insert(&id, &mut scene); +// +// let resp = gh.handle_request(req, &mut devices).unwrap(); +// +// let json = serde_json::to_string(&resp).unwrap(); +// println!("{}", json) +// } +// +// #[test] +// fn handle_execute() { +// let json = r#"{ +// "requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf", +// "inputs": [ +// { +// "intent": "action.devices.EXECUTE", +// "payload": { +// "commands": [ +// { +// "devices": [ +// { +// "id": "bedroom/nightstand" +// }, +// { +// "id": "living/lamp" +// } +// ], +// "execution": [ +// { +// "command": "action.devices.commands.OnOff", +// "params": { +// "on": true +// } +// } +// ] +// } +// ] +// } +// } +// ] +// }"#; +// let req: Request = serde_json::from_str(json).unwrap(); +// +// let gh = GoogleHome { +// user_id: "Dreaded_X".into(), +// }; +// +// let mut nightstand = TestOutlet::new("bedroom/nightstand"); +// let mut lamp = TestOutlet::new("living/lamp"); +// let mut scene = TestScene::new(); +// let mut devices: HashMap<&str, &mut dyn GoogleHomeDevice> = HashMap::new(); +// let id = nightstand.get_id().to_owned(); +// devices.insert(&id, &mut nightstand); +// let id = lamp.get_id().to_owned(); +// devices.insert(&id, &mut lamp); +// let id = scene.get_id().to_owned(); +// devices.insert(&id, &mut scene); +// +// let resp = gh.handle_request(req, &mut devices).unwrap(); +// +// let json = serde_json::to_string(&resp).unwrap(); +// println!("{}", json) +// } +// } diff --git a/google-home/src/lib.rs b/google-home/src/lib.rs index 2a675e4..9d91e04 100644 --- a/google-home/src/lib.rs +++ b/google-home/src/lib.rs @@ -1,5 +1,6 @@ #![allow(incomplete_features)] #![feature(specialization)] +#![feature(let_chains)] pub mod device; mod fullfillment; diff --git a/google-home/src/request/execute.rs b/google-home/src/request/execute.rs index 6efb99c..ce95ef6 100644 --- a/google-home/src/request/execute.rs +++ b/google-home/src/request/execute.rs @@ -20,7 +20,7 @@ pub struct Device { // customData } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone, Copy)] #[serde(tag = "command", content = "params")] pub enum CommandType { #[serde(rename = "action.devices.commands.OnOff")] diff --git a/google-home/src/response.rs b/google-home/src/response.rs index 28bd3f5..627b91c 100644 --- a/google-home/src/response.rs +++ b/google-home/src/response.rs @@ -28,7 +28,7 @@ pub enum ResponsePayload { Execute(execute::Payload), } -#[derive(Debug, Default, Serialize)] +#[derive(Debug, Default, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct State { #[serde(skip_serializing_if = "Option::is_none")] diff --git a/google-home/src/response/execute.rs b/google-home/src/response/execute.rs index 4ef3a66..7473736 100644 --- a/google-home/src/response/execute.rs +++ b/google-home/src/response/execute.rs @@ -2,7 +2,7 @@ use serde::Serialize; use crate::{errors::ErrorCode, response::State}; -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Payload { #[serde(skip_serializing_if = "Option::is_none")] @@ -34,7 +34,7 @@ impl Default for Payload { } } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Command { #[serde(skip_serializing_if = "Option::is_none")] @@ -65,7 +65,7 @@ impl Command { } } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct States { pub online: bool, @@ -74,7 +74,7 @@ pub struct States { pub state: State, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum Status { Success, diff --git a/impl_cast/src/lib.rs b/impl_cast/src/lib.rs index ccf3269..e92740e 100644 --- a/impl_cast/src/lib.rs +++ b/impl_cast/src/lib.rs @@ -122,7 +122,7 @@ pub fn device(attr: TokenStream, item: TokenStream) -> TokenStream { Some(self) } - fn cast_mut(&mut self) -> Option<&mut (dyn #device_trait + 'static)> { + fn cast_mut(&mut self) -> Option<&mut (dyn #device_trait + 'static)> { Some(self) } } diff --git a/src/config.rs b/src/config.rs index 61dc87d..c4d4047 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, fs, net::{Ipv4Addr, SocketAddr}, time::Duration, @@ -32,8 +31,8 @@ pub struct Config { pub light_sensor: LightSensorConfig, pub hue_bridge: Option, pub debug_bridge: Option, - #[serde(default)] - pub devices: HashMap, + #[serde(default, with = "tuple_vec_map")] + pub devices: Vec<(String, DeviceConfig)>, } #[derive(Debug, Clone, Deserialize)] diff --git a/src/devices.rs b/src/devices.rs index 3350bb8..961c6ee 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -21,12 +21,14 @@ pub use self::presence::{Presence, PresenceConfig, DEFAULT_PRESENCE}; pub use self::wake_on_lan::WakeOnLAN; use std::collections::HashMap; +use std::sync::Arc; use futures::future::join_all; -use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice}; +use google_home::device::AsGoogleHomeDevice; +use google_home::{traits::OnOff, FullfillmentError}; use rumqttc::{matches, AsyncClient, QoS}; use thiserror::Error; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, RwLock}; use tracing::{debug, error, instrument, trace}; use crate::{ @@ -37,25 +39,25 @@ use crate::{ event::{Event, EventChannel}, }; -#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + OnNotification + GoogleHomeDevice + OnOff)] -pub trait Device: std::fmt::Debug + Sync + Send { +#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + OnNotification + OnOff)] +pub trait Device: AsGoogleHomeDevice + std::fmt::Debug + Sync + Send { fn get_id(&self) -> &str; } +pub type DeviceMap = HashMap>>>; + // TODO: Add an inner type that we can wrap with Arc> to make this type a little bit nicer // to work with #[derive(Debug)] struct Devices { - devices: HashMap>, + devices: DeviceMap, client: AsyncClient, } #[derive(Debug)] pub enum Command { Fullfillment { - google_home: GoogleHome, - payload: google_home::Request, - tx: oneshot::Sender>, + tx: oneshot::Sender, }, AddDevice { device: Box, @@ -80,20 +82,10 @@ pub enum DevicesError { impl DevicesHandle { // TODO: Improve error type - pub async fn fullfillment( - &self, - google_home: GoogleHome, - payload: google_home::Request, - ) -> Result { + pub async fn fullfillment(&self) -> Result { let (tx, rx) = oneshot::channel(); - self.tx - .send(Command::Fullfillment { - google_home, - payload, - tx, - }) - .await?; - Ok(rx.await??) + self.tx.send(Command::Fullfillment { tx }).await?; + Ok(rx.await?) } pub async fn add_device(&self, device: Box) -> Result<(), DevicesError> { @@ -140,14 +132,8 @@ pub fn start(client: AsyncClient) -> (DevicesHandle, EventChannel) { impl Devices { async fn handle_cmd(&mut self, cmd: Command) { match cmd { - Command::Fullfillment { - google_home, - payload, - tx, - } => { - let result = - google_home.handle_request(payload, &mut self.get::()); - tx.send(result).ok(); + Command::Fullfillment { tx } => { + tx.send(self.devices.clone()).ok(); } Command::AddDevice { device, tx } => { self.add_device(device).await; @@ -158,39 +144,49 @@ impl Devices { } async fn add_device(&mut self, device: Box) { - let id = device.get_id(); - debug!(id, "Adding device"); + let id = device.get_id().to_owned(); - // If the device listens to mqtt, subscribe to the topics - if let Some(device) = As::::cast(device.as_ref()) { - for topic in device.topics() { - trace!(id, topic, "Subscribing to topic"); - if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await { - // NOTE: Pretty sure that this can only happen if the mqtt client if no longer - // running - error!(id, topic, "Failed to subscribe to topic: {err}"); + let device = Arc::new(RwLock::new(device)); + { + let device = device.read().await; + + debug!(id, "Adding device"); + + // If the device listens to mqtt, subscribe to the topics + if let Some(device) = As::::cast(device.as_ref()) { + for topic in device.topics() { + trace!(id, topic, "Subscribing to topic"); + if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await { + // NOTE: Pretty sure that this can only happen if the mqtt client if no longer + // running + error!(id, topic, "Failed to subscribe to topic: {err}"); + } } } } - self.devices.insert(device.get_id().to_owned(), device); + self.devices.insert(id, device); } #[instrument(skip(self))] async fn handle_event(&mut self, event: Event) { match event { Event::MqttMessage(message) => { - let iter = self.get::().into_iter().map(|(id, listener)| { + let iter = self.devices.iter().map(|(id, device)| { let message = message.clone(); async move { - let subscribed = listener - .topics() - .iter() - .any(|topic| matches(&message.topic, topic)); + let mut device = device.write().await; + let device = device.as_mut(); + if let Some(device) = As::::cast_mut(device) { + let subscribed = device + .topics() + .iter() + .any(|topic| matches(&message.topic, topic)); - if subscribed { - trace!(id, "Handling"); - listener.on_mqtt(message).await; + if subscribed { + trace!(id, "Handling"); + device.on_mqtt(message).await; + } } } }); @@ -198,52 +194,44 @@ impl Devices { join_all(iter).await; } Event::Darkness(dark) => { - let iter = - self.get::() - .into_iter() - .map(|(id, device)| async move { - trace!(id, "Handling"); - device.on_darkness(dark).await; - }); + let iter = self.devices.iter().map(|(id, device)| async move { + let mut device = device.write().await; + let device = device.as_mut(); + if let Some(device) = As::::cast_mut(device) { + trace!(id, "Handling"); + device.on_darkness(dark).await; + } + }); join_all(iter).await; } Event::Presence(presence) => { - let iter = - self.get::() - .into_iter() - .map(|(id, device)| async move { - trace!(id, "Handling"); - device.on_presence(presence).await; - }); + let iter = self.devices.iter().map(|(id, device)| async move { + let mut device = device.write().await; + let device = device.as_mut(); + if let Some(device) = As::::cast_mut(device) { + trace!(id, "Handling"); + device.on_presence(presence).await; + } + }); join_all(iter).await; } Event::Ntfy(notification) => { - let iter = self - .get::() - .into_iter() - .map(|(id, device)| { - let notification = notification.clone(); - async move { + let iter = self.devices.iter().map(|(id, device)| { + let notification = notification.clone(); + async move { + let mut device = device.write().await; + let device = device.as_mut(); + if let Some(device) = As::::cast_mut(device) { trace!(id, "Handling"); device.on_notification(notification).await; } - }); + } + }); join_all(iter).await; } } } - - fn get(&mut self) -> HashMap<&str, &mut T> - where - T: ?Sized + 'static, - (dyn Device): As, - { - self.devices - .iter_mut() - .filter_map(|(id, device)| As::::cast_mut(device.as_mut()).map(|t| (id.as_str(), t))) - .collect() - } } diff --git a/src/main.rs b/src/main.rs index 67e4b70..f6f9266 100644 --- a/src/main.rs +++ b/src/main.rs @@ -128,8 +128,14 @@ async fn app() -> anyhow::Result<()> { post(async move |user: User, Json(payload): Json| { debug!(username = user.preferred_username, "{payload:#?}"); let gc = GoogleHome::new(&user.preferred_username); - let result = match device_handler.fullfillment(gc, payload).await { - Ok(result) => result, + let result = match device_handler.fullfillment().await { + Ok(devices) => match gc.handle_request(payload, &devices).await { + Ok(result) => result, + Err(err) => { + return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()) + .into_response() + } + }, Err(err) => { return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()) .into_response()