DeviceManager no longer handles subscribing and filtering topics, each device has to do this themselves now

This commit is contained in:
2024-04-29 01:46:43 +02:00
parent 3e4ea8952a
commit fcd0b370d6
14 changed files with 158 additions and 86 deletions

View File

@@ -5,10 +5,9 @@ use std::sync::Arc;
use futures::future::join_all;
use google_home::traits::OnOff;
use mlua::{FromLua, LuaSerdeExt};
use rumqttc::{matches, AsyncClient, QoS};
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, error, instrument, trace};
use tracing::{debug, instrument, trace};
use crate::devices::Device;
use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence};
@@ -43,17 +42,15 @@ pub type DeviceMap = HashMap<String, Arc<RwLock<Box<dyn Device>>>>;
#[derive(Debug, Clone)]
pub struct DeviceManager {
devices: Arc<RwLock<DeviceMap>>,
client: AsyncClient,
event_channel: EventChannel,
}
impl DeviceManager {
pub fn new(client: AsyncClient) -> Self {
pub fn new() -> Self {
let (event_channel, mut event_rx) = EventChannel::new();
let device_manager = Self {
devices: Arc::new(RwLock::new(HashMap::new())),
client,
event_channel,
};
@@ -127,21 +124,6 @@ impl DeviceManager {
debug!(id, "Adding device");
{
// If the device listens to mqtt, subscribe to the topics
let device = device.read().await;
if let Some(device) = device.as_ref().cast() as Option<&dyn OnMqtt> {
for topic in device.topics() {
trace!(id, topic, "Subscribing to topic");
if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await {
// NOTE: Pretty sure that this can only happen if the mqtt client if no longer
// running
error!(id, topic, "Failed to subscribe to topic: {err}");
}
}
}
}
self.devices.write().await.insert(id, device.0.clone());
}
@@ -173,16 +155,16 @@ impl DeviceManager {
let mut device = device.write().await;
let device: Option<&mut dyn OnMqtt> = device.as_mut().cast_mut();
if let Some(device) = device {
let subscribed = device
.topics()
.iter()
.any(|topic| matches(&message.topic, topic));
if subscribed {
trace!(id, "Handling");
device.on_mqtt(message).await;
trace!(id, "Done");
}
// let subscribed = device
// .topics()
// .iter()
// .any(|topic| matches(&message.topic, topic));
//
// if subscribed {
trace!(id, "Handling");
device.on_mqtt(message).await;
trace!(id, "Done");
// }
}
}
});
@@ -238,6 +220,12 @@ impl DeviceManager {
}
}
impl Default for DeviceManager {
fn default() -> Self {
Self::new()
}
}
impl mlua::UserData for DeviceManager {
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move {