Devices now handles subscribing to mqtt topics

This commit is contained in:
Dreaded_X 2023-04-12 04:37:16 +02:00
parent 34e5274e0b
commit 92c8f3074f
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
12 changed files with 93 additions and 63 deletions

View File

@ -4,10 +4,7 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[workspace] [workspace]
members = [ members = ["impl_cast", "google-home"]
"impl_cast",
"google-home"
]
[dependencies] [dependencies]
rumqttc = "0.18" rumqttc = "0.18"

View File

@ -281,9 +281,14 @@ impl Device {
info.name, info.name,
info.room info.room
); );
IkeaOutlet::build(identifier, info, mqtt, outlet_type, timeout, client) Box::new(IkeaOutlet::new(
.await identifier,
.map(Box::new)? info,
mqtt,
outlet_type,
timeout,
client,
))
} }
Device::WakeOnLAN { Device::WakeOnLAN {
info, info,
@ -297,9 +302,13 @@ impl Device {
info.name, info.name,
info.room info.room
); );
WakeOnLAN::build(identifier, info, mqtt, mac_address, broadcast_ip, client) Box::new(WakeOnLAN::new(
.await identifier,
.map(Box::new)? info,
mqtt,
mac_address,
broadcast_ip,
))
} }
Device::KasaOutlet { ip } => { Device::KasaOutlet { ip } => {
trace!(id = identifier, "KasaOutlet [{}]", identifier); trace!(id = identifier, "KasaOutlet [{}]", identifier);
@ -319,7 +328,7 @@ impl Device {
.create(&speakers_id, config, client.clone()) .create(&speakers_id, config, client.clone())
.await?; .await?;
AudioSetup::build(identifier, mqtt, mixer, speakers, client) AudioSetup::build(identifier, mqtt, mixer, speakers)
.await .await
.map(Box::new)? .map(Box::new)?
} }
@ -329,9 +338,7 @@ impl Device {
.map(|p| p.generate_topic("contact", identifier, config)) .map(|p| p.generate_topic("contact", identifier, config))
.transpose()?; .transpose()?;
ContactSensor::build(identifier, mqtt, presence, client) Box::new(ContactSensor::new(identifier, mqtt, presence, client))
.await
.map(Box::new)?
} }
}; };

View File

