Adjusted how we requre Sync + Send, added logger, cleanup dependencies, and added web server using warp and tokio
This commit is contained in:
@@ -6,20 +6,22 @@ pub use self::test_outlet::TestOutlet;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use google_home::{Fullfillment, traits::OnOff};
|
||||
use google_home::{GoogleHomeDevice, traits::OnOff};
|
||||
|
||||
use crate::mqtt::Listener;
|
||||
|
||||
impl_cast::impl_cast!(Device, Listener);
|
||||
impl_cast::impl_cast!(Device, Fullfillment);
|
||||
impl_cast::impl_cast!(Device, GoogleHomeDevice);
|
||||
impl_cast::impl_cast!(Device, OnOff);
|
||||
|
||||
pub trait Device: Sync + Send + AsFullfillment + AsListener + AsOnOff {
|
||||
pub trait Device: AsGoogleHomeDevice + AsListener + AsOnOff {
|
||||
fn get_id(&self) -> String;
|
||||
}
|
||||
|
||||
// @TODO Add an inner type that we can wrap with Arc<RwLock<>> to make this type a little bit nicer
|
||||
// to work with
|
||||
pub struct Devices {
|
||||
devices: HashMap<String, Box<dyn Device>>,
|
||||
devices: HashMap<String, Box<dyn Device + Sync + Send>>,
|
||||
}
|
||||
|
||||
macro_rules! get_cast {
|
||||
@@ -44,12 +46,12 @@ impl Devices {
|
||||
Self { devices: HashMap::new() }
|
||||
}
|
||||
|
||||
pub fn add_device<T: Device + 'static>(&mut self, device: T) {
|
||||
pub fn add_device<T: Device + Sync + Send + 'static>(&mut self, device: T) {
|
||||
self.devices.insert(device.get_id(), Box::new(device));
|
||||
}
|
||||
|
||||
get_cast!(Listener);
|
||||
get_cast!(Fullfillment);
|
||||
get_cast!(GoogleHomeDevice);
|
||||
get_cast!(OnOff);
|
||||
|
||||
pub fn get_device(&mut self, name: &str) -> Option<&mut dyn Device> {
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use pollster::FutureExt as _;
|
||||
|
||||
use google_home::errors::ErrorCode;
|
||||
use google_home::{GoogleHomeDevice, device, types::Type, traits};
|
||||
use rumqttc::{Client, Publish};
|
||||
use rumqttc::{AsyncClient, Publish};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use log::debug;
|
||||
|
||||
use crate::devices::Device;
|
||||
use crate::mqtt::Listener;
|
||||
@@ -10,13 +13,13 @@ use crate::zigbee::Zigbee;
|
||||
pub struct IkeaOutlet {
|
||||
name: String,
|
||||
zigbee: Zigbee,
|
||||
client: Client,
|
||||
client: AsyncClient,
|
||||
last_known_state: bool,
|
||||
}
|
||||
|
||||
impl IkeaOutlet {
|
||||
pub fn new(name: String, zigbee: Zigbee, mut client: Client) -> Self {
|
||||
client.subscribe(zigbee.get_topic(), rumqttc::QoS::AtLeastOnce).unwrap();
|
||||
pub fn new(name: String, zigbee: Zigbee, client: AsyncClient) -> Self {
|
||||
client.subscribe(zigbee.get_topic(), rumqttc::QoS::AtLeastOnce).block_on().unwrap();
|
||||
Self{ name, zigbee, client, last_known_state: false }
|
||||
}
|
||||
}
|
||||
@@ -51,16 +54,16 @@ impl Listener for IkeaOutlet {
|
||||
if message.topic == self.zigbee.get_topic() {
|
||||
let state = StateMessage::from(message);
|
||||
|
||||
print!("Updating state: {} => ", self.last_known_state);
|
||||
self.last_known_state = state.state == "ON";
|
||||
println!("{}", self.last_known_state);
|
||||
let new_state = state.state == "ON";
|
||||
debug!("Updating state: {} => {}", self.last_known_state, new_state);
|
||||
self.last_known_state = new_state;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GoogleHomeDevice for IkeaOutlet {
|
||||
fn get_device_type(&self) -> Type {
|
||||
Type::Outlet
|
||||
Type::Kettle
|
||||
}
|
||||
|
||||
fn get_device_name(&self) -> device::Name {
|
||||
@@ -91,8 +94,10 @@ impl traits::OnOff for IkeaOutlet {
|
||||
}
|
||||
};
|
||||
|
||||
// @TODO Handle potential error here
|
||||
self.client.publish(topic + "/set", rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap()).unwrap();
|
||||
// @TODO Handle potential errors here
|
||||
// @NOTE We are blocking here, ideally this function would just be async, however that is
|
||||
// currently not really possible
|
||||
self.client.publish(topic + "/set", rumqttc::QoS::AtLeastOnce, false, serde_json::to_string(&message).unwrap()).block_on().unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use log::debug;
|
||||
|
||||
use google_home::{errors::ErrorCode, traits};
|
||||
|
||||
use super::Device;
|
||||
@@ -24,7 +26,7 @@ impl traits::OnOff for TestOutlet {
|
||||
}
|
||||
|
||||
fn set_on(&mut self, on: bool) -> Result<(), ErrorCode> {
|
||||
println!("Setting on: {on}");
|
||||
debug!("Setting on: {on}");
|
||||
self.on = on;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
112
src/main.rs
112
src/main.rs
@@ -1,90 +1,86 @@
|
||||
use std::{time::Duration, sync::{Arc, RwLock}, process::exit, thread};
|
||||
use std::{time::Duration, sync::{Arc, RwLock}, process::exit, net::SocketAddr};
|
||||
|
||||
use warp::Filter;
|
||||
use rumqttc::{MqttOptions, Transport, AsyncClient};
|
||||
use dotenv::dotenv;
|
||||
use env_logger::Builder;
|
||||
use log::{error, info, LevelFilter};
|
||||
|
||||
use automation::{devices::{Devices, IkeaOutlet, TestOutlet}, zigbee::Zigbee, mqtt::Notifier};
|
||||
use google_home::GoogleHome;
|
||||
use rumqttc::{MqttOptions, Transport, Client};
|
||||
use google_home::{GoogleHome, Request};
|
||||
|
||||
fn get_required_env(name: &str) -> String {
|
||||
match std::env::var(name) {
|
||||
Ok(value) => value,
|
||||
_ => {
|
||||
eprintln!("Environment variable ${name} is not set!");
|
||||
error!("Environment variable ${name} is not set!");
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Setup logger
|
||||
Builder::new()
|
||||
.filter_module("automation", LevelFilter::Info)
|
||||
.parse_default_env()
|
||||
.init();
|
||||
|
||||
// Load dotfiles
|
||||
dotenv().ok();
|
||||
|
||||
info!("Starting automation_rs...");
|
||||
|
||||
// Create device holder
|
||||
// @TODO Make this nices to work with, we devices.rs
|
||||
let devices = Arc::new(RwLock::new(Devices::new()));
|
||||
|
||||
// Setup MQTT
|
||||
let mut mqttoptions = MqttOptions::new("rust-test", get_required_env("MQTT_HOST"), 8883);
|
||||
mqttoptions.set_credentials(get_required_env("MQTT_USERNAME"), get_required_env("MQTT_PASSWORD"));
|
||||
mqttoptions.set_keep_alive(Duration::from_secs(5));
|
||||
mqttoptions.set_transport(Transport::tls_with_default_config());
|
||||
|
||||
let (client, connection) = Client::new(mqttoptions, 10);
|
||||
|
||||
// Create device holder
|
||||
let devices = Arc::new(RwLock::new(Devices::new()));
|
||||
// Create a notifier and move it to a new thread
|
||||
// @TODO Maybe rename this to make it clear it has to do with mqtt
|
||||
let mut notifier = Notifier::new();
|
||||
let (client, eventloop) = AsyncClient::new(mqttoptions, 10);
|
||||
notifier.add_listener(Arc::downgrade(&devices));
|
||||
tokio::spawn(async move {
|
||||
info!("Connecting to MQTT broker");
|
||||
notifier.start(eventloop).await;
|
||||
todo!("Error in MQTT (most likely lost connection to mqtt server), we need to handle these errors!");
|
||||
});
|
||||
|
||||
// @TODO Load these from a config
|
||||
// Create a new device and add it to the holder
|
||||
devices.write().unwrap().add_device(IkeaOutlet::new("Kettle".into(), Zigbee::new("kitchen/kettle", "zigbee2mqtt/kitchen/kettle"), client.clone()));
|
||||
|
||||
devices.write().unwrap().add_device(TestOutlet::new());
|
||||
|
||||
{
|
||||
for (_, d) in devices.write().unwrap().as_on_offs().iter_mut() {
|
||||
d.set_on(false).unwrap();
|
||||
}
|
||||
}
|
||||
// Google Home fullfillments
|
||||
let fullfillment_google_home = warp::path("google_home")
|
||||
.and(warp::post())
|
||||
.and(warp::body::json())
|
||||
.map(move |request: Request| {
|
||||
// @TODO Verify that we are actually logged in
|
||||
// Might also be smart to get the username from here
|
||||
let gc = GoogleHome::new("Dreaded_X");
|
||||
let result = gc.handle_request(request, &mut devices.write().unwrap().as_google_home_devices()).unwrap();
|
||||
|
||||
let ptr = Arc::downgrade(&devices);
|
||||
{
|
||||
let mut notifier = Notifier::new();
|
||||
notifier.add_listener(ptr);
|
||||
notifier.start(connection);
|
||||
}
|
||||
warp::reply::json(&result)
|
||||
});
|
||||
|
||||
// Google Home test
|
||||
let gc = GoogleHome::new("Dreaded_X");
|
||||
let json = r#"{
|
||||
"requestId": "ff36a3cc-ec34-11e6-b1a0-64510650abcf",
|
||||
"inputs": [
|
||||
{
|
||||
"intent": "action.devices.EXECUTE",
|
||||
"payload": {
|
||||
"commands": [
|
||||
{
|
||||
"devices": [
|
||||
{
|
||||
"id": "kitchen/kettle"
|
||||
},
|
||||
{
|
||||
"id": "test_device"
|
||||
}
|
||||
],
|
||||
"execution": [
|
||||
{
|
||||
"command": "action.devices.commands.OnOff",
|
||||
"params": {
|
||||
"on": false
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
let request = serde_json::from_str(json).unwrap();
|
||||
let mut binding = devices.write().unwrap();
|
||||
let mut ghd = binding.as_fullfillments();
|
||||
// Combine all fullfillments together
|
||||
let fullfillment = warp::path("fullfillment").and(fullfillment_google_home);
|
||||
|
||||
let response = gc.handle_request(request, &mut ghd).unwrap();
|
||||
// Combine all routes together
|
||||
let routes = fullfillment;
|
||||
|
||||
println!("{response:?}");
|
||||
// Start the web server
|
||||
let addr: SocketAddr = ([127, 0, 0, 1], 7878).into();
|
||||
info!("Server started on http://{addr}");
|
||||
warp::serve(routes)
|
||||
.run(addr)
|
||||
.await;
|
||||
}
|
||||
|
||||
19
src/mqtt.rs
19
src/mqtt.rs
@@ -1,13 +1,15 @@
|
||||
use std::sync::{Weak, RwLock};
|
||||
use log::error;
|
||||
|
||||
use rumqttc::{Publish, Connection, Event, Incoming};
|
||||
use rumqttc::{Publish, Event, Incoming, EventLoop};
|
||||
use log::trace;
|
||||
|
||||
pub trait Listener: Sync + Send {
|
||||
pub trait Listener {
|
||||
fn notify(&mut self, message: &Publish);
|
||||
}
|
||||
|
||||
pub struct Notifier {
|
||||
listeners: Vec<Weak<RwLock<dyn Listener>>>,
|
||||
listeners: Vec<Weak<RwLock<dyn Listener + Sync + Send>>>,
|
||||
}
|
||||
|
||||
impl Notifier {
|
||||
@@ -26,20 +28,21 @@ impl Notifier {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_listener<T: Listener + 'static>(&mut self, listener: Weak<RwLock<T>>) {
|
||||
pub fn add_listener<T: Listener + Sync + Send + 'static>(&mut self, listener: Weak<RwLock<T>>) {
|
||||
self.listeners.push(listener);
|
||||
}
|
||||
|
||||
pub fn start(&mut self, mut connection: Connection) {
|
||||
for notification in connection.iter() {
|
||||
pub async fn start(&mut self, mut eventloop: EventLoop) {
|
||||
loop {
|
||||
let notification = eventloop.poll().await;
|
||||
match notification {
|
||||
Ok(Event::Incoming(Incoming::Publish(p))) => {
|
||||
println!("{:?}", p);
|
||||
trace!("{:?}", p);
|
||||
self.notify(p);
|
||||
},
|
||||
Ok(..) => continue,
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
error!("{}", err);
|
||||
break
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user