Make sure the setup is done before we start listening for events

This commit is contained in:
Dreaded_X 2023-01-10 01:02:04 +01:00
parent 9d6488183f
commit 5ee8eaf8fb
3 changed files with 57 additions and 32 deletions

View File

@ -58,10 +58,11 @@ enum Command {
Fullfillment { Fullfillment {
google_home: GoogleHome, google_home: GoogleHome,
payload: google_home::Request, payload: google_home::Request,
tx: oneshot::Sender<google_home::Response> tx: oneshot::Sender<google_home::Response>,
}, },
AddDevice { AddDevice {
device: DeviceBox, device: DeviceBox,
tx: oneshot::Sender<()>
} }
} }
@ -81,7 +82,9 @@ impl DeviceHandle {
} }
pub async fn add_device(&self, device: DeviceBox) { pub async fn add_device(&self, device: DeviceBox) {
self.tx.send(Command::AddDevice { device }).await.unwrap(); let (tx, rx) = oneshot::channel();
self.tx.send(Command::AddDevice { device, tx }).await.unwrap();
rx.await.ok();
} }
} }
@ -138,7 +141,11 @@ impl Devices {
let result = google_home.handle_request(payload, &mut self.as_google_home_devices()).unwrap(); let result = google_home.handle_request(payload, &mut self.as_google_home_devices()).unwrap();
tx.send(result).ok(); tx.send(result).ok();
}, },
Command::AddDevice { device } => self.add_device(device), Command::AddDevice { device, tx } => {
self.add_device(device);
tx.send(()).ok();
},
} }
} }

View File

@ -8,7 +8,7 @@ use automation::{
config::{Config, OpenIDConfig}, config::{Config, OpenIDConfig},
devices, devices,
hue_bridge::HueBridge, hue_bridge::HueBridge,
light_sensor, mqtt, light_sensor, mqtt::{self, Mqtt},
ntfy::Ntfy, ntfy::Ntfy,
presence, presence,
}; };
@ -41,14 +41,14 @@ async fn main() {
tracing_subscriber::fmt().with_env_filter(filter).init(); tracing_subscriber::fmt().with_env_filter(filter).init();
info!("Starting automation_rs...");
let config = std::env::var("AUTOMATION_CONFIG").unwrap_or("./config/config.toml".to_owned()); let config = std::env::var("AUTOMATION_CONFIG").unwrap_or("./config/config.toml".to_owned());
let config = Config::build(&config).unwrap_or_else(|err| { let config = Config::build(&config).unwrap_or_else(|err| {
error!("Failed to load config: {err}"); error!("Failed to load config: {err}");
process::exit(1); process::exit(1);
}); });
info!("Starting automation_rs...");
// Configure MQTT // Configure MQTT
let mqtt = config.mqtt.clone(); let mqtt = config.mqtt.clone();
let mut mqttoptions = MqttOptions::new("rust-test", mqtt.host, mqtt.port); let mut mqttoptions = MqttOptions::new("rust-test", mqtt.host, mqtt.port);
@ -58,11 +58,11 @@ async fn main() {
// Create a mqtt client and wrap the eventloop // Create a mqtt client and wrap the eventloop
let (client, eventloop) = AsyncClient::new(mqttoptions, 10); let (client, eventloop) = AsyncClient::new(mqttoptions, 10);
let mqtt = mqtt::start(eventloop); let mqtt = Mqtt::new(eventloop);
let presence = presence::start(mqtt.clone(), config.presence.clone(), client.clone()).await; let presence = presence::start(mqtt.subscribe(), config.presence.clone(), client.clone()).await;
let light_sensor = light_sensor::start(mqtt.clone(), config.light_sensor.clone(), client.clone()).await; let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await;
let devices = devices::start(mqtt, presence.clone(), light_sensor.clone()); let devices = devices::start(mqtt.subscribe(), presence.clone(), light_sensor.clone());
join_all( join_all(
config config
.devices .devices
@ -86,15 +86,19 @@ async fn main() {
HueBridge::create(presence.clone(), light_sensor.clone(), hue_bridge_config); HueBridge::create(presence.clone(), light_sensor.clone(), hue_bridge_config);
} }
// Actually start listening for mqtt message,
// we wait until all the setup is done, as otherwise we might miss some messages
mqtt.start();
// Create google home fullfillment route // Create google home fullfillment route
let fullfillment = Router::new().route( let fullfillment = Router::new().route(
"/google_home", "/google_home",
post(async move |user: User, Json(payload): Json<Request>| { post(async move |user: User, Json(payload): Json<Request>| {
debug!(username = user.preferred_username, "{payload:?}"); debug!(username = user.preferred_username, "{payload:#?}");
let gc = GoogleHome::new(&user.preferred_username); let gc = GoogleHome::new(&user.preferred_username);
let result = devices.fullfillment(gc, payload).await.unwrap(); let result = devices.fullfillment(gc, payload).await.unwrap();
debug!(username = user.preferred_username, "{result:?}"); debug!(username = user.preferred_username, "{result:#?}");
return (StatusCode::OK, Json(result)); return (StatusCode::OK, Json(result));
}), }),

View File

@ -11,29 +11,43 @@ pub trait OnMqtt {
} }
pub type Receiver = watch::Receiver<Option<Publish>>; pub type Receiver = watch::Receiver<Option<Publish>>;
type Sender = watch::Sender<Option<Publish>>;
pub fn start(mut eventloop: EventLoop) -> Receiver { pub struct Mqtt {
let (tx, rx) = watch::channel(None); tx: Sender,
tokio::spawn(async move { eventloop: EventLoop,
debug!("Listening for MQTT events"); }
loop {
let notification = eventloop.poll().await; impl Mqtt {
match notification { pub fn new(eventloop: EventLoop) -> Self {
Ok(Event::Incoming(Incoming::Publish(p))) => { let (tx, _rx) = watch::channel(None);
tx.send(Some(p)).ok(); Self { tx, eventloop }
}, }
Ok(..) => continue,
Err(err) => { pub fn subscribe(&self) -> Receiver {
error!("{}", err); self.tx.subscribe()
break }
},
pub fn start(mut self) {
tokio::spawn(async move {
debug!("Listening for MQTT events");
loop {
let notification = self.eventloop.poll().await;
match notification {
Ok(Event::Incoming(Incoming::Publish(p))) => {
self.tx.send(Some(p)).ok();
},
Ok(..) => continue,
Err(err) => {
error!("{}", err);
break
},
}
} }
}
todo!("Error in MQTT (most likely lost connection to mqtt server), we need to handle these errors!"); todo!("Error in MQTT (most likely lost connection to mqtt server), we need to handle these errors!");
}); });
}
return rx;
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]