Started work on reimplementing schedules

This commit is contained in:
Dreaded_X 2024-04-29 04:52:54 +02:00
parent ff428e3d20
commit bf3ce9efd4
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
6 changed files with 112 additions and 135 deletions

5
Cargo.lock generated
View File

@ -88,6 +88,7 @@ dependencies = [
"impl_cast", "impl_cast",
"indexmap 2.0.0", "indexmap 2.0.0",
"mlua", "mlua",
"once_cell",
"paste", "paste",
"pollster", "pollster",
"regex", "regex",
@ -1108,9 +1109,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.18.0" version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]] [[package]]
name = "openssl-probe" name = "openssl-probe"

View File

@ -42,13 +42,8 @@ enum_dispatch = "0.3.12"
indexmap = { version = "2.0.0", features = ["serde"] } indexmap = { version = "2.0.0", features = ["serde"] }
serde_yaml = "0.9.27" serde_yaml = "0.9.27"
tokio-cron-scheduler = "0.9.4" tokio-cron-scheduler = "0.9.4"
mlua = { version = "0.9.7", features = [ mlua = { version = "0.9.7", features = ["lua54", "vendored", "macros", "serialize", "async", "send"] }
"lua54", once_cell = "1.19.0"
"vendored",
"macros",
"serialize",
"async",
] }
[patch.crates-io] [patch.crates-io]
wakey = { git = "https://git.huizinga.dev/Dreaded_X/wakey" } wakey = { git = "https://git.huizinga.dev/Dreaded_X/wakey" }

View File

@ -160,23 +160,14 @@ automation.device_manager:add(ContactSensor.new({
}, },
})) }))
local bedroom_air_filter = automation.device_manager:add(AirFilter.new({ local bedroom_air_filter = AirFilter.new({
name = "Air Filter", name = "Air Filter",
room = "Bedroom", room = "Bedroom",
topic = "pico/filter/bedroom", topic = "pico/filter/bedroom",
client = mqtt_client, client = mqtt_client,
}))
-- TODO: Use the wrapped device bedroom_air_filter instead of the string
automation.device_manager:add_schedule({
["0 0 19 * * *"] = {
on = {
"bedroom_air_filter",
},
},
["0 0 20 * * *"] = {
off = {
"bedroom_air_filter",
},
},
}) })
automation.device_manager:add(bedroom_air_filter)
automation.device_manager:schedule("0/1 * * * * *", function()
print("Device: " .. bedroom_air_filter:get_id())
end)

View File

@ -3,15 +3,14 @@ use std::ops::{Deref, DerefMut};
use std::sync::Arc; use std::sync::Arc;
use futures::future::join_all; use futures::future::join_all;
use google_home::traits::OnOff; use mlua::FromLua;
use mlua::{FromLua, LuaSerdeExt};
use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_cron_scheduler::{Job, JobScheduler}; use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, instrument, trace}; use tracing::{debug, instrument, trace};
use crate::devices::{As, Device}; use crate::devices::{As, Device};
use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence}; use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence};
use crate::schedule::{Action, Schedule}; use crate::LUA;
#[derive(Debug, FromLua, Clone)] #[derive(Debug, FromLua, Clone)]
pub struct WrappedDevice(Arc<RwLock<Box<dyn Device>>>); pub struct WrappedDevice(Arc<RwLock<Box<dyn Device>>>);
@ -35,23 +34,31 @@ impl DerefMut for WrappedDevice {
&mut self.0 &mut self.0
} }
} }
impl mlua::UserData for WrappedDevice {} impl mlua::UserData for WrappedDevice {
fn add_methods<'lua, M: mlua::prelude::LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("get_id", |_lua, this, _: ()| async {
Ok(crate::devices::Device::get_id(this.0.read().await.as_ref()))
});
}
}
pub type DeviceMap = HashMap<String, Arc<RwLock<Box<dyn Device>>>>; pub type DeviceMap = HashMap<String, Arc<RwLock<Box<dyn Device>>>>;
#[derive(Debug, Clone)] #[derive(Clone)]
pub struct DeviceManager { pub struct DeviceManager {
devices: Arc<RwLock<DeviceMap>>, devices: Arc<RwLock<DeviceMap>>,
event_channel: EventChannel, event_channel: EventChannel,
scheduler: JobScheduler,
} }
impl DeviceManager { impl DeviceManager {
pub fn new() -> Self { pub async fn new() -> Self {
let (event_channel, mut event_rx) = EventChannel::new(); let (event_channel, mut event_rx) = EventChannel::new();
let device_manager = Self { let device_manager = Self {
devices: Arc::new(RwLock::new(HashMap::new())), devices: Arc::new(RwLock::new(HashMap::new())),
event_channel, event_channel,
scheduler: JobScheduler::new().await.unwrap(),
}; };
tokio::spawn({ tokio::spawn({
@ -67,58 +74,11 @@ impl DeviceManager {
} }
}); });
device_manager.scheduler.start().await.unwrap();
device_manager device_manager
} }
// TODO: This function is currently extremely cursed...
pub async fn add_schedule(&self, schedule: Schedule) {
let sched = JobScheduler::new().await.unwrap();
for (when, actions) in schedule {
let manager = self.clone();
sched
.add(
Job::new_async(when.as_str(), move |_uuid, _l| {
let actions = actions.clone();
let manager = manager.clone();
Box::pin(async move {
for (action, targets) in actions {
for target in targets {
let device = manager.get(&target).await.unwrap();
match action {
Action::On => {
As::<dyn OnOff>::cast_mut(
device.write().await.as_mut(),
)
.unwrap()
.set_on(true)
.await
.unwrap();
}
Action::Off => {
As::<dyn OnOff>::cast_mut(
device.write().await.as_mut(),
)
.unwrap()
.set_on(false)
.await
.unwrap();
}
}
}
}
})
})
.unwrap(),
)
.await
.unwrap();
}
sched.start().await.unwrap();
}
pub async fn add(&self, device: &WrappedDevice) { pub async fn add(&self, device: &WrappedDevice) {
let id = device.read().await.get_id(); let id = device.read().await.get_id();
@ -220,12 +180,6 @@ impl DeviceManager {
} }
} }
impl Default for DeviceManager {
fn default() -> Self {
Self::new()
}
}
impl mlua::UserData for DeviceManager { impl mlua::UserData for DeviceManager {
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) { fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move { methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move {
@ -234,11 +188,40 @@ impl mlua::UserData for DeviceManager {
Ok(()) Ok(())
}); });
methods.add_async_method("add_schedule", |lua, this, schedule| async { methods.add_async_method(
let schedule = lua.from_value(schedule)?; "schedule",
this.add_schedule(schedule).await; |lua, this, (schedule, f): (String, mlua::Function)| async move {
debug!("schedule = {schedule}");
let uuid = this
.scheduler
.add(
Job::new_async(schedule.as_str(), |uuid, _lock| {
Box::pin(async move {
let lua = LUA.lock().await;
let f: mlua::Function =
lua.named_registry_value(uuid.to_string().as_str()).unwrap();
f.call::<_, ()>(()).unwrap();
})
})
.unwrap(),
)
.await
.unwrap();
// Store the function in the registry
lua.set_named_registry_value(uuid.to_string().as_str(), f)
.unwrap();
Ok(()) Ok(())
}); },
);
// methods.add_async_method("add_schedule", |lua, this, schedule| async {
// let schedule = lua.from_value(schedule)?;
// this.add_schedule(schedule).await;
// Ok(())
// });
methods.add_method("event_channel", |_lua, this, ()| Ok(this.event_channel())) methods.add_method("event_channel", |_lua, this, ()| Ok(this.event_channel()))
} }

