Fix: Scheduled function can not run async functions
All checks were successful
Build and deploy automation_rs / Build automation_rs (push) Successful in 7m20s
Build and deploy automation_rs / Build Docker image (push) Successful in 1m0s
Build and deploy automation_rs / Deploy Docker container (push) Has been skipped

Since Lua is not Send, this turned out to be a bit more complicated.
In order to make it work the async function needs to be pinned to a
single thread.
It works now, but the implementation looks a bit messy. Not sure it can
be improved through.
This commit is contained in:
Dreaded_X 2024-05-06 23:43:59 +02:00
parent 808549bcba
commit 8a0805156d
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
3 changed files with 77 additions and 28 deletions

57
Cargo.lock generated
View File

@ -17,6 +17,18 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.0.2"
@ -26,6 +38,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "allocator-api2"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f"
[[package]]
name = "android-tzdata"
version = "0.1.1"
@ -103,8 +121,10 @@ dependencies = [
"thiserror",
"tokio",
"tokio-cron-scheduler",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
"wakey",
]
@ -659,6 +679,10 @@ name = "hashbrown"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
dependencies = [
"ahash",
"allocator-api2",
]
[[package]]
name = "hdrhistogram"
@ -1943,16 +1967,19 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.8"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
dependencies = [
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"futures-util",
"hashbrown 0.14.0",
"pin-project-lite",
"slab",
"tokio",
"tracing",
]
[[package]]
@ -2130,9 +2157,9 @@ dependencies = [
[[package]]
name = "uuid"
version = "1.5.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [
"getrandom",
]
@ -2484,3 +2511,23 @@ name = "winsafe"
version = "0.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904"
[[package]]
name = "zerocopy"
version = "0.7.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.60",
]

View File

@ -43,16 +43,11 @@ enum_dispatch = "0.3.12"
indexmap = { version = "2.0.0", features = ["serde"] }
serde_yaml = "0.9.27"
tokio-cron-scheduler = "0.9.4"
mlua = { version = "0.9.7", features = [
"lua54",
"vendored",
"macros",
"serialize",
"async",
"send",
] }
mlua = { version = "0.9.7", features = ["lua54", "vendored", "macros", "serialize", "async", "send"] }
once_cell = "1.19.0"
hostname = "0.4.0"
tokio-util = { version = "0.7.11", features = ["full"] }
uuid = "1.8.0"
[patch.crates-io]
wakey = { git = "https://git.huizinga.dev/Dreaded_X/wakey" }

View File

@ -1,11 +1,14 @@
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::Arc;
use futures::future::join_all;
use futures::Future;
use mlua::FromLua;
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_cron_scheduler::{Job, JobScheduler};
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, instrument, trace};
use crate::devices::Device;
@ -180,6 +183,23 @@ impl DeviceManager {
}
}
fn run_schedule(
uuid: uuid::Uuid,
_: tokio_cron_scheduler::JobScheduler,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(async move {
// Lua is not Send, so we need to make sure that the task stays on the same thread
let pool = LocalPoolHandle::new(1);
pool.spawn_pinned(move || async move {
let lua = LUA.lock().await;
let f: mlua::Function = lua.named_registry_value(uuid.to_string().as_str()).unwrap();
f.call_async::<_, ()>(()).await.unwrap();
})
.await
.unwrap();
})
}
impl mlua::UserData for DeviceManager {
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move {
@ -192,22 +212,9 @@ impl mlua::UserData for DeviceManager {
"schedule",
|lua, this, (schedule, f): (String, mlua::Function)| async move {
debug!("schedule = {schedule}");
let uuid = this
.scheduler
.add(
Job::new_async(schedule.as_str(), |uuid, _lock| {
Box::pin(async move {
let lua = LUA.lock().await;
let f: mlua::Function =
lua.named_registry_value(uuid.to_string().as_str()).unwrap();
let job = Job::new_async(schedule.as_str(), run_schedule).unwrap();
f.call::<_, ()>(()).unwrap();
})
})
.unwrap(),
)
.await
.unwrap();
let uuid = this.scheduler.add(job).await.unwrap();
// Store the function in the registry
lua.set_named_registry_value(uuid.to_string().as_str(), f)