Devices are now clonable

This commit is contained in:
Dreaded_X 2024-07-25 23:15:23 +02:00
parent 98ab265fed
commit 2cf4e40ad5
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
17 changed files with 338 additions and 259 deletions

View File

@ -12,35 +12,35 @@ traits! {
command_only_on_off: Option<bool>, command_only_on_off: Option<bool>,
query_only_on_off: Option<bool>, query_only_on_off: Option<bool>,
async fn on(&self) -> Result<bool, ErrorCode>, async fn on(&self) -> Result<bool, ErrorCode>,
"action.devices.commands.OnOff" => async fn set_on(&mut self, on: bool) -> Result<(), ErrorCode>, "action.devices.commands.OnOff" => async fn set_on(&self, on: bool) -> Result<(), ErrorCode>,
}, },
"action.devices.traits.Scene" => trait Scene { "action.devices.traits.Scene" => trait Scene {
scene_reversible: Option<bool>, scene_reversible: Option<bool>,
"action.devices.commands.ActivateScene" => async fn set_active(&mut self, deactivate: bool) -> Result<(), ErrorCode>, "action.devices.commands.ActivateScene" => async fn set_active(&self, deactivate: bool) -> Result<(), ErrorCode>,
}, },
"action.devices.traits.FanSpeed" => trait FanSpeed { "action.devices.traits.FanSpeed" => trait FanSpeed {
reversible: Option<bool>, reversible: Option<bool>,
command_only_fan_speed: Option<bool>, command_only_fan_speed: Option<bool>,
available_fan_speeds: AvailableSpeeds, available_fan_speeds: AvailableSpeeds,
fn current_fan_speed_setting(&self) -> Result<String, ErrorCode>, async fn current_fan_speed_setting(&self) -> Result<String, ErrorCode>,
// TODO: Figure out some syntax for optional command? // TODO: Figure out some syntax for optional command?
// Probably better to just force the user to always implement commands? // Probably better to just force the user to always implement commands?
"action.devices.commands.SetFanSpeed" => async fn set_fan_speed(&mut self, fan_speed: String) -> Result<(), ErrorCode>, "action.devices.commands.SetFanSpeed" => async fn set_fan_speed(&self, fan_speed: String) -> Result<(), ErrorCode>,
}, },
"action.devices.traits.HumiditySetting" => trait HumiditySetting { "action.devices.traits.HumiditySetting" => trait HumiditySetting {
query_only_humidity_setting: Option<bool>, query_only_humidity_setting: Option<bool>,
fn humidity_ambient_percent(&self) -> Result<isize, ErrorCode>, async fn humidity_ambient_percent(&self) -> Result<isize, ErrorCode>,
}, },
"action.devices.traits.TemperatureControl" => trait TemperatureSetting { "action.devices.traits.TemperatureControl" => trait TemperatureSetting {
query_only_temperature_control: Option<bool>, query_only_temperature_control: Option<bool>,
// TODO: Add rename // TODO: Add rename
temperatureUnitForUX: TemperatureUnit, temperatureUnitForUX: TemperatureUnit,
fn temperature_ambient_celsius(&self) -> f32, async fn temperature_ambient_celsius(&self) -> f32,
} }
} }

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use automation_macro::LuaDeviceConfig; use automation_macro::LuaDeviceConfig;
use google_home::device::Name; use google_home::device::Name;
@ -8,6 +10,7 @@ use google_home::traits::{
}; };
use google_home::types::Type; use google_home::types::Type;
use rumqttc::Publish; use rumqttc::Publish;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
use super::LuaDeviceCreate; use super::LuaDeviceCreate;
@ -18,7 +21,7 @@ use crate::messages::{AirFilterFanState, AirFilterState, SetAirFilterFanState};
use crate::mqtt::WrappedAsyncClient; use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct AirFilterConfig { pub struct Config {
#[device_config(flatten)] #[device_config(flatten)]
pub info: InfoConfig, pub info: InfoConfig,
#[device_config(flatten)] #[device_config(flatten)]
@ -27,11 +30,10 @@ pub struct AirFilterConfig {
pub client: WrappedAsyncClient, pub client: WrappedAsyncClient,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct AirFilter { pub struct AirFilter {
config: AirFilterConfig, config: Config,
state: Arc<RwLock<AirFilterState>>,
last_known_state: AirFilterState,
} }
impl AirFilter { impl AirFilter {
@ -52,11 +54,19 @@ impl AirFilter {
.map_err(|err| warn!("Failed to update state on {topic}: {err}")) .map_err(|err| warn!("Failed to update state on {topic}: {err}"))
.ok(); .ok();
} }
async fn state(&self) -> RwLockReadGuard<AirFilterState> {
self.state.read().await
}
async fn state_mut(&self) -> RwLockWriteGuard<AirFilterState> {
self.state.write().await
}
} }
#[async_trait] #[async_trait]
impl LuaDeviceCreate for AirFilter { impl LuaDeviceCreate for AirFilter {
type Config = AirFilterConfig; type Config = Config;
type Error = rumqttc::ClientError; type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -67,14 +77,14 @@ impl LuaDeviceCreate for AirFilter {
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?; .await?;
Ok(Self { let state = AirFilterState {
config,
last_known_state: AirFilterState {
state: AirFilterFanState::Off, state: AirFilterFanState::Off,
humidity: 0.0, humidity: 0.0,
temperature: 0.0, temperature: 0.0,
}, };
}) let state = Arc::new(RwLock::new(state));
Ok(Self { config, state })
} }
} }
@ -86,7 +96,7 @@ impl Device for AirFilter {
#[async_trait] #[async_trait]
impl OnMqtt for AirFilter { impl OnMqtt for AirFilter {
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return; return;
} }
@ -99,13 +109,13 @@ impl OnMqtt for AirFilter {
} }
}; };
if state == self.last_known_state { if state == *self.state().await {
return; return;
} }
debug!(id = Device::get_id(self), "Updating state to {state:?}"); debug!(id = Device::get_id(self), "Updating state to {state:?}");
self.last_known_state = state; *self.state_mut().await = state;
} }
} }
@ -138,10 +148,10 @@ impl google_home::Device for AirFilter {
#[async_trait] #[async_trait]
impl OnOff for AirFilter { impl OnOff for AirFilter {
async fn on(&self) -> Result<bool, ErrorCode> { async fn on(&self) -> Result<bool, ErrorCode> {
Ok(self.last_known_state.state != AirFilterFanState::Off) Ok(self.state().await.state != AirFilterFanState::Off)
} }
async fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { async fn set_on(&self, on: bool) -> Result<(), ErrorCode> {
debug!("Turning on air filter: {on}"); debug!("Turning on air filter: {on}");
if on { if on {
@ -192,8 +202,8 @@ impl FanSpeed for AirFilter {
} }
} }
fn current_fan_speed_setting(&self) -> Result<String, ErrorCode> { async fn current_fan_speed_setting(&self) -> Result<String, ErrorCode> {
let speed = match self.last_known_state.state { let speed = match self.state().await.state {
AirFilterFanState::Off => "off", AirFilterFanState::Off => "off",
AirFilterFanState::Low => "low", AirFilterFanState::Low => "low",
AirFilterFanState::Medium => "medium", AirFilterFanState::Medium => "medium",
@ -203,7 +213,7 @@ impl FanSpeed for AirFilter {
Ok(speed.into()) Ok(speed.into())
} }
async fn set_fan_speed(&mut self, fan_speed: String) -> Result<(), ErrorCode> { async fn set_fan_speed(&self, fan_speed: String) -> Result<(), ErrorCode> {
let fan_speed = fan_speed.as_str(); let fan_speed = fan_speed.as_str();
let state = if fan_speed == "off" { let state = if fan_speed == "off" {
AirFilterFanState::Off AirFilterFanState::Off
@ -229,11 +239,12 @@ impl HumiditySetting for AirFilter {
Some(true) Some(true)
} }
fn humidity_ambient_percent(&self) -> Result<isize, ErrorCode> { async fn humidity_ambient_percent(&self) -> Result<isize, ErrorCode> {
Ok(self.last_known_state.humidity.round() as isize) Ok(self.state().await.humidity.round() as isize)
} }
} }
#[async_trait]
impl TemperatureSetting for AirFilter { impl TemperatureSetting for AirFilter {
fn query_only_temperature_control(&self) -> Option<bool> { fn query_only_temperature_control(&self) -> Option<bool> {
Some(true) Some(true)
@ -244,8 +255,8 @@ impl TemperatureSetting for AirFilter {
TemperatureUnit::Celsius TemperatureUnit::Celsius
} }
fn temperature_ambient_celsius(&self) -> f32 { async fn temperature_ambient_celsius(&self) -> f32 {
// HACK: Round to one decimal place // HACK: Round to one decimal place
(10.0 * self.last_known_state.temperature).round() / 10.0 (10.0 * self.state().await.temperature).round() / 10.0
} }
} }

View File

@ -12,7 +12,7 @@ use crate::messages::{RemoteAction, RemoteMessage};
use crate::mqtt::WrappedAsyncClient; use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct AudioSetupConfig { pub struct Config {
pub identifier: String, pub identifier: String,
#[device_config(flatten)] #[device_config(flatten)]
pub mqtt: MqttDeviceConfig, pub mqtt: MqttDeviceConfig,
@ -24,14 +24,14 @@ pub struct AudioSetupConfig {
pub client: WrappedAsyncClient, pub client: WrappedAsyncClient,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct AudioSetup { pub struct AudioSetup {
config: AudioSetupConfig, config: Config,
} }
#[async_trait] #[async_trait]
impl LuaDeviceCreate for AudioSetup { impl LuaDeviceCreate for AudioSetup {
type Config = AudioSetupConfig; type Config = Config;
type Error = DeviceConfigError; type Error = DeviceConfigError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -68,7 +68,7 @@ impl Device for AudioSetup {
#[async_trait] #[async_trait]
impl OnMqtt for AudioSetup { impl OnMqtt for AudioSetup {
async fn on_mqtt(&mut self, message: rumqttc::Publish) { async fn on_mqtt(&self, message: rumqttc::Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return; return;
} }
@ -76,10 +76,7 @@ impl OnMqtt for AudioSetup {
let action = match RemoteMessage::try_from(message) { let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(), Ok(message) => message.action(),
Err(err) => { Err(err) => {
error!( error!(id = self.get_id(), "Failed to parse message: {err}");
id = self.config.identifier,
"Failed to parse message: {err}"
);
return; return;
} }
}; };
@ -118,7 +115,7 @@ impl OnMqtt for AudioSetup {
#[async_trait] #[async_trait]
impl OnPresence for AudioSetup { impl OnPresence for AudioSetup {
async fn on_presence(&mut self, presence: bool) { async fn on_presence(&self, presence: bool) {
let mut mixer = self.config.mixer.write().await; let mut mixer = self.config.mixer.write().await;
let mut speakers = self.config.speakers.write().await; let mut speakers = self.config.speakers.write().await;
@ -128,7 +125,7 @@ impl OnPresence for AudioSetup {
) { ) {
// Turn off the audio setup when we leave the house // Turn off the audio setup when we leave the house
if !presence { if !presence {
debug!(id = self.config.identifier, "Turning devices off"); debug!(id = self.get_id(), "Turning devices off");
speakers.set_on(false).await.unwrap(); speakers.set_on(false).await.unwrap();
mixer.set_on(false).await.unwrap(); mixer.set_on(false).await.unwrap();
} }

View File

@ -1,9 +1,10 @@
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use automation_macro::LuaDeviceConfig; use automation_macro::LuaDeviceConfig;
use google_home::traits::OnOff; use google_home::traits::OnOff;
use mlua::FromLua; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
@ -26,31 +27,16 @@ pub struct PresenceDeviceConfig {
pub timeout: Duration, pub timeout: Duration,
} }
#[derive(Debug, Clone)]
struct TriggerDevicesHelper(Vec<WrappedDevice>);
impl<'lua> FromLua<'lua> for TriggerDevicesHelper {
fn from_lua(value: mlua::Value<'lua>, lua: &'lua mlua::Lua) -> mlua::Result<Self> {
Ok(TriggerDevicesHelper(mlua::FromLua::from_lua(value, lua)?))
}
}
impl From<TriggerDevicesHelper> for Vec<(WrappedDevice, bool)> {
fn from(value: TriggerDevicesHelper) -> Self {
value.0.into_iter().map(|device| (device, false)).collect()
}
}
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct TriggerConfig { pub struct TriggerConfig {
#[device_config(from_lua, from(TriggerDevicesHelper))] #[device_config(from_lua)]
pub devices: Vec<(WrappedDevice, bool)>, pub devices: Vec<WrappedDevice>,
#[device_config(default, with(|t: Option<_>| t.map(Duration::from_secs)))] #[device_config(default, with(|t: Option<_>| t.map(Duration::from_secs)))]
pub timeout: Option<Duration>, pub timeout: Option<Duration>,
} }
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct ContactSensorConfig { pub struct Config {
pub identifier: String, pub identifier: String,
#[device_config(flatten)] #[device_config(flatten)]
pub mqtt: MqttDeviceConfig, pub mqtt: MqttDeviceConfig,
@ -63,25 +49,41 @@ pub struct ContactSensorConfig {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct ContactSensor { struct State {
config: ContactSensorConfig,
overall_presence: bool, overall_presence: bool,
is_closed: bool, is_closed: bool,
previous: Vec<bool>,
handle: Option<JoinHandle<()>>, handle: Option<JoinHandle<()>>,
} }
#[derive(Debug, Clone)]
pub struct ContactSensor {
config: Config,
state: Arc<RwLock<State>>,
}
impl ContactSensor {
async fn state(&self) -> RwLockReadGuard<State> {
self.state.read().await
}
async fn state_mut(&self) -> RwLockWriteGuard<State> {
self.state.write().await
}
}
#[async_trait] #[async_trait]
impl LuaDeviceCreate for ContactSensor { impl LuaDeviceCreate for ContactSensor {
type Config = ContactSensorConfig; type Config = Config;
type Error = DeviceConfigError; type Error = DeviceConfigError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.identifier, "Setting up ContactSensor"); trace!(id = config.identifier, "Setting up ContactSensor");
let mut previous = Vec::new();
// Make sure the devices implement the required traits // Make sure the devices implement the required traits
if let Some(trigger) = &config.trigger { if let Some(trigger) = &config.trigger {
for (device, _) in &trigger.devices { for device in &trigger.devices {
{ {
let device = device.read().await; let device = device.read().await;
let id = device.get_id().to_owned(); let id = device.get_id().to_owned();
@ -96,6 +98,7 @@ impl LuaDeviceCreate for ContactSensor {
} }
} }
} }
previous.resize(trigger.devices.len(), false);
} }
config config
@ -103,12 +106,15 @@ impl LuaDeviceCreate for ContactSensor {
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?; .await?;
Ok(Self { let state = State {
config: config.clone(),
overall_presence: DEFAULT_PRESENCE, overall_presence: DEFAULT_PRESENCE,
is_closed: true, is_closed: true,
previous,
handle: None, handle: None,
}) };
let state = Arc::new(RwLock::new(state));
Ok(Self { config, state })
} }
} }
@ -120,14 +126,14 @@ impl Device for ContactSensor {
#[async_trait] #[async_trait]
impl OnPresence for ContactSensor { impl OnPresence for ContactSensor {
async fn on_presence(&mut self, presence: bool) { async fn on_presence(&self, presence: bool) {
self.overall_presence = presence; self.state_mut().await.overall_presence = presence;
} }
} }
#[async_trait] #[async_trait]
impl OnMqtt for ContactSensor { impl OnMqtt for ContactSensor {
async fn on_mqtt(&mut self, message: rumqttc::Publish) { async fn on_mqtt(&self, message: rumqttc::Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return; return;
} }
@ -135,24 +141,25 @@ impl OnMqtt for ContactSensor {
let is_closed = match ContactMessage::try_from(message) { let is_closed = match ContactMessage::try_from(message) {
Ok(state) => state.is_closed(), Ok(state) => state.is_closed(),
Err(err) => { Err(err) => {
error!( error!(id = self.get_id(), "Failed to parse message: {err}");
id = self.config.identifier,
"Failed to parse message: {err}"
);
return; return;
} }
}; };
if is_closed == self.is_closed { if is_closed == self.state().await.is_closed {
return; return;
} }
debug!(id = self.config.identifier, "Updating state to {is_closed}"); debug!(id = self.get_id(), "Updating state to {is_closed}");
self.is_closed = is_closed; self.state_mut().await.is_closed = is_closed;
if let Some(trigger) = &mut self.config.trigger { if let Some(trigger) = &self.config.trigger {
if !self.is_closed { if !is_closed {
for (light, previous) in &mut trigger.devices { for (light, previous) in trigger
.devices
.iter()
.zip(self.state_mut().await.previous.iter_mut())
{
let mut light = light.write().await; let mut light = light.write().await;
if let Some(light) = light.as_mut().cast_mut() as Option<&mut dyn OnOff> { if let Some(light) = light.as_mut().cast_mut() as Option<&mut dyn OnOff> {
*previous = light.on().await.unwrap(); *previous = light.on().await.unwrap();
@ -160,7 +167,11 @@ impl OnMqtt for ContactSensor {
} }
} }
} else { } else {
for (light, previous) in &trigger.devices { for (light, previous) in trigger
.devices
.iter()
.zip(self.state_mut().await.previous.iter())
{
let mut light = light.write().await; let mut light = light.write().await;
if !previous { if !previous {
// If the timeout is zero just turn the light off directly // If the timeout is zero just turn the light off directly
@ -183,20 +194,20 @@ impl OnMqtt for ContactSensor {
// Check if this contact sensor works as a presence device // Check if this contact sensor works as a presence device
// If not we are done here // If not we are done here
let presence = match &self.config.presence { let presence = match &self.config.presence {
Some(presence) => presence, Some(presence) => presence.clone(),
None => return, None => return,
}; };
if !is_closed { if !is_closed {
// Activate presence and stop any timeout once we open the door // Activate presence and stop any timeout once we open the door
if let Some(handle) = self.handle.take() { if let Some(handle) = self.state_mut().await.handle.take() {
handle.abort(); handle.abort();
} }
// Only use the door as an presence sensor if there the current presence is set false // Only use the door as an presence sensor if there the current presence is set false
// This is to prevent the house from being marked as present for however long the // This is to prevent the house from being marked as present for however long the
// timeout is set when leaving the house // timeout is set when leaving the house
if !self.overall_presence { if !self.state().await.overall_presence {
self.config self.config
.client .client
.publish( .publish(
@ -216,18 +227,25 @@ impl OnMqtt for ContactSensor {
} }
} else { } else {
// Once the door is closed again we start a timeout for removing the presence // Once the door is closed again we start a timeout for removing the presence
let client = self.config.client.clone(); let device = self.clone();
let id = self.config.identifier.clone(); self.state_mut().await.handle = Some(tokio::spawn(async move {
let timeout = presence.timeout; debug!(
let topic = presence.mqtt.topic.clone(); id = device.get_id(),
self.handle = Some(tokio::spawn(async move { "Starting timeout ({:?}) for contact sensor...", presence.timeout
debug!(id, "Starting timeout ({timeout:?}) for contact sensor..."); );
tokio::time::sleep(timeout).await; tokio::time::sleep(presence.timeout).await;
debug!(id, "Removing door device!"); debug!(id = device.get_id(), "Removing door device!");
client device
.publish(&topic, rumqttc::QoS::AtLeastOnce, false, "") .config
.client
.publish(&presence.mqtt.topic, rumqttc::QoS::AtLeastOnce, false, "")
.await .await
.map_err(|err| warn!("Failed to publish presence on {topic}: {err}")) .map_err(|err| {
warn!(
"Failed to publish presence on {}: {err}",
presence.mqtt.topic
)
})
.ok(); .ok();
})); }));
} }

View File

@ -12,7 +12,7 @@ use crate::messages::{DarknessMessage, PresenceMessage};
use crate::mqtt::WrappedAsyncClient; use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, LuaDeviceConfig, Clone)] #[derive(Debug, LuaDeviceConfig, Clone)]
pub struct DebugBridgeConfig { pub struct Config {
pub identifier: String, pub identifier: String,
#[device_config(flatten)] #[device_config(flatten)]
pub mqtt: MqttDeviceConfig, pub mqtt: MqttDeviceConfig,
@ -20,14 +20,14 @@ pub struct DebugBridgeConfig {
pub client: WrappedAsyncClient, pub client: WrappedAsyncClient,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct DebugBridge { pub struct DebugBridge {
config: DebugBridgeConfig, config: Config,
} }
#[async_trait] #[async_trait]
impl LuaDeviceCreate for DebugBridge { impl LuaDeviceCreate for DebugBridge {
type Config = DebugBridgeConfig; type Config = Config;
type Error = Infallible; type Error = Infallible;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -44,7 +44,7 @@ impl Device for DebugBridge {
#[async_trait] #[async_trait]
impl OnPresence for DebugBridge { impl OnPresence for DebugBridge {
async fn on_presence(&mut self, presence: bool) { async fn on_presence(&self, presence: bool) {
let message = PresenceMessage::new(presence); let message = PresenceMessage::new(presence);
let topic = format!("{}/presence", self.config.mqtt.topic); let topic = format!("{}/presence", self.config.mqtt.topic);
self.config self.config
@ -68,7 +68,7 @@ impl OnPresence for DebugBridge {
#[async_trait] #[async_trait]
impl OnDarkness for DebugBridge { impl OnDarkness for DebugBridge {
async fn on_darkness(&mut self, dark: bool) { async fn on_darkness(&self, dark: bool) {
let message = DarknessMessage::new(dark); let message = DarknessMessage::new(dark);
let topic = format!("{}/darkness", self.config.mqtt.topic); let topic = format!("{}/darkness", self.config.mqtt.topic);
self.config self.config

View File

@ -23,7 +23,7 @@ pub struct FlagIDs {
} }
#[derive(Debug, LuaDeviceConfig, Clone)] #[derive(Debug, LuaDeviceConfig, Clone)]
pub struct HueBridgeConfig { pub struct Config {
pub identifier: String, pub identifier: String,
#[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 80)))] #[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 80)))]
pub addr: SocketAddr, pub addr: SocketAddr,
@ -31,9 +31,9 @@ pub struct HueBridgeConfig {
pub flags: FlagIDs, pub flags: FlagIDs,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct HueBridge { pub struct HueBridge {
config: HueBridgeConfig, config: Config,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -43,7 +43,7 @@ struct FlagMessage {
#[async_trait] #[async_trait]
impl LuaDeviceCreate for HueBridge { impl LuaDeviceCreate for HueBridge {
type Config = HueBridgeConfig; type Config = Config;
type Error = Infallible; type Error = Infallible;
async fn create(config: Self::Config) -> Result<Self, Infallible> { async fn create(config: Self::Config) -> Result<Self, Infallible> {
@ -93,7 +93,7 @@ impl Device for HueBridge {
#[async_trait] #[async_trait]
impl OnPresence for HueBridge { impl OnPresence for HueBridge {
async fn on_presence(&mut self, presence: bool) { async fn on_presence(&self, presence: bool) {
trace!("Bridging presence to hue"); trace!("Bridging presence to hue");
self.set_flag(Flag::Presence, presence).await; self.set_flag(Flag::Presence, presence).await;
} }
@ -101,7 +101,7 @@ impl OnPresence for HueBridge {
#[async_trait] #[async_trait]
impl OnDarkness for HueBridge { impl OnDarkness for HueBridge {
async fn on_darkness(&mut self, dark: bool) { async fn on_darkness(&self, dark: bool) {
trace!("Bridging darkness to hue"); trace!("Bridging darkness to hue");
self.set_flag(Flag::Darkness, dark).await; self.set_flag(Flag::Darkness, dark).await;
} }

View File

@ -17,7 +17,7 @@ use crate::mqtt::WrappedAsyncClient;
use crate::traits::Timeout; use crate::traits::Timeout;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct HueGroupConfig { pub struct Config {
pub identifier: String, pub identifier: String,
#[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 80)))] #[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 80)))]
pub addr: SocketAddr, pub addr: SocketAddr,
@ -31,15 +31,15 @@ pub struct HueGroupConfig {
pub client: WrappedAsyncClient, pub client: WrappedAsyncClient,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct HueGroup { pub struct HueGroup {
config: HueGroupConfig, config: Config,
} }
// Couple of helper function to get the correct urls // Couple of helper function to get the correct urls
#[async_trait] #[async_trait]
impl LuaDeviceCreate for HueGroup { impl LuaDeviceCreate for HueGroup {
type Config = HueGroupConfig; type Config = Config;
type Error = rumqttc::ClientError; type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -85,7 +85,7 @@ impl Device for HueGroup {
#[async_trait] #[async_trait]
impl OnMqtt for HueGroup { impl OnMqtt for HueGroup {
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&self, message: Publish) {
if !self if !self
.config .config
.remotes .remotes
@ -98,10 +98,7 @@ impl OnMqtt for HueGroup {
let action = match RemoteMessage::try_from(message) { let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(), Ok(message) => message.action(),
Err(err) => { Err(err) => {
error!( error!(id = self.get_id(), "Failed to parse message: {err}");
id = self.config.identifier,
"Failed to parse message: {err}"
);
return; return;
} }
}; };
@ -120,7 +117,7 @@ impl OnMqtt for HueGroup {
#[async_trait] #[async_trait]
impl OnOff for HueGroup { impl OnOff for HueGroup {
async fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { async fn set_on(&self, on: bool) -> Result<(), ErrorCode> {
// Abort any timer that is currently running // Abort any timer that is currently running
self.stop_timeout().await.unwrap(); self.stop_timeout().await.unwrap();
@ -140,13 +137,10 @@ impl OnOff for HueGroup {
Ok(res) => { Ok(res) => {
let status = res.status(); let status = res.status();
if !status.is_success() { if !status.is_success() {
warn!( warn!(id = self.get_id(), "Status code is not success: {status}");
id = self.config.identifier,
"Status code is not success: {status}"
);
} }
} }
Err(err) => error!(id = self.config.identifier, "Error: {err}"), Err(err) => error!(id = self.get_id(), "Error: {err}"),
} }
Ok(()) Ok(())
@ -162,19 +156,13 @@ impl OnOff for HueGroup {
Ok(res) => { Ok(res) => {
let status = res.status(); let status = res.status();
if !status.is_success() { if !status.is_success() {
warn!( warn!(id = self.get_id(), "Status code is not success: {status}");
id = self.config.identifier,
"Status code is not success: {status}"
);
} }
let on = match res.json::<message::Info>().await { let on = match res.json::<message::Info>().await {
Ok(info) => info.any_on(), Ok(info) => info.any_on(),
Err(err) => { Err(err) => {
error!( error!(id = self.get_id(), "Failed to parse message: {err}");
id = self.config.identifier,
"Failed to parse message: {err}"
);
// TODO: Error code // TODO: Error code
return Ok(false); return Ok(false);
} }
@ -182,7 +170,7 @@ impl OnOff for HueGroup {
return Ok(on); return Ok(on);
} }
Err(err) => error!(id = self.config.identifier, "Error: {err}"), Err(err) => error!(id = self.get_id(), "Error: {err}"),
} }
Ok(false) Ok(false)
@ -191,7 +179,7 @@ impl OnOff for HueGroup {
#[async_trait] #[async_trait]
impl Timeout for HueGroup { impl Timeout for HueGroup {
async fn start_timeout(&mut self, timeout: Duration) -> Result<()> { async fn start_timeout(&self, timeout: Duration) -> Result<()> {
// Abort any timer that is currently running // Abort any timer that is currently running
self.stop_timeout().await?; self.stop_timeout().await?;
@ -214,7 +202,7 @@ impl Timeout for HueGroup {
Ok(()) Ok(())
} }
async fn stop_timeout(&mut self) -> Result<()> { async fn stop_timeout(&self) -> Result<()> {
let message = message::Timeout::new(None); let message = message::Timeout::new(None);
let res = reqwest::Client::new() let res = reqwest::Client::new()
.put(self.url_set_schedule()) .put(self.url_set_schedule())

View File

@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::Result; use anyhow::Result;
@ -9,6 +10,7 @@ use google_home::traits::{self, OnOff};
use google_home::types::Type; use google_home::types::Type;
use rumqttc::{matches, Publish, SubscribeFilter}; use rumqttc::{matches, Publish, SubscribeFilter};
use serde::Deserialize; use serde::Deserialize;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
@ -29,7 +31,7 @@ pub enum OutletType {
} }
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct IkeaOutletConfig { pub struct Config {
#[device_config(flatten)] #[device_config(flatten)]
pub info: InfoConfig, pub info: InfoConfig,
#[device_config(flatten)] #[device_config(flatten)]
@ -46,33 +48,31 @@ pub struct IkeaOutletConfig {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct IkeaOutlet { pub struct State {
config: IkeaOutletConfig,
last_known_state: bool, last_known_state: bool,
handle: Option<JoinHandle<()>>, handle: Option<JoinHandle<()>>,
} }
async fn set_on(client: WrappedAsyncClient, topic: &str, on: bool) { #[derive(Debug, Clone)]
let message = OnOffMessage::new(on); pub struct IkeaOutlet {
config: Config,
let topic = format!("{}/set", topic); state: Arc<RwLock<State>>,
// TODO: Handle potential errors here }
client
.publish( impl IkeaOutlet {
&topic, async fn state(&self) -> RwLockReadGuard<State> {
rumqttc::QoS::AtLeastOnce, self.state.read().await
false, }
serde_json::to_string(&message).unwrap(),
) async fn state_mut(&self) -> RwLockWriteGuard<State> {
.await self.state.write().await
.map_err(|err| warn!("Failed to update state on {topic}: {err}")) }
.ok();
} }
#[async_trait] #[async_trait]
impl LuaDeviceCreate for IkeaOutlet { impl LuaDeviceCreate for IkeaOutlet {
type Config = IkeaOutletConfig; type Config = Config;
type Error = rumqttc::ClientError; type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -93,11 +93,13 @@ impl LuaDeviceCreate for IkeaOutlet {
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?; .await?;
Ok(Self { let state = State {
config,
last_known_state: false, last_known_state: false,
handle: None, handle: None,
}) };
let state = Arc::new(RwLock::new(state));
Ok(Self { config, state })
} }
} }
@ -109,7 +111,7 @@ impl Device for IkeaOutlet {
#[async_trait] #[async_trait]
impl OnMqtt for IkeaOutlet { impl OnMqtt for IkeaOutlet {
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&self, message: Publish) {
// Check if the message is from the deviec itself or from a remote // Check if the message is from the deviec itself or from a remote
if matches(&message.topic, &self.config.mqtt.topic) { if matches(&message.topic, &self.config.mqtt.topic) {
// Update the internal state based on what the device has reported // Update the internal state based on what the device has reported
@ -122,7 +124,7 @@ impl OnMqtt for IkeaOutlet {
}; };
// No need to do anything if the state has not changed // No need to do anything if the state has not changed
if state == self.last_known_state { if state == self.state().await.last_known_state {
return; return;
} }
@ -130,7 +132,7 @@ impl OnMqtt for IkeaOutlet {
self.stop_timeout().await.unwrap(); self.stop_timeout().await.unwrap();
debug!(id = Device::get_id(self), "Updating state to {state}"); debug!(id = Device::get_id(self), "Updating state to {state}");
self.last_known_state = state; self.state_mut().await.last_known_state = state;
// If this is a kettle start a timeout for turning it of again // If this is a kettle start a timeout for turning it of again
if state && let Some(timeout) = self.config.timeout { if state && let Some(timeout) = self.config.timeout {
@ -162,7 +164,7 @@ impl OnMqtt for IkeaOutlet {
#[async_trait] #[async_trait]
impl OnPresence for IkeaOutlet { impl OnPresence for IkeaOutlet {
async fn on_presence(&mut self, presence: bool) { async fn on_presence(&self, presence: bool) {
// Turn off the outlet when we leave the house (Not if it is a battery charger) // Turn off the outlet when we leave the house (Not if it is a battery charger)
if !presence && self.config.outlet_type != OutletType::Charger { if !presence && self.config.outlet_type != OutletType::Charger {
debug!(id = Device::get_id(self), "Turning device off"); debug!(id = Device::get_id(self), "Turning device off");
@ -206,11 +208,25 @@ impl google_home::Device for IkeaOutlet {
#[async_trait] #[async_trait]
impl traits::OnOff for IkeaOutlet { impl traits::OnOff for IkeaOutlet {
async fn on(&self) -> Result<bool, ErrorCode> { async fn on(&self) -> Result<bool, ErrorCode> {
Ok(self.last_known_state) Ok(self.state().await.last_known_state)
} }
async fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { async fn set_on(&self, on: bool) -> Result<(), ErrorCode> {
set_on(self.config.client.clone(), &self.config.mqtt.topic, on).await; let message = OnOffMessage::new(on);
let topic = format!("{}/set", self.config.mqtt.topic);
// TODO: Handle potential errors here
self.config
.client
.publish(
&topic,
rumqttc::QoS::AtLeastOnce,
false,
serde_json::to_string(&message).unwrap(),
)
.await
.map_err(|err| warn!("Failed to update state on {topic}: {err}"))
.ok();
Ok(()) Ok(())
} }
@ -218,31 +234,23 @@ impl traits::OnOff for IkeaOutlet {
#[async_trait] #[async_trait]
impl crate::traits::Timeout for IkeaOutlet { impl crate::traits::Timeout for IkeaOutlet {
async fn start_timeout(&mut self, timeout: Duration) -> Result<()> { async fn start_timeout(&self, timeout: Duration) -> Result<()> {
// Abort any timer that is currently running // Abort any timer that is currently running
self.stop_timeout().await?; self.stop_timeout().await?;
// Turn the kettle of after the specified timeout let device = self.clone();
// TODO: Impl Drop for IkeaOutlet that will abort the handle if the IkeaOutlet self.state_mut().await.handle = Some(tokio::spawn(async move {
// get dropped debug!(id = device.get_id(), "Starting timeout ({timeout:?})...");
let client = self.config.client.clone();
let topic = self.config.mqtt.topic.clone();
let id = Device::get_id(self).clone();
self.handle = Some(tokio::spawn(async move {
debug!(id, "Starting timeout ({timeout:?})...");
tokio::time::sleep(timeout).await; tokio::time::sleep(timeout).await;
debug!(id, "Turning outlet off!"); debug!(id = device.get_id(), "Turning outlet off!");
// TODO: Idealy we would call self.set_on(false), however since we want to do device.set_on(false).await.unwrap();
// it after a timeout we have to put it in a separate task.
// I don't think we can really get around calling outside function
set_on(client, &topic, false).await;
})); }));
Ok(()) Ok(())
} }
async fn stop_timeout(&mut self) -> Result<()> { async fn stop_timeout(&self) -> Result<()> {
if let Some(handle) = self.handle.take() { if let Some(handle) = self.state_mut().await.handle.take() {
handle.abort(); handle.abort();
} }

View File

@ -16,20 +16,20 @@ use tracing::trace;
use super::{Device, LuaDeviceCreate}; use super::{Device, LuaDeviceCreate};
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct KasaOutletConfig { pub struct Config {
pub identifier: String, pub identifier: String,
#[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 9999)))] #[device_config(rename("ip"), with(|ip| SocketAddr::new(ip, 9999)))]
pub addr: SocketAddr, pub addr: SocketAddr,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct KasaOutlet { pub struct KasaOutlet {
config: KasaOutletConfig, config: Config,
} }
#[async_trait] #[async_trait]
impl LuaDeviceCreate for KasaOutlet { impl LuaDeviceCreate for KasaOutlet {
type Config = KasaOutletConfig; type Config = Config;
type Error = Infallible; type Error = Infallible;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -241,7 +241,7 @@ impl traits::OnOff for KasaOutlet {
.or(Err(DeviceError::TransientError.into())) .or(Err(DeviceError::TransientError.into()))
} }
async fn set_on(&mut self, on: bool) -> Result<(), errors::ErrorCode> { async fn set_on(&self, on: bool) -> Result<(), errors::ErrorCode> {
let mut stream = TcpStream::connect(self.config.addr) let mut stream = TcpStream::connect(self.config.addr)
.await .await
.or::<DeviceError>(Err(DeviceError::DeviceOffline))?; .or::<DeviceError>(Err(DeviceError::DeviceOffline))?;

View File

@ -1,6 +1,9 @@
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use automation_macro::LuaDeviceConfig; use automation_macro::LuaDeviceConfig;
use rumqttc::Publish; use rumqttc::Publish;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
use super::LuaDeviceCreate; use super::LuaDeviceCreate;
@ -11,7 +14,7 @@ use crate::messages::BrightnessMessage;
use crate::mqtt::WrappedAsyncClient; use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct LightSensorConfig { pub struct Config {
pub identifier: String, pub identifier: String,
#[device_config(flatten)] #[device_config(flatten)]
pub mqtt: MqttDeviceConfig, pub mqtt: MqttDeviceConfig,
@ -26,15 +29,29 @@ pub struct LightSensorConfig {
const DEFAULT: bool = false; const DEFAULT: bool = false;
#[derive(Debug)] #[derive(Debug)]
pub struct LightSensor { pub struct State {
config: LightSensorConfig,
is_dark: bool, is_dark: bool,
} }
#[derive(Debug, Clone)]
pub struct LightSensor {
config: Config,
state: Arc<RwLock<State>>,
}
impl LightSensor {
async fn state(&self) -> RwLockReadGuard<State> {
self.state.read().await
}
async fn state_mut(&self) -> RwLockWriteGuard<State> {
self.state.write().await
}
}
#[async_trait] #[async_trait]
impl LuaDeviceCreate for LightSensor { impl LuaDeviceCreate for LightSensor {
type Config = LightSensorConfig; type Config = Config;
type Error = rumqttc::ClientError; type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -45,10 +62,10 @@ impl LuaDeviceCreate for LightSensor {
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?; .await?;
Ok(Self { let state = State { is_dark: DEFAULT };
config, let state = Arc::new(RwLock::new(state));
is_dark: DEFAULT,
}) Ok(Self { config, state })
} }
} }
@ -60,7 +77,7 @@ impl Device for LightSensor {
#[async_trait] #[async_trait]
impl OnMqtt for LightSensor { impl OnMqtt for LightSensor {
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return; return;
} }
@ -81,18 +98,19 @@ impl OnMqtt for LightSensor {
trace!("It is light"); trace!("It is light");
false false
} else { } else {
let is_dark = self.state().await.is_dark;
trace!( trace!(
"In between min ({}) and max ({}) value, keeping current state: {}", "In between min ({}) and max ({}) value, keeping current state: {}",
self.config.min, self.config.min,
self.config.max, self.config.max,
self.is_dark is_dark
); );
self.is_dark is_dark
}; };
if is_dark != self.is_dark { if is_dark != self.state().await.is_dark {
debug!("Dark state has changed: {is_dark}"); debug!("Dark state has changed: {is_dark}");
self.is_dark = is_dark; self.state_mut().await.is_dark = is_dark;
if self.config.tx.send(Event::Darkness(is_dark)).await.is_err() { if self.config.tx.send(Event::Darkness(is_dark)).await.is_err() {
warn!("There are no receivers on the event channel"); warn!("There are no receivers on the event channel");

View File

@ -18,19 +18,19 @@ use async_trait::async_trait;
use automation_cast::Cast; use automation_cast::Cast;
use google_home::traits::OnOff; use google_home::traits::OnOff;
pub use self::air_filter::*; pub use self::air_filter::AirFilter;
pub use self::audio_setup::*; pub use self::audio_setup::AudioSetup;
pub use self::contact_sensor::*; pub use self::contact_sensor::ContactSensor;
pub use self::debug_bridge::*; pub use self::debug_bridge::DebugBridge;
pub use self::hue_bridge::*; pub use self::hue_bridge::HueBridge;
pub use self::hue_group::*; pub use self::hue_group::HueGroup;
pub use self::ikea_outlet::*; pub use self::ikea_outlet::IkeaOutlet;
pub use self::kasa_outlet::*; pub use self::kasa_outlet::KasaOutlet;
pub use self::light_sensor::*; pub use self::light_sensor::LightSensor;
pub use self::ntfy::{Notification, Ntfy}; pub use self::ntfy::{Notification, Ntfy};
pub use self::presence::{Presence, PresenceConfig, DEFAULT_PRESENCE}; pub use self::presence::{Presence, DEFAULT_PRESENCE};
pub use self::wake_on_lan::*; pub use self::wake_on_lan::WakeOnLAN;
pub use self::washer::*; pub use self::washer::Washer;
use crate::event::{OnDarkness, OnMqtt, OnNotification, OnPresence}; use crate::event::{OnDarkness, OnMqtt, OnNotification, OnPresence};
use crate::traits::Timeout; use crate::traits::Timeout;

View File

@ -111,8 +111,8 @@ impl Default for Notification {
} }
} }
#[derive(Debug, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct NtfyConfig { pub struct Config {
#[device_config(default("https://ntfy.sh".into()))] #[device_config(default("https://ntfy.sh".into()))]
pub url: String, pub url: String,
pub topic: String, pub topic: String,
@ -120,14 +120,14 @@ pub struct NtfyConfig {
pub tx: event::Sender, pub tx: event::Sender,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Ntfy { pub struct Ntfy {
config: NtfyConfig, config: Config,
} }
#[async_trait] #[async_trait]
impl LuaDeviceCreate for Ntfy { impl LuaDeviceCreate for Ntfy {
type Config = NtfyConfig; type Config = Config;
type Error = Infallible; type Error = Infallible;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -166,7 +166,7 @@ impl Ntfy {
#[async_trait] #[async_trait]
impl OnPresence for Ntfy { impl OnPresence for Ntfy {
async fn on_presence(&mut self, presence: bool) { async fn on_presence(&self, presence: bool) {
// Setup extras for the broadcast // Setup extras for the broadcast
let extras = HashMap::from([ let extras = HashMap::from([
("cmd".into(), "presence".into()), ("cmd".into(), "presence".into()),
@ -202,7 +202,7 @@ impl OnPresence for Ntfy {
#[async_trait] #[async_trait]
impl OnNotification for Ntfy { impl OnNotification for Ntfy {
async fn on_notification(&mut self, notification: Notification) { async fn on_notification(&self, notification: Notification) {
self.send(notification).await; self.send(notification).await;
} }
} }

View File

@ -1,8 +1,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use automation_macro::LuaDeviceConfig; use automation_macro::LuaDeviceConfig;
use rumqttc::Publish; use rumqttc::Publish;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
use super::LuaDeviceCreate; use super::LuaDeviceCreate;
@ -12,8 +14,8 @@ use crate::event::{self, Event, EventChannel, OnMqtt};
use crate::messages::PresenceMessage; use crate::messages::PresenceMessage;
use crate::mqtt::WrappedAsyncClient; use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct PresenceConfig { pub struct Config {
#[device_config(flatten)] #[device_config(flatten)]
pub mqtt: MqttDeviceConfig, pub mqtt: MqttDeviceConfig,
#[device_config(from_lua, rename("event_channel"), with(|ec: EventChannel| ec.get_tx()))] #[device_config(from_lua, rename("event_channel"), with(|ec: EventChannel| ec.get_tx()))]
@ -25,30 +27,47 @@ pub struct PresenceConfig {
pub const DEFAULT_PRESENCE: bool = false; pub const DEFAULT_PRESENCE: bool = false;
#[derive(Debug)] #[derive(Debug)]
pub struct Presence { pub struct State {
config: PresenceConfig,
devices: HashMap<String, bool>, devices: HashMap<String, bool>,
current_overall_presence: bool, current_overall_presence: bool,
} }
#[derive(Debug, Clone)]
pub struct Presence {
config: Config,
state: Arc<RwLock<State>>,
}
impl Presence {
async fn state(&self) -> RwLockReadGuard<State> {
self.state.read().await
}
async fn state_mut(&self) -> RwLockWriteGuard<State> {
self.state.write().await
}
}
#[async_trait] #[async_trait]
impl LuaDeviceCreate for Presence { impl LuaDeviceCreate for Presence {
type Config = PresenceConfig; type Config = Config;
type Error = rumqttc::ClientError; type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = "ntfy", "Setting up Presence"); trace!(id = "presence", "Setting up Presence");
config config
.client .client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?; .await?;
Ok(Self { let state = State {
config,
devices: HashMap::new(), devices: HashMap::new(),
current_overall_presence: DEFAULT_PRESENCE, current_overall_presence: DEFAULT_PRESENCE,
}) };
let state = Arc::new(RwLock::new(state));
Ok(Self { config, state })
} }
} }
@ -60,7 +79,7 @@ impl Device for Presence {
#[async_trait] #[async_trait]
impl OnMqtt for Presence { impl OnMqtt for Presence {
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return; return;
} }
@ -77,7 +96,7 @@ impl OnMqtt for Presence {
if message.payload.is_empty() { if message.payload.is_empty() {
// Remove the device from the map // Remove the device from the map
debug!("State of device [{device_name}] has been removed"); debug!("State of device [{device_name}] has been removed");
self.devices.remove(&device_name); self.state_mut().await.devices.remove(&device_name);
} else { } else {
let present = match PresenceMessage::try_from(message) { let present = match PresenceMessage::try_from(message) {
Ok(state) => state.presence(), Ok(state) => state.presence(),
@ -88,13 +107,13 @@ impl OnMqtt for Presence {
}; };
debug!("State of device [{device_name}] has changed: {}", present); debug!("State of device [{device_name}] has changed: {}", present);
self.devices.insert(device_name, present); self.state_mut().await.devices.insert(device_name, present);
} }
let overall_presence = self.devices.iter().any(|(_, v)| *v); let overall_presence = self.state().await.devices.iter().any(|(_, v)| *v);
if overall_presence != self.current_overall_presence { if overall_presence != self.state().await.current_overall_presence {
debug!("Overall presence updated: {overall_presence}"); debug!("Overall presence updated: {overall_presence}");
self.current_overall_presence = overall_presence; self.state_mut().await.current_overall_presence = overall_presence;
if self if self
.config .config

View File

@ -17,7 +17,7 @@ use crate::messages::ActivateMessage;
use crate::mqtt::WrappedAsyncClient; use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct WakeOnLANConfig { pub struct Config {
#[device_config(flatten)] #[device_config(flatten)]
pub info: InfoConfig, pub info: InfoConfig,
#[device_config(flatten)] #[device_config(flatten)]
@ -29,14 +29,14 @@ pub struct WakeOnLANConfig {
pub client: WrappedAsyncClient, pub client: WrappedAsyncClient,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct WakeOnLAN { pub struct WakeOnLAN {
config: WakeOnLANConfig, config: Config,
} }
#[async_trait] #[async_trait]
impl LuaDeviceCreate for WakeOnLAN { impl LuaDeviceCreate for WakeOnLAN {
type Config = WakeOnLANConfig; type Config = Config;
type Error = rumqttc::ClientError; type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -59,7 +59,7 @@ impl Device for WakeOnLAN {
#[async_trait] #[async_trait]
impl OnMqtt for WakeOnLAN { impl OnMqtt for WakeOnLAN {
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return; return;
} }
@ -103,7 +103,7 @@ impl google_home::Device for WakeOnLAN {
#[async_trait] #[async_trait]
impl traits::Scene for WakeOnLAN { impl traits::Scene for WakeOnLAN {
async fn set_active(&mut self, deactivate: bool) -> Result<(), ErrorCode> { async fn set_active(&self, deactivate: bool) -> Result<(), ErrorCode> {
if deactivate { if deactivate {
debug!( debug!(
id = Device::get_id(self), id = Device::get_id(self),

View File

@ -1,6 +1,9 @@
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use automation_macro::LuaDeviceConfig; use automation_macro::LuaDeviceConfig;
use rumqttc::Publish; use rumqttc::Publish;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
use super::ntfy::Priority; use super::ntfy::Priority;
@ -11,7 +14,7 @@ use crate::messages::PowerMessage;
use crate::mqtt::WrappedAsyncClient; use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)] #[derive(Debug, Clone, LuaDeviceConfig)]
pub struct WasherConfig { pub struct Config {
pub identifier: String, pub identifier: String,
#[device_config(flatten)] #[device_config(flatten)]
pub mqtt: MqttDeviceConfig, pub mqtt: MqttDeviceConfig,
@ -23,17 +26,31 @@ pub struct WasherConfig {
pub client: WrappedAsyncClient, pub client: WrappedAsyncClient,
} }
// TODO: Add google home integration
#[derive(Debug)] #[derive(Debug)]
pub struct Washer { pub struct State {
config: WasherConfig,
running: isize, running: isize,
} }
// TODO: Add google home integration
#[derive(Debug, Clone)]
pub struct Washer {
config: Config,
state: Arc<RwLock<State>>,
}
impl Washer {
async fn state(&self) -> RwLockReadGuard<State> {
self.state.read().await
}
async fn state_mut(&self) -> RwLockWriteGuard<State> {
self.state.write().await
}
}
#[async_trait] #[async_trait]
impl LuaDeviceCreate for Washer { impl LuaDeviceCreate for Washer {
type Config = WasherConfig; type Config = Config;
type Error = rumqttc::ClientError; type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> { async fn create(config: Self::Config) -> Result<Self, Self::Error> {
@ -44,7 +61,10 @@ impl LuaDeviceCreate for Washer {
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce) .subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?; .await?;
Ok(Self { config, running: 0 }) let state = State { running: 0 };
let state = Arc::new(RwLock::new(state));
Ok(Self { config, state })
} }
} }
@ -61,7 +81,7 @@ const HYSTERESIS: isize = 10;
#[async_trait] #[async_trait]
impl OnMqtt for Washer { impl OnMqtt for Washer {
async fn on_mqtt(&mut self, message: Publish) { async fn on_mqtt(&self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) { if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return; return;
} }
@ -79,7 +99,7 @@ impl OnMqtt for Washer {
// debug!(id = self.identifier, power, "Washer state update"); // debug!(id = self.identifier, power, "Washer state update");
if power < self.config.threshold && self.running >= HYSTERESIS { if power < self.config.threshold && self.state().await.running >= HYSTERESIS {
// The washer is done running // The washer is done running
debug!( debug!(
id = self.config.identifier, id = self.config.identifier,
@ -88,7 +108,7 @@ impl OnMqtt for Washer {
"Washer is done" "Washer is done"
); );
self.running = 0; self.state_mut().await.running = 0;
let notification = Notification::new() let notification = Notification::new()
.set_title("Laundy is done") .set_title("Laundy is done")
.set_message("Don't forget to hang it!") .set_message("Don't forget to hang it!")
@ -106,8 +126,8 @@ impl OnMqtt for Washer {
} }
} else if power < self.config.threshold { } else if power < self.config.threshold {
// Prevent false positives // Prevent false positives
self.running = 0; self.state_mut().await.running = 0;
} else if power >= self.config.threshold && self.running < HYSTERESIS { } else if power >= self.config.threshold && self.state().await.running < HYSTERESIS {
// Washer could be starting // Washer could be starting
debug!( debug!(
id = self.config.identifier, id = self.config.identifier,
@ -116,7 +136,7 @@ impl OnMqtt for Washer {
"Washer is starting" "Washer is starting"
); );
self.running += 1; self.state_mut().await.running += 1;
} }
} }
} }

View File

@ -36,20 +36,20 @@ impl mlua::UserData for EventChannel {}
#[async_trait] #[async_trait]
pub trait OnMqtt: Sync + Send { pub trait OnMqtt: Sync + Send {
// fn topics(&self) -> Vec<&str>; // fn topics(&self) -> Vec<&str>;
async fn on_mqtt(&mut self, message: Publish); async fn on_mqtt(&self, message: Publish);
} }
#[async_trait] #[async_trait]
pub trait OnPresence: Sync + Send { pub trait OnPresence: Sync + Send {
async fn on_presence(&mut self, presence: bool); async fn on_presence(&self, presence: bool);
} }
#[async_trait] #[async_trait]
pub trait OnDarkness: Sync + Send { pub trait OnDarkness: Sync + Send {
async fn on_darkness(&mut self, dark: bool); async fn on_darkness(&self, dark: bool);
} }
#[async_trait] #[async_trait]
pub trait OnNotification: Sync + Send { pub trait OnNotification: Sync + Send {
async fn on_notification(&mut self, notification: Notification); async fn on_notification(&self, notification: Notification);
} }

View File

@ -5,6 +5,6 @@ use async_trait::async_trait;
#[async_trait] #[async_trait]
pub trait Timeout: Sync + Send { pub trait Timeout: Sync + Send {
async fn start_timeout(&mut self, _timeout: Duration) -> Result<()>; async fn start_timeout(&self, _timeout: Duration) -> Result<()>;
async fn stop_timeout(&mut self) -> Result<()>; async fn stop_timeout(&self) -> Result<()>;
} }