Moved the mqtt topic check up one leve into Devices
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Dreaded_X 2023-04-12 04:49:45 +02:00
parent 92c8f3074f
commit 11aa15b59d
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
5 changed files with 13 additions and 23 deletions

View File

@ -15,7 +15,7 @@ use std::collections::HashMap;
use async_trait::async_trait; use async_trait::async_trait;
use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice}; use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice};
use pollster::FutureExt; use pollster::FutureExt;
use rumqttc::{AsyncClient, QoS}; use rumqttc::{matches, AsyncClient, QoS};
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, trace}; use tracing::{debug, error, trace};
@ -192,8 +192,15 @@ impl OnMqtt for Devices {
self.get::<dyn OnMqtt>() self.get::<dyn OnMqtt>()
.iter_mut() .iter_mut()
.for_each(|(id, listener)| { .for_each(|(id, listener)| {
trace!(id, "Handling"); let subscribed = listener
listener.on_mqtt(message).block_on(); .topics()
.iter()
.any(|topic| matches(&message.topic, topic));
if subscribed {
trace!(id, "Handling");
listener.on_mqtt(message).block_on();
}
}) })
} }
} }

View File

@ -1,6 +1,5 @@
use async_trait::async_trait; use async_trait::async_trait;
use google_home::traits; use google_home::traits;
use rumqttc::matches;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use crate::config::MqttDeviceConfig; use crate::config::MqttDeviceConfig;
@ -56,10 +55,6 @@ impl OnMqtt for AudioSetup {
} }
async fn on_mqtt(&mut self, message: &rumqttc::Publish) { async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if !matches(&message.topic, &self.mqtt.topic) {
return;
}
let action = match RemoteMessage::try_from(message) { let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(), Ok(message) => message.action(),
Err(err) => { Err(err) => {

View File

@ -1,7 +1,7 @@
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use rumqttc::{matches, AsyncClient}; use rumqttc::AsyncClient;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
@ -64,10 +64,6 @@ impl OnMqtt for ContactSensor {
} }
async fn on_mqtt(&mut self, message: &rumqttc::Publish) { async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if !matches(&message.topic, &self.mqtt.topic) {
return;
}
let is_closed = match ContactMessage::try_from(message) { let is_closed = match ContactMessage::try_from(message) {
Ok(state) => state.is_closed(), Ok(state) => state.is_closed(),
Err(err) => { Err(err) => {

View File

@ -7,7 +7,7 @@ use google_home::{
GoogleHomeDevice, GoogleHomeDevice,
}; };
use pollster::FutureExt as _; use pollster::FutureExt as _;
use rumqttc::{matches, AsyncClient, Publish}; use rumqttc::{AsyncClient, Publish};
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
@ -83,10 +83,6 @@ impl OnMqtt for IkeaOutlet {
async fn on_mqtt(&mut self, message: &Publish) { async fn on_mqtt(&mut self, message: &Publish) {
// Update the internal state based on what the device has reported // Update the internal state based on what the device has reported
if !matches(&message.topic, &self.mqtt.topic) {
return;
}
let state = match OnOffMessage::try_from(message) { let state = match OnOffMessage::try_from(message) {
Ok(state) => state.state(), Ok(state) => state.state(),
Err(err) => { Err(err) => {

View File

@ -9,7 +9,7 @@ use google_home::{
types::Type, types::Type,
GoogleHomeDevice, GoogleHomeDevice,
}; };
use rumqttc::{matches, Publish}; use rumqttc::Publish;
use tracing::{debug, error}; use tracing::{debug, error};
use crate::{ use crate::{
@ -59,10 +59,6 @@ impl OnMqtt for WakeOnLAN {
} }
async fn on_mqtt(&mut self, message: &Publish) { async fn on_mqtt(&mut self, message: &Publish) {
if !matches(&message.topic, &self.mqtt.topic) {
return;
}
let activate = match ActivateMessage::try_from(message) { let activate = match ActivateMessage::try_from(message) {
Ok(message) => message.activate(), Ok(message) => message.activate(),
Err(err) => { Err(err) => {