DeviceManager no longer handles subscribing and filtering topics, each device has to do this themselves now
This commit is contained in:
@@ -42,7 +42,7 @@ impl AirFilter {
|
||||
self.config
|
||||
.client
|
||||
.publish(
|
||||
topic.clone(),
|
||||
&topic,
|
||||
rumqttc::QoS::AtLeastOnce,
|
||||
false,
|
||||
serde_json::to_string(&message).unwrap(),
|
||||
@@ -56,6 +56,12 @@ impl AirFilter {
|
||||
impl AirFilter {
|
||||
async fn create(config: AirFilterConfig) -> Result<Self, DeviceConfigError> {
|
||||
trace!(id = config.info.identifier(), "Setting up AirFilter");
|
||||
|
||||
config
|
||||
.client
|
||||
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
last_known_state: AirFilterState {
|
||||
@@ -74,11 +80,11 @@ impl Device for AirFilter {
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for AirFilter {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
vec![&self.config.mqtt.topic]
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: Publish) {
|
||||
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
|
||||
return;
|
||||
}
|
||||
|
||||
let state = match AirFilterState::try_from(message) {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::devices::As;
|
||||
use crate::error::DeviceConfigError;
|
||||
use crate::event::{OnMqtt, OnPresence};
|
||||
use crate::messages::{RemoteAction, RemoteMessage};
|
||||
use crate::mqtt::WrappedAsyncClient;
|
||||
|
||||
#[derive(Debug, Clone, LuaDeviceConfig)]
|
||||
pub struct AudioSetupConfig {
|
||||
@@ -20,6 +21,8 @@ pub struct AudioSetupConfig {
|
||||
mixer: WrappedDevice,
|
||||
#[device_config(from_lua)]
|
||||
speakers: WrappedDevice,
|
||||
#[device_config(from_lua)]
|
||||
client: WrappedAsyncClient,
|
||||
}
|
||||
|
||||
#[derive(Debug, LuaDevice)]
|
||||
@@ -42,6 +45,11 @@ impl AudioSetup {
|
||||
return Err(DeviceConfigError::MissingTrait(speakers_id, "OnOff".into()));
|
||||
}
|
||||
|
||||
config
|
||||
.client
|
||||
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
|
||||
.await?;
|
||||
|
||||
Ok(AudioSetup { config })
|
||||
}
|
||||
}
|
||||
@@ -54,11 +62,11 @@ impl Device for AudioSetup {
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for AudioSetup {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
vec![&self.config.mqtt.topic]
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: rumqttc::Publish) {
|
||||
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
|
||||
return;
|
||||
}
|
||||
|
||||
let action = match RemoteMessage::try_from(message) {
|
||||
Ok(message) => message.action(),
|
||||
Err(err) => {
|
||||
|
||||
@@ -91,6 +91,11 @@ impl ContactSensor {
|
||||
}
|
||||
}
|
||||
|
||||
config
|
||||
.client
|
||||
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
config: config.clone(),
|
||||
overall_presence: DEFAULT_PRESENCE,
|
||||
@@ -115,11 +120,11 @@ impl OnPresence for ContactSensor {
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for ContactSensor {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
vec![&self.config.mqtt.topic]
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: rumqttc::Publish) {
|
||||
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
|
||||
return;
|
||||
}
|
||||
|
||||
let is_closed = match ContactMessage::try_from(message) {
|
||||
Ok(state) => state.is_closed(),
|
||||
Err(err) => {
|
||||
@@ -187,7 +192,7 @@ impl OnMqtt for ContactSensor {
|
||||
self.config
|
||||
.client
|
||||
.publish(
|
||||
presence.mqtt.topic.clone(),
|
||||
&presence.mqtt.topic,
|
||||
rumqttc::QoS::AtLeastOnce,
|
||||
false,
|
||||
serde_json::to_string(&PresenceMessage::new(true)).unwrap(),
|
||||
@@ -212,7 +217,7 @@ impl OnMqtt for ContactSensor {
|
||||
tokio::time::sleep(timeout).await;
|
||||
debug!(id, "Removing door device!");
|
||||
client
|
||||
.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "")
|
||||
.publish(&topic, rumqttc::QoS::AtLeastOnce, false, "")
|
||||
.await
|
||||
.map_err(|err| warn!("Failed to publish presence on {topic}: {err}"))
|
||||
.ok();
|
||||
|
||||
@@ -6,7 +6,7 @@ use async_trait::async_trait;
|
||||
use automation_macro::{LuaDevice, LuaDeviceConfig};
|
||||
use google_home::errors::ErrorCode;
|
||||
use google_home::traits::OnOff;
|
||||
use rumqttc::Publish;
|
||||
use rumqttc::{Publish, SubscribeFilter};
|
||||
use tracing::{debug, error, trace, warn};
|
||||
|
||||
use super::Device;
|
||||
@@ -14,6 +14,7 @@ use crate::config::MqttDeviceConfig;
|
||||
use crate::error::DeviceConfigError;
|
||||
use crate::event::OnMqtt;
|
||||
use crate::messages::{RemoteAction, RemoteMessage};
|
||||
use crate::mqtt::WrappedAsyncClient;
|
||||
use crate::traits::Timeout;
|
||||
|
||||
#[derive(Debug, Clone, LuaDeviceConfig)]
|
||||
@@ -27,6 +28,8 @@ pub struct HueGroupConfig {
|
||||
pub scene_id: String,
|
||||
#[device_config(default)]
|
||||
pub remotes: Vec<MqttDeviceConfig>,
|
||||
#[device_config(from_lua)]
|
||||
client: WrappedAsyncClient,
|
||||
}
|
||||
|
||||
#[derive(Debug, LuaDevice)]
|
||||
@@ -39,6 +42,17 @@ pub struct HueGroup {
|
||||
impl HueGroup {
|
||||
async fn create(config: HueGroupConfig) -> Result<Self, DeviceConfigError> {
|
||||
trace!(id = config.identifier, "Setting up AudioSetup");
|
||||
|
||||
if !config.remotes.is_empty() {
|
||||
config
|
||||
.client
|
||||
.subscribe_many(config.remotes.iter().map(|remote| SubscribeFilter {
|
||||
path: remote.topic.clone(),
|
||||
qos: rumqttc::QoS::AtLeastOnce,
|
||||
}))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Self { config })
|
||||
}
|
||||
|
||||
@@ -67,15 +81,16 @@ impl Device for HueGroup {
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for HueGroup {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
self.config
|
||||
async fn on_mqtt(&mut self, message: Publish) {
|
||||
if !self
|
||||
.config
|
||||
.remotes
|
||||
.iter()
|
||||
.map(|mqtt| mqtt.topic.as_str())
|
||||
.collect()
|
||||
}
|
||||
.any(|remote| rumqttc::matches(&message.topic, &remote.topic))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: Publish) {
|
||||
let action = match RemoteMessage::try_from(message) {
|
||||
Ok(message) => message.action(),
|
||||
Err(err) => {
|
||||
|
||||
@@ -7,7 +7,7 @@ use google_home::errors::ErrorCode;
|
||||
use google_home::traits::{self, OnOff};
|
||||
use google_home::types::Type;
|
||||
use google_home::{device, GoogleHomeDevice};
|
||||
use rumqttc::{matches, Publish};
|
||||
use rumqttc::{matches, Publish, SubscribeFilter};
|
||||
use serde::Deserialize;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, trace, warn};
|
||||
@@ -61,7 +61,7 @@ async fn set_on(client: WrappedAsyncClient, topic: &str, on: bool) {
|
||||
// TODO: Handle potential errors here
|
||||
client
|
||||
.publish(
|
||||
topic.clone(),
|
||||
&topic,
|
||||
rumqttc::QoS::AtLeastOnce,
|
||||
false,
|
||||
serde_json::to_string(&message).unwrap(),
|
||||
@@ -75,6 +75,21 @@ impl IkeaOutlet {
|
||||
async fn create(config: IkeaOutletConfig) -> Result<Self, DeviceConfigError> {
|
||||
trace!(id = config.info.identifier(), "Setting up IkeaOutlet");
|
||||
|
||||
if !config.remotes.is_empty() {
|
||||
config
|
||||
.client
|
||||
.subscribe_many(config.remotes.iter().map(|remote| SubscribeFilter {
|
||||
path: remote.topic.clone(),
|
||||
qos: rumqttc::QoS::AtLeastOnce,
|
||||
}))
|
||||
.await?;
|
||||
}
|
||||
|
||||
config
|
||||
.client
|
||||
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
last_known_state: false,
|
||||
@@ -91,19 +106,6 @@ impl Device for IkeaOutlet {
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for IkeaOutlet {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
let mut topics: Vec<_> = self
|
||||
.config
|
||||
.remotes
|
||||
.iter()
|
||||
.map(|mqtt| mqtt.topic.as_str())
|
||||
.collect();
|
||||
|
||||
topics.push(&self.config.mqtt.topic);
|
||||
|
||||
topics
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: Publish) {
|
||||
// Check if the message is from the deviec itself or from a remote
|
||||
if matches(&message.topic, &self.config.mqtt.topic) {
|
||||
@@ -131,7 +133,12 @@ impl OnMqtt for IkeaOutlet {
|
||||
if state && let Some(timeout) = self.config.timeout {
|
||||
self.start_timeout(timeout).await.unwrap();
|
||||
}
|
||||
} else {
|
||||
} else if self
|
||||
.config
|
||||
.remotes
|
||||
.iter()
|
||||
.any(|remote| rumqttc::matches(&message.topic, &remote.topic))
|
||||
{
|
||||
let action = match RemoteMessage::try_from(message) {
|
||||
Ok(message) => message.action(),
|
||||
Err(err) => {
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::devices::Device;
|
||||
use crate::error::DeviceConfigError;
|
||||
use crate::event::{self, Event, EventChannel, OnMqtt};
|
||||
use crate::messages::BrightnessMessage;
|
||||
use crate::mqtt::WrappedAsyncClient;
|
||||
|
||||
#[derive(Debug, Clone, LuaDeviceConfig)]
|
||||
pub struct LightSensorConfig {
|
||||
@@ -18,6 +19,8 @@ pub struct LightSensorConfig {
|
||||
pub max: isize,
|
||||
#[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))]
|
||||
pub tx: event::Sender,
|
||||
#[device_config(from_lua)]
|
||||
client: WrappedAsyncClient,
|
||||
}
|
||||
|
||||
pub const DEFAULT: bool = false;
|
||||
@@ -33,6 +36,12 @@ pub struct LightSensor {
|
||||
impl LightSensor {
|
||||
async fn create(config: LightSensorConfig) -> Result<Self, DeviceConfigError> {
|
||||
trace!(id = config.identifier, "Setting up LightSensor");
|
||||
|
||||
config
|
||||
.client
|
||||
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
is_dark: DEFAULT,
|
||||
@@ -48,11 +57,11 @@ impl Device for LightSensor {
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for LightSensor {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
vec![&self.config.mqtt.topic]
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: Publish) {
|
||||
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
|
||||
return;
|
||||
}
|
||||
|
||||
let illuminance = match BrightnessMessage::try_from(message) {
|
||||
Ok(state) => state.illuminance(),
|
||||
Err(err) => {
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::devices::Device;
|
||||
use crate::error::DeviceConfigError;
|
||||
use crate::event::{self, Event, EventChannel, OnMqtt};
|
||||
use crate::messages::PresenceMessage;
|
||||
use crate::mqtt::WrappedAsyncClient;
|
||||
|
||||
#[derive(Debug, LuaDeviceConfig)]
|
||||
pub struct PresenceConfig {
|
||||
@@ -17,6 +18,8 @@ pub struct PresenceConfig {
|
||||
pub mqtt: MqttDeviceConfig,
|
||||
#[device_config(from_lua, rename("event_channel"), with(|ec: EventChannel| ec.get_tx()))]
|
||||
tx: event::Sender,
|
||||
#[device_config(from_lua)]
|
||||
client: WrappedAsyncClient,
|
||||
}
|
||||
|
||||
pub const DEFAULT_PRESENCE: bool = false;
|
||||
@@ -32,6 +35,12 @@ pub struct Presence {
|
||||
impl Presence {
|
||||
async fn create(config: PresenceConfig) -> Result<Self, DeviceConfigError> {
|
||||
trace!(id = "ntfy", "Setting up Presence");
|
||||
|
||||
config
|
||||
.client
|
||||
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
devices: HashMap::new(),
|
||||
@@ -48,11 +57,11 @@ impl Device for Presence {
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for Presence {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
vec![&self.config.mqtt.topic]
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: Publish) {
|
||||
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
|
||||
return;
|
||||
}
|
||||
|
||||
let offset = self
|
||||
.config
|
||||
.mqtt
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::config::{InfoConfig, MqttDeviceConfig};
|
||||
use crate::error::DeviceConfigError;
|
||||
use crate::event::OnMqtt;
|
||||
use crate::messages::ActivateMessage;
|
||||
use crate::mqtt::WrappedAsyncClient;
|
||||
|
||||
#[derive(Debug, Clone, LuaDeviceConfig)]
|
||||
pub struct WakeOnLANConfig {
|
||||
@@ -25,6 +26,8 @@ pub struct WakeOnLANConfig {
|
||||
mac_address: MacAddress,
|
||||
#[device_config(default(Ipv4Addr::new(255, 255, 255, 255)))]
|
||||
broadcast_ip: Ipv4Addr,
|
||||
#[device_config(from_lua)]
|
||||
client: WrappedAsyncClient,
|
||||
}
|
||||
|
||||
#[derive(Debug, LuaDevice)]
|
||||
@@ -37,6 +40,11 @@ impl WakeOnLAN {
|
||||
async fn create(config: WakeOnLANConfig) -> Result<Self, DeviceConfigError> {
|
||||
trace!(id = config.info.identifier(), "Setting up WakeOnLAN");
|
||||
|
||||
config
|
||||
.client
|
||||
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
|
||||
.await?;
|
||||
|
||||
Ok(Self { config })
|
||||
}
|
||||
}
|
||||
@@ -49,11 +57,11 @@ impl Device for WakeOnLAN {
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for WakeOnLAN {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
vec![&self.config.mqtt.topic]
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: Publish) {
|
||||
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
|
||||
return;
|
||||
}
|
||||
|
||||
let activate = match ActivateMessage::try_from(message) {
|
||||
Ok(message) => message.activate(),
|
||||
Err(err) => {
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::config::MqttDeviceConfig;
|
||||
use crate::error::DeviceConfigError;
|
||||
use crate::event::{self, Event, EventChannel, OnMqtt};
|
||||
use crate::messages::PowerMessage;
|
||||
use crate::mqtt::WrappedAsyncClient;
|
||||
|
||||
#[derive(Debug, Clone, LuaDeviceConfig)]
|
||||
pub struct WasherConfig {
|
||||
@@ -19,6 +20,8 @@ pub struct WasherConfig {
|
||||
threshold: f32,
|
||||
#[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))]
|
||||
pub tx: event::Sender,
|
||||
#[device_config(from_lua)]
|
||||
client: WrappedAsyncClient,
|
||||
}
|
||||
|
||||
// TODO: Add google home integration
|
||||
@@ -33,6 +36,12 @@ pub struct Washer {
|
||||
impl Washer {
|
||||
async fn create(config: WasherConfig) -> Result<Self, DeviceConfigError> {
|
||||
trace!(id = config.identifier, "Setting up Washer");
|
||||
|
||||
config
|
||||
.client
|
||||
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
|
||||
.await?;
|
||||
|
||||
Ok(Self { config, running: 0 })
|
||||
}
|
||||
}
|
||||
@@ -50,11 +59,11 @@ const HYSTERESIS: isize = 10;
|
||||
|
||||
#[async_trait]
|
||||
impl OnMqtt for Washer {
|
||||
fn topics(&self) -> Vec<&str> {
|
||||
vec![&self.config.mqtt.topic]
|
||||
}
|
||||
|
||||
async fn on_mqtt(&mut self, message: Publish) {
|
||||
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
|
||||
return;
|
||||
}
|
||||
|
||||
let power = match PowerMessage::try_from(message) {
|
||||
Ok(state) => state.power(),
|
||||
Err(err) => {
|
||||
|
||||
Reference in New Issue
Block a user