Compare commits

...

3 Commits

Author SHA1 Message Date
2b62aca78a
LuaDevice macro now uses LuaDeviceCreate trait to create devices from configs
All checks were successful
Build and deploy automation_rs / Build automation_rs (push) Successful in 4m53s
Build and deploy automation_rs / Build Docker image (push) Successful in 59s
Build and deploy automation_rs / Deploy Docker container (push) Has been skipped
2024-04-29 02:53:21 +02:00
40426862e5
mqtt client is now created in lua 2024-04-29 02:19:52 +02:00
c3bd05434c
DeviceManager no longer handles subscribing and filtering topics, each device has to do this themselves now 2024-04-29 02:12:47 +02:00
23 changed files with 304 additions and 210 deletions

View File

@ -1,30 +1,9 @@
use proc_macro2::TokenStream;
use quote::quote;
use syn::{Data, DataStruct, DeriveInput, Fields, FieldsNamed};
use syn::DeriveInput;
pub fn impl_lua_device_macro(ast: &DeriveInput) -> TokenStream {
let name = &ast.ident;
// TODO: Handle errors properly
// This includes making sure one, and only one config is specified
let config = if let Data::Struct(DataStruct {
fields: Fields::Named(FieldsNamed { ref named, .. }),
..
}) = ast.data
{
named
.iter()
.find(|&field| {
field
.attrs
.iter()
.any(|attr| attr.path().is_ident("config"))
})
.map(|field| field.ty.clone())
.unwrap()
} else {
unimplemented!()
};
let gen = quote! {
impl #name {
pub fn register_with_lua(lua: &mlua::Lua) -> mlua::Result<()> {
@ -34,8 +13,10 @@ pub fn impl_lua_device_macro(ast: &DeriveInput) -> TokenStream {
impl mlua::UserData for #name {
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_function("new", |lua, config: mlua::Value| async {
let config: #config = mlua::FromLua::from_lua(config, lua)?;
let device = #name::create(config).await.map_err(mlua::ExternalError::into_lua_err)?;
let config = mlua::FromLua::from_lua(config, lua)?;
// TODO: Using crate:: could cause issues
let device: #name = crate::devices::LuaDeviceCreate::create(config).await.map_err(mlua::ExternalError::into_lua_err)?;
Ok(crate::device_manager::WrappedDevice::new(Box::new(device)))
});

View File

@ -13,6 +13,15 @@ local function mqtt_automation(topic)
return "automation/" .. topic
end
local mqtt_client = automation.create_mqtt_client({
host = debug and "olympus.lan.huizinga.dev" or "mosquitto",
port = 8883,
client_name = debug and "automation-debug" or "automation_rs",
username = "mqtt",
password = automation.util.get_env("MQTT_PASSWORD"),
tls = debug and true or false,
})
automation.device_manager:add(Ntfy.new({
topic = automation.util.get_env("NTFY_TOPIC"),
event_channel = automation.event_channel,
@ -20,13 +29,14 @@ automation.device_manager:add(Ntfy.new({
automation.device_manager:add(Presence.new({
topic = "automation_dev/presence/+/#",
client = mqtt_client,
event_channel = automation.event_channel,
}))
automation.device_manager:add(DebugBridge.new({
identifier = "debug_bridge",
topic = mqtt_automation("debug"),
client = automation.mqtt_client,
client = mqtt_client,
}))
local hue_ip = "10.0.0.146"
@ -45,6 +55,7 @@ automation.device_manager:add(HueBridge.new({
automation.device_manager:add(LightSensor.new({
identifier = "living_light_sensor",
topic = mqtt_z2m("living/light"),
client = mqtt_client,
min = 22000,
max = 23500,
event_channel = automation.event_channel,
@ -54,6 +65,7 @@ automation.device_manager:add(WakeOnLAN.new({
name = "Zeus",
room = "Living Room",
topic = mqtt_automation("appliance/living_room/zeus"),
client = mqtt_client,
mac_address = "30:9c:23:60:9c:13",
broadcast_ip = "10.0.0.255",
}))
@ -66,6 +78,7 @@ automation.device_manager:add(living_speakers)
automation.device_manager:add(AudioSetup.new({
identifier = "living_audio",
topic = mqtt_z2m("living/remote"),
client = mqtt_client,
mixer = living_mixer,
speakers = living_speakers,
}))
@ -75,7 +88,7 @@ automation.device_manager:add(IkeaOutlet.new({
name = "Kettle",
room = "Kitchen",
topic = mqtt_z2m("kitchen/kettle"),
client = automation.mqtt_client,
client = mqtt_client,
timeout = debug and 5 or 300,
remotes = {
{ topic = mqtt_z2m("bedroom/remote") },
@ -88,13 +101,14 @@ automation.device_manager:add(IkeaOutlet.new({
name = "Light",
room = "Bathroom",
topic = mqtt_z2m("batchroom/light"),
client = automation.mqtt_client,
client = mqtt_client,
timeout = debug and 60 or 45 * 60,
}))
automation.device_manager:add(Washer.new({
identifier = "bathroom_washer",
topic = mqtt_z2m("batchroom/washer"),
client = mqtt_client,
threshold = 1,
event_channel = automation.event_channel,
}))
@ -104,7 +118,7 @@ automation.device_manager:add(IkeaOutlet.new({
name = "Charger",
room = "Workbench",
topic = mqtt_z2m("workbench/charger"),
client = automation.mqtt_client,
client = mqtt_client,
timeout = debug and 5 or 20 * 3600,
}))
@ -112,7 +126,7 @@ automation.device_manager:add(IkeaOutlet.new({
name = "Outlet",
room = "Workbench",
topic = mqtt_z2m("workbench/outlet"),
client = automation.mqtt_client,
client = mqtt_client,
}))
local hallway_lights = automation.device_manager:add(HueGroup.new({
@ -125,12 +139,13 @@ local hallway_lights = automation.device_manager:add(HueGroup.new({
remotes = {
{ topic = mqtt_z2m("hallway/remote") },
},
client = mqtt_client,
}))
automation.device_manager:add(ContactSensor.new({
identifier = "hallway_frontdoor",
topic = mqtt_z2m("hallway/frontdoor"),
client = automation.mqtt_client,
client = mqtt_client,
presence = {
topic = mqtt_automation("presence/contact/frontdoor"),
timeout = debug and 10 or 15 * 60,
@ -145,7 +160,7 @@ local bedroom_air_filter = automation.device_manager:add(AirFilter.new({
name = "Air Filter",
room = "Bedroom",
topic = "pico/filter/bedroom",
client = automation.mqtt_client,
client = mqtt_client,
}))
-- TODO: Use the wrapped device bedroom_air_filter instead of the string

View File

@ -1,9 +1,2 @@
openid:
base_url: "https://login.huizinga.dev/api/oidc"
mqtt:
host: "mosquitto"
port: 8883
client_name: "automation_rs"
username: "mqtt"
password: "${MQTT_PASSWORD}"

View File

@ -1,10 +1,2 @@
openid:
base_url: "https://login.huizinga.dev/api/oidc"
mqtt:
host: "olympus.lan.huizinga.dev"
port: 8883
client_name: "automation-zeus"
username: "mqtt"
password: "${MQTT_PASSWORD}"
tls: true

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use regex::{Captures, Regex};
use rumqttc::{MqttOptions, Transport};
use serde::{Deserialize, Deserializer};
use serde::Deserialize;
use tracing::debug;
use crate::auth::OpenIDConfig;
@ -13,8 +13,6 @@ use crate::error::{ConfigParseError, MissingEnv};
#[derive(Debug, Deserialize)]
pub struct Config {
pub openid: OpenIDConfig,
#[serde(deserialize_with = "deserialize_mqtt_options")]
pub mqtt: MqttOptions,
#[serde(default)]
pub fullfillment: FullfillmentConfig,
}
@ -44,13 +42,6 @@ impl From<MqttConfig> for MqttOptions {
}
}
fn deserialize_mqtt_options<'de, D>(deserializer: D) -> Result<MqttOptions, D::Error>
where
D: Deserializer<'de>,
{
Ok(MqttOptions::from(MqttConfig::deserialize(deserializer)?))
}
#[derive(Debug, Deserialize)]
pub struct FullfillmentConfig {
#[serde(default = "default_fullfillment_ip")]

View File

@ -5,10 +5,9 @@ use std::sync::Arc;
use futures::future::join_all;
use google_home::traits::OnOff;
use mlua::{FromLua, LuaSerdeExt};
use rumqttc::{matches, AsyncClient, QoS};
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, error, instrument, trace};
use tracing::{debug, instrument, trace};
use crate::devices::{As, Device};
use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence};
@ -43,17 +42,15 @@ pub type DeviceMap = HashMap<String, Arc<RwLock<Box<dyn Device>>>>;
#[derive(Debug, Clone)]
pub struct DeviceManager {
devices: Arc<RwLock<DeviceMap>>,
client: AsyncClient,
event_channel: EventChannel,
}
impl DeviceManager {
pub fn new(client: AsyncClient) -> Self {
pub fn new() -> Self {
let (event_channel, mut event_rx) = EventChannel::new();
let device_manager = Self {
devices: Arc::new(RwLock::new(HashMap::new())),
client,
event_channel,
};
@ -127,18 +124,6 @@ impl DeviceManager {
debug!(id, "Adding device");
// If the device listens to mqtt, subscribe to the topics
if let Some(device) = As::<dyn OnMqtt>::cast(device.read().await.as_ref()) {
for topic in device.topics() {
trace!(id, topic, "Subscribing to topic");
if let Err(err) = self.client.subscribe(topic, QoS::AtLeastOnce).await {
// NOTE: Pretty sure that this can only happen if the mqtt client if no longer
// running
error!(id, topic, "Failed to subscribe to topic: {err}");
}
}
}
self.devices.write().await.insert(id, device.0.clone());
}
@ -170,16 +155,16 @@ impl DeviceManager {
let mut device = device.write().await;
let device = device.as_mut();
if let Some(device) = As::<dyn OnMqtt>::cast_mut(device) {
let subscribed = device
.topics()
.iter()
.any(|topic| matches(&message.topic, topic));
if subscribed {
trace!(id, "Handling");
device.on_mqtt(message).await;
trace!(id, "Done");
}
// let subscribed = device
// .topics()
// .iter()
// .any(|topic| matches(&message.topic, topic));
//
// if subscribed {
trace!(id, "Handling");
device.on_mqtt(message).await;
trace!(id, "Done");
// }
}
}
});
@ -235,6 +220,12 @@ impl DeviceManager {
}
}
impl Default for DeviceManager {
fn default() -> Self {
Self::new()
}
}
impl mlua::UserData for DeviceManager {
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move {

View File

@ -8,9 +8,9 @@ use google_home::GoogleHomeDevice;
use rumqttc::Publish;
use tracing::{debug, error, trace, warn};
use super::LuaDeviceCreate;
use crate::config::{InfoConfig, MqttDeviceConfig};
use crate::devices::Device;
use crate::error::DeviceConfigError;
use crate::event::OnMqtt;
use crate::messages::{AirFilterFanState, AirFilterState, SetAirFilterFanState};
use crate::mqtt::WrappedAsyncClient;
@ -27,7 +27,6 @@ pub struct AirFilterConfig {
#[derive(Debug, LuaDevice)]
pub struct AirFilter {
#[config]
config: AirFilterConfig,
last_known_state: AirFilterState,
@ -42,7 +41,7 @@ impl AirFilter {
self.config
.client
.publish(
topic.clone(),
&topic,
rumqttc::QoS::AtLeastOnce,
false,
serde_json::to_string(&message).unwrap(),
@ -53,9 +52,19 @@ impl AirFilter {
}
}
impl AirFilter {
async fn create(config: AirFilterConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for AirFilter {
type Config = AirFilterConfig;
type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.info.identifier(), "Setting up AirFilter");
config
.client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
config,
last_known_state: AirFilterState {
@ -74,11 +83,11 @@ impl Device for AirFilter {
#[async_trait]
impl OnMqtt for AirFilter {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let state = match AirFilterState::try_from(message) {
Ok(state) => state,
Err(err) => {

View File

@ -3,13 +3,14 @@ use automation_macro::{LuaDevice, LuaDeviceConfig};
use google_home::traits::OnOff;
use tracing::{debug, error, trace, warn};
use super::Device;
use super::{Device, LuaDeviceCreate};
use crate::config::MqttDeviceConfig;
use crate::device_manager::WrappedDevice;
use crate::devices::As;
use crate::error::DeviceConfigError;
use crate::event::{OnMqtt, OnPresence};
use crate::messages::{RemoteAction, RemoteMessage};
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)]
pub struct AudioSetupConfig {
@ -20,16 +21,21 @@ pub struct AudioSetupConfig {
mixer: WrappedDevice,
#[device_config(from_lua)]
speakers: WrappedDevice,
#[device_config(from_lua)]
client: WrappedAsyncClient,
}
#[derive(Debug, LuaDevice)]
pub struct AudioSetup {
#[config]
config: AudioSetupConfig,
}
impl AudioSetup {
async fn create(config: AudioSetupConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for AudioSetup {
type Config = AudioSetupConfig;
type Error = DeviceConfigError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.identifier, "Setting up AudioSetup");
let mixer_id = config.mixer.read().await.get_id().to_owned();
@ -42,6 +48,11 @@ impl AudioSetup {
return Err(DeviceConfigError::MissingTrait(speakers_id, "OnOff".into()));
}
config
.client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?;
Ok(AudioSetup { config })
}
}
@ -54,11 +65,11 @@ impl Device for AudioSetup {
#[async_trait]
impl OnMqtt for AudioSetup {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: rumqttc::Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(),
Err(err) => {

View File

@ -7,7 +7,7 @@ use mlua::FromLua;
use tokio::task::JoinHandle;
use tracing::{debug, error, trace, warn};
use super::Device;
use super::{Device, LuaDeviceCreate};
use crate::config::MqttDeviceConfig;
use crate::device_manager::WrappedDevice;
use crate::devices::{As, DEFAULT_PRESENCE};
@ -64,7 +64,6 @@ pub struct ContactSensorConfig {
#[derive(Debug, LuaDevice)]
pub struct ContactSensor {
#[config]
config: ContactSensorConfig,
overall_presence: bool,
@ -72,8 +71,12 @@ pub struct ContactSensor {
handle: Option<JoinHandle<()>>,
}
impl ContactSensor {
async fn create(config: ContactSensorConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for ContactSensor {
type Config = ContactSensorConfig;
type Error = DeviceConfigError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.identifier, "Setting up ContactSensor");
// Make sure the devices implement the required traits
@ -91,6 +94,11 @@ impl ContactSensor {
}
}
config
.client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
config: config.clone(),
overall_presence: DEFAULT_PRESENCE,
@ -115,11 +123,11 @@ impl OnPresence for ContactSensor {
#[async_trait]
impl OnMqtt for ContactSensor {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: rumqttc::Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let is_closed = match ContactMessage::try_from(message) {
Ok(state) => state.is_closed(),
Err(err) => {
@ -187,7 +195,7 @@ impl OnMqtt for ContactSensor {
self.config
.client
.publish(
presence.mqtt.topic.clone(),
&presence.mqtt.topic,
rumqttc::QoS::AtLeastOnce,
false,
serde_json::to_string(&PresenceMessage::new(true)).unwrap(),
@ -212,7 +220,7 @@ impl OnMqtt for ContactSensor {
tokio::time::sleep(timeout).await;
debug!(id, "Removing door device!");
client
.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "")
.publish(&topic, rumqttc::QoS::AtLeastOnce, false, "")
.await
.map_err(|err| warn!("Failed to publish presence on {topic}: {err}"))
.ok();

View File

@ -1,10 +1,12 @@
use std::convert::Infallible;
use async_trait::async_trait;
use automation_macro::{LuaDevice, LuaDeviceConfig};
use tracing::{trace, warn};
use super::LuaDeviceCreate;
use crate::config::MqttDeviceConfig;
use crate::devices::Device;
use crate::error::DeviceConfigError;
use crate::event::{OnDarkness, OnPresence};
use crate::messages::{DarknessMessage, PresenceMessage};
use crate::mqtt::WrappedAsyncClient;
@ -20,12 +22,15 @@ pub struct DebugBridgeConfig {
#[derive(Debug, LuaDevice)]
pub struct DebugBridge {
#[config]
config: DebugBridgeConfig,
}
impl DebugBridge {
async fn create(config: DebugBridgeConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for DebugBridge {
type Config = DebugBridgeConfig;
type Error = Infallible;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.identifier, "Setting up DebugBridge");
Ok(Self { config })
}

View File

@ -1,3 +1,4 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use async_trait::async_trait;
@ -5,8 +6,8 @@ use automation_macro::{LuaDevice, LuaDeviceConfig};
use serde::{Deserialize, Serialize};
use tracing::{error, trace, warn};
use super::LuaDeviceCreate;
use crate::devices::Device;
use crate::error::DeviceConfigError;
use crate::event::{OnDarkness, OnPresence};
#[derive(Debug)]
@ -32,7 +33,6 @@ pub struct HueBridgeConfig {
#[derive(Debug, LuaDevice)]
pub struct HueBridge {
#[config]
config: HueBridgeConfig,
}
@ -41,12 +41,18 @@ struct FlagMessage {
flag: bool,
}
impl HueBridge {
async fn create(config: HueBridgeConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for HueBridge {
type Config = HueBridgeConfig;
type Error = Infallible;
async fn create(config: Self::Config) -> Result<Self, Infallible> {
trace!(id = config.identifier, "Setting up HueBridge");
Ok(Self { config })
}
}
impl HueBridge {
pub async fn set_flag(&self, flag: Flag, value: bool) {
let flag_id = match flag {
Flag::Presence => self.config.flags.presence,

View File

@ -6,14 +6,14 @@ use async_trait::async_trait;
use automation_macro::{LuaDevice, LuaDeviceConfig};
use google_home::errors::ErrorCode;
use google_home::traits::OnOff;
use rumqttc::Publish;
use rumqttc::{Publish, SubscribeFilter};
use tracing::{debug, error, trace, warn};
use super::Device;
use super::{Device, LuaDeviceCreate};
use crate::config::MqttDeviceConfig;
use crate::error::DeviceConfigError;
use crate::event::OnMqtt;
use crate::messages::{RemoteAction, RemoteMessage};
use crate::mqtt::WrappedAsyncClient;
use crate::traits::Timeout;
#[derive(Debug, Clone, LuaDeviceConfig)]
@ -27,21 +27,39 @@ pub struct HueGroupConfig {
pub scene_id: String,
#[device_config(default)]
pub remotes: Vec<MqttDeviceConfig>,
#[device_config(from_lua)]
client: WrappedAsyncClient,
}
#[derive(Debug, LuaDevice)]
pub struct HueGroup {
#[config]
config: HueGroupConfig,
}
// Couple of helper function to get the correct urls
impl HueGroup {
async fn create(config: HueGroupConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for HueGroup {
type Config = HueGroupConfig;
type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.identifier, "Setting up AudioSetup");
if !config.remotes.is_empty() {
config
.client
.subscribe_many(config.remotes.iter().map(|remote| SubscribeFilter {
path: remote.topic.clone(),
qos: rumqttc::QoS::AtLeastOnce,
}))
.await?;
}
Ok(Self { config })
}
}
impl HueGroup {
fn url_base(&self) -> String {
format!("http://{}/api/{}", self.config.addr, self.config.login)
}
@ -67,15 +85,16 @@ impl Device for HueGroup {
#[async_trait]
impl OnMqtt for HueGroup {
fn topics(&self) -> Vec<&str> {
self.config
async fn on_mqtt(&mut self, message: Publish) {
if !self
.config
.remotes
.iter()
.map(|mqtt| mqtt.topic.as_str())
.collect()
}
.any(|remote| rumqttc::matches(&message.topic, &remote.topic))
{
return;
}
async fn on_mqtt(&mut self, message: Publish) {
let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(),
Err(err) => {

View File

@ -7,14 +7,14 @@ use google_home::errors::ErrorCode;
use google_home::traits::{self, OnOff};
use google_home::types::Type;
use google_home::{device, GoogleHomeDevice};
use rumqttc::{matches, Publish};
use rumqttc::{matches, Publish, SubscribeFilter};
use serde::Deserialize;
use tokio::task::JoinHandle;
use tracing::{debug, error, trace, warn};
use super::LuaDeviceCreate;
use crate::config::{InfoConfig, MqttDeviceConfig};
use crate::devices::Device;
use crate::error::DeviceConfigError;
use crate::event::{OnMqtt, OnPresence};
use crate::messages::{OnOffMessage, RemoteAction, RemoteMessage};
use crate::mqtt::WrappedAsyncClient;
@ -47,7 +47,6 @@ pub struct IkeaOutletConfig {
#[derive(Debug, LuaDevice)]
pub struct IkeaOutlet {
#[config]
config: IkeaOutletConfig,
last_known_state: bool,
@ -61,7 +60,7 @@ async fn set_on(client: WrappedAsyncClient, topic: &str, on: bool) {
// TODO: Handle potential errors here
client
.publish(
topic.clone(),
&topic,
rumqttc::QoS::AtLeastOnce,
false,
serde_json::to_string(&message).unwrap(),
@ -71,10 +70,29 @@ async fn set_on(client: WrappedAsyncClient, topic: &str, on: bool) {
.ok();
}
impl IkeaOutlet {
async fn create(config: IkeaOutletConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for IkeaOutlet {
type Config = IkeaOutletConfig;
type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.info.identifier(), "Setting up IkeaOutlet");
if !config.remotes.is_empty() {
config
.client
.subscribe_many(config.remotes.iter().map(|remote| SubscribeFilter {
path: remote.topic.clone(),
qos: rumqttc::QoS::AtLeastOnce,
}))
.await?;
}
config
.client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
config,
last_known_state: false,
@ -91,19 +109,6 @@ impl Device for IkeaOutlet {
#[async_trait]
impl OnMqtt for IkeaOutlet {
fn topics(&self) -> Vec<&str> {
let mut topics: Vec<_> = self
.config
.remotes
.iter()
.map(|mqtt| mqtt.topic.as_str())
.collect();
topics.push(&self.config.mqtt.topic);
topics
}
async fn on_mqtt(&mut self, message: Publish) {
// Check if the message is from the deviec itself or from a remote
if matches(&message.topic, &self.config.mqtt.topic) {
@ -131,7 +136,12 @@ impl OnMqtt for IkeaOutlet {
if state && let Some(timeout) = self.config.timeout {
self.start_timeout(timeout).await.unwrap();
}
} else {
} else if self
.config
.remotes
.iter()
.any(|remote| rumqttc::matches(&message.topic, &remote.topic))
{
let action = match RemoteMessage::try_from(message) {
Ok(message) => message.action(),
Err(err) => {

View File

@ -1,3 +1,4 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use std::str::Utf8Error;
@ -12,8 +13,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::trace;
use super::Device;
use crate::error::DeviceConfigError;
use super::{Device, LuaDeviceCreate};
#[derive(Debug, Clone, LuaDeviceConfig)]
pub struct KasaOutletConfig {
@ -24,12 +24,15 @@ pub struct KasaOutletConfig {
#[derive(Debug, LuaDevice)]
pub struct KasaOutlet {
#[config]
config: KasaOutletConfig,
}
impl KasaOutlet {
async fn create(config: KasaOutletConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for KasaOutlet {
type Config = KasaOutletConfig;
type Error = Infallible;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.identifier, "Setting up KasaOutlet");
Ok(Self { config })
}

View File

@ -3,11 +3,12 @@ use automation_macro::{LuaDevice, LuaDeviceConfig};
use rumqttc::Publish;
use tracing::{debug, trace, warn};
use super::LuaDeviceCreate;
use crate::config::MqttDeviceConfig;
use crate::devices::Device;
use crate::error::DeviceConfigError;
use crate::event::{self, Event, EventChannel, OnMqtt};
use crate::messages::BrightnessMessage;
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)]
pub struct LightSensorConfig {
@ -18,21 +19,32 @@ pub struct LightSensorConfig {
pub max: isize,
#[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))]
pub tx: event::Sender,
#[device_config(from_lua)]
client: WrappedAsyncClient,
}
pub const DEFAULT: bool = false;
#[derive(Debug, LuaDevice)]
pub struct LightSensor {
#[config]
config: LightSensorConfig,
is_dark: bool,
}
impl LightSensor {
async fn create(config: LightSensorConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for LightSensor {
type Config = LightSensorConfig;
type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.identifier, "Setting up LightSensor");
config
.client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
config,
is_dark: DEFAULT,
@ -48,11 +60,11 @@ impl Device for LightSensor {
#[async_trait]
impl OnMqtt for LightSensor {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let illuminance = match BrightnessMessage::try_from(message) {
Ok(state) => state.illuminance(),
Err(err) => {

View File

@ -12,6 +12,7 @@ mod presence;
mod wake_on_lan;
mod washer;
use async_trait::async_trait;
use google_home::device::AsGoogleHomeDevice;
use google_home::traits::OnOff;
@ -31,6 +32,16 @@ pub use self::washer::*;
use crate::event::{OnDarkness, OnMqtt, OnNotification, OnPresence};
use crate::traits::Timeout;
#[async_trait]
pub trait LuaDeviceCreate {
type Config;
type Error;
async fn create(config: Self::Config) -> Result<Self, Self::Error>
where
Self: Sized;
}
#[impl_cast::device(As: OnMqtt + OnPresence + OnDarkness + OnNotification + OnOff + Timeout)]
pub trait Device: AsGoogleHomeDevice + std::fmt::Debug + Sync + Send {
fn get_id(&self) -> String;

View File

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::convert::Infallible;
use async_trait::async_trait;
use automation_macro::{LuaDevice, LuaDeviceConfig};
@ -6,8 +7,8 @@ use serde::Serialize;
use serde_repr::*;
use tracing::{error, trace, warn};
use super::LuaDeviceCreate;
use crate::devices::Device;
use crate::error::DeviceConfigError;
use crate::event::{self, Event, EventChannel, OnNotification, OnPresence};
#[derive(Debug, Serialize_repr, Clone, Copy)]
@ -121,12 +122,15 @@ pub struct NtfyConfig {
#[derive(Debug, LuaDevice)]
pub struct Ntfy {
#[config]
config: NtfyConfig,
}
impl Ntfy {
async fn create(config: NtfyConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for Ntfy {
type Config = NtfyConfig;
type Error = Infallible;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = "ntfy", "Setting up Ntfy");
Ok(Self { config })
}

View File

@ -5,11 +5,12 @@ use automation_macro::{LuaDevice, LuaDeviceConfig};
use rumqttc::Publish;
use tracing::{debug, trace, warn};
use super::LuaDeviceCreate;
use crate::config::MqttDeviceConfig;
use crate::devices::Device;
use crate::error::DeviceConfigError;
use crate::event::{self, Event, EventChannel, OnMqtt};
use crate::messages::PresenceMessage;
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, LuaDeviceConfig)]
pub struct PresenceConfig {
@ -17,21 +18,32 @@ pub struct PresenceConfig {
pub mqtt: MqttDeviceConfig,
#[device_config(from_lua, rename("event_channel"), with(|ec: EventChannel| ec.get_tx()))]
tx: event::Sender,
#[device_config(from_lua)]
client: WrappedAsyncClient,
}
pub const DEFAULT_PRESENCE: bool = false;
#[derive(Debug, LuaDevice)]
pub struct Presence {
#[config]
config: PresenceConfig,
devices: HashMap<String, bool>,
current_overall_presence: bool,
}
impl Presence {
async fn create(config: PresenceConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for Presence {
type Config = PresenceConfig;
type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = "ntfy", "Setting up Presence");
config
.client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self {
config,
devices: HashMap::new(),
@ -48,11 +60,11 @@ impl Device for Presence {
#[async_trait]
impl OnMqtt for Presence {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let offset = self
.config
.mqtt

View File

@ -10,11 +10,11 @@ use google_home::{device, GoogleHomeDevice};
use rumqttc::Publish;
use tracing::{debug, error, trace};
use super::Device;
use super::{Device, LuaDeviceCreate};
use crate::config::{InfoConfig, MqttDeviceConfig};
use crate::error::DeviceConfigError;
use crate::event::OnMqtt;
use crate::messages::ActivateMessage;
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)]
pub struct WakeOnLANConfig {
@ -25,18 +25,28 @@ pub struct WakeOnLANConfig {
mac_address: MacAddress,
#[device_config(default(Ipv4Addr::new(255, 255, 255, 255)))]
broadcast_ip: Ipv4Addr,
#[device_config(from_lua)]
client: WrappedAsyncClient,
}
#[derive(Debug, LuaDevice)]
pub struct WakeOnLAN {
#[config]
config: WakeOnLANConfig,
}
impl WakeOnLAN {
async fn create(config: WakeOnLANConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for WakeOnLAN {
type Config = WakeOnLANConfig;
type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.info.identifier(), "Setting up WakeOnLAN");
config
.client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { config })
}
}
@ -49,11 +59,11 @@ impl Device for WakeOnLAN {
#[async_trait]
impl OnMqtt for WakeOnLAN {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let activate = match ActivateMessage::try_from(message) {
Ok(message) => message.activate(),
Err(err) => {

View File

@ -4,11 +4,11 @@ use rumqttc::Publish;
use tracing::{debug, error, trace, warn};
use super::ntfy::Priority;
use super::{Device, Notification};
use super::{Device, LuaDeviceCreate, Notification};
use crate::config::MqttDeviceConfig;
use crate::error::DeviceConfigError;
use crate::event::{self, Event, EventChannel, OnMqtt};
use crate::messages::PowerMessage;
use crate::mqtt::WrappedAsyncClient;
#[derive(Debug, Clone, LuaDeviceConfig)]
pub struct WasherConfig {
@ -19,20 +19,31 @@ pub struct WasherConfig {
threshold: f32,
#[device_config(rename("event_channel"), from_lua, with(|ec: EventChannel| ec.get_tx()))]
pub tx: event::Sender,
#[device_config(from_lua)]
client: WrappedAsyncClient,
}
// TODO: Add google home integration
#[derive(Debug, LuaDevice)]
pub struct Washer {
#[config]
config: WasherConfig,
running: isize,
}
impl Washer {
async fn create(config: WasherConfig) -> Result<Self, DeviceConfigError> {
#[async_trait]
impl LuaDeviceCreate for Washer {
type Config = WasherConfig;
type Error = rumqttc::ClientError;
async fn create(config: Self::Config) -> Result<Self, Self::Error> {
trace!(id = config.identifier, "Setting up Washer");
config
.client
.subscribe(&config.mqtt.topic, rumqttc::QoS::AtLeastOnce)
.await?;
Ok(Self { config, running: 0 })
}
}
@ -50,11 +61,11 @@ const HYSTERESIS: isize = 10;
#[async_trait]
impl OnMqtt for Washer {
fn topics(&self) -> Vec<&str> {
vec![&self.config.mqtt.topic]
}
async fn on_mqtt(&mut self, message: Publish) {
if !rumqttc::matches(&message.topic, &self.config.mqtt.topic) {
return;
}
let power = match PowerMessage::try_from(message) {
Ok(state) => state.power(),
Err(err) => {

View File

@ -92,12 +92,10 @@ impl MissingWildcard {
#[derive(Debug, Error)]
pub enum DeviceConfigError {
#[error("Child '{1}' of device '{0}' does not exist")]
MissingChild(String, String),
#[error("Device '{0}' does not implement expected trait '{1}'")]
MissingTrait(String, String),
#[error(transparent)]
MissingWildcard(#[from] MissingWildcard),
MqttClientError(#[from] rumqttc::ClientError),
}
#[derive(Debug, Error)]

View File

@ -37,7 +37,7 @@ impl mlua::UserData for EventChannel {}
#[async_trait]
#[device_trait]
pub trait OnMqtt {
fn topics(&self) -> Vec<&str>;
// fn topics(&self) -> Vec<&str>;
async fn on_mqtt(&mut self, message: Publish);
}

View File

@ -2,7 +2,7 @@
use std::{fs, process};
use automation::auth::{OpenIDConfig, User};
use automation::config::Config;
use automation::config::{Config, MqttConfig};
use automation::device_manager::DeviceManager;
use automation::devices::{
AirFilter, AudioSetup, ContactSensor, DebugBridge, HueBridge, HueGroup, IkeaOutlet, KasaOutlet,
@ -17,6 +17,7 @@ use axum::routing::post;
use axum::{Json, Router};
use dotenvy::dotenv;
use google_home::{GoogleHome, Request};
use mlua::LuaSerdeExt;
use rumqttc::AsyncClient;
use tracing::{debug, error, info, warn};
@ -55,14 +56,8 @@ async fn app() -> anyhow::Result<()> {
std::env::var("AUTOMATION_CONFIG").unwrap_or("./config/config.yml".into());
let config = Config::parse_file(&config_filename)?;
// Create a mqtt client
// TODO: Since we wait with starting the eventloop we might fill the queue while setting up devices
let (client, eventloop) = AsyncClient::new(config.mqtt.clone(), 100);
// Setup the device handler
let device_manager = DeviceManager::new(client.clone());
let event_channel = device_manager.event_channel();
let device_manager = DeviceManager::new();
// Lua testing
{
@ -74,9 +69,20 @@ async fn app() -> anyhow::Result<()> {
});
let automation = lua.create_table()?;
let event_channel = device_manager.event_channel();
let create_mqtt_client = lua.create_function(move |lua, config: mlua::Value| {
let config: MqttConfig = lua.from_value(config)?;
// Create a mqtt client
// TODO: When starting up, the devices are not yet created, this could lead to a device being out of sync
let (client, eventloop) = AsyncClient::new(config.into(), 100);
mqtt::start(eventloop, &event_channel);
Ok(WrappedAsyncClient(client))
})?;
automation.set("create_mqtt_client", create_mqtt_client)?;
automation.set("device_manager", device_manager.clone())?;
automation.set("mqtt_client", WrappedAsyncClient(client.clone()))?;
automation.set("event_channel", device_manager.event_channel())?;
let util = lua.create_table()?;
@ -115,10 +121,6 @@ async fn app() -> anyhow::Result<()> {
}?;
}
// Wrap the mqtt eventloop and start listening for message
// NOTE: We wait until all the setup is done, as otherwise we might miss some messages
mqtt::start(eventloop, &event_channel);
// Create google home fullfillment route
let fullfillment = Router::new().route(
"/google_home",