Improved how devices are created, ntfy and presence are now treated like any other device

This commit is contained in:
2024-04-27 02:55:53 +02:00
parent 5069d1b0e7
commit 3e4ea8952a
22 changed files with 423 additions and 528 deletions

View File

@@ -2,8 +2,6 @@ use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use async_trait::async_trait;
use enum_dispatch::enum_dispatch;
use futures::future::join_all;
use google_home::traits::OnOff;
use mlua::{FromLua, LuaSerdeExt};
@@ -13,22 +11,14 @@ use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, error, instrument, trace};
use crate::devices::Device;
use crate::error::DeviceConfigError;
use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence};
use crate::schedule::{Action, Schedule};
#[async_trait]
#[enum_dispatch]
pub trait DeviceConfig {
async fn create(&self, identifier: &str) -> Result<Box<dyn Device>, DeviceConfigError>;
}
impl mlua::UserData for Box<dyn DeviceConfig> {}
#[derive(Debug, FromLua, Clone)]
pub struct WrappedDevice(Arc<RwLock<Box<dyn Device>>>);
impl WrappedDevice {
fn new(device: Box<dyn Device>) -> Self {
pub fn new(device: Box<dyn Device>) -> Self {
Self(Arc::new(RwLock::new(device)))
}
}
@@ -132,29 +122,27 @@ impl DeviceManager {
sched.start().await.unwrap();
}
pub async fn add(&self, device: Box<dyn Device>) -> WrappedDevice {
let id = device.get_id().into();
pub async fn add(&self, device: &WrappedDevice) {
let id = device.read().await.get_id().to_owned();
debug!(id, "Adding device");
// If the device listens to mqtt, subscribe to the topics
if let Some(device) = device.as_ref().cast() as Option<&dyn OnMqtt> {
for topic in device.topics() {
trace!(id, topic, "Subscribing to topic");
if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await {
// NOTE: Pretty sure that this can only happen if the mqtt client if no longer
// running
error!(id, topic, "Failed to subscribe to topic: {err}");
{
// If the device listens to mqtt, subscribe to the topics
let device = device.read().await;
if let Some(device) = device.as_ref().cast() as Option<&dyn OnMqtt> {
for topic in device.topics() {
trace!(id, topic, "Subscribing to topic");
if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await {
// NOTE: Pretty sure that this can only happen if the mqtt client if no longer
// running
error!(id, topic, "Failed to subscribe to topic: {err}");
}
}
}
}
// Wrap the device
let device = WrappedDevice::new(device);
self.devices.write().await.insert(id, device.0.clone());
device
}
pub fn event_channel(&self) -> EventChannel {
@@ -193,6 +181,7 @@ impl DeviceManager {
if subscribed {
trace!(id, "Handling");
device.on_mqtt(message).await;
trace!(id, "Done");
}
}
}
@@ -208,6 +197,7 @@ impl DeviceManager {
if let Some(device) = device {
trace!(id, "Handling");
device.on_darkness(dark).await;
trace!(id, "Done");
}
});
@@ -221,6 +211,7 @@ impl DeviceManager {
if let Some(device) = device {
trace!(id, "Handling");
device.on_presence(presence).await;
trace!(id, "Done");
}
});
@@ -236,6 +227,7 @@ impl DeviceManager {
if let Some(device) = device {
trace!(id, "Handling");
device.on_notification(notification).await;
trace!(id, "Done");
}
}
});
@@ -248,20 +240,11 @@ impl DeviceManager {
impl mlua::UserData for DeviceManager {
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method(
"create",
|_lua, this, (identifier, config): (String, mlua::Value)| async move {
// TODO: Handle the error here properly
let config: Box<dyn DeviceConfig> = config.as_userdata().unwrap().take()?;
methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move {
this.add(&device).await;
let device = config
.create(&identifier)
.await
.map_err(mlua::ExternalError::into_lua_err)?;
Ok(this.add(device).await)
},
);
Ok(())
});
methods.add_async_method("add_schedule", |lua, this, schedule| async {
let schedule = lua.from_value(schedule)?;