@ -15,9 +15,10 @@ use std::collections::HashMap;
use async_trait::async_trait; use async_trait::async_trait;
use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice}; use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice};
use pollster::FutureExt; use pollster::FutureExt;
use rumqttc::{AsyncClient, QoS};
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tracing::{debug, trace}; use tracing::{debug, error, trace};
use crate::{ use crate::{
light_sensor::{self, OnDarkness}, light_sensor::{self, OnDarkness},
@ -35,6 +36,7 @@ pub trait Device: std::fmt::Debug + Sync + Send {
#[derive(Debug)] #[derive(Debug)]
struct Devices { struct Devices {
devices: HashMap<String, Box<dyn Device>>, devices: HashMap<String, Box<dyn Device>>,
client: AsyncClient,
} }
#[derive(Debug)] #[derive(Debug)]
@ -94,9 +96,11 @@ pub fn start(
mut mqtt_rx: mqtt::Receiver, mut mqtt_rx: mqtt::Receiver,
mut presence_rx: presence::Receiver, mut presence_rx: presence::Receiver,
mut light_sensor_rx: light_sensor::Receiver, mut light_sensor_rx: light_sensor::Receiver,
client: AsyncClient,
) -> DevicesHandle { ) -> DevicesHandle {
let mut devices = Devices { let mut devices = Devices {
devices: HashMap::new(), devices: HashMap::new(),
client,
}; };
let (tx, mut rx) = mpsc::channel(100); let (tx, mut rx) = mpsc::channel(100);
@ -118,7 +122,7 @@ pub fn start(
} }
// TODO: Handle receiving None better, otherwise it might constantly run doing // TODO: Handle receiving None better, otherwise it might constantly run doing
// nothing // nothing
Some(cmd) = rx.recv() => devices.handle_cmd(cmd) Some(cmd) = rx.recv() => devices.handle_cmd(cmd).await
} }
} }
}); });
@ -127,7 +131,7 @@ pub fn start(
} }
impl Devices { impl Devices {
fn handle_cmd(&mut self, cmd: Command) { async fn handle_cmd(&mut self, cmd: Command) {
match cmd { match cmd {
Command::Fullfillment { Command::Fullfillment {
google_home, google_home,
@ -139,15 +143,29 @@ impl Devices {
tx.send(result).ok(); tx.send(result).ok();
} }
Command::AddDevice { device, tx } => { Command::AddDevice { device, tx } => {
self.add_device(device); self.add_device(device).await;
tx.send(()).ok(); tx.send(()).ok();
} }
} }
} }
fn add_device(&mut self, device: Box<dyn Device>) { async fn add_device(&mut self, device: Box<dyn Device>) {
debug!(id = device.get_id(), "Adding device"); let id = device.get_id();
debug!(id, "Adding device");
// If the device listens to mqtt, subscribe to the topics
if let Some(device) = As::<dyn OnMqtt>::cast(device.as_ref()) {
for topic in device.topics() {
trace!(id, topic, "Subscribing to topic");
if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await {
// NOTE: Pretty sure that this can only happen if the mqtt client if no longer
// running
error!(id, topic, "Failed to subscribe to topic: {err}");
}
}
}
self.devices.insert(device.get_id().to_owned(), device); self.devices.insert(device.get_id().to_owned(), device);
} }
@ -165,6 +183,10 @@ impl Devices {
#[async_trait] #[async_trait]
impl OnMqtt for Devices { impl OnMqtt for Devices {
fn topics(&self) -> Vec<&str> {
Vec::new()
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn on_mqtt(&mut self, message: &rumqttc::Publish) { async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
self.get::<dyn OnMqtt>() self.get::<dyn OnMqtt>()

View File

@ -1,6 +1,6 @@
use async_trait::async_trait; use async_trait::async_trait;
use google_home::traits; use google_home::traits;
use rumqttc::{matches, AsyncClient}; use rumqttc::matches;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
use crate::config::MqttDeviceConfig; use crate::config::MqttDeviceConfig;
@ -26,19 +26,13 @@ impl AudioSetup {
mqtt: MqttDeviceConfig, mqtt: MqttDeviceConfig,
mixer: Box<dyn Device>, mixer: Box<dyn Device>,
speakers: Box<dyn Device>, speakers: Box<dyn Device>,
client: AsyncClient,
) -> Result<Self, DeviceError> { ) -> Result<Self, DeviceError> {
// We expect the children devices to implement the OnOff trait // We expect the children devices to implement the OnOff trait
let mixer_id = mixer.get_id().to_owned(); let mixer_id = mixer.get_id().to_owned();
let mixer = As::consume(mixer).ok_or_else(|| DeviceError::OnOffExpected(mixer_id))?; let mixer = As::consume(mixer).ok_or(DeviceError::OnOffExpected(mixer_id))?;
let speakers_id = speakers.get_id().to_owned(); let speakers_id = speakers.get_id().to_owned();
let speakers = let speakers = As::consume(speakers).ok_or(DeviceError::OnOffExpected(speakers_id))?;
As::consume(speakers).ok_or_else(|| DeviceError::OnOffExpected(speakers_id))?;
client
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { Ok(Self {
identifier: identifier.to_owned(), identifier: identifier.to_owned(),
@ -57,6 +51,10 @@ impl Device for AudioSetup {
#[async_trait] #[async_trait]
impl OnMqtt for AudioSetup { impl OnMqtt for AudioSetup {
fn topics(&self) -> Vec<&str> {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &rumqttc::Publish) { async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if !matches(&message.topic, &self.mqtt.topic) { if !matches(&message.topic, &self.mqtt.topic) {
return; return;

View File

@ -7,7 +7,6 @@ use tracing::{debug, error, warn};
use crate::{ use crate::{
config::{MqttDeviceConfig, PresenceDeviceConfig}, config::{MqttDeviceConfig, PresenceDeviceConfig},
error::DeviceError,
mqtt::{ContactMessage, OnMqtt, PresenceMessage}, mqtt::{ContactMessage, OnMqtt, PresenceMessage},
presence::OnPresence, presence::OnPresence,
}; };
@ -27,17 +26,13 @@ pub struct ContactSensor {
} }
impl ContactSensor { impl ContactSensor {
pub async fn build( pub fn new(
identifier: &str, identifier: &str,
mqtt: MqttDeviceConfig, mqtt: MqttDeviceConfig,
presence: Option<PresenceDeviceConfig>, presence: Option<PresenceDeviceConfig>,
client: AsyncClient, client: AsyncClient,
) -> Result<Self, DeviceError> { ) -> Self {
client Self {
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
identifier: identifier.to_owned(), identifier: identifier.to_owned(),
mqtt, mqtt,
presence, presence,
@ -45,7 +40,7 @@ impl ContactSensor {
overall_presence: false, overall_presence: false,
is_closed: true, is_closed: true,
handle: None, handle: None,
}) }
} }
} }
@ -64,6 +59,10 @@ impl OnPresence for ContactSensor {
#[async_trait] #[async_trait]
impl OnMqtt for ContactSensor { impl OnMqtt for ContactSensor {
fn topics(&self) -> Vec<&str> {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &rumqttc::Publish) { async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if !matches(&message.topic, &self.mqtt.topic) { if !matches(&message.topic, &self.mqtt.topic) {
return; return;

View File

@ -14,7 +14,6 @@ use tracing::{debug, error, warn};
use crate::config::{InfoConfig, MqttDeviceConfig, OutletType}; use crate::config::{InfoConfig, MqttDeviceConfig, OutletType};
use crate::devices::Device; use crate::devices::Device;
use crate::error::DeviceError;
use crate::mqtt::{OnMqtt, OnOffMessage}; use crate::mqtt::{OnMqtt, OnOffMessage};
use crate::presence::OnPresence; use crate::presence::OnPresence;
@ -32,20 +31,15 @@ pub struct IkeaOutlet {
} }
impl IkeaOutlet { impl IkeaOutlet {
pub async fn build( pub fn new(
identifier: &str, identifier: &str,
info: InfoConfig, info: InfoConfig,
mqtt: MqttDeviceConfig, mqtt: MqttDeviceConfig,
outlet_type: OutletType, outlet_type: OutletType,
timeout: Option<u64>, timeout: Option<u64>,
client: AsyncClient, client: AsyncClient,
) -> Result<Self, DeviceError> { ) -> Self {
// TODO: Handle potential errors here Self {
client
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
identifier: identifier.to_owned(), identifier: identifier.to_owned(),
info, info,
mqtt, mqtt,
@ -54,7 +48,7 @@ impl IkeaOutlet {
client, client,
last_known_state: false, last_known_state: false,
handle: None, handle: None,
}) }
} }
} }
@ -83,6 +77,10 @@ impl Device for IkeaOutlet {
#[async_trait] #[async_trait]
impl OnMqtt for IkeaOutlet { impl OnMqtt for IkeaOutlet {
fn topics(&self) -> Vec<&str> {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &Publish) { async fn on_mqtt(&mut self, message: &Publish) {
// Update the internal state based on what the device has reported // Update the internal state based on what the device has reported
if !matches(&message.topic, &self.mqtt.topic) { if !matches(&message.topic, &self.mqtt.topic) {

View File

@ -9,12 +9,11 @@ use google_home::{
types::Type, types::Type,
GoogleHomeDevice, GoogleHomeDevice,
}; };
use rumqttc::{matches, AsyncClient, Publish}; use rumqttc::{matches, Publish};
use tracing::{debug, error}; use tracing::{debug, error};
use crate::{ use crate::{
config::{InfoConfig, MqttDeviceConfig}, config::{InfoConfig, MqttDeviceConfig},
error::DeviceError,
mqtt::{ActivateMessage, OnMqtt}, mqtt::{ActivateMessage, OnMqtt},
}; };
@ -30,26 +29,20 @@ pub struct WakeOnLAN {
} }
impl WakeOnLAN { impl WakeOnLAN {
pub async fn build( pub fn new(
identifier: &str, identifier: &str,
info: InfoConfig, info: InfoConfig,
mqtt: MqttDeviceConfig, mqtt: MqttDeviceConfig,
mac_address: MacAddress, mac_address: MacAddress,
broadcast_ip: Ipv4Addr, broadcast_ip: Ipv4Addr,
client: AsyncClient, ) -> Self {
) -> Result<Self, DeviceError> { Self {
// TODO: Handle potential errors here
client
.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
identifier: identifier.to_owned(), identifier: identifier.to_owned(),
info, info,
mqtt, mqtt,
mac_address, mac_address,
broadcast_ip, broadcast_ip,
}) }
} }
} }
@ -61,6 +54,10 @@ impl Device for WakeOnLAN {
#[async_trait] #[async_trait]
impl OnMqtt for WakeOnLAN { impl OnMqtt for WakeOnLAN {
fn topics(&self) -> Vec<&str> {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &Publish) { async fn on_mqtt(&mut self, message: &Publish) {
if !matches(&message.topic, &self.mqtt.topic) { if !matches(&message.topic, &self.mqtt.topic) {
return; return;

View File

@ -84,8 +84,6 @@ impl MissingWildcard {
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum DeviceError { pub enum DeviceError {
#[error(transparent)]
SubscribeError(#[from] ClientError),
#[error("Expected device '{0}' to implement OnOff trait")] #[error("Expected device '{0}' to implement OnOff trait")]
OnOffExpected(String), OnOffExpected(String),
} }

View File

@ -65,6 +65,10 @@ pub async fn start(
#[async_trait] #[async_trait]
impl OnMqtt for LightSensor { impl OnMqtt for LightSensor {
fn topics(&self) -> Vec<&str> {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &rumqttc::Publish) { async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if !matches(&message.topic, &self.mqtt.topic) { if !matches(&message.topic, &self.mqtt.topic) {
return; return;

View File

@ -97,7 +97,12 @@ async fn app() -> anyhow::Result<()> {
); );
} }
let devices = devices::start(mqtt.subscribe(), presence.clone(), light_sensor.clone()); let devices = devices::start(
mqtt.subscribe(),
presence.clone(),
light_sensor.clone(),
client.clone(),
);
join_all( join_all(
config config
.devices .devices

View File

@ -12,6 +12,7 @@ use tokio::sync::broadcast;
#[async_trait] #[async_trait]
#[impl_cast::device_trait] #[impl_cast::device_trait]
pub trait OnMqtt { pub trait OnMqtt {
fn topics(&self) -> Vec<&str>;
async fn on_mqtt(&mut self, message: &Publish); async fn on_mqtt(&mut self, message: &Publish);
} }

View File

@ -70,6 +70,10 @@ pub async fn start(
#[async_trait] #[async_trait]
impl OnMqtt for Presence { impl OnMqtt for Presence {
fn topics(&self) -> Vec<&str> {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &rumqttc::Publish) { async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if !matches(&message.topic, &self.mqtt.topic) { if !matches(&message.topic, &self.mqtt.topic) {
return; return;