DeviceManager no longer handles subscribing and filtering topics, each device has to do this themselves now
All checks were successful
Build and deploy automation_rs / Build automation_rs (push) Successful in 6m15s
Build and deploy automation_rs / Build Docker image (push) Successful in 48s
Build and deploy automation_rs / Deploy Docker container (push) Has been skipped

This commit is contained in:
Dreaded_X 2024-04-29 01:46:43 +02:00
parent 9385f27125
commit e02edbcd28
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
14 changed files with 149 additions and 79 deletions

View File

@ -20,6 +20,7 @@ automation.device_manager:add(Ntfy.new({
automation.device_manager:add(Presence.new({ automation.device_manager:add(Presence.new({
topic = "automation_dev/presence/+/#", topic = "automation_dev/presence/+/#",
client = automation.mqtt_client,
event_channel = automation.event_channel, event_channel = automation.event_channel,
})) }))
@ -45,6 +46,7 @@ automation.device_manager:add(HueBridge.new({
automation.device_manager:add(LightSensor.new({ automation.device_manager:add(LightSensor.new({
identifier = "living_light_sensor", identifier = "living_light_sensor",
topic = mqtt_z2m("living/light"), topic = mqtt_z2m("living/light"),
client = automation.mqtt_client,
min = 22000, min = 22000,
max = 23500, max = 23500,
event_channel = automation.event_channel, event_channel = automation.event_channel,
@ -54,6 +56,7 @@ automation.device_manager:add(WakeOnLAN.new({
name = "Zeus", name = "Zeus",
room = "Living Room", room = "Living Room",
topic = mqtt_automation("appliance/living_room/zeus"), topic = mqtt_automation("appliance/living_room/zeus"),
client = automation.mqtt_client,
mac_address = "30:9c:23:60:9c:13", mac_address = "30:9c:23:60:9c:13",
broadcast_ip = "10.0.0.255", broadcast_ip = "10.0.0.255",
})) }))
@ -66,6 +69,7 @@ automation.device_manager:add(living_speakers)
automation.device_manager:add(AudioSetup.new({ automation.device_manager:add(AudioSetup.new({
identifier = "living_audio", identifier = "living_audio",
topic = mqtt_z2m("living/remote"), topic = mqtt_z2m("living/remote"),
client = automation.mqtt_client,
mixer = living_mixer, mixer = living_mixer,
speakers = living_speakers, speakers = living_speakers,
})) }))
@ -95,6 +99,7 @@ automation.device_manager:add(IkeaOutlet.new({
automation.device_manager:add(Washer.new({ automation.device_manager:add(Washer.new({
identifier = "bathroom_washer", identifier = "bathroom_washer",
topic = mqtt_z2m("batchroom/washer"), topic = mqtt_z2m("batchroom/washer"),
client = automation.mqtt_client,
threshold = 1, threshold = 1,
event_channel = automation.event_channel, event_channel = automation.event_channel,
})) }))
@ -125,6 +130,7 @@ local hallway_lights = automation.device_manager:add(HueGroup.new({
remotes = { remotes = {
{ topic = mqtt_z2m("hallway/remote") }, { topic = mqtt_z2m("hallway/remote") },
}, },
client = automation.mqtt_client,
})) }))
automation.device_manager:add(ContactSensor.new({ automation.device_manager:add(ContactSensor.new({

View File

@ -5,10 +5,9 @@ use std::sync::Arc;
use futures::future::join_all; use futures::future::join_all;
use google_home::traits::OnOff; use google_home::traits::OnOff;
use mlua::{FromLua, LuaSerdeExt}; use mlua::{FromLua, LuaSerdeExt};
use rumqttc::{matches, AsyncClient, QoS};
use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_cron_scheduler::{Job, JobScheduler}; use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, error, instrument, trace}; use tracing::{debug, instrument, trace};
use crate::devices::{As, Device}; use crate::devices::{As, Device};
use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence}; use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence};
@ -43,17 +42,15 @@ pub type DeviceMap = HashMap<String, Arc<RwLock<Box<dyn Device>>>>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct DeviceManager { pub struct DeviceManager {
devices: Arc<RwLock<DeviceMap>>, devices: Arc<RwLock<DeviceMap>>,
client: AsyncClient,
event_channel: EventChannel, event_channel: EventChannel,
} }
impl DeviceManager { impl DeviceManager {
pub fn new(client: AsyncClient) -> Self { pub fn new() -> Self {
let (event_channel, mut event_rx) = EventChannel::new(); let (event_channel, mut event_rx) = EventChannel::new();
let device_manager = Self { let device_manager = Self {
devices: Arc::new(RwLock::new(HashMap::new())), devices: Arc::new(RwLock::new(HashMap::new())),
client,
event_channel, event_channel,
}; };
@ -127,18 +124,6 @@ impl DeviceManager {
debug!(id, "Adding device"); debug!(id, "Adding device");
// If the device listens to mqtt, subscribe to the topics
if let Some(device) = As::<dyn OnMqtt>::cast(device.read().await.as_ref()) {
for topic in device.topics() {
trace!(id, topic, "Subscribing to topic");
if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await {
// NOTE: Pretty sure that this can only happen if the mqtt client if no longer
// running
error!(id, topic, "Failed to subscribe to topic: {err}");
}
}
}
self.devices.write().await.insert(id, device.0.clone()); self.devices.write().await.insert(id, device.0.clone());
} }
@ -170,16 +155,16 @@ impl DeviceManager {
let mut device = device.write().await; let mut device = device.write().await;
let device = device.as_mut(); let device = device.as_mut();
if let Some(device) = As::<dyn OnMqtt>::cast_mut(device) { if let Some(device) = As::<dyn OnMqtt>::cast_mut(device) {
let subscribed = device // let subscribed = device
.topics() // .topics()
.iter() // .iter()
.any(|topic| matches(&message.topic, topic)); // .any(|topic| matches(&message.topic, topic));
//
if subscribed { // if subscribed {
trace!(id, "Handling"); trace!(id, "Handling");
device.on_mqtt(message).await; device.on_mqtt(message).await;
trace!(id, "Done"); trace!(id, "Done");
} // }
} }
} }
}); });
@ -235,6 +220,12 @@ impl DeviceManager {
} }
} }
impl Default for DeviceManager {
fn default() -> Self {
Self::new()
}
}
impl mlua::UserData for DeviceManager { impl mlua::UserData for DeviceManager {
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move { methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move {

View File

@ -56,6 +56,12 @@ impl AirFilter {
impl AirFilter { impl AirFilter {
async fn create(config: AirFilterConfig) -> Result<Self, DeviceConfigError> { async fn create(config: AirFilterConfig) -> Result<Self, DeviceConfigError> {
trace!(id = config.info.identifier(), "Setting up AirFilter"); trace!(id = config.info.identifier(), "Setting up AirFilter");
config
.client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { Ok(Self {
config, config,
last_known_state: AirFilterState { last_known_state: AirFilterState {
@ -74,11 +80,11 @@ impl Device for AirFilter {
#[async_trait] #[async_trait]
impl OnMqtt for AirFilter { impl OnMqtt for AirFilter {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let state = match AirFilterState::try_from(message) { let state = match AirFilterState::try_from(message) {
Ok(state) => state, Ok(state) => state,
Err(err) => { Err(err) => {

View File

@ -10,6 +10,7 @@ use crate::devices::As;
use crate::error::DeviceConfigError; use crate::error::DeviceConfigError;
use crate::event::{OnMqtt, OnPresence}; use crate::event::{OnMqtt, OnPresence};
use crate::messages::{RemoteAction, RemoteMessage}; use crate::messages::{RemoteAction, RemoteMessage};
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct AudioSetupConfig { pub struct AudioSetupConfig {
@ -20,6 +21,8 @@ pub struct AudioSetupConfig {
mixer: WrappedDevice, mixer: WrappedDevice,
#[device_config(from_lua)] #[device_config(from_lua)]
speakers: WrappedDevice, speakers: WrappedDevice,
#[device_config(from_lua)]
client: WrappedAsyncClient,
} }
#[derive(Debug, LuaDevice)] #[derive(Debug, LuaDevice)]
@ -42,6 +45,11 @@ impl AudioSetup {
return Err(DeviceConfigError::MissingTrait(speakers_id, "OnOff".into())); return Err(DeviceConfigError::MissingTrait(speakers_id, "OnOff".into()));
} }
config
.client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(AudioSetup { config }) Ok(AudioSetup { config })
} }
} }
@ -54,11 +62,11 @@ impl Device for AudioSetup {
#[async_trait] #[async_trait]
impl OnMqtt for AudioSetup { impl OnMqtt for AudioSetup {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: rumqttc::Publish) { async fn on_mqtt(&mut self, message: rumqttc::Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let action = match RemoteMessage::try_from(message) { let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(), Ok(message) => message.action(),
Err(err) => { Err(err) => {

View File

@ -91,6 +91,11 @@ impl ContactSensor {
} }
} }
config
.client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { Ok(Self {
config: config.clone(), config: config.clone(),
overall_presence: DEFAULT_PRESENCE, overall_presence: DEFAULT_PRESENCE,
@ -115,11 +120,11 @@ impl OnPresence for ContactSensor {
#[async_trait] #[async_trait]
impl OnMqtt for ContactSensor { impl OnMqtt for ContactSensor {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: rumqttc::Publish) { async fn on_mqtt(&mut self, message: rumqttc::Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let is_closed = match ContactMessage::try_from(message) { let is_closed = match ContactMessage::try_from(message) {
Ok(state) => state.is_closed(), Ok(state) => state.is_closed(),
Err(err) => { Err(err) => {

View File

@ -6,7 +6,7 @@ use async_trait::async_trait;
use automation_macro::{LuaDevice, LuaDeviceConfig}; use automation_macro::{LuaDevice, LuaDeviceConfig};
use google_home::errors::ErrorCode; use google_home::errors::ErrorCode;
use google_home::traits::OnOff; use google_home::traits::OnOff;
use rumqttc::Publish; use rumqttc::{Publish, SubscribeFilter};
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
use super::Device; use super::Device;
@ -14,6 +14,7 @@ use crate::config::MqttDeviceConfig;
use crate::error::DeviceConfigError; use crate::error::DeviceConfigError;
use crate::event::OnMqtt; use crate::event::OnMqtt;
use crate::messages::{RemoteAction, RemoteMessage}; use crate::messages::{RemoteAction, RemoteMessage};
use crate::mqtt::WrappedAsyncClient;
use crate::traits::Timeout; use crate::traits::Timeout;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
@ -27,6 +28,8 @@ pub struct HueGroupConfig {
pub scene_id: String, pub scene_id: String,
#[device_config(default)] #[device_config(default)]
pub remotes: Vec<MqttDeviceConfig>, pub remotes: Vec<MqttDeviceConfig>,
#[device_config(from_lua)]
client: WrappedAsyncClient,
} }
#[derive(Debug, LuaDevice)] #[derive(Debug, LuaDevice)]
@ -39,6 +42,17 @@ pub struct HueGroup {
impl HueGroup { impl HueGroup {
async fn create(config: HueGroupConfig) -> Result<Self, DeviceConfigError> { async fn create(config: HueGroupConfig) -> Result<Self, DeviceConfigError> {
trace!(id = config.identifier, "Setting up AudioSetup"); trace!(id = config.identifier, "Setting up AudioSetup");
if !config.remotes.is_empty() {
config
.client
.subscribe_many(config.remotes.iter().map(|remote| SubscribeFilter {
path: remote.topic.clone(),
qos: rumqttc::QoS::AtLeastOnce,
}))
.await?;
}
Ok(Self { config }) Ok(Self { config })
} }
@ -67,15 +81,16 @@ impl Device for HueGroup {
#[async_trait] #[async_trait]
impl OnMqtt for HueGroup { impl OnMqtt for HueGroup {
fn topics(&self) -> Vec<&str> { async fn on_mqtt(&mut self, message: Publish) {
self.config if !self
.config
.remotes .remotes
.iter() .iter()
.map(|mqtt| mqtt.topic.as_str()) .any(|remote| rumqttc::matches(&message.topic, &remote.topic))
.collect() {
} return;
}
async fn on_mqtt(&mut self, message: Publish) {
let action = match RemoteMessage::try_from(message) { let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(), Ok(message) => message.action(),
Err(err) => { Err(err) => {

View File

@ -7,7 +7,7 @@ use google_home::errors::ErrorCode;
use google_home::traits::{self, OnOff}; use google_home::traits::{self, OnOff};
use google_home::types::Type; use google_home::types::Type;
use google_home::{device, GoogleHomeDevice}; use google_home::{device, GoogleHomeDevice};
use rumqttc::{matches, Publish}; use rumqttc::{matches, Publish, SubscribeFilter};
use serde::Deserialize; use serde::Deserialize;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
@ -75,6 +75,16 @@ impl IkeaOutlet {
async fn create(config: IkeaOutletConfig) -> Result<Self, DeviceConfigError> { async fn create(config: IkeaOutletConfig) -> Result<Self, DeviceConfigError> {
trace!(id = config.info.identifier(), "Setting up IkeaOutlet"); trace!(id = config.info.identifier(), "Setting up IkeaOutlet");
if !config.remotes.is_empty() {
config
.client
.subscribe_many(config.remotes.iter().map(|remote| SubscribeFilter {
path: remote.topic.clone(),
qos: rumqttc::QoS::AtLeastOnce,
}))
.await?;
}
Ok(Self { Ok(Self {
config, config,
last_known_state: false, last_known_state: false,
@ -91,19 +101,6 @@ impl Device for IkeaOutlet {
#[async_trait] #[async_trait]
impl OnMqtt for IkeaOutlet { impl OnMqtt for IkeaOutlet {
fn topics(&self) -> Vec<&str> {
let mut topics: Vec<_> = self
.config
.remotes
.iter()
.map(|mqtt| mqtt.topic.as_str())
.collect();
topics.push(&self.config.mqtt.topic);
topics
}
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&mut self, message: Publish) {
// Check if the message is from the deviec itself or from a remote // Check if the message is from the deviec itself or from a remote
if matches(&message.topic, &self.config.mqtt.topic) { if matches(&message.topic, &self.config.mqtt.topic) {
@ -131,7 +128,12 @@ impl OnMqtt for IkeaOutlet {
if state && let Some(timeout) = self.config.timeout { if state && let Some(timeout) = self.config.timeout {
self.start_timeout(timeout).await.unwrap(); self.start_timeout(timeout).await.unwrap();
} }
} else { } else if self
.config
.remotes
.iter()
.any(|remote| rumqttc::matches(&message.topic, &remote.topic))
{
let action = match RemoteMessage::try_from(message) { let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(), Ok(message) => message.action(),
Err(err) => { Err(err) => {

View File

@ -8,6 +8,7 @@ use crate::devices::Device;
use crate::error::DeviceConfigError; use crate::error::DeviceConfigError;
use crate::event::{self, Event, EventChannel, OnMqtt}; use crate::event::{self, Event, EventChannel, OnMqtt};
use crate::messages::BrightnessMessage; use crate::messages::BrightnessMessage;
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct LightSensorConfig { pub struct LightSensorConfig {
@ -18,6 +19,8 @@ pub struct LightSensorConfig {
pub max: isize, pub max: isize,
#[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))] #[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))]
pub tx: event::Sender, pub tx: event::Sender,
#[device_config(from_lua)]
client: WrappedAsyncClient,
} }
pub const DEFAULT: bool = false; pub const DEFAULT: bool = false;
@ -33,6 +36,12 @@ pub struct LightSensor {
impl LightSensor { impl LightSensor {
async fn create(config: LightSensorConfig) -> Result<Self, DeviceConfigError> { async fn create(config: LightSensorConfig) -> Result<Self, DeviceConfigError> {
trace!(id = config.identifier, "Setting up LightSensor"); trace!(id = config.identifier, "Setting up LightSensor");
config
.client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { Ok(Self {
config, config,
is_dark: DEFAULT, is_dark: DEFAULT,
@ -48,11 +57,11 @@ impl Device for LightSensor {
#[async_trait] #[async_trait]
impl OnMqtt for LightSensor { impl OnMqtt for LightSensor {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let illuminance = match BrightnessMessage::try_from(message) { let illuminance = match BrightnessMessage::try_from(message) {
Ok(state) => state.illuminance(), Ok(state) => state.illuminance(),
Err(err) => { Err(err) => {

View File

@ -10,6 +10,7 @@ use crate::devices::Device;
use crate::error::DeviceConfigError; use crate::error::DeviceConfigError;
use crate::event::{self, Event, EventChannel, OnMqtt}; use crate::event::{self, Event, EventChannel, OnMqtt};
use crate::messages::PresenceMessage; use crate::messages::PresenceMessage;
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, LuaDeviceConfig)] #[derive(Debug, LuaDeviceConfig)]
pub struct PresenceConfig { pub struct PresenceConfig {
@ -17,6 +18,8 @@ pub struct PresenceConfig {
pub mqtt: MqttDeviceConfig, pub mqtt: MqttDeviceConfig,
#[device_config(from_lua, rename("event_channel"), with(|ec: EventChannel| ec.get_tx()))] #[device_config(from_lua, rename("event_channel"), with(|ec: EventChannel| ec.get_tx()))]
tx: event::Sender, tx: event::Sender,
#[device_config(from_lua)]
client: WrappedAsyncClient,
} }
pub const DEFAULT_PRESENCE: bool = false; pub const DEFAULT_PRESENCE: bool = false;
@ -32,6 +35,12 @@ pub struct Presence {
impl Presence { impl Presence {
async fn create(config: PresenceConfig) -> Result<Self, DeviceConfigError> { async fn create(config: PresenceConfig) -> Result<Self, DeviceConfigError> {
trace!(id = "ntfy", "Setting up Presence"); trace!(id = "ntfy", "Setting up Presence");
config
.client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { Ok(Self {
config, config,
devices: HashMap::new(), devices: HashMap::new(),
@ -48,11 +57,11 @@ impl Device for Presence {
#[async_trait] #[async_trait]
impl OnMqtt for Presence { impl OnMqtt for Presence {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let offset = self let offset = self
.config .config
.mqtt .mqtt

View File

@ -15,6 +15,7 @@ use crate::config::{InfoConfig, MqttDeviceConfig};
use crate::error::DeviceConfigError; use crate::error::DeviceConfigError;
use crate::event::OnMqtt; use crate::event::OnMqtt;
use crate::messages::ActivateMessage; use crate::messages::ActivateMessage;
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct WakeOnLANConfig { pub struct WakeOnLANConfig {
@ -25,6 +26,8 @@ pub struct WakeOnLANConfig {
mac_address: MacAddress, mac_address: MacAddress,
#[device_config(default(Ipv4Addr::new(255, 255, 255, 255)))] #[device_config(default(Ipv4Addr::new(255, 255, 255, 255)))]
broadcast_ip: Ipv4Addr, broadcast_ip: Ipv4Addr,
#[device_config(from_lua)]
client: WrappedAsyncClient,
} }
#[derive(Debug, LuaDevice)] #[derive(Debug, LuaDevice)]
@ -37,6 +40,11 @@ impl WakeOnLAN {
async fn create(config: WakeOnLANConfig) -> Result<Self, DeviceConfigError> { async fn create(config: WakeOnLANConfig) -> Result<Self, DeviceConfigError> {
trace!(id = config.info.identifier(), "Setting up WakeOnLAN"); trace!(id = config.info.identifier(), "Setting up WakeOnLAN");
config
.client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { config }) Ok(Self { config })
} }
} }
@ -49,11 +57,11 @@ impl Device for WakeOnLAN {
#[async_trait] #[async_trait]
impl OnMqtt for WakeOnLAN { impl OnMqtt for WakeOnLAN {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let activate = match ActivateMessage::try_from(message) { let activate = match ActivateMessage::try_from(message) {
Ok(message) => message.activate(), Ok(message) => message.activate(),
Err(err) => { Err(err) => {

View File

@ -9,6 +9,7 @@ use crate::config::MqttDeviceConfig;
use crate::error::DeviceConfigError; use crate::error::DeviceConfigError;
use crate::event::{self, Event, EventChannel, OnMqtt}; use crate::event::{self, Event, EventChannel, OnMqtt};
use crate::messages::PowerMessage; use crate::messages::PowerMessage;
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct WasherConfig { pub struct WasherConfig {
@ -19,6 +20,8 @@ pub struct WasherConfig {
threshold: f32, threshold: f32,
#[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))] #[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))]
pub tx: event::Sender, pub tx: event::Sender,
#[device_config(from_lua)]
client: WrappedAsyncClient,
} }
// TODO: Add google home integration // TODO: Add google home integration
@ -33,6 +36,12 @@ pub struct Washer {
impl Washer { impl Washer {
async fn create(config: WasherConfig) -> Result<Self, DeviceConfigError> { async fn create(config: WasherConfig) -> Result<Self, DeviceConfigError> {
trace!(id = config.identifier, "Setting up Washer"); trace!(id = config.identifier, "Setting up Washer");
config
.client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { config, running: 0 }) Ok(Self { config, running: 0 })
} }
} }
@ -50,11 +59,11 @@ const HYSTERESIS: isize = 10;
#[async_trait] #[async_trait]
impl OnMqtt for Washer { impl OnMqtt for Washer {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let power = match PowerMessage::try_from(message) { let power = match PowerMessage::try_from(message) {
Ok(state) => state.power(), Ok(state) => state.power(),
Err(err) => { Err(err) => {

View File

@ -98,6 +98,8 @@ pub enum DeviceConfigError {
MissingTrait(String, String), MissingTrait(String, String),
#[error(transparent)] #[error(transparent)]
MissingWildcard(#[from] MissingWildcard), MissingWildcard(#[from] MissingWildcard),
#[error(transparent)]
MqttClientError(#[from] rumqttc::ClientError),
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]

View File

@ -37,7 +37,7 @@ impl mlua::UserData for EventChannel {}
#[async_trait] #[async_trait]
#[device_trait] #[device_trait]
pub trait OnMqtt { pub trait OnMqtt {
fn topics(&self) -> Vec<&str>; // fn topics(&self) -> Vec<&str>;
async fn on_mqtt(&mut self, message: Publish); async fn on_mqtt(&mut self, message: Publish);
} }

View File

@ -60,7 +60,7 @@ async fn app() -> anyhow::Result<()> {
let (client, eventloop) = AsyncClient::new(config.mqtt.clone(), 100); let (client, eventloop) = AsyncClient::new(config.mqtt.clone(), 100);
// Setup the device handler // Setup the device handler
let device_manager = DeviceManager::new(client.clone()); let device_manager = DeviceManager::new();
let event_channel = device_manager.event_channel(); let event_channel = device_manager.event_channel();