Fix: Scheduled function can not run async functions

Since Lua is not Send, this turned out to be a bit more complicated.
In order to make it work the async function needs to be pinned to a
single thread.
It works now, but the implementation looks a bit messy. Not sure it can
be improved through.
This commit is contained in:
2024-05-06 23:43:59 +02:00
parent bf3d757710
commit c7fc25d239
3 changed files with 77 additions and 28 deletions

View File

@@ -1,11 +1,14 @@
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::Arc;
use futures::future::join_all;
use futures::Future;
use mlua::FromLua;
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_cron_scheduler::{Job, JobScheduler};
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, instrument, trace};
use crate::devices::Device;
@@ -180,6 +183,23 @@ impl DeviceManager {
}
}
fn run_schedule(
uuid: uuid::Uuid,
_: tokio_cron_scheduler::JobScheduler,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(async move {
// Lua is not Send, so we need to make sure that the task stays on the same thread
let pool = LocalPoolHandle::new(1);
pool.spawn_pinned(move || async move {
let lua = LUA.lock().await;
let f: mlua::Function = lua.named_registry_value(uuid.to_string().as_str()).unwrap();
f.call_async::<_, ()>(()).await.unwrap();
})
.await
.unwrap();
})
}
impl mlua::UserData for DeviceManager {
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("add", |_lua, this, device: WrappedDevice| async move {
@@ -192,22 +212,9 @@ impl mlua::UserData for DeviceManager {
"schedule",
|lua, this, (schedule, f): (String, mlua::Function)| async move {
debug!("schedule = {schedule}");
let uuid = this
.scheduler
.add(
Job::new_async(schedule.as_str(), |uuid, _lock| {
Box::pin(async move {
let lua = LUA.lock().await;
let f: mlua::Function =
lua.named_registry_value(uuid.to_string().as_str()).unwrap();
let job = Job::new_async(schedule.as_str(), run_schedule).unwrap();
f.call::<_, ()>(()).unwrap();
})
})
.unwrap(),
)
.await
.unwrap();
let uuid = this.scheduler.add(job).await.unwrap();
// Store the function in the registry
lua.set_named_registry_value(uuid.to_string().as_str(), f)