refactor: Switch to async closures

This commit is contained in:
2025-09-01 03:18:56 +02:00
parent e21ea0f34e
commit 1b8566e593
8 changed files with 109 additions and 124 deletions

View File

@@ -99,7 +99,7 @@ impl AddAdditionalMethods for HueBridge {
{ {
methods.add_async_method( methods.add_async_method(
"set_flag", "set_flag",
|lua, this, (flag, value): (mlua::Value, bool)| async move { async |lua, this, (flag, value): (mlua::Value, bool)| {
let flag: Flag = lua.from_value(flag)?; let flag: Flag = lua.from_value(flag)?;
this.set_flag(flag, value).await; this.set_flag(flag, value).await;

View File

@@ -170,7 +170,7 @@ impl AddAdditionalMethods for Ntfy {
{ {
methods.add_async_method( methods.add_async_method(
"send_notification", "send_notification",
|lua, this, notification: mlua::Value| async move { async |lua, this, notification: mlua::Value| {
let notification: Notification = lua.from_value(notification)?; let notification: Notification = lua.from_value(notification)?;
this.send(notification).await; this.send(notification).await;

View File

@@ -73,22 +73,19 @@ impl DeviceManager {
match event { match event {
Event::MqttMessage(message) => { Event::MqttMessage(message) => {
let devices = self.devices.read().await; let devices = self.devices.read().await;
let iter = devices.iter().map(|(id, device)| { let iter = devices.iter().map(async |(id, device)| {
let message = message.clone(); let device: Option<&dyn OnMqtt> = device.cast();
async move { if let Some(device) = device {
let device: Option<&dyn OnMqtt> = device.cast(); // let subscribed = device
if let Some(device) = device { // .topics()
// let subscribed = device // .iter()
// .topics() // .any(|topic| matches(&message.topic, topic));
// .iter() //
// .any(|topic| matches(&message.topic, topic)); // if subscribed {
// trace!(id, "Handling");
// if subscribed { device.on_mqtt(message.clone()).await;
trace!(id, "Handling"); trace!(id, "Done");
device.on_mqtt(message).await; // }
trace!(id, "Done");
// }
}
} }
}); });
@@ -100,7 +97,7 @@ impl DeviceManager {
impl mlua::UserData for DeviceManager { impl mlua::UserData for DeviceManager {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) { fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_async_method("add", |_lua, this, device: Box<dyn Device>| async move { methods.add_async_method("add", async |_lua, this, device: Box<dyn Device>| {
this.add(device).await; this.add(device).await;
Ok(()) Ok(())
@@ -108,7 +105,7 @@ impl mlua::UserData for DeviceManager {
methods.add_async_method( methods.add_async_method(
"schedule", "schedule",
|lua, this, (schedule, f): (String, mlua::Function)| async move { async |lua, this, (schedule, f): (String, mlua::Function)| {
debug!("schedule = {schedule}"); debug!("schedule = {schedule}");
// This creates a function, that returns the actual job we want to run // This creates a function, that returns the actual job we want to run
let create_job = { let create_job = {

View File

@@ -29,7 +29,7 @@ impl mlua::UserData for Timeout {
methods.add_async_method( methods.add_async_method(
"start", "start",
|_lua, this, (timeout, callback): (f32, ActionCallback<mlua::Value, bool>)| async move { async |_lua, this, (timeout, callback): (f32, ActionCallback<mlua::Value, bool>)| {
if let Some(handle) = this.state.write().await.handle.take() { if let Some(handle) = this.state.write().await.handle.take() {
handle.abort(); handle.abort();
} }
@@ -50,7 +50,7 @@ impl mlua::UserData for Timeout {
}, },
); );
methods.add_async_method("cancel", |_lua, this, ()| async move { methods.add_async_method("cancel", async |_lua, this, ()| {
debug!("Canceling timeout callback"); debug!("Canceling timeout callback");
if let Some(handle) = this.state.write().await.handle.take() { if let Some(handle) = this.state.write().await.handle.take() {
@@ -60,7 +60,7 @@ impl mlua::UserData for Timeout {
Ok(()) Ok(())
}); });
methods.add_async_method("is_waiting", |_lua, this, ()| async move { methods.add_async_method("is_waiting", async |_lua, this, ()| {
debug!("Canceling timeout callback"); debug!("Canceling timeout callback");
if let Some(handle) = this.state.read().await.handle.as_ref() { if let Some(handle) = this.state.read().await.handle.as_ref() {

View File

@@ -7,13 +7,13 @@ pub trait OnOff {
where where
Self: Sized + google_home::traits::OnOff + 'static, Self: Sized + google_home::traits::OnOff + 'static,
{ {
methods.add_async_method("set_on", |_lua, this, on: bool| async move { methods.add_async_method("set_on", async |_lua, this, on: bool| {
this.deref().set_on(on).await.unwrap(); this.deref().set_on(on).await.unwrap();
Ok(()) Ok(())
}); });
methods.add_async_method("on", |_lua, this, ()| async move { methods.add_async_method("on", async |_lua, this, ()| {
Ok(this.deref().on().await.unwrap()) Ok(this.deref().on().await.unwrap())
}); });
} }
@@ -25,13 +25,13 @@ pub trait Brightness {
where where
Self: Sized + google_home::traits::Brightness + 'static, Self: Sized + google_home::traits::Brightness + 'static,
{ {
methods.add_async_method("set_brightness", |_lua, this, brightness: u8| async move { methods.add_async_method("set_brightness", async |_lua, this, brightness: u8| {
this.set_brightness(brightness).await.unwrap(); this.set_brightness(brightness).await.unwrap();
Ok(()) Ok(())
}); });
methods.add_async_method("brightness", |_lua, this, _: ()| async move { methods.add_async_method("brightness", async |_lua, this, _: ()| {
Ok(this.brightness().await.unwrap()) Ok(this.brightness().await.unwrap())
}); });
} }
@@ -45,7 +45,7 @@ pub trait ColorSetting {
{ {
methods.add_async_method( methods.add_async_method(
"set_color_temperature", "set_color_temperature",
|_lua, this, temperature: u32| async move { async |_lua, this, temperature: u32| {
this.set_color(google_home::traits::Color { temperature }) this.set_color(google_home::traits::Color { temperature })
.await .await
.unwrap(); .unwrap();
@@ -54,7 +54,7 @@ pub trait ColorSetting {
}, },
); );
methods.add_async_method("color_temperature", |_lua, this, ()| async move { methods.add_async_method("color_temperature", async |_lua, this, ()| {
Ok(this.color().await.temperature) Ok(this.color().await.temperature)
}); });
} }
@@ -66,16 +66,13 @@ pub trait OpenClose {
where where
Self: Sized + google_home::traits::OpenClose + 'static, Self: Sized + google_home::traits::OpenClose + 'static,
{ {
methods.add_async_method( methods.add_async_method("set_open_percent", async |_lua, this, open_percent: u8| {
"set_open_percent", this.set_open_percent(open_percent).await.unwrap();
|_lua, this, open_percent: u8| async move {
this.set_open_percent(open_percent).await.unwrap();
Ok(()) Ok(())
}, });
);
methods.add_async_method("open_percent", |_lua, this, _: ()| async move { methods.add_async_method("open_percent", async |_lua, this, _: ()| {
Ok(this.open_percent().await.unwrap()) Ok(this.open_percent().await.unwrap())
}); });
} }

View File

@@ -27,7 +27,7 @@ impl mlua::UserData for WrappedAsyncClient {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) { fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_async_method( methods.add_async_method(
"send_message", "send_message",
|_lua, this, (topic, message): (String, mlua::Value)| async move { async |_lua, this, (topic, message): (String, mlua::Value)| {
let message = serde_json::to_string(&message).unwrap(); let message = serde_json::to_string(&message).unwrap();
debug!("message = {message}"); debug!("message = {message}");

View File

@@ -45,7 +45,7 @@ impl Impl {
quote! { quote! {
impl mlua::UserData for #name #generics { impl mlua::UserData for #name #generics {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) { fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_async_function("new", |_lua, config| async { methods.add_async_function("new", async |_lua, config| {
let device: Self = LuaDeviceCreate::create(config) let device: Self = LuaDeviceCreate::create(config)
.await .await
.map_err(mlua::ExternalError::into_lua_err)?; .map_err(mlua::ExternalError::into_lua_err)?;
@@ -58,7 +58,7 @@ impl Impl {
Ok(b) Ok(b)
}); });
methods.add_async_method("get_id", |_lua, this, _: ()| async move { Ok(this.get_id()) }); methods.add_async_method("get_id", async |_lua, this, _: ()| { Ok(this.get_id()) });
#( #(
#traits::add_methods(methods); #traits::add_methods(methods);

View File

@@ -40,15 +40,13 @@ impl GoogleHome {
let intent = request.inputs.into_iter().next(); let intent = request.inputs.into_iter().next();
let payload: OptionFuture<_> = intent let payload: OptionFuture<_> = intent
.map(|intent| async move { .map(async |intent| match intent {
match intent { Intent::Sync => ResponsePayload::Sync(self.sync(devices).await),
Intent::Sync => ResponsePayload::Sync(self.sync(devices).await), Intent::Query(payload) => {
Intent::Query(payload) => { ResponsePayload::Query(self.query(payload, devices).await)
ResponsePayload::Query(self.query(payload, devices).await) }
} Intent::Execute(payload) => {
Intent::Execute(payload) => { ResponsePayload::Execute(self.execute(payload, devices).await)
ResponsePayload::Execute(self.execute(payload, devices).await)
}
} }
}) })
.into(); .into();
@@ -64,7 +62,7 @@ impl GoogleHome {
devices: &HashMap<String, Box<T>>, devices: &HashMap<String, Box<T>>,
) -> sync::Payload { ) -> sync::Payload {
let mut resp_payload = sync::Payload::new(&self.user_id); let mut resp_payload = sync::Payload::new(&self.user_id);
let f = devices.values().map(|device| async move { let f = devices.values().map(async |device| {
if let Some(device) = device.as_ref().cast() { if let Some(device) = device.as_ref().cast() {
Some(Device::sync(device).await) Some(Device::sync(device).await)
} else { } else {
@@ -86,7 +84,7 @@ impl GoogleHome {
.devices .devices
.into_iter() .into_iter()
.map(|device| device.id) .map(|device| device.id)
.map(|id| async move { .map(async |id| {
// NOTE: Requires let_chains feature // NOTE: Requires let_chains feature
let device = if let Some(device) = devices.get(id.as_str()) let device = if let Some(device) = devices.get(id.as_str())
&& let Some(device) = device.as_ref().cast() && let Some(device) = device.as_ref().cast()
@@ -115,84 +113,77 @@ impl GoogleHome {
) -> execute::Payload { ) -> execute::Payload {
let resp_payload = Arc::new(Mutex::new(response::execute::Payload::new())); let resp_payload = Arc::new(Mutex::new(response::execute::Payload::new()));
let f = payload.commands.into_iter().map(|command| { let f = payload.commands.into_iter().map(async |command| {
let resp_payload = resp_payload.clone(); let mut success = response::execute::Command::new(execute::Status::Success);
async move { success.states = Some(execute::States {
let mut success = response::execute::Command::new(execute::Status::Success); online: true,
success.states = Some(execute::States { state: Default::default(),
online: true, });
state: Default::default(), let mut offline = response::execute::Command::new(execute::Status::Offline);
}); offline.states = Some(execute::States {
let mut offline = response::execute::Command::new(execute::Status::Offline); online: false,
offline.states = Some(execute::States { state: Default::default(),
online: false, });
state: Default::default(), let mut errors: HashMap<ErrorCode, response::execute::Command> = HashMap::new();
});
let mut errors: HashMap<ErrorCode, response::execute::Command> = HashMap::new();
let f = command let f = command
.devices .devices
.into_iter() .into_iter()
.map(|device| device.id) .map(|device| device.id)
.map(|id| { .map(async |id| {
let execution = command.execution.clone(); if let Some(device) = devices.get(id.as_str())
async move { && let Some(device) = device.as_ref().cast()
if let Some(device) = devices.get(id.as_str()) {
&& let Some(device) = device.as_ref().cast() if !device.is_online().await {
{ return (id, Ok(false));
if !device.is_online().await {
return (id, Ok(false));
}
// NOTE: We can not use .map here because async =(
let mut results = Vec::new();
for cmd in &execution {
results.push(Device::execute(device, cmd.clone()).await);
}
// Convert vec of results to a result with a vec and the first
// encountered error
let results =
results.into_iter().collect::<Result<Vec<_>, ErrorCode>>();
// TODO: We only get one error not all errors
if let Err(err) = results {
(id, Err(err))
} else {
(id, Ok(true))
}
} else {
(id.clone(), Err(DeviceError::DeviceNotFound.into()))
}
} }
});
let a = join_all(f).await; // NOTE: We can not use .map here because async =(
a.into_iter().for_each(|(id, state)| { let mut results = Vec::new();
match state { for cmd in &command.execution {
Ok(true) => success.add_id(&id), results.push(Device::execute(device, cmd.clone()).await);
Ok(false) => offline.add_id(&id), }
Err(err) => errors
.entry(err) // Convert vec of results to a result with a vec and the first
.or_insert_with(|| match &err { // encountered error
ErrorCode::DeviceError(_) => { let results = results.into_iter().collect::<Result<Vec<_>, ErrorCode>>();
response::execute::Command::new(execute::Status::Error)
} // TODO: We only get one error not all errors
ErrorCode::DeviceException(_) => { if let Err(err) = results {
response::execute::Command::new(execute::Status::Exceptions) (id, Err(err))
} } else {
}) (id, Ok(true))
.add_id(&id), }
}; } else {
(id.clone(), Err(DeviceError::DeviceNotFound.into()))
}
}); });
let mut resp_payload = resp_payload.lock().await; let a = join_all(f).await;
resp_payload.add_command(success); a.into_iter().for_each(|(id, state)| {
resp_payload.add_command(offline); match state {
for (error, mut cmd) in errors { Ok(true) => success.add_id(&id),
cmd.error_code = Some(error); Ok(false) => offline.add_id(&id),
resp_payload.add_command(cmd); Err(err) => errors
} .entry(err)
.or_insert_with(|| match &err {
ErrorCode::DeviceError(_) => {
response::execute::Command::new(execute::Status::Error)
}
ErrorCode::DeviceException(_) => {
response::execute::Command::new(execute::Status::Exceptions)
}
})
.add_id(&id),
};
});
let mut resp_payload = resp_payload.lock().await;
resp_payload.add_command(success);
resp_payload.add_command(offline);
for (error, mut cmd) in errors {
cmd.error_code = Some(error);
resp_payload.add_command(cmd);
} }
}); });