View File

@ -1,6 +1,9 @@
#![allow(incomplete_features)] #![allow(incomplete_features)]
#![feature(specialization)] #![feature(specialization)]
#![feature(let_chains)] #![feature(let_chains)]
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
pub mod auth; pub mod auth;
pub mod config; pub mod config;
pub mod device_manager; pub mod device_manager;
@ -11,3 +14,5 @@ pub mod messages;
pub mod mqtt; pub mod mqtt;
pub mod schedule; pub mod schedule;
pub mod traits; pub mod traits;
pub static LUA: Lazy<Mutex<mlua::Lua>> = Lazy::new(|| Mutex::new(mlua::Lua::new()));

View File

@ -6,9 +6,9 @@ use anyhow::anyhow;
use automation::auth::User; use automation::auth::User;
use automation::config::{FulfillmentConfig, MqttConfig}; use automation::config::{FulfillmentConfig, MqttConfig};
use automation::device_manager::DeviceManager; use automation::device_manager::DeviceManager;
use automation::devices;
use automation::error::ApiError; use automation::error::ApiError;
use automation::mqtt::{self, WrappedAsyncClient}; use automation::mqtt::{self, WrappedAsyncClient};
use automation::{devices, LUA};
use axum::extract::FromRef; use axum::extract::FromRef;
use axum::http::StatusCode; use axum::http::StatusCode;
use axum::response::IntoResponse; use axum::response::IntoResponse;
@ -53,9 +53,10 @@ async fn app() -> anyhow::Result<()> {
info!("Starting automation_rs..."); info!("Starting automation_rs...");
// Setup the device handler // Setup the device handler
let device_manager = DeviceManager::new(); let device_manager = DeviceManager::new().await;
let lua = mlua::Lua::new(); let fulfillment_config = {
let lua = LUA.lock().await;
lua.set_warning_function(|_lua, text, _cont| { lua.set_warning_function(|_lua, text, _cont| {
warn!("{text}"); warn!("{text}");
@ -102,12 +103,13 @@ async fn app() -> anyhow::Result<()> {
let automation: mlua::Table = lua.globals().get("automation")?; let automation: mlua::Table = lua.globals().get("automation")?;
let fulfillment_config: Option<mlua::Value> = automation.get("fulfillment")?; let fulfillment_config: Option<mlua::Value> = automation.get("fulfillment")?;
let fulfillment_config = if let Some(fulfillment_config) = fulfillment_config { if let Some(fulfillment_config) = fulfillment_config {
let fulfillment_config: FulfillmentConfig = lua.from_value(fulfillment_config)?; let fulfillment_config: FulfillmentConfig = lua.from_value(fulfillment_config)?;
debug!("automation.fulfillment = {fulfillment_config:?}"); debug!("automation.fulfillment = {fulfillment_config:?}");
fulfillment_config fulfillment_config
} else { } else {
return Err(anyhow!("Fulfillment is not configured")); return Err(anyhow!("Fulfillment is not configured"));
}
}; };
// Create google home fulfillment route // Create google home fulfillment route