Store devices wrapped in Arc RwLock
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fs,
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
time::Duration,
|
||||
@@ -32,8 +31,8 @@ pub struct Config {
|
||||
pub light_sensor: LightSensorConfig,
|
||||
pub hue_bridge: Option<HueBridgeConfig>,
|
||||
pub debug_bridge: Option<DebugBridgeConfig>,
|
||||
#[serde(default)]
|
||||
pub devices: HashMap<String, DeviceConfig>,
|
||||
#[serde(default, with = "tuple_vec_map")]
|
||||
pub devices: Vec<(String, DeviceConfig)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
|
||||
148
src/devices.rs
148
src/devices.rs
@@ -21,12 +21,14 @@ pub use self::presence::{Presence, PresenceConfig, DEFAULT_PRESENCE};
|
||||
pub use self::wake_on_lan::WakeOnLAN;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::future::join_all;
|
||||
use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice};
|
||||
use google_home::device::AsGoogleHomeDevice;
|
||||
use google_home::{traits::OnOff, FullfillmentError};
|
||||
use rumqttc::{matches, AsyncClient, QoS};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
|
||||
use crate::{
|
||||
@@ -37,25 +39,25 @@ use crate::{
|
||||
event::{Event, EventChannel},
|
||||
};
|
||||
|
||||
#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + OnNotification + GoogleHomeDevice + OnOff)]
|
||||
pub trait Device: std::fmt::Debug + Sync + Send {
|
||||
#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + OnNotification + OnOff)]
|
||||
pub trait Device: AsGoogleHomeDevice + std::fmt::Debug + Sync + Send {
|
||||
fn get_id(&self) -> &str;
|
||||
}
|
||||
|
||||
pub type DeviceMap = HashMap<String, Arc<RwLock<Box<dyn Device>>>>;
|
||||
|
||||
// TODO: Add an inner type that we can wrap with Arc<RwLock<>> to make this type a little bit nicer
|
||||
// to work with
|
||||
#[derive(Debug)]
|
||||
struct Devices {
|
||||
devices: HashMap<String, Box<dyn Device>>,
|
||||
devices: DeviceMap,
|
||||
client: AsyncClient,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Command {
|
||||
Fullfillment {
|
||||
google_home: GoogleHome,
|
||||
payload: google_home::Request,
|
||||
tx: oneshot::Sender<Result<google_home::Response, FullfillmentError>>,
|
||||
tx: oneshot::Sender<DeviceMap>,
|
||||
},
|
||||
AddDevice {
|
||||
device: Box<dyn Device>,
|
||||
@@ -80,20 +82,10 @@ pub enum DevicesError {
|
||||
|
||||
impl DevicesHandle {
|
||||
// TODO: Improve error type
|
||||
pub async fn fullfillment(
|
||||
&self,
|
||||
google_home: GoogleHome,
|
||||
payload: google_home::Request,
|
||||
) -> Result<google_home::Response, DevicesError> {
|
||||
pub async fn fullfillment(&self) -> Result<DeviceMap, DevicesError> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx
|
||||
.send(Command::Fullfillment {
|
||||
google_home,
|
||||
payload,
|
||||
tx,
|
||||
})
|
||||
.await?;
|
||||
Ok(rx.await??)
|
||||
self.tx.send(Command::Fullfillment { tx }).await?;
|
||||
Ok(rx.await?)
|
||||
}
|
||||
|
||||
pub async fn add_device(&self, device: Box<dyn Device>) -> Result<(), DevicesError> {
|
||||
@@ -140,14 +132,8 @@ pub fn start(client: AsyncClient) -> (DevicesHandle, EventChannel) {
|
||||
impl Devices {
|
||||
async fn handle_cmd(&mut self, cmd: Command) {
|
||||
match cmd {
|
||||
Command::Fullfillment {
|
||||
google_home,
|
||||
payload,
|
||||
tx,
|
||||
} => {
|
||||
let result =
|
||||
google_home.handle_request(payload, &mut self.get::<dyn GoogleHomeDevice>());
|
||||
tx.send(result).ok();
|
||||
Command::Fullfillment { tx } => {
|
||||
tx.send(self.devices.clone()).ok();
|
||||
}
|
||||
Command::AddDevice { device, tx } => {
|
||||
self.add_device(device).await;
|
||||
@@ -158,39 +144,49 @@ impl Devices {
|
||||
}
|
||||
|
||||
async fn add_device(&mut self, device: Box<dyn Device>) {
|
||||
let id = device.get_id();
|
||||
debug!(id, "Adding device");
|
||||
let id = device.get_id().to_owned();
|
||||
|
||||
// If the device listens to mqtt, subscribe to the topics
|
||||
if let Some(device) = As::<dyn OnMqtt>::cast(device.as_ref()) {
|
||||
for topic in device.topics() {
|
||||
trace!(id, topic, "Subscribing to topic");
|
||||
if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await {
|
||||
// NOTE: Pretty sure that this can only happen if the mqtt client if no longer
|
||||
// running
|
||||
error!(id, topic, "Failed to subscribe to topic: {err}");
|
||||
let device = Arc::new(RwLock::new(device));
|
||||
{
|
||||
let device = device.read().await;
|
||||
|
||||
debug!(id, "Adding device");
|
||||
|
||||
// If the device listens to mqtt, subscribe to the topics
|
||||
if let Some(device) = As::<dyn OnMqtt>::cast(device.as_ref()) {
|
||||
for topic in device.topics() {
|
||||
trace!(id, topic, "Subscribing to topic");
|
||||
if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await {
|
||||
// NOTE: Pretty sure that this can only happen if the mqtt client if no longer
|
||||
// running
|
||||
error!(id, topic, "Failed to subscribe to topic: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.devices.insert(device.get_id().to_owned(), device);
|
||||
self.devices.insert(id, device);
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn handle_event(&mut self, event: Event) {
|
||||
match event {
|
||||
Event::MqttMessage(message) => {
|
||||
let iter = self.get::<dyn OnMqtt>().into_iter().map(|(id, listener)| {
|
||||
let iter = self.devices.iter().map(|(id, device)| {
|
||||
let message = message.clone();
|
||||
async move {
|
||||
let subscribed = listener
|
||||
.topics()
|
||||
.iter()
|
||||
.any(|topic| matches(&message.topic, topic));
|
||||
let mut device = device.write().await;
|
||||
let device = device.as_mut();
|
||||
if let Some(device) = As::<dyn OnMqtt>::cast_mut(device) {
|
||||
let subscribed = device
|
||||
.topics()
|
||||
.iter()
|
||||
.any(|topic| matches(&message.topic, topic));
|
||||
|
||||
if subscribed {
|
||||
trace!(id, "Handling");
|
||||
listener.on_mqtt(message).await;
|
||||
if subscribed {
|
||||
trace!(id, "Handling");
|
||||
device.on_mqtt(message).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -198,52 +194,44 @@ impl Devices {
|
||||
join_all(iter).await;
|
||||
}
|
||||
Event::Darkness(dark) => {
|
||||
let iter =
|
||||
self.get::<dyn OnDarkness>()
|
||||
.into_iter()
|
||||
.map(|(id, device)| async move {
|
||||
trace!(id, "Handling");
|
||||
device.on_darkness(dark).await;
|
||||
});
|
||||
let iter = self.devices.iter().map(|(id, device)| async move {
|
||||
let mut device = device.write().await;
|
||||
let device = device.as_mut();
|
||||
if let Some(device) = As::<dyn OnDarkness>::cast_mut(device) {
|
||||
trace!(id, "Handling");
|
||||
device.on_darkness(dark).await;
|
||||
}
|
||||
});
|
||||
|
||||
join_all(iter).await;
|
||||
}
|
||||
Event::Presence(presence) => {
|
||||
let iter =
|
||||
self.get::<dyn OnPresence>()
|
||||
.into_iter()
|
||||
.map(|(id, device)| async move {
|
||||
trace!(id, "Handling");
|
||||
device.on_presence(presence).await;
|
||||
});
|
||||
let iter = self.devices.iter().map(|(id, device)| async move {
|
||||
let mut device = device.write().await;
|
||||
let device = device.as_mut();
|
||||
if let Some(device) = As::<dyn OnPresence>::cast_mut(device) {
|
||||
trace!(id, "Handling");
|
||||
device.on_presence(presence).await;
|
||||
}
|
||||
});
|
||||
|
||||
join_all(iter).await;
|
||||
}
|
||||
Event::Ntfy(notification) => {
|
||||
let iter = self
|
||||
.get::<dyn OnNotification>()
|
||||
.into_iter()
|
||||
.map(|(id, device)| {
|
||||
let notification = notification.clone();
|
||||
async move {
|
||||
let iter = self.devices.iter().map(|(id, device)| {
|
||||
let notification = notification.clone();
|
||||
async move {
|
||||
let mut device = device.write().await;
|
||||
let device = device.as_mut();
|
||||
if let Some(device) = As::<dyn OnNotification>::cast_mut(device) {
|
||||
trace!(id, "Handling");
|
||||
device.on_notification(notification).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
join_all(iter).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get<T>(&mut self) -> HashMap<&str, &mut T>
|
||||
where
|
||||
T: ?Sized + 'static,
|
||||
(dyn Device): As<T>,
|
||||
{
|
||||
self.devices
|
||||
.iter_mut()
|
||||
.filter_map(|(id, device)| As::<T>::cast_mut(device.as_mut()).map(|t| (id.as_str(), t)))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
10
src/main.rs
10
src/main.rs
@@ -128,8 +128,14 @@ async fn app() -> anyhow::Result<()> {
|
||||
post(async move |user: User, Json(payload): Json<Request>| {
|
||||
debug!(username = user.preferred_username, "{payload:#?}");
|
||||
let gc = GoogleHome::new(&user.preferred_username);
|
||||
let result = match device_handler.fullfillment(gc, payload).await {
|
||||
Ok(result) => result,
|
||||
let result = match device_handler.fullfillment().await {
|
||||
Ok(devices) => match gc.handle_request(payload, &devices).await {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into())
|
||||
.into_response()
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into())
|
||||
.into_response()
|
||||
|
||||
Reference in New Issue
Block a user