Started work on reimplementing schedules
This commit is contained in:
@@ -3,15 +3,14 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::future::join_all;
|
||||
use google_home::traits::OnOff;
|
||||
use mlua::{FromLua, LuaSerdeExt};
|
||||
use mlua::FromLua;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
use tokio_cron_scheduler::{Job, JobScheduler};
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
use crate::devices::Device;
|
||||
use crate::event::{Event, EventChannel, OnDarkness, OnMqtt, OnNotification, OnPresence};
|
||||
use crate::schedule::{Action, Schedule};
|
||||
use crate::LUA;
|
||||
|
||||
#[derive(Debug, FromLua, Clone)]
|
||||
pub struct WrappedDevice(Arc<RwLock<Box<dyn Device>>>);
|
||||
@@ -35,23 +34,31 @@ impl DerefMut for WrappedDevice {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
impl mlua::UserData for WrappedDevice {}
|
||||
impl mlua::UserData for WrappedDevice {
|
||||
fn add_methods<'lua, M: mlua::prelude::LuaUserDataMethods<'lua, Self>>(methods: &mut M) {
|
||||
methods.add_async_method("get_id", |_lua, this, _: ()| async {
|
||||
Ok(crate::devices::Device::get_id(this.0.read().await.as_ref()))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub type DeviceMap = HashMap<String, Arc<RwLock<Box<dyn Device>>>>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct DeviceManager {
|
||||
devices: Arc<RwLock<DeviceMap>>,
|
||||
event_channel: EventChannel,
|
||||
scheduler: JobScheduler,
|
||||
}
|
||||
|
||||
impl DeviceManager {
|
||||
pub fn new() -> Self {
|
||||
pub async fn new() -> Self {
|
||||
let (event_channel, mut event_rx) = EventChannel::new();
|
||||
|
||||
let device_manager = Self {
|
||||
devices: Arc::new(RwLock::new(HashMap::new())),
|
||||
event_channel,
|
||||
scheduler: JobScheduler::new().await.unwrap(),
|
||||
};
|
||||
|
||||
tokio::spawn({
|
||||
@@ -67,58 +74,11 @@ impl DeviceManager {
|
||||
}
|
||||
});
|
||||
|
||||
device_manager.scheduler.start().await.unwrap();
|
||||
|
||||
device_manager
|
||||
}
|
||||
|
||||
// TODO: This function is currently extremely cursed...
|
||||
pub async fn add_schedule(&self, schedule: Schedule) {
|
||||
let sched = JobScheduler::new().await.unwrap();
|
||||
|
||||
for (when, actions) in schedule {
|
||||
let manager = self.clone();
|
||||
sched
|
||||
.add(
|
||||
Job::new_async(when.as_str(), move |_uuid, _l| {
|
||||
let actions = actions.clone();
|
||||
let manager = manager.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
for (action, targets) in actions {
|
||||
for target in targets {
|
||||
let device = manager.get(&target).await.unwrap();
|
||||
match action {
|
||||
Action::On => {
|
||||
let mut device = device.write().await;
|
||||
let device: Option<&mut dyn OnOff> =
|
||||
device.as_mut().cast_mut();
|
||||
|
||||
if let Some(device) = device {
|
||||
device.set_on(true).await.unwrap();
|
||||
}
|
||||
}
|
||||
Action::Off => {
|
||||
let mut device = device.write().await;
|
||||
let device: Option<&mut dyn OnOff> =
|
||||
device.as_mut().cast_mut();
|
||||
|
||||
if let Some(device) = device {
|
||||
device.set_on(false).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
sched.start().await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn add(&self, device: &WrappedDevice) {
|
||||
let id = device.read().await.get_id().to_owned();
|
||||
|
||||
@@ -220,12 +180,6 @@ impl DeviceManager {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DeviceManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl mlua::UserData for DeviceManager {
|
||||
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
|
||||
methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move {
|
||||
@@ -234,11 +188,40 @@ impl mlua::UserData for DeviceManager {
|
||||
Ok(())
|
||||
});
|
||||
|
||||
methods.add_async_method("add_schedule", |lua, this, schedule| async {
|
||||
let schedule = lua.from_value(schedule)?;
|
||||
this.add_schedule(schedule).await;
|
||||
Ok(())
|
||||
});
|
||||
methods.add_async_method(
|
||||
"schedule",
|
||||
|lua, this, (schedule, f): (String, mlua::Function)| async move {
|
||||
debug!("schedule = {schedule}");
|
||||
let uuid = this
|
||||
.scheduler
|
||||
.add(
|
||||
Job::new_async(schedule.as_str(), |uuid, _lock| {
|
||||
Box::pin(async move {
|
||||
let lua = LUA.lock().await;
|
||||
let f: mlua::Function =
|
||||
lua.named_registry_value(uuid.to_string().as_str()).unwrap();
|
||||
|
||||
f.call::<_, ()>(()).unwrap();
|
||||
})
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Store the function in the registry
|
||||
lua.set_named_registry_value(uuid.to_string().as_str(), f)
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
// methods.add_async_method("add_schedule", |lua, this, schedule| async {
|
||||
// let schedule = lua.from_value(schedule)?;
|
||||
// this.add_schedule(schedule).await;
|
||||
// Ok(())
|
||||
// });
|
||||
|
||||
methods.add_method("event_channel", |_lua, this, ()| Ok(this.event_channel()))
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
#![allow(incomplete_features)]
|
||||
#![feature(specialization)]
|
||||
#![feature(let_chains)]
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::Mutex;
|
||||
pub mod auth;
|
||||
pub mod config;
|
||||
pub mod device_manager;
|
||||
@@ -11,3 +14,5 @@ pub mod messages;
|
||||
pub mod mqtt;
|
||||
pub mod schedule;
|
||||
pub mod traits;
|
||||
|
||||
pub static LUA: Lazy<Mutex<mlua::Lua>> = Lazy::new(|| Mutex::new(mlua::Lua::new()));
|
||||
|
||||
92
src/main.rs
92
src/main.rs
@@ -6,9 +6,9 @@ use anyhow::anyhow;
|
||||
use automation::auth::User;
|
||||
use automation::config::{FulfillmentConfig, MqttConfig};
|
||||
use automation::device_manager::DeviceManager;
|
||||
use automation::devices;
|
||||
use automation::error::ApiError;
|
||||
use automation::mqtt::{self, WrappedAsyncClient};
|
||||
use automation::{devices, LUA};
|
||||
use axum::extract::FromRef;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::IntoResponse;
|
||||
@@ -53,61 +53,63 @@ async fn app() -> anyhow::Result<()> {
|
||||
info!("Starting automation_rs...");
|
||||
|
||||
// Setup the device handler
|
||||
let device_manager = DeviceManager::new();
|
||||
let device_manager = DeviceManager::new().await;
|
||||
|
||||
let lua = mlua::Lua::new();
|
||||
let fulfillment_config = {
|
||||
let lua = LUA.lock().await;
|
||||
|
||||
lua.set_warning_function(|_lua, text, _cont| {
|
||||
warn!("{text}");
|
||||
Ok(())
|
||||
});
|
||||
lua.set_warning_function(|_lua, text, _cont| {
|
||||
warn!("{text}");
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let automation = lua.create_table()?;
|
||||
let event_channel = device_manager.event_channel();
|
||||
let new_mqtt_client = lua.create_function(move |lua, config: mlua::Value| {
|
||||
let config: MqttConfig = lua.from_value(config)?;
|
||||
let automation = lua.create_table()?;
|
||||
let event_channel = device_manager.event_channel();
|
||||
let new_mqtt_client = lua.create_function(move |lua, config: mlua::Value| {
|
||||
let config: MqttConfig = lua.from_value(config)?;
|
||||
|
||||
// Create a mqtt client
|
||||
// TODO: When starting up, the devices are not yet created, this could lead to a device being out of sync
|
||||
let (client, eventloop) = AsyncClient::new(config.into(), 100);
|
||||
mqtt::start(eventloop, &event_channel);
|
||||
// Create a mqtt client
|
||||
// TODO: When starting up, the devices are not yet created, this could lead to a device being out of sync
|
||||
let (client, eventloop) = AsyncClient::new(config.into(), 100);
|
||||
mqtt::start(eventloop, &event_channel);
|
||||
|
||||
Ok(WrappedAsyncClient(client))
|
||||
})?;
|
||||
Ok(WrappedAsyncClient(client))
|
||||
})?;
|
||||
|
||||
automation.set("new_mqtt_client", new_mqtt_client)?;
|
||||
automation.set("device_manager", device_manager.clone())?;
|
||||
automation.set("new_mqtt_client", new_mqtt_client)?;
|
||||
automation.set("device_manager", device_manager.clone())?;
|
||||
|
||||
let util = lua.create_table()?;
|
||||
let get_env = lua.create_function(|_lua, name: String| {
|
||||
std::env::var(name).map_err(mlua::ExternalError::into_lua_err)
|
||||
})?;
|
||||
util.set("get_env", get_env)?;
|
||||
automation.set("util", util)?;
|
||||
let util = lua.create_table()?;
|
||||
let get_env = lua.create_function(|_lua, name: String| {
|
||||
std::env::var(name).map_err(mlua::ExternalError::into_lua_err)
|
||||
})?;
|
||||
util.set("get_env", get_env)?;
|
||||
automation.set("util", util)?;
|
||||
|
||||
lua.globals().set("automation", automation)?;
|
||||
lua.globals().set("automation", automation)?;
|
||||
|
||||
devices::register_with_lua(&lua)?;
|
||||
devices::register_with_lua(&lua)?;
|
||||
|
||||
// TODO: Make this not hardcoded
|
||||
let config_filename = std::env::var("AUTOMATION_CONFIG").unwrap_or("./config.lua".into());
|
||||
let config_path = Path::new(&config_filename);
|
||||
match lua.load(config_path).exec_async().await {
|
||||
Err(error) => {
|
||||
println!("{error}");
|
||||
Err(error)
|
||||
// TODO: Make this not hardcoded
|
||||
let config_filename = std::env::var("AUTOMATION_CONFIG").unwrap_or("./config.lua".into());
|
||||
let config_path = Path::new(&config_filename);
|
||||
match lua.load(config_path).exec_async().await {
|
||||
Err(error) => {
|
||||
println!("{error}");
|
||||
Err(error)
|
||||
}
|
||||
result => result,
|
||||
}?;
|
||||
|
||||
let automation: mlua::Table = lua.globals().get("automation")?;
|
||||
let fulfillment_config: Option<mlua::Value> = automation.get("fulfillment")?;
|
||||
if let Some(fulfillment_config) = fulfillment_config {
|
||||
let fulfillment_config: FulfillmentConfig = lua.from_value(fulfillment_config)?;
|
||||
debug!("automation.fulfillment = {fulfillment_config:?}");
|
||||
fulfillment_config
|
||||
} else {
|
||||
return Err(anyhow!("Fulfillment is not configured"));
|
||||
}
|
||||
result => result,
|
||||
}?;
|
||||
|
||||
let automation: mlua::Table = lua.globals().get("automation")?;
|
||||
let fulfillment_config: Option<mlua::Value> = automation.get("fulfillment")?;
|
||||
let fulfillment_config = if let Some(fulfillment_config) = fulfillment_config {
|
||||
let fulfillment_config: FulfillmentConfig = lua.from_value(fulfillment_config)?;
|
||||
debug!("automation.fulfillment = {fulfillment_config:?}");
|
||||
fulfillment_config
|
||||
} else {
|
||||
return Err(anyhow!("Fulfillment is not configured"));
|
||||
};
|
||||
|
||||
// Create google home fulfillment route
|
||||
|
||||
Reference in New Issue
Block a user