From 0f1e181b925683825671c6564d91620478be44ef Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Sat, 2 Sep 2023 06:03:33 +0200 Subject: [PATCH] Started work on OTA --- Cargo.lock | 2 ++ Cargo.toml | 3 ++- src/main.rs | 48 ++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 067d094..147b72b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -958,6 +958,7 @@ dependencies = [ "defmt", "hash32", "rustc_version 0.4.0", + "serde", "spin", "stable_deref_trait", ] @@ -1418,6 +1419,7 @@ dependencies = [ "panic-probe", "rand", "rust-mqtt", + "serde", "serde-json-core", "static_cell", ] diff --git a/Cargo.toml b/Cargo.toml index 57a121a..94d85ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,7 @@ cyw43-pio = { git = "https://github.com/embassy-rs/embassy", features = [ panic-probe = { version = "0.3", features = ["print-defmt"] } log = "0.4" static_cell = { version = "1.1", features = ["nightly"] } -heapless = { version = "0.7.16", features = ["defmt"] } +heapless = { version = "0.7.16", features = ["defmt", "serde"] } embedded-io-async = { version = "0.5", features = ["defmt-03"] } crc16 = "0.4" dsmr5 = "0.3" @@ -68,6 +68,7 @@ rand = { version = "0.8.5", features = [ "std_rng", ], default-features = false } serde-json-core = "0.5.1" +serde = { version = "1.0.188", default-features = false, features = ["derive"] } cfg-if = "1.0.0" [patch.crates-io] diff --git a/src/main.rs b/src/main.rs index 069ccff..b0ba646 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,11 +6,11 @@ use core::str::FromStr; use cyw43::PowerManagementMode; use cyw43_pio::PioSpi; -use defmt::{debug, info, warn, Format}; +use defmt::{debug, error, info, warn, Format}; use dsmr5::Readout; use embassy_executor::Spawner; use embassy_futures::{ - select::{select, Either}, + select::{select3, Either3}, yield_now, }; use embassy_net::{tcp::TcpSocket, Config, IpEndpoint, Stack, StackResources}; @@ -43,6 +43,7 @@ use rust_mqtt::{ }, packet::v5::publish_packet::QualityOfService, }; +use serde::Deserialize; use static_cell::make_static; use {defmt_rtt as _, panic_probe as _}; @@ -52,9 +53,9 @@ bind_interrupts!(struct Irqs { PIO0_IRQ_0 => pio::InterruptHandler; }); -#[derive(Format)] -struct Test { - counter: u32, +#[derive(Format, Deserialize)] +struct UpdateMessage<'a> { + url: &'a str, } #[embassy_executor::task] @@ -281,6 +282,8 @@ async fn main(spawner: Spawner) { .await .unwrap(); + client.subscribe_to_topic("pico/test/update").await.unwrap(); + // Turn LED off when connected control.gpio_set(0, false).await; @@ -288,21 +291,46 @@ async fn main(spawner: Spawner) { let receiver = channel.receiver(); loop { - match select(keep_alive.next(), receiver.receive()).await { - Either::First(_) => client.send_ping().await.unwrap(), - Either::Second(state) => { + match select3( + keep_alive.next(), + receiver.receive(), + client.receive_message(), + ) + .await + { + Either3::First(_) => client.send_ping().await.unwrap(), + Either3::Second(state) => { // Blink the LED to show that a readout was received control.gpio_set(0, true).await; control.gpio_set(0, false).await; - let msg: Vec = serde_json_core::to_vec(&state).unwrap(); - info!("len: {}", msg.len()); + let msg: Vec = serde_json_core::to_vec(&state) + .expect("The buffer should be large enough to contain all the data"); client .send_message("pico/test", &msg, QualityOfService::QoS0, false) .await .unwrap(); } + Either3::Third(message) => { + let message = match message { + Ok(message) => message, + Err(err) => { + error!("Failed to receive MQTT message: {}", err); + continue; + } + }; + + let message = match serde_json_core::from_slice::(message.1) { + Ok((message, _)) => message, + Err(_) => { + error!("Unable to parse update message"); + continue; + } + }; + + info!("{}", message); + } } } }