From 99ff60a505f5786836c061a56d9262887d5c6209 Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Fri, 6 Jan 2023 23:24:25 +0100 Subject: [PATCH] AudioSetup now takes two devices that implement the OnOff trait instead of being tied to KasaOutlet --- config/zeus.dev.toml | 10 +- src/config.rs | 17 ++- src/devices.rs | 12 +- src/devices/audio_setup.rs | 239 +++++-------------------------------- src/devices/kasa_outlet.rs | 200 +++++++++++++++++++++++++++++++ 5 files changed, 256 insertions(+), 222 deletions(-) create mode 100644 src/devices/kasa_outlet.rs diff --git a/config/zeus.dev.toml b/config/zeus.dev.toml index 809e44a..eed0e24 100644 --- a/config/zeus.dev.toml +++ b/config/zeus.dev.toml @@ -43,11 +43,15 @@ room = "Living Room" topic = "automation/appliance/living_room/zeus" mac_address = "30:9c:23:60:9c:13" -[devices.audio] +[devices.living_audio] type = "AudioSetup" topic = "zigbee2mqtt/living/remote" -mixer = "10.0.0.49" -speakers = "10.0.0.182" +[devices.living_audio.mixer] +type = "KasaOutlet" +ip = "10.0.0.49" +[devices.living_audio.speakers] +type = "KasaOutlet" +ip = "10.0.0.182" [devices.hallway_frontdoor] type = "ContactSensor" diff --git a/src/config.rs b/src/config.rs index 6763d14..b0082b4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,11 @@ -use std::{fs, error::Error, collections::HashMap, net::{Ipv4Addr, SocketAddr}}; +use std::{fs, error::Error, net::{Ipv4Addr, SocketAddr}, collections::HashMap}; use regex::{Regex, Captures}; use tracing::{debug, trace, error}; use rumqttc::{AsyncClient, has_wildcards}; use serde::Deserialize; -use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor}; +use crate::devices::{DeviceBox, IkeaOutlet, WakeOnLAN, AudioSetup, ContactSensor, KasaOutlet}; // @TODO Configure more defaults @@ -151,11 +151,14 @@ pub enum Device { mqtt: MqttDeviceConfig, mac_address: String, }, + KasaOutlet { + ip: Ipv4Addr, + }, AudioSetup { #[serde(flatten)] mqtt: MqttDeviceConfig, - mixer: Ipv4Addr, - speakers: Ipv4Addr, + mixer: Box::, + speakers: Box::, }, ContactSensor { #[serde(flatten)] @@ -214,8 +217,14 @@ impl Device { trace!(id = identifier, "WakeOnLan [{} in {:?}]", info.name, info.room); Box::new(WakeOnLAN::new(identifier, info, mqtt, mac_address, client)) }, + Device::KasaOutlet { ip } => { + trace!(id = identifier, "KasaOutlet [{}]", identifier); + Box::new(KasaOutlet::new(identifier, ip)) + } Device::AudioSetup { mqtt, mixer, speakers } => { trace!(id = identifier, "AudioSetup [{}]", identifier); + let mixer = (*mixer).into(identifier.clone() + ".mixer", config, client.clone()); + let speakers = (*speakers).into(identifier.clone() + ".speakers", config, client.clone()); Box::new(AudioSetup::new(identifier, mqtt, mixer, speakers, client)) }, Device::ContactSensor { mqtt, mut presence } => { diff --git a/src/devices.rs b/src/devices.rs index 4adac0a..f2ed2f0 100644 --- a/src/devices.rs +++ b/src/devices.rs @@ -1,13 +1,13 @@ mod ikea_outlet; -pub use self::ikea_outlet::IkeaOutlet; - mod wake_on_lan; -pub use self::wake_on_lan::WakeOnLAN; - +mod kasa_outlet; mod audio_setup; -pub use self::audio_setup::AudioSetup; - mod contact_sensor; + +pub use self::ikea_outlet::IkeaOutlet; +pub use self::wake_on_lan::WakeOnLAN; +pub use self::kasa_outlet::KasaOutlet; +pub use self::audio_setup::AudioSetup; pub use self::contact_sensor::ContactSensor; use std::collections::HashMap; diff --git a/src/devices/audio_setup.rs b/src/devices/audio_setup.rs index 50090c8..dd8f866 100644 --- a/src/devices/audio_setup.rs +++ b/src/devices/audio_setup.rs @@ -1,216 +1,22 @@ -use std::io::{Write, Read}; -use std::net::{TcpStream, SocketAddr, Ipv4Addr}; - -use bytes::{BufMut, Buf}; -use google_home::errors::{ErrorCode, DeviceError}; -use google_home::traits::{self, OnOff}; use rumqttc::{AsyncClient, matches}; -use serde::{Deserialize, Serialize}; use tracing::{error, warn}; use pollster::FutureExt as _; use crate::config::MqttDeviceConfig; +use crate::devices::AsOnOff; use crate::mqtt::{OnMqtt, RemoteMessage, RemoteAction}; -use super::Device; - -struct TPLinkOutlet { - addr: SocketAddr, -} - -impl TPLinkOutlet { - pub fn new(ip: Ipv4Addr) -> Self { - // @TODO Get the current state of the outlet - Self { addr: (ip, 9999).into() } - } -} - -#[derive(Debug, Serialize)] -struct RequestRelayState { - state: isize, -} - -#[derive(Debug, Serialize)] -struct RequestSysinfo; - -#[derive(Debug, Serialize)] -struct RequestSystem { - #[serde(skip_serializing_if = "Option::is_none")] - get_sysinfo: Option, - #[serde(skip_serializing_if = "Option::is_none")] - set_relay_state: Option, -} - -#[derive(Debug, Serialize)] -struct Request { - system: RequestSystem, -} - -impl Request { - fn get_sysinfo() -> Self { - Self { - system: RequestSystem { - get_sysinfo: Some(RequestSysinfo{}), - set_relay_state: None - } - } - } - - fn set_relay_state(on: bool) -> Self { - Self { - system: RequestSystem { - get_sysinfo: None, - set_relay_state: Some(RequestRelayState { - state: if on { 1 } else { 0 } - }) - } - } - } - - fn encrypt(&self) -> bytes::Bytes { - let data: bytes::Bytes = serde_json::to_string(self).unwrap().into(); - - let mut key: u8 = 171; - let mut encrypted = bytes::BytesMut::with_capacity(data.len() + 4); - - encrypted.put_u32(data.len() as u32); - - for c in data { - key = key ^ c; - encrypted.put_u8(key); - } - - return encrypted.freeze(); - } -} - -#[derive(Debug, Deserialize)] -struct ResponseSetRelayState { - err_code: isize, -} - -#[derive(Debug, Deserialize)] -struct ResponseGetSysinfo { - err_code: isize, - relay_state: isize, -} - -#[derive(Debug, Deserialize)] -struct ResponseSystem { - set_relay_state: Option, - get_sysinfo: Option, -} - -#[derive(Debug, Deserialize)] -struct Response { - system: ResponseSystem, -} - -impl Response { - fn get_current_relay_state(&self) -> Result { - if let Some(sysinfo) = &self.system.get_sysinfo { - if sysinfo.err_code != 0 { - return Err(anyhow::anyhow!("Error code: {}", sysinfo.err_code)); - } - return Ok(sysinfo.relay_state == 1); - } - - return Err(anyhow::anyhow!("No sysinfo found in response")); - } - - fn check_set_relay_success(&self) -> Result<(), anyhow::Error> { - if let Some(set_relay_state) = &self.system.set_relay_state { - if set_relay_state.err_code != 0 { - return Err(anyhow::anyhow!("Error code: {}", set_relay_state.err_code)); - } - return Ok(()); - } - - return Err(anyhow::anyhow!("No relay_state found in response")); - } - - fn decrypt(mut data: bytes::Bytes) -> Result { - let mut key: u8 = 171; - if data.len() < 4 { - return Err(anyhow::anyhow!("Expected a minimun data length of 4")); - } - - let length = data.get_u32(); - let mut decrypted = bytes::BytesMut::with_capacity(length as usize); - - for c in data { - decrypted.put_u8(key ^ c); - key = c; - } - - let decrypted = std::str::from_utf8(&decrypted)?; - Ok(serde_json::from_str(decrypted)?) - } -} - -impl traits::OnOff for TPLinkOutlet { - fn is_on(&self) -> Result { - let mut stream = TcpStream::connect(self.addr).or::(Err(DeviceError::DeviceOffline.into()))?; - - let body = Request::get_sysinfo().encrypt(); - stream.write_all(&body).and(stream.flush()).or::(Err(DeviceError::TransientError.into()))?; - - let mut received = Vec::new(); - let mut rx_bytes = [0; 1024]; - loop { - let read = stream.read(&mut rx_bytes).or::(Err(DeviceError::TransientError.into()))?; - - received.extend_from_slice(&rx_bytes[..read]); - - if read < rx_bytes.len() { - break; - } - } - - let resp = Response::decrypt(received.into()).or::(Err(DeviceError::TransientError.into()))?; - - resp.get_current_relay_state().or(Err(DeviceError::TransientError.into())) - } - - fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { - let mut stream = TcpStream::connect(self.addr).or::(Err(DeviceError::DeviceOffline.into()))?; - - let body = Request::set_relay_state(on).encrypt(); - stream.write_all(&body).and(stream.flush()).or::(Err(DeviceError::TransientError.into()))?; - - let mut received = Vec::new(); - let mut rx_bytes = [0; 1024]; - loop { - let read = match stream.read(&mut rx_bytes) { - Ok(read) => read, - Err(_) => return Err(DeviceError::TransientError.into()), - }; - - received.extend_from_slice(&rx_bytes[..read]); - - if read < rx_bytes.len() { - break; - } - } - - let resp = Response::decrypt(received.into()).or::(Err(DeviceError::TransientError.into()))?; - - resp.check_set_relay_success().or(Err(DeviceError::TransientError.into())) - } -} +use super::{Device, DeviceBox}; pub struct AudioSetup { identifier: String, mqtt: MqttDeviceConfig, - mixer: TPLinkOutlet, - speakers: TPLinkOutlet, + mixer: DeviceBox, + speakers: DeviceBox, } impl AudioSetup { - pub fn new(identifier: String, mqtt: MqttDeviceConfig, mixer_ip: Ipv4Addr, speakers_ip: Ipv4Addr, client: AsyncClient) -> Self { - let mixer = TPLinkOutlet::new(mixer_ip); - let speakers = TPLinkOutlet::new(speakers_ip); - + pub fn new(identifier: String, mqtt: MqttDeviceConfig, mixer: DeviceBox, speakers: DeviceBox, client: AsyncClient) -> Self { client.subscribe(mqtt.topic.clone(), rumqttc::QoS::AtLeastOnce).block_on().unwrap(); Self { identifier, mqtt, mixer, speakers } @@ -237,23 +43,38 @@ impl OnMqtt for AudioSetup { } }; + let mixer = match AsOnOff::cast_mut(self.mixer.as_mut()) { + Some(mixer) => mixer, + None => { + error!(id = self.identifier, "Mixer device '{}' does not implement OnOff trait", self.mixer.get_id()); + return; + }, + }; + let speakers = match AsOnOff::cast_mut(self.speakers.as_mut()) { + Some(speakers) => speakers, + None => { + error!(id = self.identifier, "Speakers device '{}' does not implement OnOff trait", self.mixer.get_id()); + return; + }, + }; + match action { RemoteAction::On => { - if self.mixer.is_on().unwrap() { - self.speakers.set_on(false).unwrap(); - self.mixer.set_on(false).unwrap(); + if mixer.is_on().unwrap() { + speakers.set_on(false).unwrap(); + mixer.set_on(false).unwrap(); } else { - self.speakers.set_on(true).unwrap(); - self.mixer.set_on(true).unwrap(); + speakers.set_on(true).unwrap(); + mixer.set_on(true).unwrap(); } }, RemoteAction::BrightnessMoveUp => { - if !self.mixer.is_on().unwrap() { - self.mixer.set_on(true).unwrap(); - } else if self.speakers.is_on().unwrap() { - self.speakers.set_on(false).unwrap(); + if !mixer.is_on().unwrap() { + mixer.set_on(true).unwrap(); + } else if speakers.is_on().unwrap() { + speakers.set_on(false).unwrap(); } else { - self.speakers.set_on(true).unwrap(); + speakers.set_on(true).unwrap(); } }, RemoteAction::BrightnessStop => { /* Ignore this action */ }, diff --git a/src/devices/kasa_outlet.rs b/src/devices/kasa_outlet.rs new file mode 100644 index 0000000..6a58649 --- /dev/null +++ b/src/devices/kasa_outlet.rs @@ -0,0 +1,200 @@ +use std::{net::{SocketAddr, Ipv4Addr, TcpStream}, io::{Write, Read}}; + +use bytes::{Buf, BufMut}; +use google_home::{traits, errors::{ErrorCode, DeviceError}}; +use serde::{Serialize, Deserialize}; + +use super::Device; + +pub struct KasaOutlet { + identifier: String, + addr: SocketAddr, +} + +impl KasaOutlet { + pub fn new(identifier: String, ip: Ipv4Addr) -> Self { + // @TODO Get the current state of the outlet + Self { identifier, addr: (ip, 9999).into() } + } +} + +impl Device for KasaOutlet { + fn get_id(&self) -> String { + self.identifier.clone() + } +} + +#[derive(Debug, Serialize)] +struct RequestRelayState { + state: isize, +} + +#[derive(Debug, Serialize)] +struct RequestSysinfo; + +#[derive(Debug, Serialize)] +struct RequestSystem { + #[serde(skip_serializing_if = "Option::is_none")] + get_sysinfo: Option, + #[serde(skip_serializing_if = "Option::is_none")] + set_relay_state: Option, +} + +#[derive(Debug, Serialize)] +struct Request { + system: RequestSystem, +} + +impl Request { + fn get_sysinfo() -> Self { + Self { + system: RequestSystem { + get_sysinfo: Some(RequestSysinfo{}), + set_relay_state: None + } + } + } + + fn set_relay_state(on: bool) -> Self { + Self { + system: RequestSystem { + get_sysinfo: None, + set_relay_state: Some(RequestRelayState { + state: if on { 1 } else { 0 } + }) + } + } + } + + fn encrypt(&self) -> bytes::Bytes { + let data: bytes::Bytes = serde_json::to_string(self).unwrap().into(); + + let mut key: u8 = 171; + let mut encrypted = bytes::BytesMut::with_capacity(data.len() + 4); + + encrypted.put_u32(data.len() as u32); + + for c in data { + key = key ^ c; + encrypted.put_u8(key); + } + + return encrypted.freeze(); + } +} + +#[derive(Debug, Deserialize)] +struct ResponseSetRelayState { + err_code: isize, +} + +#[derive(Debug, Deserialize)] +struct ResponseGetSysinfo { + err_code: isize, + relay_state: isize, +} + +#[derive(Debug, Deserialize)] +struct ResponseSystem { + set_relay_state: Option, + get_sysinfo: Option, +} + +#[derive(Debug, Deserialize)] +struct Response { + system: ResponseSystem, +} + +impl Response { + fn get_current_relay_state(&self) -> Result { + if let Some(sysinfo) = &self.system.get_sysinfo { + if sysinfo.err_code != 0 { + return Err(anyhow::anyhow!("Error code: {}", sysinfo.err_code)); + } + return Ok(sysinfo.relay_state == 1); + } + + return Err(anyhow::anyhow!("No sysinfo found in response")); + } + + fn check_set_relay_success(&self) -> Result<(), anyhow::Error> { + if let Some(set_relay_state) = &self.system.set_relay_state { + if set_relay_state.err_code != 0 { + return Err(anyhow::anyhow!("Error code: {}", set_relay_state.err_code)); + } + return Ok(()); + } + + return Err(anyhow::anyhow!("No relay_state found in response")); + } + + fn decrypt(mut data: bytes::Bytes) -> Result { + let mut key: u8 = 171; + if data.len() < 4 { + return Err(anyhow::anyhow!("Expected a minimun data length of 4")); + } + + let length = data.get_u32(); + let mut decrypted = bytes::BytesMut::with_capacity(length as usize); + + for c in data { + decrypted.put_u8(key ^ c); + key = c; + } + + let decrypted = std::str::from_utf8(&decrypted)?; + Ok(serde_json::from_str(decrypted)?) + } +} + +impl traits::OnOff for KasaOutlet { + fn is_on(&self) -> Result { + let mut stream = TcpStream::connect(self.addr).or::(Err(DeviceError::DeviceOffline.into()))?; + + let body = Request::get_sysinfo().encrypt(); + stream.write_all(&body).and(stream.flush()).or::(Err(DeviceError::TransientError.into()))?; + + let mut received = Vec::new(); + let mut rx_bytes = [0; 1024]; + loop { + let read = stream.read(&mut rx_bytes).or::(Err(DeviceError::TransientError.into()))?; + + received.extend_from_slice(&rx_bytes[..read]); + + if read < rx_bytes.len() { + break; + } + } + + let resp = Response::decrypt(received.into()).or::(Err(DeviceError::TransientError.into()))?; + + resp.get_current_relay_state().or(Err(DeviceError::TransientError.into())) + } + + fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> { + let mut stream = TcpStream::connect(self.addr).or::(Err(DeviceError::DeviceOffline.into()))?; + + let body = Request::set_relay_state(on).encrypt(); + stream.write_all(&body).and(stream.flush()).or::(Err(DeviceError::TransientError.into()))?; + + let mut received = Vec::new(); + let mut rx_bytes = [0; 1024]; + loop { + let read = match stream.read(&mut rx_bytes) { + Ok(read) => read, + Err(_) => return Err(DeviceError::TransientError.into()), + }; + + received.extend_from_slice(&rx_bytes[..read]); + + if read < rx_bytes.len() { + break; + } + } + + let resp = Response::decrypt(received.into()).or::(Err(DeviceError::TransientError.into()))?; + + resp.check_set_relay_success().or(Err(DeviceError::TransientError.into())) + } +} +