Improved error handling
This commit is contained in:
parent
e9d1cf554d
commit
13f5c87c03
18
Cargo.lock
generated
18
Cargo.lock
generated
|
@ -55,6 +55,7 @@ dependencies = [
|
|||
"axum",
|
||||
"bytes",
|
||||
"dotenvy",
|
||||
"eui48",
|
||||
"futures",
|
||||
"google-home",
|
||||
"impl_cast",
|
||||
|
@ -189,6 +190,17 @@ dependencies = [
|
|||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "eui48"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "887418ac5e8d57c2e66e04bdc2fe15f9a5407be20b54a82c86bd0e368b709701"
|
||||
dependencies = [
|
||||
"regex",
|
||||
"rustc-serialize",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flume"
|
||||
version = "0.10.14"
|
||||
|
@ -772,6 +784,12 @@ dependencies = [
|
|||
"tokio-rustls",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-serialize"
|
||||
version = "0.3.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda"
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.20.7"
|
||||
|
|
|
@ -31,6 +31,7 @@ regex = "1.7.0"
|
|||
async-trait = "0.1.61"
|
||||
async-recursion = "1.0.0"
|
||||
futures = "0.3.25"
|
||||
eui48 = { version = "1.1.0", features = ["disp_hexstring", "serde"] }
|
||||
|
||||
[profile.release]
|
||||
lto=true
|
||||
|
|
|
@ -27,7 +27,7 @@ impl GoogleHome {
|
|||
|
||||
match payload {
|
||||
Some(payload) => Ok(Response::new(request.request_id, payload)),
|
||||
_ => Err(anyhow::anyhow!("Something went wrong, expected at least ResponsePayload")),
|
||||
_ => Err(anyhow::anyhow!("Expected at least one ResponsePayload")),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
20
src/auth.rs
20
src/auth.rs
|
@ -2,11 +2,10 @@ use axum::{
|
|||
async_trait,
|
||||
extract::{FromRequestParts, FromRef},
|
||||
http::{StatusCode, request::Parts},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::config::OpenIDConfig;
|
||||
use crate::{config::OpenIDConfig, error::{ApiError, ApiErrorJson}};
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct User {
|
||||
|
@ -19,7 +18,7 @@ where
|
|||
OpenIDConfig: FromRef<S>,
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = Response;
|
||||
type Rejection = ApiError;
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
|
||||
// Get the state
|
||||
|
@ -38,23 +37,26 @@ where
|
|||
// Send the request
|
||||
let res = req.send()
|
||||
.await
|
||||
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;
|
||||
.map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?;
|
||||
|
||||
// If the request is success full the auth token is valid and we are given userinfo
|
||||
let status = res.status();
|
||||
if status.is_success() {
|
||||
let user = res.json()
|
||||
.await
|
||||
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;
|
||||
.map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?;
|
||||
|
||||
return Ok(user);
|
||||
} else {
|
||||
let err = res
|
||||
.text()
|
||||
let err: ApiErrorJson = res
|
||||
.json()
|
||||
.await
|
||||
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;
|
||||
.map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?;
|
||||
|
||||
return Err((status, err).into_response());
|
||||
let err = ApiError::try_from(err)
|
||||
.map_err(|err| ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()))?;
|
||||
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,13 @@
|
|||
use std::{fs, error::Error, net::{Ipv4Addr, SocketAddr}, collections::HashMap};
|
||||
use std::{fs, net::{Ipv4Addr, SocketAddr}, collections::HashMap};
|
||||
|
||||
use async_recursion::async_recursion;
|
||||
use regex::{Regex, Captures};
|
||||
use tracing::{debug, trace, error};
|
||||
use tracing::{debug, trace};
|
||||
use rumqttc::{AsyncClient, has_wildcards};
|
||||
use serde::Deserialize;
|
||||
use eui48::MacAddress;
|
||||
|
||||
use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor, KasaOutlet, AsOnOff};
|
||||
|
||||
// @TODO Configure more defaults
|
||||
use crate::{devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor, KasaOutlet}, error::{FailedToParseConfig, MissingEnv, MissingWildcard, Error, FailedToCreateDevice}};
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
|
@ -124,14 +123,20 @@ pub struct PresenceDeviceConfig {
|
|||
|
||||
impl PresenceDeviceConfig {
|
||||
/// Set the mqtt topic to an appropriate value if it is not already set
|
||||
fn generate_topic(&mut self, class: &str, identifier: &str, config: &Config) {
|
||||
fn generate_topic(&mut self, class: &str, identifier: &str, config: &Config) -> Result<(), MissingWildcard> {
|
||||
if self.mqtt.is_none() {
|
||||
if !has_wildcards(&config.presence.topic) {
|
||||
return Err(MissingWildcard::new(&config.presence.topic).into());
|
||||
}
|
||||
|
||||
// @TODO This is not perfect, if the topic is some/+/thing/# this will fail
|
||||
let offset = config.presence.topic.find('+').or(config.presence.topic.find('#')).unwrap();
|
||||
let topic = config.presence.topic[..offset].to_owned() + class + "/" + identifier;
|
||||
trace!("Setting presence mqtt topic: {topic}");
|
||||
self.mqtt = Some(MqttDeviceConfig { topic });
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,7 +155,7 @@ pub enum Device {
|
|||
info: InfoConfig,
|
||||
#[serde(flatten)]
|
||||
mqtt: MqttDeviceConfig,
|
||||
mac_address: String,
|
||||
mac_address: MacAddress,
|
||||
},
|
||||
KasaOutlet {
|
||||
ip: Ipv4Addr,
|
||||
|
@ -169,39 +174,31 @@ pub enum Device {
|
|||
}
|
||||
|
||||
impl Config {
|
||||
pub fn build(filename: &str) -> Result<Self, Box<dyn Error>> {
|
||||
pub fn parse_file(filename: &str) -> Result<Self, FailedToParseConfig> {
|
||||
debug!("Loading config: {filename}");
|
||||
let file = fs::read_to_string(filename)?;
|
||||
let file = fs::read_to_string(filename)
|
||||
.map_err(|err| FailedToParseConfig::new(filename, err.into()))?;
|
||||
|
||||
// Substitute in environment variables
|
||||
let re = Regex::new(r"\$\{(.*)\}").unwrap();
|
||||
let mut failure = false;
|
||||
let mut missing = MissingEnv::new();
|
||||
let file = re.replace_all(&file, |caps: &Captures| {
|
||||
let key = caps.get(1).unwrap().as_str();
|
||||
debug!("Substituting '{key}' in config");
|
||||
match std::env::var(key) {
|
||||
Ok(value) => value,
|
||||
Err(_) => {
|
||||
failure = true;
|
||||
error!("Environment variable '{key}' is not set");
|
||||
missing.add_missing(key);
|
||||
"".to_string()
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if failure {
|
||||
return Err("Missing environment variables".into());
|
||||
}
|
||||
missing.has_missing()
|
||||
.map_err(|err| FailedToParseConfig::new(filename, err.into()))?;
|
||||
|
||||
let config: Config = toml::from_str(&file)?;
|
||||
|
||||
// Some extra config validation
|
||||
if !has_wildcards(&config.presence.topic) {
|
||||
return Err(format!("Invalid presence topic '{}', needs to contain a wildcard (+/#) in order to listen to presence devices", config.presence.topic).into());
|
||||
}
|
||||
|
||||
// @TODO It would be nice it was possible to add validation to serde,
|
||||
// that way we can check that the provided mqtt topics are actually valid
|
||||
let config: Config = toml::from_str(&file)
|
||||
.map_err(|err| FailedToParseConfig::new(filename, err.into()))?;
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
@ -209,48 +206,51 @@ impl Config {
|
|||
|
||||
impl Device {
|
||||
#[async_recursion]
|
||||
pub async fn into(self, identifier: String, config: &Config, client: AsyncClient) -> DeviceBox {
|
||||
let device: DeviceBox = match self {
|
||||
pub async fn create(self, identifier: String, config: &Config, client: AsyncClient) -> Result<DeviceBox, FailedToCreateDevice> {
|
||||
let device: Result<DeviceBox, Error> = match self {
|
||||
Device::IkeaOutlet { info, mqtt, kettle } => {
|
||||
trace!(id = identifier, "IkeaOutlet [{} in {:?}]", info.name, info.room);
|
||||
Box::new(IkeaOutlet::new(identifier, info, mqtt, kettle, client).await)
|
||||
match IkeaOutlet::build(&identifier, info, mqtt, kettle, client).await {
|
||||
Ok(device) => Ok(Box::new(device)),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
},
|
||||
Device::WakeOnLAN { info, mqtt, mac_address } => {
|
||||
trace!(id = identifier, "WakeOnLan [{} in {:?}]", info.name, info.room);
|
||||
Box::new(WakeOnLAN::new(identifier, info, mqtt, mac_address, client).await)
|
||||
match WakeOnLAN::build(&identifier, info, mqtt, mac_address, client).await {
|
||||
Ok(device) => Ok(Box::new(device)),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
},
|
||||
Device::KasaOutlet { ip } => {
|
||||
trace!(id = identifier, "KasaOutlet [{}]", identifier);
|
||||
Box::new(KasaOutlet::new(identifier, ip))
|
||||
Ok(Box::new(KasaOutlet::new(&identifier, ip)))
|
||||
}
|
||||
Device::AudioSetup { mqtt, mixer, speakers } => {
|
||||
trace!(id = identifier, "AudioSetup [{}]", identifier);
|
||||
// Create the child devices
|
||||
let mixer = (*mixer).into(identifier.clone() + ".mixer", config, client.clone()).await;
|
||||
let speakers = (*speakers).into(identifier.clone() + ".speakers", config, client.clone()).await;
|
||||
let mixer = (*mixer).create(identifier.clone() + ".mixer", config, client.clone()).await?;
|
||||
let speakers = (*speakers).create(identifier.clone() + ".speakers", config, client.clone()).await?;
|
||||
|
||||
// The AudioSetup expects the children to be something that implements the OnOff trait
|
||||
// So let's convert the children and make sure OnOff is implemented
|
||||
let mixer = match AsOnOff::consume(mixer) {
|
||||
Some(mixer) => mixer,
|
||||
None => todo!("Handle this properly"),
|
||||
};
|
||||
let speakers = match AsOnOff::consume(speakers) {
|
||||
Some(speakers) => speakers,
|
||||
None => todo!("Handle this properly"),
|
||||
};
|
||||
|
||||
Box::new(AudioSetup::new(identifier, mqtt, mixer, speakers, client).await)
|
||||
match AudioSetup::build(&identifier, mqtt, mixer, speakers, client).await {
|
||||
Ok(device) => Ok(Box::new(device)),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
},
|
||||
Device::ContactSensor { mqtt, mut presence } => {
|
||||
trace!(id = identifier, "ContactSensor [{}]", identifier);
|
||||
if let Some(presence) = &mut presence {
|
||||
presence.generate_topic("contact", &identifier, &config);
|
||||
presence.generate_topic("contact", &identifier, &config)
|
||||
.map_err(|err| FailedToCreateDevice::new(&identifier, err.into()))?;
|
||||
}
|
||||
|
||||
match ContactSensor::build(&identifier, mqtt, presence, client).await {
|
||||
Ok(device) => Ok(Box::new(device)),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
Box::new(ContactSensor::new(identifier, mqtt, presence, client).await)
|
||||
},
|
||||
};
|
||||
|
||||
return device;
|
||||
return device.map_err(|err| FailedToCreateDevice::new(&identifier, err));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ use pollster::FutureExt;
|
|||
use tokio::sync::{oneshot, mpsc};
|
||||
use tracing::{trace, debug, span, Level};
|
||||
|
||||
use crate::{mqtt::{OnMqtt, self}, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}};
|
||||
use crate::{mqtt::{OnMqtt, self}, presence::{OnPresence, self}, light_sensor::{OnDarkness, self}, error};
|
||||
|
||||
impl_cast::impl_cast!(Device, OnMqtt);
|
||||
impl_cast::impl_cast!(Device, OnPresence);
|
||||
|
@ -58,7 +58,7 @@ enum Command {
|
|||
Fullfillment {
|
||||
google_home: GoogleHome,
|
||||
payload: google_home::Request,
|
||||
tx: oneshot::Sender<google_home::Response>,
|
||||
tx: oneshot::Sender<anyhow::Result<google_home::Response>>,
|
||||
},
|
||||
AddDevice {
|
||||
device: DeviceBox,
|
||||
|
@ -75,16 +75,16 @@ pub struct DeviceHandle {
|
|||
|
||||
impl DeviceHandle {
|
||||
// @TODO Improve error type
|
||||
pub async fn fullfillment(&self, google_home: GoogleHome, payload: google_home::Request) -> Result<google_home::Response, oneshot::error::RecvError> {
|
||||
pub async fn fullfillment(&self, google_home: GoogleHome, payload: google_home::Request) -> anyhow::Result<google_home::Response> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx.send(Command::Fullfillment { google_home, payload, tx }).await.unwrap();
|
||||
rx.await
|
||||
self.tx.send(Command::Fullfillment { google_home, payload, tx }).await?;
|
||||
rx.await?
|
||||
}
|
||||
|
||||
pub async fn add_device(&self, device: DeviceBox) {
|
||||
pub async fn add_device(&self, device: DeviceBox) -> error::Result<()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx.send(Command::AddDevice { device, tx }).await.unwrap();
|
||||
rx.await.ok();
|
||||
self.tx.send(Command::AddDevice { device, tx }).await?;
|
||||
Ok(rx.await?)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -123,7 +123,7 @@ impl Devices {
|
|||
fn handle_cmd(&mut self, cmd: Command) {
|
||||
match cmd {
|
||||
Command::Fullfillment { google_home, payload, tx } => {
|
||||
let result = google_home.handle_request(payload, &mut self.as_google_home_devices()).unwrap();
|
||||
let result = google_home.handle_request(payload, &mut self.as_google_home_devices());
|
||||
tx.send(result).ok();
|
||||
},
|
||||
Command::AddDevice { device, tx } => {
|
||||
|
|
|
@ -4,10 +4,11 @@ use rumqttc::{AsyncClient, matches};
|
|||
use tracing::{error, warn, debug};
|
||||
|
||||
use crate::config::MqttDeviceConfig;
|
||||
use crate::error;
|
||||
use crate::mqtt::{OnMqtt, RemoteMessage, RemoteAction};
|
||||
use crate::presence::OnPresence;
|
||||
|
||||
use super::Device;
|
||||
use super::{Device, DeviceBox, AsOnOff};
|
||||
|
||||
// @TODO Ideally we store am Arc to the childern devices,
|
||||
// that way they hook into everything just like all other devices
|
||||
|
@ -20,10 +21,20 @@ pub struct AudioSetup {
|
|||
}
|
||||
|
||||
impl AudioSetup {
|
||||
pub async fn new(identifier: String, mqtt: MqttDeviceConfig, mixer: Box<dyn traits::OnOff + Sync + Send>, speakers: Box<dyn traits::OnOff + Sync + Send>, client: AsyncClient) -> Self {
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap();
|
||||
pub async fn build(identifier: &str, mqtt: MqttDeviceConfig, mixer: DeviceBox, speakers: DeviceBox, client: AsyncClient) -> error::Result<Self> {
|
||||
// We expect the children devices to implement the OnOff trait
|
||||
let mixer = match AsOnOff::consume(mixer) {
|
||||
Some(mixer) => mixer,
|
||||
None => Err(error::ExpectedOnOff::new(&(identifier.to_owned() + ".mixer")))?,
|
||||
};
|
||||
let speakers = match AsOnOff::consume(speakers) {
|
||||
Some(speakers) => speakers,
|
||||
None => Err(error::ExpectedOnOff::new(&(identifier.to_owned() + ".speakers")))?,
|
||||
};
|
||||
|
||||
Self { identifier, mqtt, mixer, speakers }
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
|
||||
|
||||
Ok(Self { identifier: identifier.to_owned(), mqtt, mixer, speakers })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ use rumqttc::{AsyncClient, matches};
|
|||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, debug, warn};
|
||||
|
||||
use crate::{config::{MqttDeviceConfig, PresenceDeviceConfig}, mqtt::{OnMqtt, ContactMessage, PresenceMessage}, presence::OnPresence};
|
||||
use crate::{config::{MqttDeviceConfig, PresenceDeviceConfig}, mqtt::{OnMqtt, ContactMessage, PresenceMessage}, presence::OnPresence, error};
|
||||
|
||||
use super::Device;
|
||||
|
||||
|
@ -22,18 +22,18 @@ pub struct ContactSensor {
|
|||
}
|
||||
|
||||
impl ContactSensor {
|
||||
pub async fn new(identifier: String, mqtt: MqttDeviceConfig, presence: Option<PresenceDeviceConfig>, client: AsyncClient) -> Self {
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap();
|
||||
pub async fn build(identifier: &str, mqtt: MqttDeviceConfig, presence: Option<PresenceDeviceConfig>, client: AsyncClient) -> error::Result<Self> {
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
|
||||
|
||||
Self {
|
||||
identifier,
|
||||
Ok(Self {
|
||||
identifier: identifier.to_owned(),
|
||||
mqtt,
|
||||
presence,
|
||||
client,
|
||||
overall_presence: false,
|
||||
is_closed: true,
|
||||
handle: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ impl OnMqtt for ContactSensor {
|
|||
// This is to prevent the house from being marked as present for however long the
|
||||
// timeout is set when leaving the house
|
||||
if !self.overall_presence {
|
||||
self.client.publish(topic, rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&PresenceMessage::new(true)).unwrap()).await.unwrap();
|
||||
self.client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&PresenceMessage::new(true)).unwrap()).await.map_err(|err| warn!("Failed to publish presence on {topic}: {err}")).ok();
|
||||
}
|
||||
} else {
|
||||
// Once the door is closed again we start a timeout for removing the presence
|
||||
|
@ -109,7 +109,7 @@ impl OnMqtt for ContactSensor {
|
|||
debug!(id, "Starting timeout ({timeout:?}) for contact sensor...");
|
||||
tokio::time::sleep(timeout).await;
|
||||
debug!(id, "Removing door device!");
|
||||
client.publish(topic, rumqttc::QoS::AtLeastOnce, false, "").await.unwrap();
|
||||
client.publish(topic.clone(), rumqttc::QoS::AtLeastOnce, false, "").await.map_err(|err| warn!("Failed to publish presence on {topic}: {err}")).ok();
|
||||
})
|
||||
);
|
||||
}
|
||||
|
|
|
@ -4,12 +4,13 @@ use async_trait::async_trait;
|
|||
use google_home::errors::ErrorCode;
|
||||
use google_home::{GoogleHomeDevice, device, types::Type, traits::{self, OnOff}};
|
||||
use rumqttc::{AsyncClient, Publish, matches};
|
||||
use tracing::{debug, trace, error};
|
||||
use tracing::{debug, trace, error, warn};
|
||||
use tokio::task::JoinHandle;
|
||||
use pollster::FutureExt as _;
|
||||
|
||||
use crate::config::{KettleConfig, InfoConfig, MqttDeviceConfig};
|
||||
use crate::devices::Device;
|
||||
use crate::error;
|
||||
use crate::mqtt::{OnMqtt, OnOffMessage};
|
||||
use crate::presence::OnPresence;
|
||||
|
||||
|
@ -26,11 +27,11 @@ pub struct IkeaOutlet {
|
|||
}
|
||||
|
||||
impl IkeaOutlet {
|
||||
pub async fn new(identifier: String, info: InfoConfig, mqtt: MqttDeviceConfig, kettle: Option<KettleConfig>, client: AsyncClient) -> Self {
|
||||
pub async fn build(identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, kettle: Option<KettleConfig>, client: AsyncClient) -> error::Result<Self> {
|
||||
// @TODO Handle potential errors here
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap();
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
|
||||
|
||||
Self{ identifier, info, mqtt, kettle, client, last_known_state: false, handle: None }
|
||||
Ok(Self{ identifier: identifier.to_owned(), info, mqtt, kettle, client, last_known_state: false, handle: None })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,7 +39,7 @@ async fn set_on(client: AsyncClient, topic: String, on: bool) {
|
|||
let message = OnOffMessage::new(on);
|
||||
|
||||
// @TODO Handle potential errors here
|
||||
client.publish(topic + "/set", rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap()).await.unwrap();
|
||||
client.publish(topic.clone() + "/set", rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap()).await.map_err(|err| warn!("Failed to update state on {topic}: {err}")).ok();
|
||||
}
|
||||
|
||||
impl Device for IkeaOutlet {
|
||||
|
|
|
@ -13,9 +13,8 @@ pub struct KasaOutlet {
|
|||
}
|
||||
|
||||
impl KasaOutlet {
|
||||
pub fn new(identifier: String, ip: Ipv4Addr) -> Self {
|
||||
// @TODO Get the current state of the outlet
|
||||
Self { identifier, addr: (ip, 9999).into() }
|
||||
pub fn new(identifier: &str, ip: Ipv4Addr) -> Self {
|
||||
Self { identifier: identifier.to_owned(), addr: (ip, 9999).into() }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,8 +3,9 @@ use google_home::{GoogleHomeDevice, types::Type, device, traits::{self, Scene},
|
|||
use tracing::{debug, error};
|
||||
use rumqttc::{AsyncClient, Publish, matches};
|
||||
use pollster::FutureExt as _;
|
||||
use eui48::MacAddress;
|
||||
|
||||
use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::{OnMqtt, ActivateMessage}};
|
||||
use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::{OnMqtt, ActivateMessage}, error};
|
||||
|
||||
use super::Device;
|
||||
|
||||
|
@ -13,15 +14,15 @@ pub struct WakeOnLAN {
|
|||
identifier: String,
|
||||
info: InfoConfig,
|
||||
mqtt: MqttDeviceConfig,
|
||||
mac_address: String,
|
||||
mac_address: MacAddress,
|
||||
}
|
||||
|
||||
impl WakeOnLAN {
|
||||
pub async fn new(identifier: String, info: InfoConfig, mqtt: MqttDeviceConfig, mac_address: String, client: AsyncClient) -> Self {
|
||||
pub async fn build(identifier: &str, info: InfoConfig, mqtt: MqttDeviceConfig, mac_address: MacAddress, client: AsyncClient) -> error::Result<Self> {
|
||||
// @TODO Handle potential errors here
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap();
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
|
||||
|
||||
Self { identifier, info, mqtt, mac_address }
|
||||
Ok(Self { identifier: identifier.to_owned(), info, mqtt, mac_address })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
203
src/error.rs
Normal file
203
src/error.rs
Normal file
|
@ -0,0 +1,203 @@
|
|||
use std::{fmt, error, result};
|
||||
|
||||
use axum::{response::IntoResponse, http::status::InvalidStatusCode};
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
pub type Error = Box<dyn error::Error>;
|
||||
pub type Result<T> = result::Result<T, Error>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MissingEnv {
|
||||
keys: Vec<String>
|
||||
}
|
||||
|
||||
// @TODO Would be nice to somehow get the line number of the missing keys
|
||||
impl MissingEnv {
|
||||
pub fn new() -> Self {
|
||||
Self { keys: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn add_missing(&mut self, key: &str) {
|
||||
self.keys.push(key.to_owned());
|
||||
}
|
||||
|
||||
pub fn has_missing(self) -> result::Result<(), Self> {
|
||||
if self.keys.len() > 0 {
|
||||
Err(self)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for MissingEnv {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Missing environment variable")?;
|
||||
if self.keys.len() == 0 {
|
||||
unreachable!("This error should only be returned if there are actually missing environment variables");
|
||||
}
|
||||
if self.keys.len() == 1 {
|
||||
write!(f, " '{}'", self.keys[0])?;
|
||||
} else {
|
||||
write!(f, "s '{}'", self.keys[0])?;
|
||||
self.keys.iter().skip(1).map(|key| {
|
||||
write!(f, ", '{key}'")
|
||||
}).collect::<fmt::Result>()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for MissingEnv {}
|
||||
|
||||
|
||||
// @TODO Would be nice to somehow get the line number of the expected wildcard topic
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MissingWildcard {
|
||||
topic: String
|
||||
}
|
||||
|
||||
impl MissingWildcard {
|
||||
pub fn new(topic: &str) -> Self {
|
||||
Self { topic: topic.to_owned() }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for MissingWildcard {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Topic '{}' is exptected to be a wildcard topic", self.topic)
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for MissingWildcard {}
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FailedToParseConfig {
|
||||
config: String,
|
||||
cause: Error,
|
||||
}
|
||||
|
||||
impl FailedToParseConfig {
|
||||
pub fn new(config: &str, cause: Error) -> Self {
|
||||
Self { config: config.to_owned(), cause }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FailedToParseConfig {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Failed to parse config '{}'", self.config)
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for FailedToParseConfig {
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(self.cause.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FailedToCreateDevice {
|
||||
device: String,
|
||||
cause: Error,
|
||||
}
|
||||
|
||||
impl FailedToCreateDevice {
|
||||
pub fn new(device: &str, cause: Error) -> Self {
|
||||
Self { device: device.to_owned(), cause }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FailedToCreateDevice {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Failed to create device '{}'", self.device)
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for FailedToCreateDevice {
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
Some(self.cause.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ExpectedOnOff {
|
||||
device: String
|
||||
}
|
||||
|
||||
impl ExpectedOnOff {
|
||||
pub fn new(device: &str) -> Self {
|
||||
Self { device: device.to_owned() }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ExpectedOnOff {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "Expected device '{}' to implement OnOff trait", self.device)
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for ExpectedOnOff {}
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ApiError {
|
||||
status_code: axum::http::StatusCode,
|
||||
error: Error,
|
||||
}
|
||||
|
||||
impl ApiError {
|
||||
pub fn new(status_code: axum::http::StatusCode, error: Error) -> Self {
|
||||
Self { status_code, error }
|
||||
}
|
||||
|
||||
pub fn prepare_for_json(&self) -> ApiErrorJson {
|
||||
let error = ApiErrorJsonError {
|
||||
code: self.status_code.as_u16(),
|
||||
status: self.status_code.to_string(),
|
||||
reason: self.error.to_string(),
|
||||
};
|
||||
|
||||
ApiErrorJson { error }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ApiError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.error.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for ApiError {}
|
||||
|
||||
impl IntoResponse for ApiError {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
(self.status_code, serde_json::to_string(&self.prepare_for_json()).unwrap()).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct ApiErrorJsonError {
|
||||
code: u16,
|
||||
status: String,
|
||||
reason: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ApiErrorJson {
|
||||
error: ApiErrorJsonError,
|
||||
}
|
||||
|
||||
impl TryFrom<ApiErrorJson> for ApiError {
|
||||
type Error = InvalidStatusCode;
|
||||
|
||||
fn try_from(value: ApiErrorJson) -> result::Result<Self, Self::Error> {
|
||||
let status_code = axum::http::StatusCode::from_u16(value.error.code)?;
|
||||
let error = value.error.reason.into();
|
||||
|
||||
Ok(Self { status_code, error })
|
||||
}
|
||||
}
|
|
@ -11,7 +11,7 @@ pub enum Flag {
|
|||
Darkness,
|
||||
}
|
||||
|
||||
pub struct HueBridge {
|
||||
struct HueBridge {
|
||||
addr: SocketAddr,
|
||||
login: String,
|
||||
flags: Flags,
|
||||
|
@ -23,37 +23,8 @@ struct FlagMessage {
|
|||
}
|
||||
|
||||
impl HueBridge {
|
||||
pub fn create(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: HueBridgeConfig) {
|
||||
let mut hue_bridge = Self {
|
||||
addr: (config.ip, 80).into(),
|
||||
login: config.login,
|
||||
flags: config.flags,
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = presence_rx.changed() => {
|
||||
if !res.is_ok() {
|
||||
break;
|
||||
}
|
||||
|
||||
let presence = *presence_rx.borrow();
|
||||
hue_bridge.on_presence(presence).await;
|
||||
}
|
||||
res = light_sensor_rx.changed() => {
|
||||
if !res.is_ok() {
|
||||
break;
|
||||
}
|
||||
|
||||
let darkness = *light_sensor_rx.borrow();
|
||||
hue_bridge.on_darkness(darkness).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!("Did not expect this");
|
||||
});
|
||||
pub fn new(addr: SocketAddr, login: &str, flags: Flags) -> Self {
|
||||
Self { addr, login: login.to_owned(), flags }
|
||||
}
|
||||
|
||||
pub async fn set_flag(&self, flag: Flag, value: bool) {
|
||||
|
@ -83,6 +54,35 @@ impl HueBridge {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn start(mut presence_rx: presence::Receiver, mut light_sensor_rx: light_sensor::Receiver, config: HueBridgeConfig) {
|
||||
let mut hue_bridge = HueBridge::new((config.ip, 80).into(), &config.login, config.flags);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = presence_rx.changed() => {
|
||||
if !res.is_ok() {
|
||||
break;
|
||||
}
|
||||
|
||||
let presence = *presence_rx.borrow();
|
||||
hue_bridge.on_presence(presence).await;
|
||||
}
|
||||
res = light_sensor_rx.changed() => {
|
||||
if !res.is_ok() {
|
||||
break;
|
||||
}
|
||||
|
||||
let darkness = *light_sensor_rx.borrow();
|
||||
hue_bridge.on_darkness(darkness).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!("Did not expect this");
|
||||
});
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl OnPresence for HueBridge {
|
||||
async fn on_presence(&mut self, presence: bool) {
|
||||
|
|
|
@ -7,3 +7,4 @@ pub mod ntfy;
|
|||
pub mod light_sensor;
|
||||
pub mod hue_bridge;
|
||||
pub mod auth;
|
||||
pub mod error;
|
||||
|
|
|
@ -3,7 +3,7 @@ use rumqttc::{matches, AsyncClient};
|
|||
use tokio::sync::watch;
|
||||
use tracing::{error, trace, debug};
|
||||
|
||||
use crate::{config::{MqttDeviceConfig, LightSensorConfig}, mqtt::{self, OnMqtt, BrightnessMessage}};
|
||||
use crate::{config::{MqttDeviceConfig, LightSensorConfig}, mqtt::{self, OnMqtt, BrightnessMessage}, error};
|
||||
|
||||
#[async_trait]
|
||||
pub trait OnDarkness {
|
||||
|
@ -14,18 +14,25 @@ pub type Receiver = watch::Receiver<bool>;
|
|||
type Sender = watch::Sender<bool>;
|
||||
|
||||
struct LightSensor {
|
||||
is_dark: Receiver,
|
||||
mqtt: MqttDeviceConfig,
|
||||
min: isize,
|
||||
max: isize,
|
||||
tx: Sender,
|
||||
is_dark: Receiver,
|
||||
}
|
||||
|
||||
pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, client: AsyncClient) -> Receiver {
|
||||
client.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap();
|
||||
impl LightSensor {
|
||||
fn new(mqtt: MqttDeviceConfig, min: isize, max: isize) -> Self {
|
||||
let (tx, is_dark) = watch::channel(false);
|
||||
Self { is_dark: is_dark.clone(), mqtt, min, max, tx }
|
||||
}
|
||||
}
|
||||
|
||||
let (tx, is_dark) = watch::channel(false);
|
||||
let mut light_sensor = LightSensor { is_dark: is_dark.clone(), mqtt: config.mqtt, min: config.min, max: config.max, tx };
|
||||
pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, client: AsyncClient) -> error::Result<Receiver> {
|
||||
client.subscribe(config.mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
|
||||
|
||||
let mut light_sensor = LightSensor::new(config.mqtt, config.min, config.max);
|
||||
let is_dark = light_sensor.is_dark.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
|
@ -36,7 +43,7 @@ pub async fn start(mut mqtt_rx: mqtt::Receiver, config: LightSensorConfig, clien
|
|||
}
|
||||
});
|
||||
|
||||
return is_dark;
|
||||
Ok(is_dark)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
57
src/main.rs
57
src/main.rs
|
@ -1,16 +1,16 @@
|
|||
#![feature(async_closure)]
|
||||
use std::{process, time::Duration};
|
||||
|
||||
use axum::{extract::FromRef, http::StatusCode, routing::post, Json, Router};
|
||||
use axum::{extract::FromRef, http::StatusCode, routing::post, Json, Router, response::IntoResponse};
|
||||
|
||||
use automation::{
|
||||
auth::User,
|
||||
config::{Config, OpenIDConfig},
|
||||
devices,
|
||||
hue_bridge::HueBridge,
|
||||
hue_bridge,
|
||||
light_sensor, mqtt::Mqtt,
|
||||
ntfy::Ntfy,
|
||||
presence,
|
||||
ntfy,
|
||||
presence, error::ApiError,
|
||||
};
|
||||
use dotenvy::dotenv;
|
||||
use rumqttc::{AsyncClient, MqttOptions, Transport};
|
||||
|
@ -33,6 +33,19 @@ impl FromRef<AppState> for automation::config::OpenIDConfig {
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
if let Err(err) = app().await {
|
||||
error!("Error: {err}");
|
||||
let mut cause = err.source();
|
||||
while let Some(c) = cause {
|
||||
error!("Cause: {c}");
|
||||
cause = c.source();
|
||||
}
|
||||
process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async fn app() -> Result<(), Box<dyn std::error::Error>> {
|
||||
dotenv().ok();
|
||||
|
||||
let filter = EnvFilter::builder()
|
||||
|
@ -44,10 +57,7 @@ async fn main() {
|
|||
info!("Starting automation_rs...");
|
||||
|
||||
let config = std::env::var("AUTOMATION_CONFIG").unwrap_or("./config/config.toml".to_owned());
|
||||
let config = Config::build(&config).unwrap_or_else(|err| {
|
||||
error!("Failed to load config: {err}");
|
||||
process::exit(1);
|
||||
});
|
||||
let config = Config::parse_file(&config)?;
|
||||
|
||||
// Configure MQTT
|
||||
let mqtt = config.mqtt.clone();
|
||||
|
@ -59,8 +69,8 @@ async fn main() {
|
|||
// Create a mqtt client and wrap the eventloop
|
||||
let (client, eventloop) = AsyncClient::new(mqttoptions, 10);
|
||||
let mqtt = Mqtt::new(eventloop);
|
||||
let presence = presence::start(mqtt.subscribe(), config.presence.clone(), client.clone()).await;
|
||||
let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await;
|
||||
let presence = presence::start(config.presence.clone(), mqtt.subscribe(), client.clone()).await?;
|
||||
let light_sensor = light_sensor::start(mqtt.subscribe(), config.light_sensor.clone(), client.clone()).await?;
|
||||
|
||||
let devices = devices::start(mqtt.subscribe(), presence.clone(), light_sensor.clone());
|
||||
join_all(
|
||||
|
@ -69,21 +79,20 @@ async fn main() {
|
|||
.clone()
|
||||
.into_iter()
|
||||
.map(|(identifier, device_config)| async {
|
||||
// This can technically block, but this only happens during start-up, so should not be
|
||||
// a problem
|
||||
let device = device_config.into(identifier, &config, client.clone()).await;
|
||||
devices.add_device(device).await;
|
||||
let device = device_config.create(identifier, &config, client.clone()).await?;
|
||||
devices.add_device(device).await?;
|
||||
Ok::<(), Box<dyn std::error::Error>>(())
|
||||
})
|
||||
).await;
|
||||
).await.into_iter().collect::<Result<_, _>>()?;
|
||||
|
||||
// Start the ntfy service if it is configured
|
||||
if let Some(ntfy_config) = config.ntfy {
|
||||
Ntfy::create(presence.clone(), ntfy_config);
|
||||
ntfy::start(presence.clone(), &ntfy_config);
|
||||
}
|
||||
|
||||
// Start he hue bridge if it is configured
|
||||
if let Some(hue_bridge_config) = config.hue_bridge {
|
||||
HueBridge::create(presence.clone(), light_sensor.clone(), hue_bridge_config);
|
||||
hue_bridge::start(presence.clone(), light_sensor.clone(), hue_bridge_config);
|
||||
}
|
||||
|
||||
// Actually start listening for mqtt message,
|
||||
|
@ -96,11 +105,14 @@ async fn main() {
|
|||
post(async move |user: User, Json(payload): Json<Request>| {
|
||||
debug!(username = user.preferred_username, "{payload:#?}");
|
||||
let gc = GoogleHome::new(&user.preferred_username);
|
||||
let result = devices.fullfillment(gc, payload).await.unwrap();
|
||||
let result = match devices.fullfillment(gc, payload).await {
|
||||
Ok(result) => result,
|
||||
Err(err) => return ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, err.into()).into_response(),
|
||||
};
|
||||
|
||||
debug!(username = user.preferred_username, "{result:#?}");
|
||||
|
||||
return (StatusCode::OK, Json(result));
|
||||
return (StatusCode::OK, Json(result)).into_response();
|
||||
}),
|
||||
);
|
||||
|
||||
|
@ -114,8 +126,9 @@ async fn main() {
|
|||
// Start the web server
|
||||
let addr = config.fullfillment.into();
|
||||
info!("Server started on http://{addr}");
|
||||
axum::Server::bind(&addr)
|
||||
axum::Server::try_bind(&addr)?
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
.unwrap();
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
27
src/ntfy.rs
27
src/ntfy.rs
|
@ -7,7 +7,7 @@ use serde_repr::*;
|
|||
|
||||
use crate::{presence::{self, OnPresence}, config::NtfyConfig};
|
||||
|
||||
pub struct Ntfy {
|
||||
struct Ntfy {
|
||||
base_url: String,
|
||||
topic: String
|
||||
}
|
||||
|
@ -88,19 +88,24 @@ impl Notification {
|
|||
}
|
||||
|
||||
impl Ntfy {
|
||||
pub fn create(mut rx: presence::Receiver, config: NtfyConfig) {
|
||||
let mut ntfy = Self { base_url: config.url, topic: config.topic };
|
||||
tokio::spawn(async move {
|
||||
while rx.changed().await.is_ok() {
|
||||
let presence = *rx.borrow();
|
||||
ntfy.on_presence(presence).await;
|
||||
}
|
||||
|
||||
unreachable!("Did not expect this");
|
||||
});
|
||||
fn new(base_url: &str, topic: &str) -> Self {
|
||||
Self { base_url: base_url.to_owned(), topic: topic.to_owned() }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(mut rx: presence::Receiver, config: &NtfyConfig) {
|
||||
let mut ntfy = Ntfy::new(&config.url, &config.topic);
|
||||
|
||||
tokio::spawn(async move {
|
||||
while rx.changed().await.is_ok() {
|
||||
let presence = *rx.borrow();
|
||||
ntfy.on_presence(presence).await;
|
||||
}
|
||||
|
||||
unreachable!("Did not expect this");
|
||||
});
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl OnPresence for Ntfy {
|
||||
async fn on_presence(&mut self, presence: bool) {
|
||||
|
|
|
@ -3,9 +3,9 @@ use std::collections::HashMap;
|
|||
use async_trait::async_trait;
|
||||
use tokio::sync::watch;
|
||||
use tracing::{debug, error};
|
||||
use rumqttc::{AsyncClient, matches};
|
||||
use rumqttc::{AsyncClient, matches, has_wildcards};
|
||||
|
||||
use crate::{mqtt::{OnMqtt, PresenceMessage, self}, config::MqttDeviceConfig};
|
||||
use crate::{mqtt::{OnMqtt, PresenceMessage, self}, config::MqttDeviceConfig, error::{self, MissingWildcard}};
|
||||
|
||||
#[async_trait]
|
||||
pub trait OnPresence {
|
||||
|
@ -17,17 +17,28 @@ type Sender = watch::Sender<bool>;
|
|||
|
||||
struct Presence {
|
||||
devices: HashMap<String, bool>,
|
||||
overall_presence: Receiver,
|
||||
mqtt: MqttDeviceConfig,
|
||||
tx: Sender,
|
||||
overall_presence: Receiver,
|
||||
}
|
||||
|
||||
pub async fn start(mut mqtt_rx: mqtt::Receiver, mqtt: MqttDeviceConfig, client: AsyncClient) -> Receiver {
|
||||
// Subscribe to the relevant topics on mqtt
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await.unwrap();
|
||||
impl Presence {
|
||||
fn build(mqtt: MqttDeviceConfig) -> Result<Self, MissingWildcard> {
|
||||
if !has_wildcards(&mqtt.topic) {
|
||||
return Err(MissingWildcard::new(&mqtt.topic).into());
|
||||
}
|
||||
|
||||
let (tx, overall_presence) = watch::channel(false);
|
||||
let mut presence = Presence { devices: HashMap::new(), overall_presence: overall_presence.clone(), mqtt, tx };
|
||||
let (tx, overall_presence) = watch::channel(false);
|
||||
Ok(Self { devices: HashMap::new(), overall_presence: overall_presence.clone(), mqtt, tx })
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(mqtt: MqttDeviceConfig, mut mqtt_rx: mqtt::Receiver, client: AsyncClient) -> error::Result<Receiver> {
|
||||
// Subscribe to the relevant topics on mqtt
|
||||
client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).await?;
|
||||
|
||||
let mut presence = Presence::build(mqtt)?;
|
||||
let overall_presence = presence.overall_presence.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
|
@ -38,7 +49,7 @@ pub async fn start(mut mqtt_rx: mqtt::Receiver, mqtt: MqttDeviceConfig, client:
|
|||
}
|
||||
});
|
||||
|
||||
return overall_presence;
|
||||
Ok(overall_presence)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -48,7 +59,7 @@ impl OnMqtt for Presence {
|
|||
return;
|
||||
}
|
||||
|
||||
let offset = self.mqtt.topic.find('+').or(self.mqtt.topic.find('#')).unwrap();
|
||||
let offset = self.mqtt.topic.find('+').or(self.mqtt.topic.find('#')).expect("Presence::new fails if it does not contain wildcards");
|
||||
let device_name = &message.topic[offset..];
|
||||
|
||||
if message.payload.len() == 0 {
|
||||
|
|
Loading…
Reference in New Issue
Block a user