Everything is now implemented as a Device using device_traits with all events going through a single place
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Dreaded_X 2023-04-14 05:46:04 +02:00
parent 88e9b8f409
commit b7329b58ee
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
15 changed files with 487 additions and 366 deletions

View File

@ -15,6 +15,7 @@ use crate::{
debug_bridge::DebugBridgeConfig,
devices::{self, AudioSetup, ContactSensor, IkeaOutlet, KasaOutlet, WakeOnLAN},
error::{ConfigParseError, CreateDeviceError, MissingEnv},
event::EventChannel,
hue_bridge::HueBridgeConfig,
light_sensor::LightSensorConfig,
presence::PresenceConfig,
@ -165,8 +166,10 @@ pub trait CreateDevice {
fn create(
identifier: &str,
config: Self::Config,
client: AsyncClient,
presence_topic: &str, // Not a big fan of passing in the global config
event_channel: &EventChannel,
client: &AsyncClient,
// TODO: Not a big fan of passing in the global config
presence_topic: &str,
) -> Result<Self, CreateDeviceError>
where
Self: Sized;
@ -176,16 +179,31 @@ impl Device {
pub fn create(
self,
id: &str,
client: AsyncClient,
event_channel: &EventChannel,
client: &AsyncClient,
presence: &str,
) -> Result<Box<dyn devices::Device>, CreateDeviceError> {
let device: Box<dyn devices::Device> = match self {
// TODO: It would be nice if this would be more automatic, not sure how to do that...
Device::IkeaOutlet(c) => Box::new(IkeaOutlet::create(id, c, client, presence)?),
Device::WakeOnLAN(c) => Box::new(WakeOnLAN::create(id, c, client, presence)?),
Device::KasaOutlet(c) => Box::new(KasaOutlet::create(id, c, client, presence)?),
Device::AudioSetup(c) => Box::new(AudioSetup::create(id, c, client, presence)?),
Device::ContactSensor(c) => Box::new(ContactSensor::create(id, c, client, presence)?),
Device::IkeaOutlet(c) => {
Box::new(IkeaOutlet::create(id, c, event_channel, client, presence)?)
}
Device::WakeOnLAN(c) => {
Box::new(WakeOnLAN::create(id, c, event_channel, client, presence)?)
}
Device::KasaOutlet(c) => {
Box::new(KasaOutlet::create(id, c, event_channel, client, presence)?)
}
Device::AudioSetup(c) => {
Box::new(AudioSetup::create(id, c, event_channel, client, presence)?)
}
Device::ContactSensor(c) => Box::new(ContactSensor::create(
id,
c,
event_channel,
client,
presence,
)?),
};
Ok(device)

View File

@ -1,64 +1,88 @@
use async_trait::async_trait;
use rumqttc::AsyncClient;
use serde::Deserialize;
use tracing::warn;
use crate::{
event::{Event, EventChannel},
config::MqttDeviceConfig,
devices::Device,
light_sensor::OnDarkness,
mqtt::{DarknessMessage, PresenceMessage},
presence::OnPresence,
};
#[derive(Debug, Deserialize)]
pub struct DebugBridgeConfig {
pub topic: String,
#[serde(flatten)]
pub mqtt: MqttDeviceConfig,
}
pub fn start(config: DebugBridgeConfig, event_channel: &EventChannel, client: AsyncClient) {
let mut rx = event_channel.get_rx();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(Event::Presence(presence)) => {
let message = PresenceMessage::new(presence);
let topic = format!("{}/presence", config.topic);
client
.publish(
topic,
rumqttc::QoS::AtLeastOnce,
true,
serde_json::to_string(&message).unwrap(),
)
.await
.map_err(|err| {
warn!(
"Failed to update presence on {}/presence: {err}",
config.topic
)
})
.ok();
}
Ok(Event::Darkness(dark)) => {
let message = DarknessMessage::new(dark);
let topic = format!("{}/darkness", config.topic);
client
.publish(
topic,
rumqttc::QoS::AtLeastOnce,
true,
serde_json::to_string(&message).unwrap(),
)
.await
.map_err(|err| {
warn!(
"Failed to update presence on {}/presence: {err}",
config.topic
)
})
.ok();
}
Ok(_) => {}
Err(_) => todo!("Handle errors with the event channel properly"),
}
}
});
#[derive(Debug)]
pub struct DebugBridge {
mqtt: MqttDeviceConfig,
client: AsyncClient,
}
impl DebugBridge {
pub fn new(
config: DebugBridgeConfig,
client: &AsyncClient,
) -> Result<Self, crate::error::CreateDeviceError> {
Ok(Self {
mqtt: config.mqtt,
client: client.clone(),
})
}
}
impl Device for DebugBridge {
fn get_id(&self) -> &str {
"debug_bridge"
}
}
#[async_trait]
impl OnPresence for DebugBridge {
async fn on_presence(&mut self, presence: bool) {
let message = PresenceMessage::new(presence);
let topic = format!("{}/presence", self.mqtt.topic);
self.client
.publish(
topic,
rumqttc::QoS::AtLeastOnce,
true,
serde_json::to_string(&message).unwrap(),
)
.await
.map_err(|err| {
warn!(
"Failed to update presence on {}/presence: {err}",
self.mqtt.topic
)
})
.ok();
}
}
#[async_trait]
impl OnDarkness for DebugBridge {
async fn on_darkness(&mut self, dark: bool) {
let message = DarknessMessage::new(dark);
let topic = format!("{}/darkness", self.mqtt.topic);
self.client
.publish(
topic,
rumqttc::QoS::AtLeastOnce,
true,
serde_json::to_string(&message).unwrap(),
)
.await
.map_err(|err| {
warn!(
"Failed to update presence on {}/presence: {err}",
self.mqtt.topic
)
})
.ok();
}
}

View File

@ -12,21 +12,22 @@ pub use self::wake_on_lan::WakeOnLAN;
use std::collections::HashMap;
use futures::future::join_all;
use google_home::{traits::OnOff, FullfillmentError, GoogleHome, GoogleHomeDevice};
use pollster::FutureExt;
use rumqttc::{matches, AsyncClient, QoS};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, trace};
use tracing::{debug, error, instrument, trace};
use crate::{
event::{Event, EventChannel},
light_sensor::OnDarkness,
mqtt::OnMqtt,
ntfy::OnNotification,
presence::OnPresence,
};
#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + GoogleHomeDevice + OnOff)]
#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + OnNotification + GoogleHomeDevice + OnOff)]
pub trait Device: std::fmt::Debug + Sync + Send {
fn get_id(&self) -> &str;
}
@ -85,6 +86,20 @@ impl DevicesHandle {
Ok(rx.await??)
}
// TODO: Finish implementing this
// pub fn create_device<T>(&self, identifier: &str, config: T::Config, presence_topic: &str) -> Result<T, CreateDeviceError>
// where
// T: CreateDevice,
// {
// T::create(
// identifier,
// config,
// self.event_channel,
// self.client,
// presence_topic: presence_topic.to_owned(),
// )
// }
pub async fn add_device(&self, device: Box<dyn Device>) -> Result<(), DevicesError> {
let (tx, rx) = oneshot::channel();
self.tx.send(Command::AddDevice { device, tx }).await?;
@ -92,21 +107,21 @@ impl DevicesHandle {
}
}
pub fn start(event_channel: &EventChannel, client: AsyncClient) -> DevicesHandle {
pub fn start(client: AsyncClient) -> (DevicesHandle, EventChannel) {
let mut devices = Devices {
devices: HashMap::new(),
client,
};
let (event_channel, mut event_rx) = EventChannel::new();
let (tx, mut rx) = mpsc::channel(100);
let mut event_rx = event_channel.get_rx();
tokio::spawn(async move {
// TODO: Handle error better
loop {
tokio::select! {
event = event_rx.recv() => {
if event.is_err() {
if event.is_none() {
todo!("Handle errors with the event channel properly")
}
devices.handle_event(event.unwrap()).await;
@ -123,7 +138,7 @@ pub fn start(event_channel: &EventChannel, client: AsyncClient) -> DevicesHandle
}
});
DevicesHandle { tx }
(DevicesHandle { tx }, event_channel)
}
impl Devices {
@ -165,12 +180,13 @@ impl Devices {
self.devices.insert(device.get_id().to_owned(), device);
}
#[instrument(skip(self))]
async fn handle_event(&mut self, event: Event) {
match event {
Event::MqttMessage(message) => {
self.get::<dyn OnMqtt>()
.iter_mut()
.for_each(|(id, listener)| {
let iter = self.get::<dyn OnMqtt>().into_iter().map(|(id, listener)| {
let message = message.clone();
async move {
let subscribed = listener
.topics()
.iter()
@ -178,27 +194,49 @@ impl Devices {
if subscribed {
trace!(id, "Handling");
listener.on_mqtt(&message).block_on();
listener.on_mqtt(message).await;
}
})
}
});
join_all(iter).await;
}
Event::Darkness(dark) => {
self.get::<dyn OnDarkness>()
.iter_mut()
.for_each(|(id, device)| {
trace!(id, "Handling");
device.on_darkness(dark).block_on();
})
let iter =
self.get::<dyn OnDarkness>()
.into_iter()
.map(|(id, device)| async move {
trace!(id, "Handling");
device.on_darkness(dark).await;
});
join_all(iter).await;
}
Event::Presence(presence) => {
self.get::<dyn OnPresence>()
.iter_mut()
.for_each(|(id, device)| {
trace!(id, "Handling");
device.on_presence(presence).block_on();
})
let iter =
self.get::<dyn OnPresence>()
.into_iter()
.map(|(id, device)| async move {
trace!(id, "Handling");
device.on_presence(presence).await;
});
join_all(iter).await;
}
Event::Ntfy(notification) => {
let iter = self
.get::<dyn OnNotification>()
.into_iter()
.map(|(id, device)| {
let notification = notification.clone();
async move {
trace!(id, "Handling");
device.on_notification(notification).await;
}
});
join_all(iter).await;
}
Event::Ntfy(_) => {}
}
}

View File

@ -6,6 +6,7 @@ use tracing::{debug, error, trace, warn};
use crate::config::{self, CreateDevice, MqttDeviceConfig};
use crate::error::CreateDeviceError;
use crate::event::EventChannel;
use crate::mqtt::{OnMqtt, RemoteAction, RemoteMessage};
use crate::presence::OnPresence;
@ -34,18 +35,20 @@ impl CreateDevice for AudioSetup {
fn create(
identifier: &str,
config: Self::Config,
client: AsyncClient,
event_channel: &EventChannel,
client: &AsyncClient,
presence_topic: &str,
) -> Result<Self, CreateDeviceError> {
trace!(id = identifier, "Setting up AudioSetup");
// Create the child devices
let mixer_id = format!("{}.mixer", identifier);
let mixer = (*config.mixer).create(&mixer_id, client.clone(), presence_topic)?;
let mixer = (*config.mixer).create(&mixer_id, event_channel, client, presence_topic)?;
let mixer = As::consume(mixer).ok_or(CreateDeviceError::OnOffExpected(mixer_id))?;
let speakers_id = format!("{}.speakers", identifier);
let speakers = (*config.speakers).create(&speakers_id, client, presence_topic)?;
let speakers =
(*config.speakers).create(&speakers_id, event_channel, client, presence_topic)?;
let speakers =
As::consume(speakers).ok_or(CreateDeviceError::OnOffExpected(speakers_id))?;
@ -70,7 +73,7 @@ impl OnMqtt for AudioSetup {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
async fn on_mqtt(&mut self, message: rumqttc::Publish) {
let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(),
Err(err) => {

View File

@ -9,8 +9,9 @@ use tracing::{debug, error, trace, warn};
use crate::{
config::{CreateDevice, MqttDeviceConfig},
error::{CreateDeviceError, MissingWildcard},
event::EventChannel,
mqtt::{ContactMessage, OnMqtt, PresenceMessage},
presence::OnPresence,
presence::{self, OnPresence},
};
use super::Device;
@ -75,7 +76,8 @@ impl CreateDevice for ContactSensor {
fn create(
identifier: &str,
config: Self::Config,
client: AsyncClient,
_event_channel: &EventChannel,
client: &AsyncClient,
presence_topic: &str,
) -> Result<Self, CreateDeviceError> {
trace!(id = identifier, "Setting up ContactSensor");
@ -89,8 +91,8 @@ impl CreateDevice for ContactSensor {
identifier: identifier.to_owned(),
mqtt: config.mqtt,
presence,
client,
overall_presence: false,
client: client.clone(),
overall_presence: presence::DEFAULT,
is_closed: true,
handle: None,
})
@ -116,7 +118,7 @@ impl OnMqtt for ContactSensor {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &rumqttc::Publish) {
async fn on_mqtt(&mut self, message: rumqttc::Publish) {
let is_closed = match ContactMessage::try_from(message) {
Ok(state) => state.is_closed(),
Err(err) => {

View File

@ -16,6 +16,7 @@ use tracing::{debug, error, trace, warn};
use crate::config::{CreateDevice, InfoConfig, MqttDeviceConfig};
use crate::devices::Device;
use crate::error::CreateDeviceError;
use crate::event::EventChannel;
use crate::mqtt::{OnMqtt, OnOffMessage};
use crate::presence::OnPresence;
@ -60,8 +61,9 @@ impl CreateDevice for IkeaOutlet {
fn create(
identifier: &str,
config: Self::Config,
client: AsyncClient,
_presence_topic: &str, // Not a big fan of passing in the global config
_event_channel: &EventChannel,
client: &AsyncClient,
_presence_topic: &str,
) -> Result<Self, CreateDeviceError> {
trace!(
id = identifier,
@ -76,7 +78,7 @@ impl CreateDevice for IkeaOutlet {
mqtt: config.mqtt,
outlet_type: config.outlet_type,
timeout: config.timeout,
client,
client: client.clone(),
last_known_state: false,
handle: None,
})
@ -112,7 +114,7 @@ impl OnMqtt for IkeaOutlet {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &Publish) {
async fn on_mqtt(&mut self, message: Publish) {
// Update the internal state based on what the device has reported
let state = match OnOffMessage::try_from(message) {
Ok(state) => state.state(),

View File

@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::trace;
use crate::{config::CreateDevice, error::CreateDeviceError};
use crate::{config::CreateDevice, error::CreateDeviceError, event::EventChannel};
use super::Device;
@ -35,7 +35,8 @@ impl CreateDevice for KasaOutlet {
fn create(
identifier: &str,
config: Self::Config,
_client: AsyncClient,
_event_channel: &EventChannel,
_client: &AsyncClient,
_presence_topic: &str,
) -> Result<Self, CreateDeviceError> {
trace!(id = identifier, "Setting up KasaOutlet");

View File

@ -16,6 +16,7 @@ use tracing::{debug, error, trace};
use crate::{
config::{CreateDevice, InfoConfig, MqttDeviceConfig},
error::CreateDeviceError,
event::EventChannel,
mqtt::{ActivateMessage, OnMqtt},
};
@ -51,7 +52,8 @@ impl CreateDevice for WakeOnLAN {
fn create(
identifier: &str,
config: Self::Config,
_client: AsyncClient,
_event_channel: &EventChannel,
_client: &AsyncClient,
_presence_topic: &str,
) -> Result<Self, CreateDeviceError> {
trace!(
@ -83,7 +85,7 @@ impl OnMqtt for WakeOnLAN {
vec![&self.mqtt.topic]
}
async fn on_mqtt(&mut self, message: &Publish) {
async fn on_mqtt(&mut self, message: Publish) {
let activate = match ActivateMessage::try_from(message) {
Ok(message) => message.activate(),
Err(err) => {

View File

@ -1,9 +1,9 @@
use rumqttc::Publish;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use crate::ntfy;
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum Event {
MqttMessage(Publish),
Darkness(bool),
@ -11,29 +11,19 @@ pub enum Event {
Ntfy(ntfy::Notification),
}
pub type Sender = broadcast::Sender<Event>;
pub type Receiver = broadcast::Receiver<Event>;
pub type Sender = mpsc::Sender<Event>;
pub type Receiver = mpsc::Receiver<Event>;
pub struct EventChannel(Sender);
impl EventChannel {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(100);
pub fn new() -> (Self, Receiver) {
let (tx, rx) = mpsc::channel(100);
Self(tx)
}
pub fn get_rx(&self) -> Receiver {
self.0.subscribe()
(Self(tx), rx)
}
pub fn get_tx(&self) -> Sender {
self.0.clone()
}
}
impl Default for EventChannel {
fn default() -> Self {
Self::new()
}
}

View File

@ -1,10 +1,12 @@
use std::net::{Ipv4Addr, SocketAddr};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tracing::{error, trace, warn};
use crate::event::{Event, EventChannel};
use crate::{devices::Device, light_sensor::OnDarkness, presence::OnPresence};
#[derive(Debug)]
pub enum Flag {
Presence,
Darkness,
@ -23,7 +25,8 @@ pub struct HueBridgeConfig {
pub flags: FlagIDs,
}
struct HueBridge {
#[derive(Debug)]
pub struct HueBridge {
addr: SocketAddr,
login: String,
flag_ids: FlagIDs,
@ -35,14 +38,6 @@ struct FlagMessage {
}
impl HueBridge {
pub fn new(config: HueBridgeConfig) -> Self {
Self {
addr: (config.ip, 80).into(),
login: config.login,
flag_ids: config.flags,
}
}
pub async fn set_flag(&self, flag: Flag, value: bool) {
let flag_id = match flag {
Flag::Presence => self.flag_ids.presence,
@ -53,6 +48,8 @@ impl HueBridge {
"http://{}/api/{}/sensors/{flag_id}/state",
self.addr, self.login
);
trace!(?flag, flag_id, value, "Sending request to change flag");
let res = reqwest::Client::new()
.put(url)
.json(&FlagMessage { flag: value })
@ -73,25 +70,34 @@ impl HueBridge {
}
}
pub fn start(config: HueBridgeConfig, event_channel: &EventChannel) {
let hue_bridge = HueBridge::new(config);
let mut rx = event_channel.get_rx();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(Event::Presence(presence)) => {
trace!("Bridging presence to hue");
hue_bridge.set_flag(Flag::Presence, presence).await;
}
Ok(Event::Darkness(dark)) => {
trace!("Bridging darkness to hue");
hue_bridge.set_flag(Flag::Darkness, dark).await;
}
Ok(_) => {}
Err(_) => todo!("Handle errors with the event channel properly"),
}
impl HueBridge {
pub fn new(config: HueBridgeConfig) -> Self {
Self {
addr: (config.ip, 80).into(),
login: config.login,
flag_ids: config.flags,
}
});
}
}
impl Device for HueBridge {
fn get_id(&self) -> &str {
"hue_bridge"
}
}
#[async_trait]
impl OnPresence for HueBridge {
async fn on_presence(&mut self, presence: bool) {
trace!("Bridging presence to hue");
self.set_flag(Flag::Presence, presence).await;
}
}
#[async_trait]
impl OnDarkness for HueBridge {
async fn on_darkness(&mut self, dark: bool) {
trace!("Bridging darkness to hue");
self.set_flag(Flag::Darkness, dark).await;
}
}

View File

@ -1,13 +1,13 @@
use async_trait::async_trait;
use rumqttc::{matches, AsyncClient};
use rumqttc::Publish;
use serde::Deserialize;
use tracing::{debug, error, trace, warn};
use tracing::{debug, trace, warn};
use crate::{
config::MqttDeviceConfig,
error::LightSensorError,
event::{Event, EventChannel},
mqtt::BrightnessMessage,
devices::Device,
event::{self, Event, EventChannel},
mqtt::{BrightnessMessage, OnMqtt},
};
#[async_trait]
@ -23,72 +23,74 @@ pub struct LightSensorConfig {
pub max: isize,
}
const DEFAULT: bool = false;
pub const DEFAULT: bool = false;
pub async fn start(
config: LightSensorConfig,
event_channel: &EventChannel,
client: AsyncClient,
) -> Result<(), LightSensorError> {
// Subscrive to the mqtt topic
client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
#[derive(Debug)]
pub struct LightSensor {
tx: event::Sender,
mqtt: MqttDeviceConfig,
min: isize,
max: isize,
is_dark: bool,
}
// Create the channels
let mut rx = event_channel.get_rx();
let tx = event_channel.get_tx();
impl LightSensor {
pub fn new(config: LightSensorConfig, event_channel: &EventChannel) -> Self {
Self {
tx: event_channel.get_tx(),
mqtt: config.mqtt,
min: config.min,
max: config.max,
is_dark: DEFAULT,
}
}
}
// Setup default value, this is needed for hysteresis
let mut current_is_dark = DEFAULT;
impl Device for LightSensor {
fn get_id(&self) -> &str {
"light_sensor"
}
}
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(Event::MqttMessage(message)) => {
if !matches(&message.topic, &config.mqtt.topic) {
continue;
}
#[async_trait]
impl OnMqtt for LightSensor {
fn topics(&self) -> Vec<&str> {
vec![&self.mqtt.topic]
}
let illuminance = match BrightnessMessage::try_from(message) {
Ok(state) => state.illuminance(),
Err(err) => {
error!("Failed to parse message: {err}");
continue;
}
};
async fn on_mqtt(&mut self, message: Publish) {
let illuminance = match BrightnessMessage::try_from(message) {
Ok(state) => state.illuminance(),
Err(err) => {
warn!("Failed to parse message: {err}");
return;
}
};
debug!("Illuminance: {illuminance}");
let is_dark = if illuminance <= config.min {
trace!("It is dark");
true
} else if illuminance >= config.max {
trace!("It is light");
false
} else {
trace!(
"In between min ({}) and max ({}) value, keeping current state: {}",
config.min,
config.max,
current_is_dark
);
current_is_dark
};
debug!("Illuminance: {illuminance}");
let is_dark = if illuminance <= self.min {
trace!("It is dark");
true
} else if illuminance >= self.max {
trace!("It is light");
false
} else {
trace!(
"In between min ({}) and max ({}) value, keeping current state: {}",
self.min,
self.max,
self.is_dark
);
self.is_dark
};
if is_dark != current_is_dark {
debug!("Dark state has changed: {is_dark}");
current_is_dark = is_dark;
if is_dark != self.is_dark {
debug!("Dark state has changed: {is_dark}");
self.is_dark = is_dark;
if tx.send(Event::Darkness(is_dark)).is_err() {
warn!("There are no receivers on the event channel");
}
}
}
Ok(_) => {}
Err(_) => todo!("Handle errors with the event channel properly"),
if self.tx.send(Event::Darkness(is_dark)).await.is_err() {
warn!("There are no receivers on the event channel");
}
}
});
Ok(())
}
}

View File

@ -8,10 +8,14 @@ use axum::{
use automation::{
auth::{OpenIDConfig, User},
config::Config,
debug_bridge, devices,
debug_bridge::DebugBridge,
devices,
error::ApiError,
event::EventChannel,
hue_bridge, light_sensor, mqtt, ntfy, presence,
hue_bridge::HueBridge,
light_sensor::LightSensor,
mqtt,
ntfy::Ntfy,
presence::Presence,
};
use dotenvy::dotenv;
use futures::future::join_all;
@ -55,41 +59,55 @@ async fn app() -> anyhow::Result<()> {
std::env::var("AUTOMATION_CONFIG").unwrap_or("./config/config.toml".to_owned());
let config = Config::parse_file(&config_filename)?;
let event_channel = EventChannel::new();
// Create a mqtt client
let (client, eventloop) = AsyncClient::new(config.mqtt.clone(), 10);
let presence_topic = config.presence.mqtt.topic.to_owned();
presence::start(config.presence, &event_channel, client.clone()).await?;
light_sensor::start(config.light_sensor, &event_channel, client.clone()).await?;
// Setup the device handler
let (device_handler, event_channel) = devices::start(client.clone());
// Start the ntfy service if it is configured
if let Some(config) = config.ntfy {
ntfy::start(config, &event_channel);
// Create all the devices specified in the config
let mut devices = config
.devices
.into_iter()
.map(|(identifier, device_config)| {
device_config.create(
&identifier,
&event_channel,
&client,
&config.presence.mqtt.topic,
)
})
.collect::<Result<Vec<_>, _>>()?;
// Create and add the light sensor
{
let light_sensor = LightSensor::new(config.light_sensor, &event_channel);
devices.push(Box::new(light_sensor));
}
// Start the hue bridge if it is configured
// Create and add the presence system
{
let presence = Presence::new(config.presence, &event_channel);
devices.push(Box::new(presence));
}
// If configured, create and add the hue bridge
if let Some(config) = config.hue_bridge {
hue_bridge::start(config, &event_channel);
let hue_bridge = HueBridge::new(config);
devices.push(Box::new(hue_bridge));
}
// Start the debug bridge if it is configured
if let Some(config) = config.debug_bridge {
debug_bridge::start(config, &event_channel, client.clone());
let debug_bridge = DebugBridge::new(config, &client)?;
devices.push(Box::new(debug_bridge));
}
// Setup the device handler
let device_handler = devices::start(&event_channel, client.clone());
// Create all the devices specified in the config
let devices = config
.devices
.into_iter()
.map(|(identifier, device_config)| {
device_config.create(&identifier, client.clone(), &presence_topic)
})
.collect::<Result<Vec<_>, _>>()?;
// Start the ntfy service if it is configured
if let Some(config) = config.ntfy {
let ntfy = Ntfy::new(config, &event_channel);
devices.push(Box::new(ntfy));
}
// Can even add some more devices here
// devices.push(device)

View File

@ -14,7 +14,7 @@ use crate::event::{self, EventChannel};
#[impl_cast::device_trait]
pub trait OnMqtt {
fn topics(&self) -> Vec<&str>;
async fn on_mqtt(&mut self, message: &Publish);
async fn on_mqtt(&mut self, message: Publish);
}
#[derive(Debug, Error)]
@ -32,7 +32,7 @@ pub fn start(mut eventloop: EventLoop, event_channel: &EventChannel) {
let notification = eventloop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(p))) => {
tx.send(event::Event::MqttMessage(p)).ok();
tx.send(event::Event::MqttMessage(p)).await.ok();
}
Ok(..) => continue,
Err(err) => {
@ -62,10 +62,10 @@ impl OnOffMessage {
}
}
impl TryFrom<&Publish> for OnOffMessage {
impl TryFrom<Publish> for OnOffMessage {
type Error = ParseError;
fn try_from(message: &Publish) -> Result<Self, Self::Error> {
fn try_from(message: Publish) -> Result<Self, Self::Error> {
serde_json::from_slice(&message.payload)
.or(Err(ParseError::InvalidPayload(message.payload.clone())))
}
@ -82,10 +82,10 @@ impl ActivateMessage {
}
}
impl TryFrom<&Publish> for ActivateMessage {
impl TryFrom<Publish> for ActivateMessage {
type Error = ParseError;
fn try_from(message: &Publish) -> Result<Self, Self::Error> {
fn try_from(message: Publish) -> Result<Self, Self::Error> {
serde_json::from_slice(&message.payload)
.or(Err(ParseError::InvalidPayload(message.payload.clone())))
}
@ -112,10 +112,10 @@ impl RemoteMessage {
}
}
impl TryFrom<&Publish> for RemoteMessage {
impl TryFrom<Publish> for RemoteMessage {
type Error = ParseError;
fn try_from(message: &Publish) -> Result<Self, Self::Error> {
fn try_from(message: Publish) -> Result<Self, Self::Error> {
serde_json::from_slice(&message.payload)
.or(Err(ParseError::InvalidPayload(message.payload.clone())))
}
@ -185,10 +185,10 @@ impl ContactMessage {
}
}
impl TryFrom<&Publish> for ContactMessage {
impl TryFrom<Publish> for ContactMessage {
type Error = ParseError;
fn try_from(message: &Publish) -> Result<Self, Self::Error> {
fn try_from(message: Publish) -> Result<Self, Self::Error> {
serde_json::from_slice(&message.payload)
.or(Err(ParseError::InvalidPayload(message.payload.clone())))
}
@ -218,10 +218,10 @@ impl DarknessMessage {
}
}
impl TryFrom<&Publish> for DarknessMessage {
impl TryFrom<Publish> for DarknessMessage {
type Error = ParseError;
fn try_from(message: &Publish) -> Result<Self, Self::Error> {
fn try_from(message: Publish) -> Result<Self, Self::Error> {
serde_json::from_slice(&message.payload)
.or(Err(ParseError::InvalidPayload(message.payload.clone())))
}

View File

@ -1,5 +1,7 @@
use std::collections::HashMap;
use async_trait::async_trait;
use impl_cast::device_trait;
use serde::Serialize;
use serde_repr::*;
use tokio::sync::mpsc;
@ -7,18 +9,28 @@ use tracing::{debug, error, warn};
use crate::{
config::NtfyConfig,
event::{Event, EventChannel},
devices::Device,
event::{self, Event, EventChannel},
presence::OnPresence,
};
pub type Sender = mpsc::Sender<Notification>;
pub type Receiver = mpsc::Receiver<Notification>;
struct Ntfy {
base_url: String,
topic: String,
#[async_trait]
#[device_trait]
pub trait OnNotification {
async fn on_notification(&mut self, notification: Notification);
}
#[derive(Serialize_repr, Clone, Copy)]
#[derive(Debug)]
pub struct Ntfy {
base_url: String,
topic: String,
tx: event::Sender,
}
#[derive(Debug, Serialize_repr, Clone, Copy)]
#[repr(u8)]
pub enum Priority {
Min = 1,
@ -28,7 +40,7 @@ pub enum Priority {
Max,
}
#[derive(Serialize, Clone)]
#[derive(Debug, Serialize, Clone)]
#[serde(rename_all = "snake_case", tag = "action")]
pub enum ActionType {
Broadcast {
@ -39,7 +51,7 @@ pub enum ActionType {
// Http
}
#[derive(Serialize, Clone)]
#[derive(Debug, Serialize, Clone)]
pub struct Action {
#[serde(flatten)]
action: ActionType,
@ -54,7 +66,7 @@ struct NotificationFinal {
inner: Notification,
}
#[derive(Serialize, Clone)]
#[derive(Debug, Serialize, Clone)]
pub struct Notification {
#[serde(skip_serializing_if = "Option::is_none")]
title: Option<String>,
@ -119,10 +131,11 @@ impl Default for Notification {
}
impl Ntfy {
fn new(base_url: &str, topic: &str) -> Self {
pub fn new(config: NtfyConfig, event_channel: &EventChannel) -> Self {
Self {
base_url: base_url.to_owned(),
topic: topic.to_owned(),
base_url: config.url,
topic: config.topic,
tx: event_channel.get_tx(),
}
}
@ -148,45 +161,45 @@ impl Ntfy {
}
}
pub fn start(config: NtfyConfig, event_channel: &EventChannel) {
let mut rx = event_channel.get_rx();
let tx = event_channel.get_tx();
let ntfy = Ntfy::new(&config.url, &config.topic);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(Event::Presence(presence)) => {
// Setup extras for the broadcast
let extras = HashMap::from([
("cmd".into(), "presence".into()),
("state".into(), if presence { "0" } else { "1" }.into()),
]);
// Create broadcast action
let action = Action {
action: ActionType::Broadcast { extras },
label: if presence { "Set away" } else { "Set home" }.to_owned(),
clear: Some(true),
};
// Create the notification
let notification = Notification::new()
.set_title("Presence")
.set_message(if presence { "Home" } else { "Away" })
.add_tag("house")
.add_action(action)
.set_priority(Priority::Low);
if tx.send(Event::Ntfy(notification)).is_err() {
warn!("There are no receivers on the event channel");
}
}
Ok(Event::Ntfy(notification)) => ntfy.send(notification).await,
Ok(_) => {}
Err(_) => todo!("Handle errors with the event channel properly"),
}
}
});
impl Device for Ntfy {
fn get_id(&self) -> &str {
"ntfy"
}
}
#[async_trait]
impl OnPresence for Ntfy {
async fn on_presence(&mut self, presence: bool) {
// Setup extras for the broadcast
let extras = HashMap::from([
("cmd".into(), "presence".into()),
("state".into(), if presence { "0" } else { "1" }.into()),
]);
// Create broadcast action
let action = Action {
action: ActionType::Broadcast { extras },
label: if presence { "Set away" } else { "Set home" }.to_owned(),
clear: Some(true),
};
// Create the notification
let notification = Notification::new()
.set_title("Presence")
.set_message(if presence { "Home" } else { "Away" })
.add_tag("house")
.add_action(action)
.set_priority(Priority::Low);
if self.tx.send(Event::Ntfy(notification)).await.is_err() {
warn!("There are no receivers on the event channel");
}
}
}
#[async_trait]
impl OnNotification for Ntfy {
async fn on_notification(&mut self, notification: Notification) {
self.send(notification).await;
}
}

View File

@ -1,18 +1,15 @@
use std::collections::HashMap;
use async_trait::async_trait;
use rumqttc::{has_wildcards, matches, AsyncClient};
use rumqttc::Publish;
use serde::Deserialize;
use tracing::{debug, warn};
use crate::{
config::MqttDeviceConfig,
error::{MissingWildcard, PresenceError},
event::{
Event::{self, MqttMessage},
EventChannel,
},
mqtt::PresenceMessage,
devices::Device,
event::{self, Event, EventChannel},
mqtt::{OnMqtt, PresenceMessage},
};
#[async_trait]
@ -26,73 +23,78 @@ pub struct PresenceConfig {
pub mqtt: MqttDeviceConfig,
}
const DEFAULT: bool = false;
pub const DEFAULT: bool = false;
pub async fn start(
config: PresenceConfig,
event_channel: &EventChannel,
client: AsyncClient,
) -> Result<(), PresenceError> {
if !has_wildcards(&config.mqtt.topic) {
return Err(MissingWildcard::new(&config.mqtt.topic).into());
#[derive(Debug)]
pub struct Presence {
tx: event::Sender,
mqtt: MqttDeviceConfig,
devices: HashMap<String, bool>,
current_overall_presence: bool,
}
impl Presence {
pub fn new(config: PresenceConfig, event_channel: &EventChannel) -> Self {
Self {
tx: event_channel.get_tx(),
mqtt: config.mqtt,
devices: HashMap::new(),
current_overall_presence: DEFAULT,
}
}
}
impl Device for Presence {
fn get_id(&self) -> &str {
"presence"
}
}
#[async_trait]
impl OnMqtt for Presence {
fn topics(&self) -> Vec<&str> {
vec![&self.mqtt.topic]
}
// Subscribe to the relevant topics on mqtt
client
.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce)
.await?;
async fn on_mqtt(&mut self, message: Publish) {
let offset = self
.mqtt
.topic
.find('+')
.or(self.mqtt.topic.find('#'))
.expect("Presence::create fails if it does not contain wildcards");
let device_name = message.topic[offset..].to_owned();
let mut rx = event_channel.get_rx();
let tx = event_channel.get_tx();
let mut devices = HashMap::<String, bool>::new();
let mut current_overall_presence = DEFAULT;
tokio::spawn(async move {
loop {
// TODO: Handle errors, warn if lagging
if let Ok(MqttMessage(message)) = rx.recv().await {
if !matches(&message.topic, &config.mqtt.topic) {
continue;
if message.payload.is_empty() {
// Remove the device from the map
debug!("State of device [{device_name}] has been removed");
self.devices.remove(&device_name);
} else {
let present = match PresenceMessage::try_from(message) {
Ok(state) => state.present(),
Err(err) => {
warn!("Failed to parse message: {err}");
return;
}
};
let offset = config
.mqtt
.topic
.find('+')
.or(config.mqtt.topic.find('#'))
.expect("Presence::new fails if it does not contain wildcards");
let device_name = message.topic[offset..].to_owned();
debug!("State of device [{device_name}] has changed: {}", present);
self.devices.insert(device_name, present);
}
if message.payload.is_empty() {
// Remove the device from the map
debug!("State of device [{device_name}] has been removed");
devices.remove(&device_name);
} else {
let present = match PresenceMessage::try_from(message) {
Ok(state) => state.present(),
Err(err) => {
warn!("Failed to parse message: {err}");
continue;
}
};
let overall_presence = self.devices.iter().any(|(_, v)| *v);
if overall_presence != self.current_overall_presence {
debug!("Overall presence updated: {overall_presence}");
self.current_overall_presence = overall_presence;
debug!("State of device [{device_name}] has changed: {}", present);
devices.insert(device_name, present);
}
let overall_presence = devices.iter().any(|(_, v)| *v);
if overall_presence != current_overall_presence {
debug!("Overall presence updated: {overall_presence}");
current_overall_presence = overall_presence;
if tx.send(Event::Presence(overall_presence)).is_err() {
warn!("There are no receivers on the event channel");
}
}
if self
.tx
.send(Event::Presence(overall_presence))
.await
.is_err()
{
warn!("There are no receivers on the event channel");
}
}
});
Ok(())
}
}