Renamed mqtt system to make it more clear that it has to do with mqtt

This commit is contained in:
Dreaded_X 2022-12-29 00:50:16 +01:00
parent 924b3cf862
commit 458c5e25a3
Signed by: Dreaded_X
GPG Key ID: 76BDEC4E165D8AD9
7 changed files with 46 additions and 42 deletions

View File

@ -8,7 +8,7 @@ use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN};
#[derive(Debug, Deserialize)]
pub struct Config {
pub mqtt: MQTTConfig,
pub mqtt: MqttConfig,
pub fullfillment: FullfillmentConfig,
pub presence: MqttDeviceConfig,
#[serde(default)]
@ -16,7 +16,7 @@ pub struct Config {
}
#[derive(Debug, Deserialize)]
pub struct MQTTConfig {
pub struct MqttConfig {
pub host: String,
pub port: u16,
pub username: String,

View File

@ -9,14 +9,14 @@ use std::collections::HashMap;
use google_home::{GoogleHomeDevice, traits::OnOff};
use log::trace;
use crate::{mqtt::Listener, presence::OnPresence};
use crate::{mqtt::OnMqtt, presence::OnPresence};
impl_cast::impl_cast!(Device, Listener);
impl_cast::impl_cast!(Device, OnMqtt);
impl_cast::impl_cast!(Device, OnPresence);
impl_cast::impl_cast!(Device, GoogleHomeDevice);
impl_cast::impl_cast!(Device, OnOff);
pub trait Device: AsGoogleHomeDevice + AsListener + AsOnPresence + AsOnOff {
pub trait Device: AsGoogleHomeDevice + AsOnMqtt + AsOnPresence + AsOnOff {
fn get_id(&self) -> String;
}
@ -54,7 +54,7 @@ impl Devices {
self.devices.insert(device.get_id(), device);
}
get_cast!(Listener);
get_cast!(OnMqtt);
get_cast!(OnPresence);
get_cast!(GoogleHomeDevice);
get_cast!(OnOff);
@ -67,10 +67,12 @@ impl Devices {
}
}
impl Listener for Devices {
fn notify(&mut self, message: &rumqttc::Publish) {
self.as_listeners().iter_mut().for_each(|(_, listener)| {
listener.notify(message);
impl OnMqtt for Devices {
fn on_mqtt(&mut self, message: &rumqttc::Publish) {
trace!("OnMqtt for devices");
self.as_on_mqtts().iter_mut().for_each(|(id, listener)| {
trace!("OnMqtt: {id}");
listener.on_mqtt(message);
})
}
}
@ -78,8 +80,8 @@ impl Listener for Devices {
impl OnPresence for Devices {
fn on_presence(&mut self, presence: bool) {
trace!("OnPresence for devices");
self.as_on_presences().iter_mut().for_each(|(name, device)| {
trace!("OnPresence: {name}");
self.as_on_presences().iter_mut().for_each(|(id, device)| {
trace!("OnPresence: {id}");
device.on_presence(presence);
})
}

View File

@ -9,7 +9,7 @@ use tokio::task::JoinHandle;
use crate::config::{KettleConfig, InfoConfig, MqttDeviceConfig};
use crate::devices::Device;
use crate::mqtt::Listener;
use crate::mqtt::OnMqtt;
use crate::presence::OnPresence;
pub struct IkeaOutlet {
@ -69,8 +69,8 @@ impl TryFrom<&Publish> for StateMessage {
}
}
impl Listener for IkeaOutlet {
fn notify(&mut self, message: &Publish) {
impl OnMqtt for IkeaOutlet {
fn on_mqtt(&mut self, message: &Publish) {
// Update the internal state based on what the device has reported
if message.topic != self.mqtt.topic {
return;

View File

@ -3,7 +3,7 @@ use log::{debug, warn};
use rumqttc::{AsyncClient, Publish};
use serde::Deserialize;
use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::Listener};
use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::OnMqtt};
use super::Device;
@ -46,8 +46,8 @@ impl TryFrom<&Publish> for StateMessage {
}
}
impl Listener for WakeOnLAN {
fn notify(&mut self, message: &Publish) {
impl OnMqtt for WakeOnLAN {
fn on_mqtt(&mut self, message: &Publish) {
if message.topic != self.mqtt.topic {
return;

View File

@ -9,7 +9,7 @@ use rumqttc::{MqttOptions, Transport, AsyncClient};
use env_logger::Builder;
use log::{error, info, debug, LevelFilter};
use automation::{devices::Devices, mqtt::Notifier};
use automation::{devices::Devices, mqtt::Mqtt};
use google_home::{GoogleHome, Request};
#[tokio::main]
@ -30,31 +30,33 @@ async fn main() {
info!("Starting automation_rs...");
// Create device holder
// @TODO Make this nices to work with, we devices.rs
let devices = Arc::new(RwLock::new(Devices::new()));
// Setup MQTT
// Configure MQTT
let mut mqttoptions = MqttOptions::new("rust-test", config.mqtt.host, config.mqtt.port);
mqttoptions.set_credentials(config.mqtt.username, config.mqtt.password.unwrap());
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_transport(Transport::tls_with_default_config());
// Create a notifier and start it in a seperate task
// Create a mqtt client and wrap the eventloop
let (client, eventloop) = AsyncClient::new(mqttoptions, 10);
let mut notifier = Notifier::new(eventloop);
let mut mqtt = Mqtt::new(eventloop);
notifier.add_listener(Arc::downgrade(&devices));
// Create device holder and register it as listener for mqtt
let devices = Arc::new(RwLock::new(Devices::new()));
mqtt.add_listener(Arc::downgrade(&devices));
// Setup presence system
let mut presence = Presence::new(config.presence, client.clone());
// Register devices as presence listener
presence.add_listener(Arc::downgrade(&devices));
// Register presence as mqtt listener
let presence = Arc::new(RwLock::new(presence));
notifier.add_listener(Arc::downgrade(&presence));
mqtt.add_listener(Arc::downgrade(&presence));
notifier.start();
// Start mqtt, this spawns a seperate async task
mqtt.start();
// Create devices based on config
// @TODO Move out of main (config? or maybe devices?)
// Turn the config into actual devices and add them
config.devices
.into_iter()
.map(|(identifier, device_config)| {
@ -65,7 +67,7 @@ async fn main() {
devices.write().unwrap().add_device(device);
});
// Fullfillments
// Create google home fullfillment route
let fullfillment = Router::new()
.route("/google_home", post(async move |Json(payload): Json<Request>| {
// @TODO Verify that we are actually logged in

View File

@ -5,17 +5,17 @@ use rumqttc::{Publish, Event, Incoming, EventLoop};
use log::trace;
use tokio::task::JoinHandle;
pub trait Listener {
fn notify(&mut self, message: &Publish);
pub trait OnMqtt {
fn on_mqtt(&mut self, message: &Publish);
}
// @TODO Maybe rename this to make it clear it has to do with mqtt
pub struct Notifier {
listeners: Vec<Weak<RwLock<dyn Listener + Sync + Send>>>,
pub struct Mqtt {
listeners: Vec<Weak<RwLock<dyn OnMqtt + Sync + Send>>>,
eventloop: EventLoop,
}
impl Notifier {
impl Mqtt {
pub fn new(eventloop: EventLoop) -> Self {
return Self { listeners: Vec::new(), eventloop }
}
@ -25,7 +25,7 @@ impl Notifier {
self.listeners.retain(|listener| {
if let Some(listener) = listener.upgrade() {
listener.write().unwrap().notify(&message);
listener.write().unwrap().on_mqtt(&message);
return true;
} else {
trace!("Removing listener...");
@ -35,7 +35,7 @@ impl Notifier {
})
}
pub fn add_listener<T: Listener + Sync + Send + 'static>(&mut self, listener: Weak<RwLock<T>>) {
pub fn add_listener<T: OnMqtt + Sync + Send + 'static>(&mut self, listener: Weak<RwLock<T>>) {
self.listeners.push(listener);
}

View File

@ -4,7 +4,7 @@ use log::{debug, warn, trace};
use rumqttc::{AsyncClient, Publish};
use serde::{Serialize, Deserialize};
use crate::{mqtt::Listener, config::MqttDeviceConfig};
use crate::{mqtt::OnMqtt, config::MqttDeviceConfig};
pub trait OnPresence {
fn on_presence(&mut self, presence: bool);
@ -47,8 +47,8 @@ impl TryFrom<&Publish> for StateMessage {
}
}
impl Listener for Presence {
fn notify(&mut self, message: &rumqttc::Publish) {
impl OnMqtt for Presence {
fn on_mqtt(&mut self, message: &rumqttc::Publish) {
if message.topic.starts_with(&(self.mqtt.topic.clone() + "/")) {
let device_name = message.topic.rsplit_once("/").unwrap().1;