From 11aa15b59dcbcda397223b8d5be72e76a16332b5 Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Wed, 12 Apr 2023 04:49:45 +0200 Subject: [PATCH] Moved the mqtt topic check up one leve into Devices --- src/devices.rs | 13 ++++++++++--- src/devices/audio_setup.rs | 5 ----- src/devices/contact_sensor.rs | 6 +----- src/devices/ikea_outlet.rs | 6 +----- src/devices/wake_on_lan.rs | 6 +----- 5 files changed, 13 insertions(+), 23 deletions(-) diff --git a/src/devices.rs b/src/devices.rs index c3c4dfc..311c345 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use async_trait::async_trait; use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice}; use pollster::FutureExt; -use rumqttc::{AsyncClient, QoS}; +use rumqttc::{matches, AsyncClient, QoS}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, trace}; @@ -192,8 +192,15 @@ impl OnMqtt for Devices { self.get::() .iter_mut() .for_each(|(id, listener)| { - trace!(id, "Handling"); - listener.on_mqtt(message).block_on(); + let subscribed = listener + .topics() + .iter() + .any(|topic| matches(&message.topic, topic)); + + if subscribed { + trace!(id, "Handling"); + listener.on_mqtt(message).block_on(); + } }) } } diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index 22a7272..3e1eccb 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -1,6 +1,5 @@ use async_trait::async_trait; use google_home::traits; -use rumqttc::matches; use tracing::{debug, error, warn}; use crate::config::MqttDeviceConfig; @@ -56,10 +55,6 @@ impl OnMqtt for AudioSetup { } async fn on_mqtt(&mut self, message: &rumqttc::Publish) { - if !matches(&message.topic, &self.mqtt.topic) { - return; - } - let action = match RemoteMessage::try_from(message) { Ok(message) => message.action(), Err(err) => { diff --git a/src/devices/contact_sensor.rs b/src/devices/contact_sensor.rs index 2045259..7e51ace 100644 --- a/src/devices/contact_sensor.rs +++ b/src/devices/contact_sensor.rs @@ -1,7 +1,7 @@ use std::time::Duration; use async_trait::async_trait; -use rumqttc::{matches, AsyncClient}; +use rumqttc::AsyncClient; use tokio::task::JoinHandle; use tracing::{debug, error, warn}; @@ -64,10 +64,6 @@ impl OnMqtt for ContactSensor { } async fn on_mqtt(&mut self, message: &rumqttc::Publish) { - if !matches(&message.topic, &self.mqtt.topic) { - return; - } - let is_closed = match ContactMessage::try_from(message) { Ok(state) => state.is_closed(), Err(err) => { diff --git a/src/devices/ikea_outlet.rs b/src/devices/ikea_outlet.rs index 5ec0667..2c254e2 100644 --- a/src/devices/ikea_outlet.rs +++ b/src/devices/ikea_outlet.rs @@ -7,7 +7,7 @@ use google_home::{ GoogleHomeDevice, }; use pollster::FutureExt as _; -use rumqttc::{matches, AsyncClient, Publish}; +use rumqttc::{AsyncClient, Publish}; use std::time::Duration; use tokio::task::JoinHandle; use tracing::{debug, error, warn}; @@ -83,10 +83,6 @@ impl OnMqtt for IkeaOutlet { async fn on_mqtt(&mut self, message: &Publish) { // Update the internal state based on what the device has reported - if !matches(&message.topic, &self.mqtt.topic) { - return; - } - let state = match OnOffMessage::try_from(message) { Ok(state) => state.state(), Err(err) => { diff --git a/src/devices/wake_on_lan.rs b/src/devices/wake_on_lan.rs index d4d873d..a670638 100644 --- a/src/devices/wake_on_lan.rs +++ b/src/devices/wake_on_lan.rs @@ -9,7 +9,7 @@ use google_home::{ types::Type, GoogleHomeDevice, }; -use rumqttc::{matches, Publish}; +use rumqttc::Publish; use tracing::{debug, error}; use crate::{ @@ -59,10 +59,6 @@ impl OnMqtt for WakeOnLAN { } async fn on_mqtt(&mut self, message: &Publish) { - if !matches(&message.topic, &self.mqtt.topic) { - return; - } - let activate = match ActivateMessage::try_from(message) { Ok(message) => message.activate(), Err(err) => {