diff --git a/.ci/mosquitto.conf b/.ci/mosquitto.conf index 4479cc6..8e901e7 100644 --- a/.ci/mosquitto.conf +++ b/.ci/mosquitto.conf @@ -1,2 +1,3 @@ allow_anonymous false password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt + diff --git a/.ci/mqtt_pass.txt b/.ci/mqtt_pass.txt index 8c1809f..2182348 100644 --- a/.ci/mqtt_pass.txt +++ b/.ci/mqtt_pass.txt @@ -1 +1 @@ -test:$7$101$XGspXBoC6refncib$u5t0Adz5h8Xn9XfYtKfa5kWrPNMGd+H7u2vbl0S8qmr/HCREZjjEyqU88QybSV0SsgmyFrXMIkCozEmnPeTm+g== \ No newline at end of file +test:$7$101$XGspXBoC6refncib$u5t0Adz5h8Xn9XfYtKfa5kWrPNMGd+H7u2vbl0S8qmr/HCREZjjEyqU88QybSV0SsgmyFrXMIkCozEmnPeTm+g== diff --git a/.github/workflows/integration_tests.yaml b/.github/workflows/integration_tests.yaml index 365ae5c..f336014 100644 --- a/.github/workflows/integration_tests.yaml +++ b/.github/workflows/integration_tests.yaml @@ -3,18 +3,19 @@ on: [pull_request] name: IntegrationTests jobs: - unit_tests: + integration_tests: name: Integration tests runs-on: ubuntu-latest steps: - name: Git checkout uses: actions/checkout@v2 - - name: Build - uses: actions-rs/toolchain@v1 + - uses: actions-rs/toolchain@v1 with: toolchain: stable - - run: cargo build + + - name: Build embedded + run: cargo build --target thumbv7em-none-eabihf --features "no_std" --no-default-features - name: Start Mosquitto run: | @@ -22,4 +23,4 @@ jobs: mosquitto -c .ci/mosquitto.conf -d - name: Run integration tests - run: cargo test integration \ No newline at end of file + run: RUST_LOG=trace cargo test integration --features "testing" \ No newline at end of file diff --git a/.github/workflows/unit_tests.yaml b/.github/workflows/unit_tests.yaml index 8d01707..babda5a 100644 --- a/.github/workflows/unit_tests.yaml +++ b/.github/workflows/unit_tests.yaml @@ -8,7 +8,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 with: toolchain: stable - - run: cargo test unit \ No newline at end of file + + - name: Run Unit tests + run: RUST_LOG=trace cargo test unit --features "testing" \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index f48f40c..f7e17f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] members = [ - "examples/drogue", + #"examples/drogue", "mqtt", ] resolver = "2" @@ -9,4 +9,5 @@ resolver = "2" embassy = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" } embassy-traits = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" } embassy-net = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" } -drogue-device = { git = "https://github.com/drogue-iot/drogue-device.git", rev = "62ff20e278a6a705056173171714b1bbdc078df5" } \ No newline at end of file +drogue-device = { git = "https://github.com/drogue-iot/drogue-device.git", rev = "62ff20e278a6a705056173171714b1bbdc078df5" } +embassy-nrf = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" } diff --git a/examples/drogue/Cargo.toml b/examples/drogue/Cargo.toml index 61155ca..b035a74 100644 --- a/examples/drogue/Cargo.toml +++ b/examples/drogue/Cargo.toml @@ -1,20 +1,11 @@ [package] -name = "embassy-network" +name = "drogue-network" version = "0.0.1" authors = ["Ondrej Babec "] edition = "2021" resolver = "2" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] -env_logger = "0.9.0" -rust-mqtt = { path = "../../mqtt"} -embassy = { version = "0.1.0", features = ["std"] } -log = "0.4.14" -embedded-hal = { version = "0.2", features = ["unproven"] } -embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.6", git = "https://github.com/embassy-rs/embedded-hal", branch = "embassy" } -embedded-hal-async = { version = "0.0.1", git = "https://github.com/embassy-rs/embedded-hal", branch = "embassy"} - - -drogue-device = { version = "0.1.0", features = ["std"] } +defmt = { version = "0.3" } +rust-mqtt = { path = "../../mqtt", features = ["no_std"], default-features = false } +drogue-device = { path = "/drogue-device/device", features = ["time"], default-features = false } \ No newline at end of file diff --git a/examples/drogue/src/drogue_network.rs b/examples/drogue/src/drogue_network.rs index 2e30f4f..0548ec5 100644 --- a/examples/drogue/src/drogue_network.rs +++ b/examples/drogue/src/drogue_network.rs @@ -33,14 +33,11 @@ use rust_mqtt::packet::v5::reason_codes::ReasonCode; use drogue_device::traits::tcp; use drogue_device::traits::tcp::TcpStack; -use embassy::io::{AsyncBufReadExt, AsyncWriteExt}; -use embassy::time::Delay; -use embedded_hal_async::delay::DelayUs; use rust_mqtt::network::network_trait::{NetworkConnection, NetworkConnectionFactory}; pub struct DrogueNetwork -where - A: TcpActor + 'static, + where + A: TcpActor + 'static, { socket: Socket, } @@ -92,10 +89,9 @@ where } } - fn close(mut self) -> Self::CloseFuture<'m> { + fn close<'m>(mut self) -> Self::CloseFuture<'m> { async move { - self.socket - .close() + self.socket.close() .await .map_err(|_| ReasonCode::NetworkError) } @@ -141,11 +137,11 @@ where .await { Ok(_) => { - log::trace!("Connection established"); + trace!("Connection established"); Ok(DrogueNetwork::new(socket)) } Err(e) => { - log::warn!("Error creating connection: {:?}", e); + warn!("Error creating connection:"); socket.close().await.map_err(|e| ReasonCode::NetworkError)?; Err(ReasonCode::NetworkError) } diff --git a/examples/drogue/src/fmt.rs b/examples/drogue/src/fmt.rs new file mode 100644 index 0000000..f8bb0a0 --- /dev/null +++ b/examples/drogue/src/fmt.rs @@ -0,0 +1,228 @@ +#![macro_use] +#![allow(unused_macros)] + +#[cfg(all(feature = "defmt", feature = "log"))] +compile_error!("You may not enable both `defmt` and `log` features."); + +macro_rules! assert { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert!($($x)*); + } + }; +} + +macro_rules! assert_eq { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert_eq!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert_eq!($($x)*); + } + }; +} + +macro_rules! assert_ne { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert_ne!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert_ne!($($x)*); + } + }; +} + +macro_rules! debug_assert { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert!($($x)*); + } + }; +} + +macro_rules! debug_assert_eq { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert_eq!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert_eq!($($x)*); + } + }; +} + +macro_rules! debug_assert_ne { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert_ne!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert_ne!($($x)*); + } + }; +} + +macro_rules! todo { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::todo!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::todo!($($x)*); + } + }; +} + +macro_rules! unreachable { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::unreachable!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::unreachable!($($x)*); + } + }; +} + +macro_rules! panic { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::panic!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::panic!($($x)*); + } + }; +} + +macro_rules! trace { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::trace!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::trace!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! debug { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::debug!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::debug!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! info { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::info!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::info!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! warn { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::warn!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::warn!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! error { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::error!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::error!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +#[cfg(feature = "defmt")] +macro_rules! unwrap { + ($($x:tt)*) => { + ::defmt::unwrap!($($x)*) + }; +} + +#[cfg(not(feature = "defmt"))] +macro_rules! unwrap { + ($arg:expr) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e); + } + } + }; + ($arg:expr, $($msg:expr),+ $(,)? ) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e); + } + } + } +} + +#[cfg(feature = "defmt-timestamp-uptime")] +defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub struct NoneError; + +pub trait Try { + type Ok; + type Error; + fn into_result(self) -> Result; +} + +impl Try for Option { + type Ok = T; + type Error = NoneError; + + #[inline] + fn into_result(self) -> Result { + self.ok_or(NoneError) + } +} + +impl Try for Result { + type Ok = T; + type Error = E; + + #[inline] + fn into_result(self) -> Self { + self + } +} diff --git a/examples/drogue/src/lib.rs b/examples/drogue/src/lib.rs index 5cd6f18..6bd3829 100644 --- a/examples/drogue/src/lib.rs +++ b/examples/drogue/src/lib.rs @@ -28,4 +28,6 @@ #![allow(dead_code)] #![feature(type_alias_impl_trait)] #![feature(generic_associated_types)] +pub mod fmt; pub mod drogue_network; + diff --git a/mqtt/Cargo.toml b/mqtt/Cargo.toml index fc4b1cc..e109b18 100644 --- a/mqtt/Cargo.toml +++ b/mqtt/Cargo.toml @@ -8,19 +8,21 @@ resolver = "2" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -env_logger = "0.9.0" -log = "0.4.14" heapless = "0.7.10" rand_core = "0.6.0" +defmt = { version = "0.3", optional = true } -tokio = { version = "1", features = ["full"], optional = true } -tokio-test = { version = "0.4.2", optional = true} +log = { version = "0.4.14", optional = true } +tokio = { version = "1", features = ["full"], optional = true, default-features = false } [dev-dependencies] tokio = { version = "1", features = ["full"] } -tokio-test = "0.4.2" +tokio-test = { version = "0.4.2"} +env_logger = "0.9.0" +log = { version = "0.4.14"} [features] -default = ["tokio", "std", "tokio-test"] +default = ["testing"] +testing = ["tokio", "std", "log"] std = [] -no_std = [] \ No newline at end of file +no_std = ["defmt"] \ No newline at end of file diff --git a/mqtt/src/client/client_config.rs b/mqtt/src/client/client_config.rs index 7a7703b..17d60e1 100644 --- a/mqtt/src/client/client_config.rs +++ b/mqtt/src/client/client_config.rs @@ -25,8 +25,11 @@ use crate::packet::v5::property::Property; use crate::packet::v5::publish_packet::QualityOfService; use crate::utils::types::{BinaryData, EncodedString}; + use heapless::Vec; + +#[derive(Clone)] pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> { pub qos: QualityOfService, pub keep_alive: u16, diff --git a/mqtt/src/client/client_v5.rs b/mqtt/src/client/client_v5.rs index 3d051dc..e6bb010 100644 --- a/mqtt/src/client/client_v5.rs +++ b/mqtt/src/client/client_v5.rs @@ -39,11 +39,13 @@ use crate::packet::v5::subscription_packet::SubscriptionPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::rng_generator::CountingRng; use crate::utils::types::BufferError; + use heapless::Vec; use rand_core::RngCore; +use crate::network::network_trait::NetworkError::Connection; pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> { - network_driver: &'a mut T, + connection: Option, buffer: &'a mut [u8], buffer_len: usize, recv_buffer: &'a mut [u8], @@ -57,7 +59,7 @@ where T: NetworkConnection, { pub fn new( - network_driver: &'a mut T, + network_driver: T, buffer: &'a mut [u8], buffer_len: usize, recv_buffer: &'a mut [u8], @@ -65,7 +67,7 @@ where config: ClientConfig<'a, MAX_PROPERTIES>, ) -> Self { Self { - network_driver, + connection: Some(network_driver), buffer, buffer_len, recv_buffer, @@ -75,8 +77,10 @@ where } } - // Muze prijit disconnect kvuli male velikosti packetu pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } let len = { let mut connect = ConnectPacket::<'b, MAX_PROPERTIES, 0>::new(); connect.keep_alive = self.config.keep_alive; @@ -92,23 +96,26 @@ where }; if let Err(err) = len { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } - self.network_driver.send(self.buffer, len.unwrap()).await?; + let mut conn = self.connection.as_mut().unwrap(); + trace!("Sending connect"); + conn.send(self.buffer, len.unwrap()).await?; //connack let reason: Result = { - self.network_driver.receive(self.buffer).await?; - let mut packet = ConnackPacket::<'b, 5>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) { + trace!("Waiting for connack"); + conn.receive(self.recv_buffer).await?; + let mut packet = ConnackPacket::<'b, MAX_PROPERTIES>::new(); + if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) { if err == BufferError::PacketTypeMismatch { - let mut disc = DisconnectPacket::<'b, 5>::new(); + let mut disc = DisconnectPacket::<'b, MAX_PROPERTIES>::new(); if disc - .decode(&mut BuffReader::new(self.buffer, self.buffer_len)) + .decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) .is_ok() { - log::error!("Client was disconnected with reason: "); + error!("Client was disconnected with reason: "); return Err(ReasonCode::from(disc.disconnect_reason)); } } @@ -119,7 +126,7 @@ where }; if let Err(err) = reason { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } let res = reason.unwrap(); @@ -131,13 +138,29 @@ where } pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> { - let mut disconnect = DisconnectPacket::<'b, 5>::new(); + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let conn = self.connection.as_mut().unwrap(); + trace!("Creating disconnect packet!"); + let mut disconnect = DisconnectPacket::<'b, MAX_PROPERTIES>::new(); let len = disconnect.encode(self.buffer, self.buffer_len); if let Err(err) = len { - log::error!("[DECODE ERR]: {}", err); + warn!("[DECODE ERR]: {}", err); + self.connection.take().unwrap().close().await?; return Err(ReasonCode::BuffError); } - self.network_driver.send(self.buffer, len.unwrap()).await?; + + if let Err(e) = conn.send(self.buffer, len.unwrap()).await { + warn!("Could not send DISCONNECT packet"); + } + + if let Err(e) = self.connection.take().unwrap().close().await { + warn!("Could not close the TCP handle"); + return Err(e); + } else { + trace!("Closed TCP handle"); + } Ok(()) } @@ -146,9 +169,13 @@ where topic_name: &'b str, message: &'b str, ) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let mut conn = self.connection.as_mut().unwrap(); let identifier: u16 = self.rng.next_u32() as u16; let len = { - let mut packet = PublishPacket::<'b, 5>::new(); + let mut packet = PublishPacket::<'b, MAX_PROPERTIES>::new(); packet.add_topic_name(topic_name); packet.add_qos(self.config.qos); packet.add_identifier(identifier); @@ -157,20 +184,21 @@ where }; if let Err(err) = len { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } + trace!("Sending message"); + conn.send(self.buffer, len.unwrap()).await?; - self.network_driver.send(self.buffer, len.unwrap()).await?; - - //QoS1 + // QoS1 if >::into(self.config.qos) == >::into(QoS1) { let reason: Result<[u16; 2], BufferError> = { - self.network_driver.receive(self.buffer).await?; - let mut packet = PubackPacket::<'b, 5>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) + trace!("Waiting for ack"); + conn.receive(self.recv_buffer).await?; + let mut packet = PubackPacket::<'b, MAX_PROPERTIES>::new(); + if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) { Err(err) } else { @@ -179,7 +207,7 @@ where }; if let Err(err) = reason { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } @@ -199,8 +227,12 @@ where &'b mut self, topic_names: &'b Vec<&'b str, TOPICS>, ) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let mut conn = self.connection.as_mut().unwrap(); let len = { - let mut subs = SubscriptionPacket::<'b, TOPICS, 1>::new(); + let mut subs = SubscriptionPacket::<'b, TOPICS, MAX_PROPERTIES>::new(); let mut i = 0; loop { if i == TOPICS { @@ -213,17 +245,17 @@ where }; if let Err(err) = len { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } - self.network_driver.send(self.buffer, len.unwrap()).await?; + conn.send(self.buffer, len.unwrap()).await?; let reason: Result, BufferError> = { - self.network_driver.receive(self.buffer).await?; + conn.receive(self.recv_buffer).await?; - let mut packet = SubackPacket::<'b, TOPICS, 5>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) { + let mut packet = SubackPacket::<'b, TOPICS, MAX_PROPERTIES>::new(); + if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) { Err(err) } else { Ok(packet.reason_codes) @@ -231,7 +263,7 @@ where }; if let Err(err) = reason { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } let reasons = reason.unwrap(); @@ -252,24 +284,28 @@ where &'b mut self, topic_name: &'b str, ) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let mut conn = self.connection.as_mut().unwrap(); let len = { - let mut subs = SubscriptionPacket::<'b, 1, 1>::new(); + let mut subs = SubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new(); subs.add_new_filter(topic_name, self.config.qos); subs.encode(self.buffer, self.buffer_len) }; if let Err(err) = len { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } - self.network_driver.send(self.buffer, len.unwrap()).await?; + conn.send(self.buffer, len.unwrap()).await?; let reason: Result = { - self.network_driver.receive(self.buffer).await?; + conn.receive(self.recv_buffer).await?; - let mut packet = SubackPacket::<'b, 5, 5>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) { + let mut packet = SubackPacket::<'b, 5, MAX_PROPERTIES>::new(); + if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) { Err(err) } else { Ok(*packet.reason_codes.get(0).unwrap()) @@ -277,7 +313,7 @@ where }; if let Err(err) = reason { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } @@ -290,7 +326,11 @@ where } pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> { - self.network_driver.receive(self.recv_buffer).await?; + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let mut conn = self.connection.as_mut().unwrap(); + conn.receive(self.recv_buffer).await?; let mut packet = PublishPacket::<'b, 5>::new(); if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) @@ -298,30 +338,30 @@ where if err == BufferError::PacketTypeMismatch { let mut disc = DisconnectPacket::<'b, 5>::new(); if disc - .decode(&mut BuffReader::new(self.buffer, self.buffer_len)) + .decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) .is_ok() { - log::error!("Client was disconnected with reason: "); + error!("Client was disconnected with reason: "); return Err(ReasonCode::from(disc.disconnect_reason)); } } - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } if (packet.fixed_header & 0x06) == >::into(QualityOfService::QoS1) { - let mut puback = PubackPacket::<'b, 5>::new(); + let mut puback = PubackPacket::<'b, MAX_PROPERTIES>::new(); puback.packet_identifier = packet.packet_identifier; puback.reason_code = 0x00; { let len = puback.encode(self.buffer, self.buffer_len); if let Err(err) = len { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } - self.network_driver.send(self.buffer, len.unwrap()).await?; + conn.send(self.buffer, len.unwrap()).await?; } } @@ -329,22 +369,26 @@ where } pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let mut conn = self.connection.as_mut().unwrap(); let len = { let mut packet = PingreqPacket::new(); packet.encode(self.buffer, self.buffer_len) }; if let Err(err) = len { - log::error!("[DECODE ERR]: {}", err); + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } - self.network_driver.send(self.buffer, len.unwrap()).await?; + conn.send(self.buffer, len.unwrap()).await?; - self.network_driver.receive(self.buffer).await?; + conn.receive(self.recv_buffer).await?; let mut packet = PingrespPacket::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) { - log::error!("[DECODE ERR]: {}", err); + if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) { + error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } else { Ok(()) diff --git a/mqtt/src/encoding/variable_byte_integer.rs b/mqtt/src/encoding/variable_byte_integer.rs index ccfa82b..69a0beb 100644 --- a/mqtt/src/encoding/variable_byte_integer.rs +++ b/mqtt/src/encoding/variable_byte_integer.rs @@ -1,27 +1,28 @@ /* -MIT License - -Copyright (c) [2022] [Ondrej Babec ] - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. + * MIT License + * + * Copyright (c) [2022] [Ondrej Babec ] + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. */ + use crate::utils::types::BufferError; /// VariableByteIntegerEncoder and VariableByteIntegerDecoder are implemented based on @@ -48,7 +49,7 @@ impl VariableByteIntegerEncoder { const MAX_ENCODABLE: u32 = 268435455; const MOD: u32 = 128; if target > MAX_ENCODABLE { - log::error!("Maximal value of integer for encoding was exceeded"); + error!("Maximal value of integer for encoding was exceeded"); return Err(BufferError::EncodingError); } diff --git a/mqtt/src/fmt.rs b/mqtt/src/fmt.rs new file mode 100644 index 0000000..f8bb0a0 --- /dev/null +++ b/mqtt/src/fmt.rs @@ -0,0 +1,228 @@ +#![macro_use] +#![allow(unused_macros)] + +#[cfg(all(feature = "defmt", feature = "log"))] +compile_error!("You may not enable both `defmt` and `log` features."); + +macro_rules! assert { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert!($($x)*); + } + }; +} + +macro_rules! assert_eq { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert_eq!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert_eq!($($x)*); + } + }; +} + +macro_rules! assert_ne { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::assert_ne!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::assert_ne!($($x)*); + } + }; +} + +macro_rules! debug_assert { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert!($($x)*); + } + }; +} + +macro_rules! debug_assert_eq { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert_eq!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert_eq!($($x)*); + } + }; +} + +macro_rules! debug_assert_ne { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::debug_assert_ne!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::debug_assert_ne!($($x)*); + } + }; +} + +macro_rules! todo { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::todo!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::todo!($($x)*); + } + }; +} + +macro_rules! unreachable { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::unreachable!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::unreachable!($($x)*); + } + }; +} + +macro_rules! panic { + ($($x:tt)*) => { + { + #[cfg(not(feature = "defmt"))] + ::core::panic!($($x)*); + #[cfg(feature = "defmt")] + ::defmt::panic!($($x)*); + } + }; +} + +macro_rules! trace { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::trace!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::trace!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! debug { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::debug!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::debug!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! info { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::info!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::info!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! warn { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::warn!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::warn!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +macro_rules! error { + ($s:literal $(, $x:expr)* $(,)?) => { + { + #[cfg(feature = "log")] + ::log::error!($s $(, $x)*); + #[cfg(feature = "defmt")] + ::defmt::error!($s $(, $x)*); + #[cfg(not(any(feature = "log", feature="defmt")))] + let _ = ($( & $x ),*); + } + }; +} + +#[cfg(feature = "defmt")] +macro_rules! unwrap { + ($($x:tt)*) => { + ::defmt::unwrap!($($x)*) + }; +} + +#[cfg(not(feature = "defmt"))] +macro_rules! unwrap { + ($arg:expr) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e); + } + } + }; + ($arg:expr, $($msg:expr),+ $(,)? ) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e); + } + } + } +} + +#[cfg(feature = "defmt-timestamp-uptime")] +defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() } + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub struct NoneError; + +pub trait Try { + type Ok; + type Error; + fn into_result(self) -> Result; +} + +impl Try for Option { + type Ok = T; + type Error = NoneError; + + #[inline] + fn into_result(self) -> Result { + self.ok_or(NoneError) + } +} + +impl Try for Result { + type Ok = T; + type Error = E; + + #[inline] + fn into_result(self) -> Self { + self + } +} diff --git a/mqtt/src/lib.rs b/mqtt/src/lib.rs index bb592ae..6c86ad1 100644 --- a/mqtt/src/lib.rs +++ b/mqtt/src/lib.rs @@ -29,6 +29,7 @@ #![feature(type_alias_impl_trait)] #![feature(generic_associated_types)] +pub(crate) mod fmt; pub mod client; pub mod encoding; pub mod network; @@ -36,3 +37,4 @@ pub mod packet; pub mod tests; pub mod tokio_net; pub mod utils; + diff --git a/mqtt/src/packet/v5/auth_packet.rs b/mqtt/src/packet/v5/auth_packet.rs index d4077e6..7a6591b 100644 --- a/mqtt/src/packet/v5/auth_packet.rs +++ b/mqtt/src/packet/v5/auth_packet.rs @@ -46,7 +46,7 @@ pub struct AuthPacket<'a, const MAX_PROPERTIES: usize> { impl<'a, const MAX_PROPERTIES: usize> AuthPacket<'a, MAX_PROPERTIES> { pub fn add_reason_code(&mut self, code: u8) { if code != 0 && code != 24 && code != 25 { - log::error!("Provided reason code is not supported!"); + error!("Provided reason code is not supported!"); return; } self.auth_reason = code; @@ -56,7 +56,7 @@ impl<'a, const MAX_PROPERTIES: usize> AuthPacket<'a, MAX_PROPERTIES> { if p.auth_property() { self.push_to_properties(p); } else { - log::error!("Provided property is not correct AUTH packet property!"); + error!("Provided property is not correct AUTH packet property!"); } } } diff --git a/mqtt/src/packet/v5/connack_packet.rs b/mqtt/src/packet/v5/connack_packet.rs index c42c1fd..f65d12f 100644 --- a/mqtt/src/packet/v5/connack_packet.rs +++ b/mqtt/src/packet/v5/connack_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -73,7 +74,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPE fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Connack).into() { - log::error!("Packet you are trying to decode is not CONNACK packet!"); + error!("Packet you are trying to decode is not CONNACK packet!"); return Err(BufferError::PacketTypeMismatch); } self.ack_flags = buff_reader.read_u8()?; diff --git a/mqtt/src/packet/v5/connect_packet.rs b/mqtt/src/packet/v5/connect_packet.rs index 5135262..51d9abc 100644 --- a/mqtt/src/packet/v5/connect_packet.rs +++ b/mqtt/src/packet/v5/connect_packet.rs @@ -187,7 +187,7 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<' } fn decode(&mut self, _buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - log::error!("Decode function is not available for control packet!"); + error!("Decode function is not available for control packet!"); Err(BufferError::WrongPacketToDecode) } diff --git a/mqtt/src/packet/v5/disconnect_packet.rs b/mqtt/src/packet/v5/disconnect_packet.rs index ebaf8b7..44582d5 100644 --- a/mqtt/src/packet/v5/disconnect_packet.rs +++ b/mqtt/src/packet/v5/disconnect_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -79,7 +80,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PR fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Disconnect).into() { - log::error!("Packet you are trying to decode is not DISCONNECT packet!"); + error!("Packet you are trying to decode is not DISCONNECT packet!"); return Err(BufferError::WrongPacketToDecode); } self.disconnect_reason = buff_reader.read_u8()?; diff --git a/mqtt/src/packet/v5/mqtt_packet.rs b/mqtt/src/packet/v5/mqtt_packet.rs index 003afcb..ea32847 100644 --- a/mqtt/src/packet/v5/mqtt_packet.rs +++ b/mqtt/src/packet/v5/mqtt_packet.rs @@ -82,7 +82,7 @@ pub trait Packet<'a> { if self.get_property_len() != 0 { loop { prop = Property::decode(buff_reader)?; - log::debug!("Parsed property {:?}", prop); + //debug!("Parsed property {:?}", prop); x = x + prop.len() as u32 + 1; self.push_to_properties(prop); @@ -100,6 +100,7 @@ pub trait Packet<'a> { buff_reader: &mut BuffReader, ) -> Result { let first_byte: u8 = buff_reader.read_u8()?; + trace!("First byte of accepted packet: {:02X}", first_byte); self.set_fixed_header(first_byte); self.set_remaining_len(buff_reader.read_variable_byte_int()?); return Ok(PacketType::from(first_byte)); diff --git a/mqtt/src/packet/v5/pingreq_packet.rs b/mqtt/src/packet/v5/pingreq_packet.rs index 0702305..035521d 100644 --- a/mqtt/src/packet/v5/pingreq_packet.rs +++ b/mqtt/src/packet/v5/pingreq_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use crate::packet::v5::mqtt_packet::Packet; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; @@ -53,21 +54,21 @@ impl<'a> Packet<'a> for PingreqPacket { } fn decode(&mut self, _buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - log::error!("Pingreq Packet packet does not support decode funtion on client!"); + error!("Pingreq Packet packet does not support decode funtion on client!"); Err(BufferError::WrongPacketToDecode) } fn set_property_len(&mut self, _value: u32) { - log::error!("PINGREQ packet does not contain any properties!"); + error!("PINGREQ packet does not contain any properties!"); } fn get_property_len(&mut self) -> u32 { - log::error!("PINGREQ packet does not contain any properties!"); + error!("PINGREQ packet does not contain any properties!"); return 0; } fn push_to_properties(&mut self, _property: Property<'a>) { - log::error!("PINGREQ packet does not contain any properties!"); + error!("PINGREQ packet does not contain any properties!"); } fn property_allowed(&mut self, property: &Property<'a>) -> bool { diff --git a/mqtt/src/packet/v5/pingresp_packet.rs b/mqtt/src/packet/v5/pingresp_packet.rs index e03aa28..2c5e77f 100644 --- a/mqtt/src/packet/v5/pingresp_packet.rs +++ b/mqtt/src/packet/v5/pingresp_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use crate::packet::v5::mqtt_packet::Packet; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; @@ -55,27 +56,27 @@ impl<'a> Packet<'a> for PingrespPacket { fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { let x = self.decode_fixed_header(buff_reader)?; if x != (PacketType::Pingresp).into() { - log::error!("Packet you are trying to decode is not PINGRESP packet!"); + error!("Packet you are trying to decode is not PINGRESP packet!"); return Err(BufferError::PacketTypeMismatch); } if self.remain_len != 0 { - log::error!("PINGRESP packet does not have 0 lenght!"); + error!("PINGRESP packet does not have 0 lenght!"); return Err(BufferError::PacketTypeMismatch); } Ok(()) } fn set_property_len(&mut self, _value: u32) { - log::error!("PINGRESP packet does not contain any properties!"); + error!("PINGRESP packet does not contain any properties!"); } fn get_property_len(&mut self) -> u32 { - log::error!("PINGRESP packet does not contain any properties!"); + error!("PINGRESP packet does not contain any properties!"); return 0; } fn push_to_properties(&mut self, _property: Property<'a>) { - log::error!("PINGRESP packet does not contain any properties!"); + error!("PINGRESP packet does not contain any properties!"); } fn property_allowed(&mut self, property: &Property<'a>) -> bool { diff --git a/mqtt/src/packet/v5/puback_packet.rs b/mqtt/src/packet/v5/puback_packet.rs index bc379ac..e997a1d 100644 --- a/mqtt/src/packet/v5/puback_packet.rs +++ b/mqtt/src/packet/v5/puback_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -75,7 +76,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPER fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Puback).into() { - log::error!("Packet you are trying to decode is not PUBACK packet!"); + error!("Packet you are trying to decode is not PUBACK packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; diff --git a/mqtt/src/packet/v5/pubcomp_packet.rs b/mqtt/src/packet/v5/pubcomp_packet.rs index 6eb8a81..71301c4 100644 --- a/mqtt/src/packet/v5/pubcomp_packet.rs +++ b/mqtt/src/packet/v5/pubcomp_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -75,7 +76,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubcompPacket<'a, MAX_PROPE fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Pubcomp).into() { - log::error!("Packet you are trying to decode is not PUBCOMP packet!"); + error!("Packet you are trying to decode is not PUBCOMP packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; diff --git a/mqtt/src/packet/v5/publish_packet.rs b/mqtt/src/packet/v5/publish_packet.rs index 90baf9a..8b5ab14 100644 --- a/mqtt/src/packet/v5/publish_packet.rs +++ b/mqtt/src/packet/v5/publish_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -136,7 +137,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Publish).into() { - log::error!("Packet you are trying to decode is not PUBLISH packet!"); + error!("Packet you are trying to decode is not PUBLISH packet!"); return Err(BufferError::PacketTypeMismatch); } self.topic_name = buff_reader.read_string()?; diff --git a/mqtt/src/packet/v5/pubrec_packet.rs b/mqtt/src/packet/v5/pubrec_packet.rs index 49060b5..69ec7b6 100644 --- a/mqtt/src/packet/v5/pubrec_packet.rs +++ b/mqtt/src/packet/v5/pubrec_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -75,7 +76,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrecPacket<'a, MAX_PROPER fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Pubrec).into() { - log::error!("Packet you are trying to decode is not PUBREC packet!"); + error!("Packet you are trying to decode is not PUBREC packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; diff --git a/mqtt/src/packet/v5/pubrel_packet.rs b/mqtt/src/packet/v5/pubrel_packet.rs index 675e701..fb560e3 100644 --- a/mqtt/src/packet/v5/pubrel_packet.rs +++ b/mqtt/src/packet/v5/pubrel_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -75,7 +76,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrelPacket<'a, MAX_PROPER fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Pubrel).into() { - log::error!("Packet you are trying to decode is not PUBREL packet!"); + error!("Packet you are trying to decode is not PUBREL packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; diff --git a/mqtt/src/packet/v5/reason_codes.rs b/mqtt/src/packet/v5/reason_codes.rs index 57f62bf..496dabd 100644 --- a/mqtt/src/packet/v5/reason_codes.rs +++ b/mqtt/src/packet/v5/reason_codes.rs @@ -25,6 +25,7 @@ use core::fmt::{Display, Formatter}; #[derive(Debug, PartialEq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum ReasonCode { Success, GrantedQoS1, diff --git a/mqtt/src/packet/v5/suback_packet.rs b/mqtt/src/packet/v5/suback_packet.rs index a8eff84..741f004 100644 --- a/mqtt/src/packet/v5/suback_packet.rs +++ b/mqtt/src/packet/v5/suback_packet.rs @@ -23,6 +23,7 @@ */ use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; + use heapless::Vec; use crate::packet::v5::mqtt_packet::Packet; @@ -80,13 +81,13 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn encode(&mut self, _buffer: &mut [u8], _buffer_len: usize) -> Result { - log::error!("SUBACK packet does not support encoding!"); + error!("SUBACK packet does not support encoding!"); return Err(BufferError::WrongPacketToEncode); } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Suback).into() { - log::error!("Packet you are trying to decode is not SUBACK packet!"); + error!("Packet you are trying to decode is not SUBACK packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; diff --git a/mqtt/src/packet/v5/subscription_packet.rs b/mqtt/src/packet/v5/subscription_packet.rs index 86834bb..28d7af2 100644 --- a/mqtt/src/packet/v5/subscription_packet.rs +++ b/mqtt/src/packet/v5/subscription_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use super::packet_type::PacketType; @@ -106,7 +107,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn decode(&mut self, _buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - log::error!("Subscribe packet does not support decode funtion on client!"); + error!("Subscribe packet does not support decode funtion on client!"); Err(BufferError::WrongPacketToDecode) } fn set_property_len(&mut self, value: u32) { diff --git a/mqtt/src/packet/v5/unsuback_packet.rs b/mqtt/src/packet/v5/unsuback_packet.rs index 98f03b7..9deb88c 100644 --- a/mqtt/src/packet/v5/unsuback_packet.rs +++ b/mqtt/src/packet/v5/unsuback_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::packet::v5::mqtt_packet::Packet; @@ -74,13 +75,13 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn encode(&mut self, _buffer: &mut [u8], _buffer_len: usize) -> Result { - log::error!("UNSUBACK packet does not support encoding!"); + error!("UNSUBACK packet does not support encoding!"); Err(BufferError::WrongPacketToEncode) } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { if self.decode_fixed_header(buff_reader)? != (PacketType::Unsuback).into() { - log::error!("Packet you are trying to decode is not UNSUBACK packet!"); + error!("Packet you are trying to decode is not UNSUBACK packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; diff --git a/mqtt/src/packet/v5/unsubscription_packet.rs b/mqtt/src/packet/v5/unsubscription_packet.rs index a1932c3..e1cbfc5 100644 --- a/mqtt/src/packet/v5/unsubscription_packet.rs +++ b/mqtt/src/packet/v5/unsubscription_packet.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -105,7 +106,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn decode(&mut self, _buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - log::error!("Unsubscribe packet does not support decode funtion on client!"); + error!("Unsubscribe packet does not support decode funtion on client!"); Err(BufferError::WrongPacketToDecode) } diff --git a/mqtt/src/tests/integration/integration_test_single.rs b/mqtt/src/tests/integration/integration_test_single.rs index b621c76..3027392 100644 --- a/mqtt/src/tests/integration/integration_test_single.rs +++ b/mqtt/src/tests/integration/integration_test_single.rs @@ -25,6 +25,7 @@ extern crate alloc; use alloc::string::String; use core::time::Duration; use std::future::Future; +use log::LevelFilter; use tokio::time::sleep; use tokio::{join, task}; use tokio_test::assert_ok; @@ -38,6 +39,7 @@ use crate::packet::v5::reason_codes::ReasonCode; use crate::packet::v5::reason_codes::ReasonCode::NotAuthorized; use crate::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory}; use crate::utils::types::BufferError; +use std::sync::Once; static IP: [u8; 4] = [127, 0, 0, 1]; static PORT: u16 = 1883; @@ -45,32 +47,33 @@ static USERNAME: &str = "test"; static PASSWORD: &str = "testPass"; static MSG: &str = "testMessage"; -fn init() { - let _ = env_logger::builder() - .filter_level(log::LevelFilter::Info) - .format_timestamp_nanos() - .try_init(); +static INIT: Once = Once::new(); + +fn setup() { + INIT.call_once(|| { + env_logger::init(); + }); } async fn publish_core<'b>( client: &mut MqttClientV5<'b, TokioNetwork, 5>, topic: &str, ) -> Result<(), ReasonCode> { - log::info!( + info!( "[Publisher] Connection to broker with username {} and password {}", USERNAME, PASSWORD ); let mut result = { client.connect_to_broker().await }; assert_ok!(result); - log::info!("[Publisher] Waiting {} seconds before sending", 5); + info!("[Publisher] Waiting {} seconds before sending", 5); sleep(Duration::from_secs(5)).await; - log::info!("[Publisher] Sending new message {} to topic {}", MSG, topic); + info!("[Publisher] Sending new message {} to topic {}", MSG, topic); result = { client.send_message(topic, MSG).await }; assert_ok!(result); - log::info!("[Publisher] Disconnecting!"); + info!("[Publisher] Disconnecting!"); result = { client.disconnect().await }; assert_ok!(result); Ok(()) @@ -88,7 +91,7 @@ async fn publish(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { let mut write_buffer = [0; 80]; let mut client = MqttClientV5::::new( - &mut tokio_network, + tokio_network, &mut write_buffer, 80, &mut recv_buffer, @@ -102,7 +105,7 @@ async fn receive_core<'b>( client: &mut MqttClientV5<'b, TokioNetwork, 5>, topic: &str, ) -> Result<(), ReasonCode> { - log::info!( + info!( "[Receiver] Connection to broker with username {} and password {}", USERNAME, PASSWORD @@ -110,17 +113,17 @@ async fn receive_core<'b>( let mut result = { client.connect_to_broker().await }; assert_ok!(result); - log::info!("[Receiver] Subscribing to topic {}", topic); + info!("[Receiver] Subscribing to topic {}", topic); result = { client.subscribe_to_topic(topic).await }; assert_ok!(result); - log::info!("[Receiver] Waiting for new message!"); + info!("[Receiver] Waiting for new message!"); let msg = { client.receive_message().await }; assert_ok!(msg); let act_message = String::from_utf8_lossy(msg?); - log::info!("[Receiver] Got new message: {}", act_message); + info!("[Receiver] Got new message: {}", act_message); assert_eq!(act_message, MSG); - log::info!("[Receiver] Disconnecting"); + info!("[Receiver] Disconnecting"); result = { client.disconnect().await }; assert_ok!(result); Ok(()) @@ -139,7 +142,7 @@ async fn receive(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { let mut write_buffer = [0; 100]; let mut client = MqttClientV5::::new( - &mut tokio_network, + tokio_network, &mut write_buffer, 100, &mut recv_buffer, @@ -163,7 +166,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode let mut write_buffer = [0; 100]; let mut client = MqttClientV5::::new( - &mut tokio_network, + tokio_network, &mut write_buffer, 100, &mut recv_buffer, @@ -171,7 +174,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode config, ); - log::info!( + info!( "[Receiver] Connection to broker with username {} and password {}", "xyz", PASSWORD @@ -184,8 +187,8 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode #[tokio::test] async fn simple_publish_recv() { - init(); - log::info!("Running simple integration test"); + setup(); + info!("Running simple integration test"); let recv = task::spawn(async move { receive(QualityOfService::QoS0, "test/recv/simple").await }); @@ -200,8 +203,8 @@ async fn simple_publish_recv() { #[tokio::test] async fn simple_publish_recv_qos() { - init(); - log::info!("Running simple integration test with Quality of Service 1"); + setup(); + info!("Running simple integration test with Quality of Service 1"); let recv = task::spawn(async move { receive(QualityOfService::QoS1, "test/recv/qos").await }); @@ -213,8 +216,8 @@ async fn simple_publish_recv_qos() { #[tokio::test] async fn simple_publish_recv_wrong_cred() { - init(); - log::info!("Running simple integration test wrong credentials"); + setup(); + info!("Running simple integration test wrong credentials"); let recv = task::spawn(async move { receive_with_wrong_cred(QualityOfService::QoS1).await }); diff --git a/mqtt/src/tests/mod.rs b/mqtt/src/tests/mod.rs index f80e190..635cbf5 100644 --- a/mqtt/src/tests/mod.rs +++ b/mqtt/src/tests/mod.rs @@ -29,5 +29,5 @@ pub mod unit; #[allow(dead_code)] #[allow(unused_must_use)] #[allow(unused_imports)] -#[cfg(feature = "tokio")] +#[cfg(feature = "testing")] pub mod integration; diff --git a/mqtt/src/tests/unit/packet/v5/disconnect_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/disconnect_packet_unit.rs index 282b487..1a035a9 100644 --- a/mqtt/src/tests/unit/packet/v5/disconnect_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/disconnect_packet_unit.rs @@ -27,6 +27,7 @@ use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; use crate::utils::buffer_reader::BuffReader; + use heapless::Vec; #[test] diff --git a/mqtt/src/tests/unit/packet/v5/puback_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/puback_packet_unit.rs index fd80208..7ea1f3f 100644 --- a/mqtt/src/tests/unit/packet/v5/puback_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/puback_packet_unit.rs @@ -28,6 +28,7 @@ use crate::packet::v5::property::Property; use crate::packet::v5::puback_packet::PubackPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::EncodedString; + use heapless::Vec; #[test] diff --git a/mqtt/src/tests/unit/packet/v5/pubcomp_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/pubcomp_packet_unit.rs index b3a6bd0..985a1dd 100644 --- a/mqtt/src/tests/unit/packet/v5/pubcomp_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/pubcomp_packet_unit.rs @@ -28,6 +28,7 @@ use crate::packet::v5::property::Property; use crate::packet::v5::pubcomp_packet::PubcompPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::EncodedString; + use heapless::Vec; #[test] diff --git a/mqtt/src/tests/unit/packet/v5/publish_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/publish_packet_unit.rs index 93b8f95..44029b9 100644 --- a/mqtt/src/tests/unit/packet/v5/publish_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/publish_packet_unit.rs @@ -28,6 +28,7 @@ use crate::packet::v5::property::Property; use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService}; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::EncodedString; + use heapless::Vec; #[test] diff --git a/mqtt/src/tests/unit/packet/v5/pubrec_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/pubrec_packet_unit.rs index bdf98af..644b55d 100644 --- a/mqtt/src/tests/unit/packet/v5/pubrec_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/pubrec_packet_unit.rs @@ -28,6 +28,7 @@ use crate::packet::v5::property::Property; use crate::packet::v5::pubrec_packet::PubrecPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::{EncodedString, StringPair}; + use heapless::Vec; #[test] diff --git a/mqtt/src/tests/unit/packet/v5/pubrel_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/pubrel_packet_unit.rs index 5ed5dac..f4dd25f 100644 --- a/mqtt/src/tests/unit/packet/v5/pubrel_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/pubrel_packet_unit.rs @@ -28,6 +28,7 @@ use crate::packet::v5::property::Property; use crate::packet::v5::pubrel_packet::PubrelPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::{EncodedString, StringPair}; + use heapless::Vec; #[test] diff --git a/mqtt/src/tests/unit/packet/v5/subscription_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/subscription_packet_unit.rs index 52dc24e..0d62826 100644 --- a/mqtt/src/tests/unit/packet/v5/subscription_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/subscription_packet_unit.rs @@ -27,6 +27,7 @@ use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1}; use crate::packet::v5::subscription_packet::SubscriptionPacket; + use heapless::Vec; #[test] diff --git a/mqtt/src/tests/unit/packet/v5/unsubscription_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/unsubscription_packet_unit.rs index 6430f26..2c757d8 100644 --- a/mqtt/src/tests/unit/packet/v5/unsubscription_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/unsubscription_packet_unit.rs @@ -28,6 +28,7 @@ use crate::packet::v5::property::Property; use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1}; use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket; use crate::utils::types::{EncodedString, StringPair}; + use heapless::Vec; #[test] diff --git a/mqtt/src/tests/unit/utils/buffer_writer_unit.rs b/mqtt/src/tests/unit/utils/buffer_writer_unit.rs index 0bcb675..e60c7d3 100644 --- a/mqtt/src/tests/unit/utils/buffer_writer_unit.rs +++ b/mqtt/src/tests/unit/utils/buffer_writer_unit.rs @@ -25,6 +25,7 @@ use crate::packet::v5::property::Property; use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair, TopicFilter}; + use heapless::Vec; #[test] diff --git a/mqtt/src/tokio_net/tokio_network.rs b/mqtt/src/tokio_net/tokio_network.rs index 6395bad..94f04ab 100644 --- a/mqtt/src/tokio_net/tokio_network.rs +++ b/mqtt/src/tokio_net/tokio_network.rs @@ -34,14 +34,15 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time::sleep; + pub struct TokioNetwork { - stream: Option, + stream: TcpStream, } impl TokioNetwork { pub fn new(stream: TcpStream) -> Self { Self { - stream: Some(stream), + stream, } } @@ -73,49 +74,30 @@ impl NetworkConnection for TokioNetwork { fn send<'m>(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> { async move { - return if let Some(ref mut stream) = self.stream { - stream - .write_all(&buffer[0..len]) - .await - .map_err(|_| ReasonCode::NetworkError) - } else { - Err(ReasonCode::NetworkError) - }; + self.stream + .write_all(&buffer[0..len]) + .await + .map_err(|_| ReasonCode::NetworkError) } } fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> { async move { - return if let Some(ref mut stream) = self.stream { - stream - .read(buffer) - .await - .map_err(|_| ReasonCode::NetworkError) - } else { - Err(ReasonCode::NetworkError) - }; + self.stream + .read(buffer) + .await + .map_err(|_| ReasonCode::NetworkError) } } fn close<'m>(mut self) -> Self::CloseFuture<'m> { async move { - return if let Some(ref mut stream) = self.stream { - stream - .shutdown() - .await - .map_err(|_| ReasonCode::NetworkError) - } else { - Err(ReasonCode::NetworkError) - }; + self.stream + .shutdown() + .await + .map_err(|_| ReasonCode::NetworkError) } } - - /*fn count_down(&'m mut self, time_in_secs: u64) -> Self::TimerFuture<'m> { - async move { - return sleep(Duration::from_secs(time_in_secs)) - .await - } - }*/ } pub struct TokioNetworkFactory {} diff --git a/mqtt/src/utils/buffer_reader.rs b/mqtt/src/utils/buffer_reader.rs index 89cf852..62f2635 100644 --- a/mqtt/src/utils/buffer_reader.rs +++ b/mqtt/src/utils/buffer_reader.rs @@ -25,6 +25,7 @@ use core::mem; use core::str; + use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder; use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair}; @@ -129,7 +130,7 @@ impl<'a> BuffReader<'a> { let res_str = str::from_utf8(&(self.buffer[self.position..(self.position + len)])); if res_str.is_err() { - log::error!("Could not parse utf-8 string"); + error!("Could not parse utf-8 string"); return Err(BufferError::Utf8Error); } self.increment_position(len); diff --git a/mqtt/src/utils/buffer_writer.rs b/mqtt/src/utils/buffer_writer.rs index 7903065..6b74faf 100644 --- a/mqtt/src/utils/buffer_writer.rs +++ b/mqtt/src/utils/buffer_writer.rs @@ -22,6 +22,7 @@ * SOFTWARE. */ + use heapless::Vec; use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerEncoder}; diff --git a/mqtt/src/utils/types.rs b/mqtt/src/utils/types.rs index 4c141c2..cf5a1f7 100644 --- a/mqtt/src/utils/types.rs +++ b/mqtt/src/utils/types.rs @@ -25,6 +25,7 @@ use core::fmt::{Display, Formatter}; #[derive(core::fmt::Debug, Clone, PartialEq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum BufferError { Utf8Error, InsufficientBufferSize,