Added WakeOnLAN device, some small refactoring and improved error handling

This commit is contained in:
2022-12-28 03:27:25 +01:00
parent bb18cfdcee
commit 2b4ddf82b6
8 changed files with 404 additions and 59 deletions

View File

@@ -4,17 +4,17 @@ use google_home::errors::ErrorCode;
use google_home::{GoogleHomeDevice, device, types::Type, traits};
use rumqttc::{AsyncClient, Publish};
use serde::{Deserialize, Serialize};
use log::{debug, trace};
use log::{debug, trace, warn};
use tokio::task::JoinHandle;
use crate::config::{KettleConfig, InfoConfig, ZigbeeDeviceConfig};
use crate::config::{KettleConfig, InfoConfig, MqttDeviceConfig};
use crate::devices::Device;
use crate::mqtt::Listener;
pub struct IkeaOutlet {
identifier: String,
info: InfoConfig,
zigbee: ZigbeeDeviceConfig,
mqtt: MqttDeviceConfig,
kettle: Option<KettleConfig>,
client: AsyncClient,
@@ -23,15 +23,15 @@ pub struct IkeaOutlet {
}
impl IkeaOutlet {
pub fn new(identifier: String, info: InfoConfig, zigbee: ZigbeeDeviceConfig, kettle: Option<KettleConfig>, client: AsyncClient) -> Self {
pub fn new(identifier: String, info: InfoConfig, mqtt: MqttDeviceConfig, kettle: Option<KettleConfig>, client: AsyncClient) -> Self {
let c = client.clone();
let t = zigbee.topic.clone();
let t = mqtt.topic.clone();
// @TODO Handle potential errors here
tokio::spawn(async move {
c.subscribe(t, rumqttc::QoS::AtLeastOnce).await.unwrap();
});
Self{ identifier, info, zigbee, kettle, client, last_known_state: false, handle: None }
Self{ identifier, info, mqtt, kettle, client, last_known_state: false, handle: None }
}
}
@@ -59,64 +59,77 @@ struct StateMessage {
state: String
}
impl From<&Publish> for StateMessage {
fn from(p: &Publish) -> Self {
let parsed = match serde_json::from_slice(&p.payload) {
Ok(outlet) => outlet,
Err(err) => {
panic!("{}", err);
}
};
impl TryFrom<&Publish> for StateMessage {
type Error = anyhow::Error;
parsed
fn try_from(message: &Publish) -> Result<Self, Self::Error> {
match serde_json::from_slice(&message.payload) {
Ok(message) => Ok(message),
Err(..) => {
Err(anyhow::anyhow!("Invalid message payload received: {:?}", message.payload))
}
}
}
}
impl Listener for IkeaOutlet {
fn notify(&mut self, message: &Publish) {
// Update the internal state based on what the device has reported
if message.topic == self.zigbee.topic {
let new_state = StateMessage::from(message).state == "ON";
if message.topic != self.mqtt.topic {
return;
}
// No need to do anything if the state has not changed
if new_state == self.last_known_state {
let new_state = match StateMessage::try_from(message) {
Ok(state) => state,
Err(err) => {
warn!("Failed to parse message: {err}");
return;
}
}.state == "ON";
// Abort any timer that is currently running
if let Some(handle) = self.handle.take() {
handle.abort();
}
// No need to do anything if the state has not changed
if new_state == self.last_known_state {
return;
}
trace!("Updating state: {} => {}", self.last_known_state, new_state);
self.last_known_state = new_state;
// Abort any timer that is currently running
if let Some(handle) = self.handle.take() {
handle.abort();
}
// If this is a kettle start a timeout for turning it of again
if new_state {
if let Some(kettle) = &self.kettle {
if let Some(timeout) = kettle.timeout.clone() {
let client = self.client.clone();
let topic = self.zigbee.topic.clone();
trace!("Updating state: {} => {}", self.last_known_state, new_state);
self.last_known_state = new_state;
// Turn the kettle of after the specified timeout
// @TODO Impl Drop for IkeaOutlet that will abort the handle if the IkeaOutlet
// get dropped
self.handle = Some(
tokio::spawn(async move {
debug!("Starting timeout ({timeout}s) for kettle...");
tokio::time::sleep(Duration::from_secs(timeout)).await;
// @TODO We need to call set_on(false) in order to turn the device off
// again, how are we going to do this?
debug!("Turning kettle off!");
set_on(client, topic, false).await;
})
);
} else {
trace!("Outlet is a kettle without timeout");
}
// If this is a kettle start a timeout for turning it of again
if new_state {
let kettle = match &self.kettle {
Some(kettle) => kettle,
None => return,
};
}
}
let timeout = match kettle.timeout.clone() {
Some(timeout) => timeout,
None => {
trace!("Outlet is a kettle without timeout");
return;
},
};
// Turn the kettle of after the specified timeout
// @TODO Impl Drop for IkeaOutlet that will abort the handle if the IkeaOutlet
// get dropped
let client = self.client.clone();
let topic = self.mqtt.topic.clone();
self.handle = Some(
tokio::spawn(async move {
debug!("Starting timeout ({timeout}s) for kettle...");
tokio::time::sleep(Duration::from_secs(timeout)).await;
// @TODO We need to call set_on(false) in order to turn the device off
// again, how are we going to do this?
debug!("Turning kettle off!");
set_on(client, topic, false).await;
})
);
}
}
}
@@ -145,6 +158,11 @@ impl GoogleHomeDevice for IkeaOutlet {
fn get_room_hint(&self) -> Option<String> {
self.info.room.clone()
}
fn will_report_state(&self) -> bool {
// @TODO Implement state reporting
false
}
}
impl traits::OnOff for IkeaOutlet {
@@ -154,7 +172,7 @@ impl traits::OnOff for IkeaOutlet {
fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> {
let client = self.client.clone();
let topic = self.zigbee.topic.clone();
let topic = self.mqtt.topic.clone();
tokio::spawn(async move {
set_on(client, topic, on).await;
});

125
src/devices/wake_on_lan.rs Normal file
View File

@@ -0,0 +1,125 @@
use google_home::{GoogleHomeDevice, types::Type, device, traits::{self, Scene}, errors::{ErrorCode, DeviceError}};
use log::{debug, warn};
use rumqttc::{AsyncClient, Publish};
use serde::Deserialize;
use crate::{config::{InfoConfig, MqttDeviceConfig}, mqtt::Listener};
use super::Device;
pub struct WakeOnLAN {
identifier: String,
info: InfoConfig,
mqtt: MqttDeviceConfig,
mac_address: String,
}
impl WakeOnLAN {
pub fn new(identifier: String, info: InfoConfig, mqtt: MqttDeviceConfig, mac_address: String, client: AsyncClient) -> Self {
let c = client.clone();
let t = mqtt.topic.clone();
// @TODO Handle potential errors here
tokio::spawn(async move {
c.subscribe(t, rumqttc::QoS::AtLeastOnce).await.unwrap();
});
Self { identifier, info, mqtt, mac_address }
}
}
impl Device for WakeOnLAN {
fn get_id(&self) -> String {
self.identifier.clone()
}
}
#[derive(Debug, Deserialize)]
struct StateMessage {
activate: bool
}
impl TryFrom<&Publish> for StateMessage {
type Error = anyhow::Error;
fn try_from(message: &Publish) -> Result<Self, Self::Error> {
match serde_json::from_slice(&message.payload) {
Ok(message) => Ok(message),
Err(..) => {
Err(anyhow::anyhow!("Invalid message payload received: {:?}", message.payload))
}
}
}
}
impl Listener for WakeOnLAN {
fn notify(&mut self, message: &Publish) {
if message.topic != self.mqtt.topic {
return;
}
let payload = match StateMessage::try_from(message) {
Ok(state) => state,
Err(err) => {
warn!("Failed to parse message: {err}");
return;
}
};
self.set_active(payload.activate).ok();
}
}
impl GoogleHomeDevice for WakeOnLAN {
fn get_device_type(&self) -> Type {
Type::Scene
}
fn get_device_name(&self) -> device::Name {
let mut name = device::Name::new(&self.info.name);
name.add_default_name("Computer");
return name;
}
fn get_id(&self) -> String {
Device::get_id(self)
}
fn is_online(&self) -> bool {
true
}
fn get_room_hint(&self) -> Option<String> {
self.info.room.clone()
}
}
impl traits::Scene for WakeOnLAN {
fn set_active(&self, activate: bool) -> Result<(), ErrorCode> {
if activate {
// @TODO In the future send the wake on lan package directly, this is kind of annoying
// if we are inside of docker, so for now just call a webhook that does it for us
let mac_address = self.mac_address.clone();
tokio::spawn(async move {
debug!("Activating Computer: {}", mac_address);
let req = match reqwest::get(format!("http://10.0.0.2:9000/start-pc?mac={mac_address}")).await {
Ok(req) => req,
Err(err) => {
warn!("Failed to call webhook: {err}");
return;
}
};
if req.status() != 200 {
warn!("Failed to call webhook: {}", req.status());
}
});
Ok(())
} else {
debug!("Trying to deactive computer, this is not currently supported");
// We do not support deactivating this scene
Err(ErrorCode::DeviceError(DeviceError::ActionNotAvailable))
}
}
}