From 8a0805156d45445fe05064b570f9eda1ad1b7a04 Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Mon, 6 May 2024 23:43:59 +0200 Subject: [PATCH] Fix: Scheduled function can not run async functions Since Lua is not Send, this turned out to be a bit more complicated. In order to make it work the async function needs to be pinned to a single thread. It works now, but the implementation looks a bit messy. Not sure it can be improved through. --- Cargo.lock | 57 +++++++++++++++++++++++++++++++++++++++---- Cargo.toml | 11 +++------ src/device_manager.rs | 37 ++++++++++++++++------------ 3 files changed, 77 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7861fb7..2fe3a6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.0.2" @@ -26,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -103,8 +121,10 @@ dependencies = [ "thiserror", "tokio", "tokio-cron-scheduler", + "tokio-util", "tracing", "tracing-subscriber", + "uuid", "wakey", ] @@ -659,6 +679,10 @@ name = "hashbrown" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "hdrhistogram" @@ -1943,16 +1967,19 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", + "futures-util", + "hashbrown 0.14.0", "pin-project-lite", + "slab", "tokio", - "tracing", ] [[package]] @@ -2130,9 +2157,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.5.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "getrandom", ] @@ -2484,3 +2511,23 @@ name = "winsafe" version = "0.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" + +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] diff --git a/Cargo.toml b/Cargo.toml index 98cf13f..8486258 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,16 +43,11 @@ enum_dispatch = "0.3.12" indexmap = { version = "2.0.0", features = ["serde"] } serde_yaml = "0.9.27" tokio-cron-scheduler = "0.9.4" -mlua = { version = "0.9.7", features = [ - "lua54", - "vendored", - "macros", - "serialize", - "async", - "send", -] } +mlua = { version = "0.9.7", features = ["lua54", "vendored", "macros", "serialize", "async", "send"] } once_cell = "1.19.0" hostname = "0.4.0" +tokio-util = { version = "0.7.11", features = ["full"] } +uuid = "1.8.0" [patch.crates-io] wakey = { git = "https://git.huizinga.dev/Dreaded_X/wakey" } diff --git a/src/device_manager.rs b/src/device_manager.rs index d751855..333a45e 100644 --- a/src/device_manager.rs +++ b/src/device_manager.rs @@ -1,11 +1,14 @@ use std::collections::HashMap; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use std::sync::Arc; use futures::future::join_all; +use futures::Future; use mlua::FromLua; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio_cron_scheduler::{Job, JobScheduler}; +use tokio_util::task::LocalPoolHandle; use tracing::{debug, instrument, trace}; use crate::devices::Device; @@ -180,6 +183,23 @@ impl DeviceManager { } } +fn run_schedule( + uuid: uuid::Uuid, + _: tokio_cron_scheduler::JobScheduler, +) -> Pin + Send>> { + Box::pin(async move { + // Lua is not Send, so we need to make sure that the task stays on the same thread + let pool = LocalPoolHandle::new(1); + pool.spawn_pinned(move || async move { + let lua = LUA.lock().await; + let f: mlua::Function = lua.named_registry_value(uuid.to_string().as_str()).unwrap(); + f.call_async::<_, ()>(()).await.unwrap(); + }) + .await + .unwrap(); + }) +} + impl mlua::UserData for DeviceManager { fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) { methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move { @@ -192,22 +212,9 @@ impl mlua::UserData for DeviceManager { "schedule", |lua, this, (schedule, f): (String, mlua::Function)| async move { debug!("schedule = {schedule}"); - let uuid = this - .scheduler - .add( - Job::new_async(schedule.as_str(), |uuid, _lock| { - Box::pin(async move { - let lua = LUA.lock().await; - let f: mlua::Function = - lua.named_registry_value(uuid.to_string().as_str()).unwrap(); + let job = Job::new_async(schedule.as_str(), run_schedule).unwrap(); - f.call::<_, ()>(()).unwrap(); - }) - }) - .unwrap(), - ) - .await - .unwrap(); + let uuid = this.scheduler.add(job).await.unwrap(); // Store the function in the registry lua.set_named_registry_value(uuid.to_string().as_str(), f)