diff --git a/.ci/mosquitto.conf b/.ci/mosquitto.conf index 9f15bd6..bbd7532 100644 --- a/.ci/mosquitto.conf +++ b/.ci/mosquitto.conf @@ -1,4 +1,3 @@ allow_anonymous false -listener 1883 10.0.1.17 -#password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt -password_file /Users/obabec/development/school/rust-mqtt/.ci/mqtt_pass.txt +listener 1883 0.0.0.0 +password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt diff --git a/.ci/mqtt_pass.txt b/.ci/mqtt_pass.txt index 598acb2..35a0f7d 100644 --- a/.ci/mqtt_pass.txt +++ b/.ci/mqtt_pass.txt @@ -1 +1 @@ -test:$7$101$IY9q8LLi2gHZZRBi$dq+KePHnbDmjlxdZsqmYy6B/yYjHoK/qsCOQ/sXpkvdDoN3E0+8DkKl4XRe7mhI2YPv3Jopo1zcicobqIHbLEA== +test:testPass diff --git a/Cargo.toml b/Cargo.toml index f7e17f1..a9711cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,6 @@ [workspace] members = [ - #"examples/drogue", - "mqtt", + "mqtt" ] resolver = "2" diff --git a/examples/drogue/src/drogue_network.rs b/examples/drogue/src/drogue_network.rs index 0548ec5..7b2db25 100644 --- a/examples/drogue/src/drogue_network.rs +++ b/examples/drogue/src/drogue_network.rs @@ -22,130 +22,82 @@ * SOFTWARE. */ +use crate::network::socket::Socket; +use crate::Address; use core::future::Future; use core::ops::Range; -use drogue_device::actors::net::ConnectionFactory; -use drogue_device::actors::socket::Socket; -use drogue_device::actors::tcp::TcpActor; -use drogue_device::traits::ip::{IpAddress, IpAddressV4, IpProtocol, SocketAddress}; -use drogue_device::Address; use rust_mqtt::packet::v5::reason_codes::ReasonCode; -use drogue_device::traits::tcp; -use drogue_device::traits::tcp::TcpStack; -use rust_mqtt::network::network_trait::{NetworkConnection, NetworkConnectionFactory}; +use crate::traits::tcp; +use crate::traits::tcp::TcpStack; +use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory}; pub struct DrogueNetwork where - A: TcpActor + 'static, + A: TcpStack + Clone + 'static, { socket: Socket, } impl DrogueNetwork -where - A: TcpActor + 'static, + where + A: TcpStack + Clone + 'static, { - fn new(socket: Socket) -> Self { + pub fn new(socket: Socket) -> Self { Self { socket } } } impl NetworkConnection for DrogueNetwork -where - A: TcpActor + 'static, -{ - type WriteFuture<'m> where - Self: 'm, + A: TcpStack + Clone + 'static, +{ + type SendFuture<'m> + where + Self: 'm, = impl Future> + 'm; - type ReadFuture<'m> - where - Self: 'm, + type ReceiveFuture<'m> + where + Self: 'm, = impl Future> + 'm; type CloseFuture<'m> - where - Self: 'm, + where + Self: 'm, = impl Future> + 'm; - fn send(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> { + fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m> { async move { self.socket - .write(&buffer[0..len]) + .write(buffer) .await .map_err(|_| ReasonCode::NetworkError) .map(|_| ()) } } - fn receive(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> { + fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReceiveFuture<'m> { async move { - self.socket + let r = self + .socket .read(buffer) .await - .map_err(|_| ReasonCode::NetworkError) + .map_err(|_| ReasonCode::NetworkError); + // Workaround for the fair access mutex, issue: + if let Ok(0) = r { + embassy::time::Timer::after(embassy::time::Duration::from_millis(10)).await; + } + r } } fn close<'m>(mut self) -> Self::CloseFuture<'m> { async move { - self.socket.close() + self.socket + .close() .await .map_err(|_| ReasonCode::NetworkError) } } -} - -pub struct DrogueConnectionFactory -where - A: TcpActor + 'static, -{ - network: Address, -} - -impl DrogueConnectionFactory -where - A: TcpActor + 'static, -{ - pub fn new(network: Address) -> Self { - Self { network } - } -} - -impl NetworkConnectionFactory for DrogueConnectionFactory -where - A: TcpActor + 'static, -{ - type Connection = DrogueNetwork; - - type ConnectionFuture<'m> - where - Self: 'm, - = impl Future> + 'm; - - fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m> { - async move { - let mut socket = Socket::new(self.network.clone(), self.network.open().await.unwrap()); - - match socket - .connect( - IpProtocol::Tcp, - SocketAddress::new(IpAddress::new_v4(ip[0], ip[1], ip[2], ip[3]), port), - ) - .await - { - Ok(_) => { - trace!("Connection established"); - Ok(DrogueNetwork::new(socket)) - } - Err(e) => { - warn!("Error creating connection:"); - socket.close().await.map_err(|e| ReasonCode::NetworkError)?; - Err(ReasonCode::NetworkError) - } - } - } - } -} +} \ No newline at end of file diff --git a/mqtt/src/client/client.rs b/mqtt/src/client/client.rs index d4caba6..a55f17a 100644 --- a/mqtt/src/client/client.rs +++ b/mqtt/src/client/client.rs @@ -22,7 +22,13 @@ * SOFTWARE. */ +use heapless::Vec; +use rand_core::RngCore; + use crate::client::client_config::{ClientConfig, MqttVersion}; +use crate::encoding::variable_byte_integer::{ + VariableByteInteger, VariableByteIntegerDecoder, +}; use crate::network::NetworkConnection; use crate::packet::v5::connack_packet::ConnackPacket; use crate::packet::v5::connect_packet::ConnectPacket; @@ -31,36 +37,31 @@ use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::pingreq_packet::PingreqPacket; use crate::packet::v5::pingresp_packet::PingrespPacket; use crate::packet::v5::puback_packet::PubackPacket; -use crate::packet::v5::publish_packet::QualityOfService::QoS1; use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService}; +use crate::packet::v5::publish_packet::QualityOfService::QoS1; use crate::packet::v5::reason_codes::ReasonCode; +use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError}; use crate::packet::v5::suback_packet::SubackPacket; use crate::packet::v5::subscription_packet::SubscriptionPacket; +use crate::packet::v5::unsuback_packet::UnsubackPacket; +use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket; use crate::utils::buffer_reader::BuffReader; -use crate::utils::rng_generator::CountingRng; +use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::BufferError; -use heapless::Vec; -use rand_core::RngCore; -use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder}; -use crate::network::NetworkError::Connection; -use crate::packet::v5::property::Property; -use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError}; -use crate::utils::buffer_writer::BuffWriter; - -pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize> { +pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> { connection: Option, buffer: &'a mut [u8], buffer_len: usize, recv_buffer: &'a mut [u8], recv_buffer_len: usize, - rng: CountingRng, - config: ClientConfig<'a, MAX_PROPERTIES>, + config: ClientConfig<'a, MAX_PROPERTIES, R>, } -impl<'a, T, const MAX_PROPERTIES: usize> MqttClient<'a, T, MAX_PROPERTIES> +impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R> where T: NetworkConnection, + R: RngCore, { pub fn new( network_driver: T, @@ -68,7 +69,7 @@ where buffer_len: usize, recv_buffer: &'a mut [u8], recv_buffer_len: usize, - config: ClientConfig<'a, MAX_PROPERTIES>, + config: ClientConfig<'a, MAX_PROPERTIES, R>, ) -> Self { Self { connection: Some(network_driver), @@ -76,7 +77,6 @@ where buffer_len, recv_buffer, recv_buffer_len, - rng: CountingRng(50), config, } } @@ -103,7 +103,7 @@ where error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } - let mut conn = self.connection.as_mut().unwrap(); + let conn = self.connection.as_mut().unwrap(); trace!("Sending connect"); conn.send(&self.buffer[0..len.unwrap()]).await?; @@ -111,16 +111,14 @@ where let reason: Result = { trace!("Waiting for connack"); - let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; + let read = + { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; let mut packet = ConnackPacket::<'b, MAX_PROPERTIES>::new(); if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { if err == BufferError::PacketTypeMismatch { let mut disc = DisconnectPacket::<'b, MAX_PROPERTIES>::new(); - if disc - .decode(&mut BuffReader::new(self.buffer, read)) - .is_ok() - { + if disc.decode(&mut BuffReader::new(self.buffer, read)).is_ok() { error!("Client was disconnected with reason: "); return Err(ReasonCode::from(disc.disconnect_reason)); } @@ -145,12 +143,12 @@ where pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> { match self.config.mqtt_version { - MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)} - MqttVersion::MQTTv5 => {self.connect_to_broker_v5().await} + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.connect_to_broker_v5().await, } } - async fn disconnect_v5<'b>(&'b mut self) -> Result<(), ReasonCode> { + async fn disconnect_v5<'b>(&'b mut self) -> Result<(), ReasonCode> { if self.connection.is_none() { return Err(ReasonCode::NetworkError); } @@ -164,7 +162,7 @@ where return Err(ReasonCode::BuffError); } - if let Err(e) = conn.send(&self.buffer[0..len.unwrap()]).await { + if let Err(_e) = conn.send(&self.buffer[0..len.unwrap()]).await { warn!("Could not send DISCONNECT packet"); } @@ -179,8 +177,8 @@ where pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> { match self.config.mqtt_version { - MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)} - MqttVersion::MQTTv5 => {self.disconnect_v5().await} + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.disconnect_v5().await, } } @@ -192,8 +190,9 @@ where if self.connection.is_none() { return Err(ReasonCode::NetworkError); } - let mut conn = self.connection.as_mut().unwrap(); - let identifier: u16 = self.rng.next_u32() as u16; + let conn = self.connection.as_mut().unwrap(); + let identifier: u16 = self.config.rng.next_u32() as u16; + //self.rng.next_u32() as u16; let len = { let mut packet = PublishPacket::<'b, MAX_PROPERTIES>::new(); packet.add_topic_name(topic_name); @@ -216,11 +215,11 @@ where { let reason: Result<[u16; 2], BufferError> = { trace!("Waiting for ack"); - let read = receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await?; + let read = + receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await?; trace!("[PUBACK] Received packet with len"); let mut packet = PubackPacket::<'b, MAX_PROPERTIES>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) - { + if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { Err(err) } else { Ok([packet.packet_identifier, packet.reason_code as u16]) @@ -250,8 +249,8 @@ where message: &'b str, ) -> Result<(), ReasonCode> { match self.config.mqtt_version { - MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)} - MqttVersion::MQTTv5 => {self.send_message_v5(topic_name, message).await} + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.send_message_v5(topic_name, message).await, } } @@ -262,7 +261,7 @@ where if self.connection.is_none() { return Err(ReasonCode::NetworkError); } - let mut conn = self.connection.as_mut().unwrap(); + let conn = self.connection.as_mut().unwrap(); let len = { let mut subs = SubscriptionPacket::<'b, TOPICS, MAX_PROPERTIES>::new(); let mut i = 0; @@ -284,7 +283,8 @@ where conn.send(&self.buffer[0..len.unwrap()]).await?; let reason: Result, BufferError> = { - let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; + let read = + { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; let mut packet = SubackPacket::<'b, TOPICS, MAX_PROPERTIES>::new(); if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { @@ -304,7 +304,9 @@ where if i == TOPICS { break; } - if *reasons.get(i).unwrap() != (>::into(self.config.qos) >> 1) { + if *reasons.get(i).unwrap() + != (>::into(self.config.qos) >> 1) + { return Err(ReasonCode::from(*reasons.get(i).unwrap())); } i = i + 1; @@ -317,11 +319,63 @@ where topic_names: &'b Vec<&'b str, TOPICS>, ) -> Result<(), ReasonCode> { match self.config.mqtt_version { - MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)} - MqttVersion::MQTTv5 => {self.subscribe_to_topics_v5(topic_names).await} + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.subscribe_to_topics_v5(topic_names).await, } } + pub async fn unsubscribe_from_topic<'b>( + &'b mut self, + topic_name: &'b str, + ) -> Result<(), ReasonCode> { + match self.config.mqtt_version { + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.unsubscribe_from_topic_v5(topic_name).await, + } + } + + pub async fn unsubscribe_from_topic_v5<'b>( + &'b mut self, + topic_name: &'b str, + ) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let conn = self.connection.as_mut().unwrap(); + + let len = { + let mut unsub = UnsubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new(); + unsub.packet_identifier = self.config.rng.next_u32() as u16; + unsub.add_new_filter(topic_name); + unsub.encode(self.buffer, self.buffer_len) + }; + + if let Err(err) = len { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + conn.send(&self.buffer[0..len.unwrap()]).await?; + + let reason: Result = { + let read = + { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; + let mut packet = UnsubackPacket::<'b, 1, MAX_PROPERTIES>::new(); + + if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { + Err(err) + } else { + Ok(*packet.reason_codes.get(0).unwrap()) + } + }; + + if let Err(err) = reason { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + + Ok(()) + } + async fn subscribe_to_topic_v5<'b>( &'b mut self, topic_name: &'b str, @@ -329,7 +383,7 @@ where if self.connection.is_none() { return Err(ReasonCode::NetworkError); } - let mut conn = self.connection.as_mut().unwrap(); + let conn = self.connection.as_mut().unwrap(); let len = { let mut subs = SubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new(); subs.add_new_filter(topic_name, self.config.qos); @@ -344,9 +398,10 @@ where conn.send(&self.buffer[0..len.unwrap()]).await?; let reason: Result = { - let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; + let read = + { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; - let mut packet = SubackPacket::<'b, 5, MAX_PROPERTIES>::new(); + let mut packet = SubackPacket::<'b, 1, MAX_PROPERTIES>::new(); if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { Err(err) } else { @@ -372,8 +427,8 @@ where topic_name: &'b str, ) -> Result<(), ReasonCode> { match self.config.mqtt_version { - MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)} - MqttVersion::MQTTv5 => {self.subscribe_to_topic_v5(topic_name).await} + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.subscribe_to_topic_v5(topic_name).await, } } @@ -381,20 +436,14 @@ where if self.connection.is_none() { return Err(ReasonCode::NetworkError); } - let mut conn = self.connection.as_mut().unwrap(); + let conn = self.connection.as_mut().unwrap(); let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; let mut packet = PublishPacket::<'b, 5>::new(); - if let Err(err) = { - packet.decode(&mut BuffReader::new(self.buffer, read)) - } - - { + if let Err(err) = { packet.decode(&mut BuffReader::new(self.buffer, read)) } { if err == BufferError::PacketTypeMismatch { let mut disc = DisconnectPacket::<'b, 5>::new(); - if disc.decode(&mut BuffReader::new(self.buffer, read)) - .is_ok() - { + if disc.decode(&mut BuffReader::new(self.buffer, read)).is_ok() { error!("Client was disconnected with reason: "); return Err(ReasonCode::from(disc.disconnect_reason)); } @@ -424,8 +473,8 @@ where pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> { match self.config.mqtt_version { - MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)} - MqttVersion::MQTTv5 => {self.receive_message_v5().await} + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.receive_message_v5().await, } } @@ -433,7 +482,7 @@ where if self.connection.is_none() { return Err(ReasonCode::NetworkError); } - let mut conn = self.connection.as_mut().unwrap(); + let conn = self.connection.as_mut().unwrap(); let len = { let mut packet = PingreqPacket::new(); packet.encode(self.buffer, self.buffer_len) @@ -458,32 +507,37 @@ where pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> { match self.config.mqtt_version { - MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)} - MqttVersion::MQTTv5 => {self.send_ping_v5().await} + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.send_ping_v5().await, } } } - -async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: usize, recv_buffer: & mut [u8], conn: &'c mut T) -> Result { - let mut target_len = 0; +async fn receive_packet<'c, T: NetworkConnection>( + buffer: &mut [u8], + buffer_len: usize, + recv_buffer: &mut [u8], + conn: &'c mut T, +) -> Result { + let target_len: usize; let mut rem_len: Result; - let mut rem_len_len: usize = 0; let mut writer = BuffWriter::new(buffer, buffer_len); - let mut i = 0; + let mut i = 0; // Get len of packet trace!("Reading lenght of packet"); loop { trace!(" Reading in loop!"); - let len: usize = conn.receive(&mut recv_buffer[writer.position..(writer.position+1)]).await?; + let len: usize = conn + .receive(&mut recv_buffer[writer.position..(writer.position + 1)]) + .await?; trace!(" Received data!"); i = i + len; - if let Err(e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) { + if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) { error!("Error occurred during write to buffer!"); return Err(ReasonCode::BuffError); } - if (i > 1) { + if i > 1 { rem_len = writer.get_rem_len(); if rem_len.is_ok() { break; @@ -495,7 +549,7 @@ async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: } } - rem_len_len = i; + let rem_len_len = i; i = 0; if let Ok(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()) { trace!("Reading packet with target len {}", l); @@ -506,9 +560,12 @@ async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: } loop { - let len: usize = conn.receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]).await?; + let len: usize = conn + .receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]) + .await?; i = i + len; - if let Err(e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) { + if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) + { error!("Error occurred during write to buffer!"); return Err(BuffError); } @@ -517,4 +574,4 @@ async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: return Ok(target_len + rem_len_len); } } -} \ No newline at end of file +} diff --git a/mqtt/src/client/client_config.rs b/mqtt/src/client/client_config.rs index aebd2fc..45b5417 100644 --- a/mqtt/src/client/client_config.rs +++ b/mqtt/src/client/client_config.rs @@ -22,20 +22,21 @@ * SOFTWARE. */ +use heapless::Vec; +use rand_core::RngCore; + use crate::packet::v5::property::Property; use crate::packet::v5::publish_packet::QualityOfService; use crate::utils::types::{BinaryData, EncodedString}; -use heapless::Vec; - #[derive(Clone, PartialEq)] pub enum MqttVersion { MQTTv3, - MQTTv5 + MQTTv5, } #[derive(Clone)] -pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> { +pub struct ClientConfig<'a, const MAX_PROPERTIES: usize, T: RngCore> { pub qos: QualityOfService, pub keep_alive: u16, pub username_flag: bool, @@ -45,10 +46,11 @@ pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> { pub properties: Vec, MAX_PROPERTIES>, pub max_packet_size: u32, pub mqtt_version: MqttVersion, + pub rng: T, } -impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> { - pub fn new(version: MqttVersion) -> Self { +impl<'a, const MAX_PROPERTIES: usize, T: RngCore> ClientConfig<'a, MAX_PROPERTIES, T> { + pub fn new(version: MqttVersion, rng: T) -> Self { Self { qos: QualityOfService::QoS0, keep_alive: 60, @@ -58,7 +60,8 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> { password: BinaryData::new(), properties: Vec::, MAX_PROPERTIES>::new(), max_packet_size: 265_000, - mqtt_version: version + mqtt_version: version, + rng, } } diff --git a/mqtt/src/client/mod.rs b/mqtt/src/client/mod.rs index ccdda35..14145b3 100644 --- a/mqtt/src/client/mod.rs +++ b/mqtt/src/client/mod.rs @@ -22,6 +22,6 @@ * SOFTWARE. */ +pub mod client; #[allow(unused_must_use)] pub mod client_config; -pub mod client; diff --git a/mqtt/src/encoding/variable_byte_integer.rs b/mqtt/src/encoding/variable_byte_integer.rs index 69a0beb..ad71f6c 100644 --- a/mqtt/src/encoding/variable_byte_integer.rs +++ b/mqtt/src/encoding/variable_byte_integer.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use crate::utils::types::BufferError; /// VariableByteIntegerEncoder and VariableByteIntegerDecoder are implemented based on diff --git a/mqtt/src/lib.rs b/mqtt/src/lib.rs index d85d830..4f4ff37 100644 --- a/mqtt/src/lib.rs +++ b/mqtt/src/lib.rs @@ -27,7 +27,6 @@ #![allow(dead_code)] #![feature(type_alias_impl_trait)] #![feature(generic_associated_types)] - pub(crate) mod fmt; pub mod client; pub mod encoding; @@ -36,4 +35,3 @@ pub mod packet; pub mod tests; pub mod tokio_net; pub mod utils; - diff --git a/mqtt/src/network/mod.rs b/mqtt/src/network/mod.rs index c6d552d..d92244d 100644 --- a/mqtt/src/network/mod.rs +++ b/mqtt/src/network/mod.rs @@ -40,7 +40,7 @@ pub trait NetworkConnectionFactory: Sized { type ConnectionFuture<'m>: Future> where - Self: 'm; + Self: 'm; fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m>; } @@ -48,15 +48,15 @@ pub trait NetworkConnectionFactory: Sized { pub trait NetworkConnection { type SendFuture<'m>: Future> where - Self: 'm; + Self: 'm; type ReceiveFuture<'m>: Future> where - Self: 'm; + Self: 'm; type CloseFuture<'m>: Future> where - Self: 'm; + Self: 'm; fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m>; diff --git a/mqtt/src/packet/v5/connack_packet.rs b/mqtt/src/packet/v5/connack_packet.rs index f65d12f..e8911c1 100644 --- a/mqtt/src/packet/v5/connack_packet.rs +++ b/mqtt/src/packet/v5/connack_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; diff --git a/mqtt/src/packet/v5/connect_packet.rs b/mqtt/src/packet/v5/connect_packet.rs index 51d9abc..e29182e 100644 --- a/mqtt/src/packet/v5/connect_packet.rs +++ b/mqtt/src/packet/v5/connect_packet.rs @@ -27,7 +27,6 @@ use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::packet::v5::mqtt_packet::Packet; use crate::utils::buffer_reader::BuffReader; - use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::{BinaryData, BufferError, EncodedString}; diff --git a/mqtt/src/packet/v5/disconnect_packet.rs b/mqtt/src/packet/v5/disconnect_packet.rs index 44582d5..cb59a71 100644 --- a/mqtt/src/packet/v5/disconnect_packet.rs +++ b/mqtt/src/packet/v5/disconnect_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; @@ -83,6 +82,10 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PR error!("Packet you are trying to decode is not DISCONNECT packet!"); return Err(BufferError::WrongPacketToDecode); } + if self.remain_len == 0 { + self.disconnect_reason = 0x00; + return Ok(()); + } self.disconnect_reason = buff_reader.read_u8()?; return self.decode_properties(buff_reader); } diff --git a/mqtt/src/packet/v5/mqtt_packet.rs b/mqtt/src/packet/v5/mqtt_packet.rs index ea32847..c5e6fc5 100644 --- a/mqtt/src/packet/v5/mqtt_packet.rs +++ b/mqtt/src/packet/v5/mqtt_packet.rs @@ -22,10 +22,11 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::packet_type::PacketType; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::BufferError; -use heapless::Vec; use super::property::Property; diff --git a/mqtt/src/packet/v5/packet_type.rs b/mqtt/src/packet/v5/packet_type.rs index 97ac1a7..eb6d40a 100644 --- a/mqtt/src/packet/v5/packet_type.rs +++ b/mqtt/src/packet/v5/packet_type.rs @@ -77,11 +77,11 @@ impl Into for PacketType { PacketType::Publish => 0x30, PacketType::Puback => 0x40, PacketType::Pubrec => 0x50, - PacketType::Pubrel => 0x60, + PacketType::Pubrel => 0x62, PacketType::Pubcomp => 0x70, PacketType::Subscribe => 0x82, PacketType::Suback => 0x90, - PacketType::Unsubscribe => 0xA0, + PacketType::Unsubscribe => 0xA2, PacketType::Unsuback => 0xB0, PacketType::Pingreq => 0xC0, PacketType::Pingresp => 0xD0, diff --git a/mqtt/src/packet/v5/pingreq_packet.rs b/mqtt/src/packet/v5/pingreq_packet.rs index 035521d..3e8e218 100644 --- a/mqtt/src/packet/v5/pingreq_packet.rs +++ b/mqtt/src/packet/v5/pingreq_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use crate::packet::v5::mqtt_packet::Packet; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; diff --git a/mqtt/src/packet/v5/pingresp_packet.rs b/mqtt/src/packet/v5/pingresp_packet.rs index 2c5e77f..22b0827 100644 --- a/mqtt/src/packet/v5/pingresp_packet.rs +++ b/mqtt/src/packet/v5/pingresp_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use crate::packet::v5::mqtt_packet::Packet; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; diff --git a/mqtt/src/packet/v5/puback_packet.rs b/mqtt/src/packet/v5/puback_packet.rs index e997a1d..bff4c77 100644 --- a/mqtt/src/packet/v5/puback_packet.rs +++ b/mqtt/src/packet/v5/puback_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; diff --git a/mqtt/src/packet/v5/pubcomp_packet.rs b/mqtt/src/packet/v5/pubcomp_packet.rs index 71301c4..737e8fc 100644 --- a/mqtt/src/packet/v5/pubcomp_packet.rs +++ b/mqtt/src/packet/v5/pubcomp_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; diff --git a/mqtt/src/packet/v5/publish_packet.rs b/mqtt/src/packet/v5/publish_packet.rs index 8b5ab14..88a48b2 100644 --- a/mqtt/src/packet/v5/publish_packet.rs +++ b/mqtt/src/packet/v5/publish_packet.rs @@ -22,12 +22,11 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::packet::v5::mqtt_packet::Packet; -use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1, QoS2, INVALID}; +use crate::packet::v5::publish_packet::QualityOfService::{INVALID, QoS0, QoS1, QoS2}; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::{BufferError, EncodedString}; diff --git a/mqtt/src/packet/v5/pubrec_packet.rs b/mqtt/src/packet/v5/pubrec_packet.rs index 69ec7b6..e1b9f29 100644 --- a/mqtt/src/packet/v5/pubrec_packet.rs +++ b/mqtt/src/packet/v5/pubrec_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; diff --git a/mqtt/src/packet/v5/pubrel_packet.rs b/mqtt/src/packet/v5/pubrel_packet.rs index fb560e3..960a0c4 100644 --- a/mqtt/src/packet/v5/pubrel_packet.rs +++ b/mqtt/src/packet/v5/pubrel_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; diff --git a/mqtt/src/packet/v5/suback_packet.rs b/mqtt/src/packet/v5/suback_packet.rs index 741f004..77671d7 100644 --- a/mqtt/src/packet/v5/suback_packet.rs +++ b/mqtt/src/packet/v5/suback_packet.rs @@ -22,10 +22,9 @@ * SOFTWARE. */ -use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; - use heapless::Vec; +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::packet::v5::mqtt_packet::Packet; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::BufferError; diff --git a/mqtt/src/packet/v5/subscription_packet.rs b/mqtt/src/packet/v5/subscription_packet.rs index 28d7af2..c4d65cc 100644 --- a/mqtt/src/packet/v5/subscription_packet.rs +++ b/mqtt/src/packet/v5/subscription_packet.rs @@ -22,11 +22,8 @@ * SOFTWARE. */ - use heapless::Vec; -use super::packet_type::PacketType; -use super::property::Property; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::publish_packet::QualityOfService; @@ -34,6 +31,9 @@ use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::{BufferError, TopicFilter}; +use super::packet_type::PacketType; +use super::property::Property; + pub struct SubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> { pub fixed_header: u8, pub remain_len: u32, diff --git a/mqtt/src/packet/v5/unsuback_packet.rs b/mqtt/src/packet/v5/unsuback_packet.rs index 9deb88c..81cc48c 100644 --- a/mqtt/src/packet/v5/unsuback_packet.rs +++ b/mqtt/src/packet/v5/unsuback_packet.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use heapless::Vec; use crate::packet::v5::mqtt_packet::Packet; diff --git a/mqtt/src/packet/v5/unsubscription_packet.rs b/mqtt/src/packet/v5/unsubscription_packet.rs index e1cbfc5..11ce39a 100644 --- a/mqtt/src/packet/v5/unsubscription_packet.rs +++ b/mqtt/src/packet/v5/unsubscription_packet.rs @@ -22,12 +22,11 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::packet::v5::mqtt_packet::Packet; -use crate::packet::v5::publish_packet::QualityOfService; +use crate::packet::v5::packet_type::PacketType; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::{BufferError, TopicFilter}; @@ -47,13 +46,12 @@ pub struct UnsubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTI impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> { - pub fn add_new_filter(&mut self, topic_name: &'a str, qos: QualityOfService) { + pub fn add_new_filter(&mut self, topic_name: &'a str) { let len = topic_name.len(); let mut new_filter = TopicFilter::new(); new_filter.filter.string = topic_name; new_filter.filter.len = len as u16; - new_filter.sub_options = - new_filter.sub_options | (>::into(qos) >> 1); + new_filter.sub_options = new_filter.sub_options | 0x01; self.topic_filters.push(new_filter); self.topic_filter_len = self.topic_filter_len + 1; } @@ -64,7 +62,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> { fn new() -> Self { Self { - fixed_header: 0, + fixed_header: PacketType::Unsubscribe.into(), remain_len: 0, packet_identifier: 0, property_len: 0, diff --git a/mqtt/src/tests/mod.rs b/mqtt/src/tests/mod.rs index 17b1913..5c4e6f3 100644 --- a/mqtt/src/tests/mod.rs +++ b/mqtt/src/tests/mod.rs @@ -25,4 +25,3 @@ #[cfg(test)] #[allow(unused_must_use)] pub mod unit; - diff --git a/mqtt/src/tests/unit/packet/v5/disconnect_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/disconnect_packet_unit.rs index 1a035a9..3213216 100644 --- a/mqtt/src/tests/unit/packet/v5/disconnect_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/disconnect_packet_unit.rs @@ -22,14 +22,14 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::disconnect_packet::DisconnectPacket; use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; use crate::utils::buffer_reader::BuffReader; -use heapless::Vec; - #[test] fn test_encode() { let mut buffer: [u8; 10] = [0; 10]; diff --git a/mqtt/src/tests/unit/packet/v5/puback_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/puback_packet_unit.rs index 7ea1f3f..d30c8ce 100644 --- a/mqtt/src/tests/unit/packet/v5/puback_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/puback_packet_unit.rs @@ -22,6 +22,8 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; @@ -29,8 +31,6 @@ use crate::packet::v5::puback_packet::PubackPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::EncodedString; -use heapless::Vec; - #[test] fn test_encode() { let mut buffer: [u8; 14] = [0; 14]; diff --git a/mqtt/src/tests/unit/packet/v5/pubcomp_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/pubcomp_packet_unit.rs index 985a1dd..eff4758 100644 --- a/mqtt/src/tests/unit/packet/v5/pubcomp_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/pubcomp_packet_unit.rs @@ -22,6 +22,8 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; @@ -29,8 +31,6 @@ use crate::packet::v5::pubcomp_packet::PubcompPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::EncodedString; -use heapless::Vec; - #[test] fn test_encode() { let mut buffer: [u8; 14] = [0; 14]; diff --git a/mqtt/src/tests/unit/packet/v5/publish_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/publish_packet_unit.rs index 44029b9..276d14a 100644 --- a/mqtt/src/tests/unit/packet/v5/publish_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/publish_packet_unit.rs @@ -22,6 +22,8 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; @@ -29,8 +31,6 @@ use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService}; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::EncodedString; -use heapless::Vec; - #[test] fn test_encode() { let mut buffer: [u8; 29] = [0; 29]; diff --git a/mqtt/src/tests/unit/packet/v5/pubrec_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/pubrec_packet_unit.rs index 644b55d..10217bd 100644 --- a/mqtt/src/tests/unit/packet/v5/pubrec_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/pubrec_packet_unit.rs @@ -22,6 +22,8 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; @@ -29,8 +31,6 @@ use crate::packet::v5::pubrec_packet::PubrecPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::{EncodedString, StringPair}; -use heapless::Vec; - #[test] fn test_encode() { let mut buffer: [u8; 20] = [0; 20]; diff --git a/mqtt/src/tests/unit/packet/v5/pubrel_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/pubrel_packet_unit.rs index f4dd25f..abf62ef 100644 --- a/mqtt/src/tests/unit/packet/v5/pubrel_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/pubrel_packet_unit.rs @@ -22,6 +22,8 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; @@ -29,8 +31,6 @@ use crate::packet::v5::pubrel_packet::PubrelPacket; use crate::utils::buffer_reader::BuffReader; use crate::utils::types::{EncodedString, StringPair}; -use heapless::Vec; - #[test] fn test_encode() { let mut buffer: [u8; 21] = [0; 21]; @@ -56,7 +56,7 @@ fn test_encode() { assert_eq!( buffer, [ - 0x60, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, + 0x62, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x38, 0x39 ] ) @@ -65,7 +65,7 @@ fn test_encode() { #[test] fn test_decode() { let buffer: [u8; 21] = [ - 0x60, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06, + 0x62, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x38, 0x39, ]; let mut packet = PubrelPacket::<1>::new(); diff --git a/mqtt/src/tests/unit/packet/v5/subscription_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/subscription_packet_unit.rs index 0d62826..106a83c 100644 --- a/mqtt/src/tests/unit/packet/v5/subscription_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/subscription_packet_unit.rs @@ -22,14 +22,14 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1}; use crate::packet::v5::subscription_packet::SubscriptionPacket; -use heapless::Vec; - #[test] fn test_encode() { let mut buffer: [u8; 30] = [0; 30]; diff --git a/mqtt/src/tests/unit/packet/v5/unsubscription_packet_unit.rs b/mqtt/src/tests/unit/packet/v5/unsubscription_packet_unit.rs index 2c757d8..259fcb5 100644 --- a/mqtt/src/tests/unit/packet/v5/unsubscription_packet_unit.rs +++ b/mqtt/src/tests/unit/packet/v5/unsubscription_packet_unit.rs @@ -22,15 +22,14 @@ * SOFTWARE. */ +use heapless::Vec; + use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::property::Property; -use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1}; use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket; use crate::utils::types::{EncodedString, StringPair}; -use heapless::Vec; - #[test] fn test_encode() { let mut buffer: [u8; 40] = [0; 40]; @@ -49,15 +48,15 @@ fn test_encode() { let mut props = Vec::::new(); props.push(Property::UserProperty(pair)); packet.property_len = packet.add_properties(&props); - packet.add_new_filter("test/topic", QoS0); - packet.add_new_filter("hehe/#", QoS1); + packet.add_new_filter("test/topic"); + packet.add_new_filter("hehe/#"); let res = packet.encode(&mut buffer, 40); assert!(res.is_ok()); assert_eq!(res.unwrap(), 40); assert_eq!( buffer, [ - 0xA0, 0x26, 0x15, 0x38, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06, + 0xA2, 0x26, 0x15, 0x38, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x38, 0x39, 0x00, 0x0A, 0x74, 0x65, 0x73, 0x74, 0x2F, 0x74, 0x6F, 0x70, 0x69, 0x63, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x2F, 0x23 ] diff --git a/mqtt/src/tests/unit/utils/buffer_writer_unit.rs b/mqtt/src/tests/unit/utils/buffer_writer_unit.rs index 58ac4cd..707bfc7 100644 --- a/mqtt/src/tests/unit/utils/buffer_writer_unit.rs +++ b/mqtt/src/tests/unit/utils/buffer_writer_unit.rs @@ -22,14 +22,14 @@ * SOFTWARE. */ +use heapless::Vec; +use tokio_test::{assert_err, assert_ok}; + +use crate::encoding::variable_byte_integer::VariableByteInteger; use crate::packet::v5::property::Property; use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair, TopicFilter}; -use heapless::Vec; -use tokio_test::{assert_err, assert_ok}; -use crate::encoding::variable_byte_integer::VariableByteInteger; - #[test] fn buffer_write_ref() { static BUFFER: [u8; 5] = [0x82, 0x82, 0x03, 0x85, 0x84]; @@ -411,7 +411,6 @@ fn buffer_get_rem_len_two() { assert_eq!(rm_len.unwrap(), REF); } - #[test] fn buffer_get_rem_len_three() { static BUFFER: [u8; 5] = [0x82, 0x82, 0x83, 0x05, 0x84]; @@ -489,12 +488,12 @@ fn buffer_get_rem_len_cont() { let mut res_buffer: [u8; 6] = [0; 6]; let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 6); - let mut test_write = writer.insert_ref(2, &[0x82, 0x81]); + let test_write = writer.insert_ref(2, &[0x82, 0x81]); let rm_len = writer.get_rem_len(); assert_ok!(test_write); assert_err!(rm_len); - test_write = writer.insert_ref(2, &[0x82, 0x01]); + writer.insert_ref(2, &[0x82, 0x01]); let rm_len_sec = writer.get_rem_len(); assert_ok!(rm_len_sec); assert_eq!(rm_len_sec.unwrap(), [0x81, 0x82, 0x01, 0x00]); -} \ No newline at end of file +} diff --git a/mqtt/src/tokio_net/mod.rs b/mqtt/src/tokio_net/mod.rs index c9c75da..980c06e 100644 --- a/mqtt/src/tokio_net/mod.rs +++ b/mqtt/src/tokio_net/mod.rs @@ -22,10 +22,7 @@ * SOFTWARE. */ -#![feature(in_band_lifetimes)] #![macro_use] #![allow(dead_code)] -#![feature(type_alias_impl_trait)] -#![feature(generic_associated_types)] #[cfg(feature = "tokio")] pub mod tokio_network; diff --git a/mqtt/src/tokio_net/tokio_network.rs b/mqtt/src/tokio_net/tokio_network.rs index 9ff829f..3a3e9ef 100644 --- a/mqtt/src/tokio_net/tokio_network.rs +++ b/mqtt/src/tokio_net/tokio_network.rs @@ -23,17 +23,16 @@ */ extern crate alloc; + use alloc::format; use alloc::string::String; use core::future::Future; -use core::time::Duration; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; use crate::network::{NetworkConnection, NetworkConnectionFactory}; use crate::packet::v5::reason_codes::ReasonCode; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; -use tokio::time::sleep; - pub struct TokioNetwork { stream: TcpStream, @@ -41,9 +40,7 @@ pub struct TokioNetwork { impl TokioNetwork { pub fn new(stream: TcpStream) -> Self { - Self { - stream, - } + Self { stream } } pub fn convert_ip(ip: [u8; 4], port: u16) -> String { @@ -53,24 +50,13 @@ impl TokioNetwork { impl NetworkConnection for TokioNetwork { type SendFuture<'m> - where - Self: 'm, - = impl Future> + 'm; + = impl Future> + 'm where Self: 'm; type ReceiveFuture<'m> - where - Self: 'm, - = impl Future> + 'm; + = impl Future> + 'm where Self: 'm; type CloseFuture<'m> - where - Self: 'm, - = impl Future> + 'm; - - /*type TimerFuture<'m> - where - Self: 'm, - = impl Future;*/ + = impl Future> + 'm where Self: 'm; fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m> { async move { @@ -112,9 +98,7 @@ impl NetworkConnectionFactory for TokioNetworkFactory { type Connection = TokioNetwork; type ConnectionFuture<'m> - where - Self: 'm, - = impl Future> + 'm; + = impl Future> + 'm where Self: 'm; fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m> { async move { diff --git a/mqtt/src/utils/buffer_reader.rs b/mqtt/src/utils/buffer_reader.rs index 62f2635..61ac1e2 100644 --- a/mqtt/src/utils/buffer_reader.rs +++ b/mqtt/src/utils/buffer_reader.rs @@ -25,7 +25,6 @@ use core::mem; use core::str; - use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder; use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair}; diff --git a/mqtt/src/utils/buffer_writer.rs b/mqtt/src/utils/buffer_writer.rs index 5dab43d..5738ccc 100644 --- a/mqtt/src/utils/buffer_writer.rs +++ b/mqtt/src/utils/buffer_writer.rs @@ -22,7 +22,6 @@ * SOFTWARE. */ - use heapless::Vec; use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerEncoder}; @@ -48,15 +47,19 @@ impl<'a> BuffWriter<'a> { self.position = self.position + increment; } - pub fn get_n_byte(& mut self, n: usize) -> u8 { + pub fn get_n_byte(&mut self, n: usize) -> u8 { if self.position >= n { - return self.buffer[n] + return self.buffer[n]; } - return 0 + return 0; } - pub fn get_rem_len(& mut self) -> Result { - let mut max = if self.position >= 5 {4} else {self.position - 1}; + pub fn get_rem_len(&mut self) -> Result { + let max = if self.position >= 5 { + 4 + } else { + self.position - 1 + }; let mut i = 1; let mut len: VariableByteInteger = [0; 4]; loop { diff --git a/mqtt/src/utils/rng_generator.rs b/mqtt/src/utils/rng_generator.rs index f703d2e..f4e91ea 100644 --- a/mqtt/src/utils/rng_generator.rs +++ b/mqtt/src/utils/rng_generator.rs @@ -1,7 +1,7 @@ // This code is handed from Embedded Rust documentation and // is accessible from https://docs.rust-embedded.org/cortex-m-rt/0.6.0/rand/trait.RngCore.html -use rand_core::{impls, Error, RngCore}; +use rand_core::{Error, impls, RngCore}; pub struct CountingRng(pub u64); diff --git a/mqtt/tests/integration_test_single.rs b/mqtt/tests/integration_test_single.rs index c51fa82..6953b66 100644 --- a/mqtt/tests/integration_test_single.rs +++ b/mqtt/tests/integration_test_single.rs @@ -22,29 +22,30 @@ * SOFTWARE. */ extern crate alloc; + use alloc::string::String; use core::time::Duration; -use std::future::Future; -use log::{info, LevelFilter}; -use tokio::time::sleep; -use tokio::task; -use tokio_test::{assert_err, assert_ok}; +use std::sync::Once; + +use futures::future::{join, join3}; use heapless::Vec; -use rust_mqtt::client::client_config::ClientConfig; +use log::{info}; +use tokio::task; +use tokio::time::sleep; +use tokio_test::{assert_err, assert_ok}; + use rust_mqtt::client::client::MqttClient; -use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory}; +use rust_mqtt::client::client_config::ClientConfig; +use rust_mqtt::client::client_config::MqttVersion::MQTTv5; +use rust_mqtt::network::{NetworkConnectionFactory}; use rust_mqtt::packet::v5::property::Property; use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::reason_codes::ReasonCode; use rust_mqtt::packet::v5::reason_codes::ReasonCode::NotAuthorized; use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory}; -use rust_mqtt::utils::types::BufferError; -use std::sync::Once; -use futures::future::{join, join3}; -use rust_mqtt::client::client_config::MqttVersion::MQTTv5; +use rust_mqtt::utils::rng_generator::CountingRng; static IP: [u8; 4] = [127, 0, 0, 1]; -static WRONG_IP: [u8; 4] = [192, 168, 1, 1]; static PORT: u16 = 1883; static USERNAME: &str = "test"; static PASSWORD: &str = "testPass"; @@ -59,35 +60,49 @@ fn setup() { } async fn publish_core<'b>( - client: &mut MqttClient<'b, TokioNetwork, 5>, + client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>, wait: u64, topic: &str, + message: &str, + err: bool, ) -> Result<(), ReasonCode> { info!( "[Publisher] Connection to broker with username {} and password {}", - USERNAME, - PASSWORD + USERNAME, PASSWORD ); let mut result = { client.connect_to_broker().await }; assert_ok!(result); info!("[Publisher] Waiting {} seconds before sending", wait); sleep(Duration::from_secs(wait)).await; - info!("[Publisher] Sending new message {} to topic {}", MSG, topic); - result = { client.send_message(topic, MSG).await }; + info!( + "[Publisher] Sending new message {} to topic {}", + message, topic + ); + result = client.send_message(topic, message).await; info!("[PUBLISHER] sent"); - assert_ok!(result); + if err == true { + assert_err!(result); + } else { + assert_ok!(result); + } info!("[Publisher] Disconnecting!"); - result = { client.disconnect().await }; + result = client.disconnect().await; + assert_ok!(result); Ok(()) } -async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { +async fn publish( + ip: [u8; 4], + wait: u64, + qos: QualityOfService, + topic: &str, +) -> Result<(), ReasonCode> { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; - let mut config = ClientConfig::new(MQTTv5); + let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); @@ -95,7 +110,7 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) -> let mut recv_buffer = [0; 80]; let mut write_buffer = [0; 80]; - let mut client = MqttClient::::new( + let mut client = MqttClient::::new( tokio_network, &mut write_buffer, 80, @@ -103,88 +118,123 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) -> 80, config, ); - publish_core(&mut client, wait, topic).await + publish_core(&mut client, wait, topic, MSG, false).await +} + +async fn publish_spec( + ip: [u8; 4], + wait: u64, + qos: QualityOfService, + topic: &str, + message: &str, + err: bool, +) -> Result<(), ReasonCode> { + let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); + let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); + config.add_qos(qos); + config.add_username(USERNAME); + config.add_password(PASSWORD); + config.max_packet_size = 100; + let mut recv_buffer = [0; 80]; + let mut write_buffer = [0; 80]; + + let mut client = MqttClient::::new( + tokio_network, + &mut write_buffer, + 80, + &mut recv_buffer, + 80, + config, + ); + publish_core(&mut client, wait, topic, message, err).await } async fn receive_core<'b>( - client: &mut MqttClient<'b, TokioNetwork, 5>, + client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>, topic: &str, ) -> Result<(), ReasonCode> { info!( "[Receiver] Connection to broker with username {} and password {}", - USERNAME, - PASSWORD + USERNAME, PASSWORD ); - let mut result = { client.connect_to_broker().await }; + let mut result = client.connect_to_broker().await; assert_ok!(result); info!("[Receiver] Subscribing to topic {}", topic); - result = { client.subscribe_to_topic(topic).await }; + result = client.subscribe_to_topic(topic).await; assert_ok!(result); info!("[Receiver] Waiting for new message!"); - let msg = { client.receive_message().await }; + let msg = client.receive_message().await; assert_ok!(msg); let act_message = String::from_utf8_lossy(msg?); info!("[Receiver] Got new message: {}", act_message); assert_eq!(act_message, MSG); info!("[Receiver] Disconnecting"); - result = { client.disconnect().await }; + result = client.disconnect().await; + assert_ok!(result); Ok(()) } - - async fn receive_core_multiple<'b, const TOPICS: usize>( - client: &mut MqttClient<'b, TokioNetwork, 5>, + client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>, topic_names: &'b Vec<&'b str, TOPICS>, ) -> Result<(), ReasonCode> { info!( "[Receiver] Connection to broker with username {} and password {}", - USERNAME, - PASSWORD + USERNAME, PASSWORD ); - let mut result = { client.connect_to_broker().await }; + let mut result = client.connect_to_broker().await; assert_ok!(result); - info!("[Receiver] Subscribing to topics {}, {}", topic_names.get(0).unwrap(), topic_names.get(1).unwrap()); - result = { client.subscribe_to_topics(topic_names).await }; + info!( + "[Receiver] Subscribing to topics {}, {}", + topic_names.get(0).unwrap(), + topic_names.get(1).unwrap() + ); + result = client.subscribe_to_topics(topic_names).await; + assert_ok!(result); info!("[Receiver] Waiting for new message!"); { - let msg = { client.receive_message().await }; + let msg = client.receive_message().await; assert_ok!(msg); let act_message = String::from_utf8_lossy(msg?); info!("[Receiver] Got new message: {}", act_message); assert_eq!(act_message, MSG); } { - let msg_sec = { client.receive_message().await }; + let msg_sec = client.receive_message().await; assert_ok!(msg_sec); let act_message_second = String::from_utf8_lossy(msg_sec?); info!("[Receiver] Got new message: {}", act_message_second); assert_eq!(act_message_second, MSG); } info!("[Receiver] Disconnecting"); - result = { client.disconnect().await }; + result = client.disconnect().await; + assert_ok!(result); Ok(()) } -async fn receive_multiple(qos: QualityOfService, topic_names: & Vec<& str, TOPICS>,) -> Result<(), ReasonCode> { +async fn receive_multiple( + qos: QualityOfService, + topic_names: &Vec<&str, TOPICS>, +) -> Result<(), ReasonCode> { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; - let mut config = ClientConfig::new(MQTTv5); + let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; + let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 60; - config.properties.push(Property::ReceiveMaximum(20)); + assert_ok!(config.properties.push(Property::ReceiveMaximum(20))); let mut recv_buffer = [0; 100]; let mut write_buffer = [0; 100]; - let mut client = MqttClient::::new( + let mut client = MqttClient::::new( tokio_network, &mut write_buffer, 100, @@ -198,17 +248,17 @@ async fn receive_multiple(qos: QualityOfService, topic_name async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; - let mut config = ClientConfig::new(MQTTv5); + let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 6000; - config.properties.push(Property::ReceiveMaximum(20)); + assert_ok!(config.properties.push(Property::ReceiveMaximum(20))); let mut recv_buffer = [0; 100]; let mut write_buffer = [0; 100]; - let mut client = MqttClient::::new( + let mut client = MqttClient::::new( tokio_network, &mut write_buffer, 100, @@ -222,17 +272,17 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode> { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; - let mut config = ClientConfig::new(MQTTv5); + let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; + let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username("xyz"); config.add_password(PASSWORD); config.max_packet_size = 60; - config.properties.push(Property::ReceiveMaximum(20)); + assert_ok!(config.properties.push(Property::ReceiveMaximum(20))); let mut recv_buffer = [0; 100]; let mut write_buffer = [0; 100]; - let mut client = MqttClient::::new( + let mut client = MqttClient::::new( tokio_network, &mut write_buffer, 100, @@ -243,17 +293,98 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode info!( "[Receiver] Connection to broker with username {} and password {}", - "xyz", - PASSWORD + "xyz", PASSWORD ); - let result = { client.connect_to_broker().await }; + let result = client.connect_to_broker().await; assert!(result.is_err()); assert_eq!(result.unwrap_err(), NotAuthorized); Ok(()) } +async fn receive_multiple_second_unsub( + qos: QualityOfService, + topic_names: &Vec<&str, TOPICS>, + msg_t1: &str, + msg_t2: &str, +) -> Result<(), ReasonCode> { + let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); + let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; + let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); + config.add_qos(qos); + config.add_username(USERNAME); + config.add_password(PASSWORD); + config.max_packet_size = 60; + assert_ok!(config.properties.push(Property::ReceiveMaximum(20))); + let mut recv_buffer = [0; 100]; + let mut write_buffer = [0; 100]; + + let mut client = MqttClient::::new( + tokio_network, + &mut write_buffer, + 100, + &mut recv_buffer, + 100, + config, + ); + + info!( + "[Receiver] Connection to broker with username {} and password {}", + USERNAME, PASSWORD + ); + let mut result = { client.connect_to_broker().await }; + assert_ok!(result); + + info!( + "[Receiver] Subscribing to topics {}, {}", + topic_names.get(0).unwrap(), + topic_names.get(1).unwrap() + ); + result = client.subscribe_to_topics(topic_names).await; + + assert_ok!(result); + info!("[Receiver] Waiting for new message!"); + { + let msg = { client.receive_message().await }; + assert_ok!(msg); + let act_message = String::from_utf8_lossy(msg?); + info!("[Receiver] Got new message: {}", act_message); + assert_eq!(act_message, msg_t1); + } + { + let msg_sec = { client.receive_message().await }; + assert_ok!(msg_sec); + let act_message_second = String::from_utf8_lossy(msg_sec?); + info!("[Receiver] Got new message: {}", act_message_second); + assert_eq!(act_message_second, msg_t2); + } + + { + let res = client + .unsubscribe_from_topic(topic_names.get(1).unwrap()) + .await; + assert_ok!(res); + } + { + let msg = { client.receive_message().await }; + assert_ok!(msg); + let act_message = String::from_utf8_lossy(msg?); + info!("[Receiver] Got new message: {}", act_message); + assert_eq!(act_message, msg_t1); + } + + let res = + tokio::time::timeout(std::time::Duration::from_secs(10), client.receive_message()).await; + assert_err!(res); + + info!("[Receiver] Disconnecting"); + result = client.disconnect().await; + + assert_ok!(result); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn integration_simple_publish_recv() { +async fn integration_publish_recv() { setup(); info!("Running simple tests test"); @@ -261,7 +392,9 @@ async fn integration_simple_publish_recv() { task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/simple").await }); let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/simple").await }); + task::spawn( + async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/simple").await }, + ); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -269,17 +402,17 @@ async fn integration_simple_publish_recv() { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn integration_simple_publish_recv_multiple() { +async fn integration_publish_recv_multiple() { setup(); info!("Running simple tests test"); let mut topic_names = Vec::<&str, 2>::new(); - topic_names.push("test/topic1"); - topic_names.push("test/topic2"); + assert_ok!(topic_names.push("test/topic1")); + assert_ok!(topic_names.push("test/topic2")); let recv = task::spawn(async move { receive_multiple(QualityOfService::QoS0, &topic_names).await }); let publ = - task::spawn(async move { publish(IP, 5,QualityOfService::QoS0, "test/topic1").await }); + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/topic1").await }); let publ2 = task::spawn(async move { publish(IP, 10, QualityOfService::QoS0, "test/topic2").await }); @@ -291,14 +424,14 @@ async fn integration_simple_publish_recv_multiple() { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn integration_simple_publish_recv_multiple_qos() { +async fn integration_publish_recv_multiple_qos() { setup(); info!("Running simple tests test"); let mut topic_names = Vec::<&str, 2>::new(); - topic_names.push("test/topic3"); - topic_names.push("test/topic4"); + assert_ok!(topic_names.push("test/topic3")); + assert_ok!(topic_names.push("test/topic4")); let recv = - task::spawn(async move { receive_multiple(QualityOfService::QoS1, &topic_names).await }); + task::spawn(async move { receive_multiple(QualityOfService::QoS1, &topic_names).await }); let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/topic3").await }); @@ -306,38 +439,72 @@ async fn integration_simple_publish_recv_multiple_qos() { let publ2 = task::spawn(async move { publish(IP, 10, QualityOfService::QoS1, "test/topic4").await }); - let ( r, p, p2) = join3(recv, publ, publ2).await; + let (r, p, p2) = join3(recv, publ, publ2).await; assert_ok!(r.unwrap()); assert_ok!(p.unwrap()); assert_ok!(p2.unwrap()); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn integration_simple_publish_recv_qos() { +async fn integration_publish_recv_qos() { setup(); - info!("Running simple tests test with Quality of Service 1"); + info!("Running tests test with Quality of Service 1"); - let recv = task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/qos").await }); + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/qos").await }); - let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/qos").await }); + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/qos").await }); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); assert_ok!(p.unwrap()); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn integration_simple_publish_recv_wrong_cred() { +async fn integration_publish_recv_wrong_cred() { setup(); - info!("Running simple tests test wrong credentials"); + info!("Running tests test wrong credentials"); let recv = task::spawn(async move { receive_with_wrong_cred(QualityOfService::QoS1).await }); let recv_right = task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/wrong").await }); - let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await }); + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await }); let (r, rv, p) = join3(recv, recv_right, publ).await; assert_ok!(r.unwrap()); assert_ok!(rv.unwrap()); assert_ok!(p.unwrap()); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn integration_sub_unsub() { + setup(); + info!("Running tests with sub and unsub"); + let mut topic_names = Vec::<&str, 2>::new(); + assert_ok!(topic_names.push("unsub/topic1")); + assert_ok!(topic_names.push("unsub/topic2")); + let msg_t1 = "First topic message"; + let msg_t2 = "Second topic message"; + + let recv = task::spawn(async move { + receive_multiple_second_unsub(QualityOfService::QoS1, &topic_names, msg_t1, msg_t2).await + }); + + let publ = task::spawn(async move { + assert_ok!(publish_spec(IP, 5, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await); + + publish_spec(IP, 2, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await + }); + + let publ2 = task::spawn(async move { + assert_ok!(publish_spec(IP, 6, QualityOfService::QoS1, "unsub/topic2", msg_t2, false).await); + + publish_spec(IP, 3, QualityOfService::QoS1, "unsub/topic2", msg_t2, true).await + }); + let (r, p1, p2) = join3(recv, publ, publ2).await; + assert_ok!(r.unwrap()); + assert_ok!(p1.unwrap()); + assert_ok!(p2.unwrap()); +} diff --git a/mqtt/tests/load_test.rs b/mqtt/tests/load_test.rs index cd78980..87135c0 100644 --- a/mqtt/tests/load_test.rs +++ b/mqtt/tests/load_test.rs @@ -22,30 +22,28 @@ * SOFTWARE. */ extern crate alloc; + use alloc::string::String; use core::time::Duration; -use std::future::Future; -use log::{info, LevelFilter}; -use tokio::time::sleep; +use std::sync::Once; + +use futures::future::{join}; +use log::{info}; use serial_test::serial; use tokio::task; -use tokio_test::{assert_err, assert_ok}; -use heapless::Vec; -use rust_mqtt::client::client_config::ClientConfig; +use tokio::time::sleep; +use tokio_test::{assert_ok}; + use rust_mqtt::client::client::MqttClient; -use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory}; -use rust_mqtt::packet::v5::property::Property; +use rust_mqtt::client::client_config::ClientConfig; +use rust_mqtt::client::client_config::MqttVersion::MQTTv5; +use rust_mqtt::network::{NetworkConnectionFactory}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::reason_codes::ReasonCode; -use rust_mqtt::packet::v5::reason_codes::ReasonCode::NotAuthorized; use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory}; -use rust_mqtt::utils::types::BufferError; -use std::sync::Once; -use futures::future::{join, join3}; -use rust_mqtt::client::client_config::MqttVersion::MQTTv5; +use rust_mqtt::utils::rng_generator::CountingRng; static IP: [u8; 4] = [127, 0, 0, 1]; -static WRONG_IP: [u8; 4] = [192, 168, 1, 1]; static PORT: u16 = 1883; static USERNAME: &str = "test"; static PASSWORD: &str = "testPass"; @@ -60,17 +58,16 @@ fn setup() { } async fn publish_core<'b>( - client: &mut MqttClient<'b, TokioNetwork, 5>, + client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>, wait: u64, topic: &str, amount: u16, ) -> Result<(), ReasonCode> { info!( "[Publisher] Connection to broker with username {} and password {}", - USERNAME, - PASSWORD + USERNAME, PASSWORD ); - let mut result = { client.connect_to_broker().await }; + let mut result = client.connect_to_broker().await; assert_ok!(result); info!("[Publisher] Waiting {} seconds before sending", wait); sleep(Duration::from_secs(wait)).await; @@ -78,7 +75,7 @@ async fn publish_core<'b>( info!("[Publisher] Sending new message {} to topic {}", MSG, topic); let mut count = 0; loop { - result = { client.send_message(topic, MSG).await }; + result = client.send_message(topic, MSG).await; info!("[PUBLISHER] sent {}", count); assert_ok!(result); count = count + 1; @@ -88,18 +85,22 @@ async fn publish_core<'b>( //sleep(Duration::from_millis(5)).await; } - - info!("[Publisher] Disconnecting!"); - result = { client.disconnect().await }; + result = client.disconnect().await; assert_ok!(result); Ok(()) } -async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str, amount: u16) -> Result<(), ReasonCode> { +async fn publish( + ip: [u8; 4], + wait: u64, + qos: QualityOfService, + topic: &str, + amount: u16, +) -> Result<(), ReasonCode> { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; - let mut config = ClientConfig::new(MQTTv5); + let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let mut config = ClientConfig::new(MQTTv5, CountingRng(50000)); config.add_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); @@ -107,7 +108,7 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str, amo let mut recv_buffer = [0; 80]; let mut write_buffer = [0; 80]; - let mut client = MqttClient::::new( + let mut client = MqttClient::::new( tokio_network, &mut write_buffer, 80, @@ -119,25 +120,24 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str, amo } async fn receive_core<'b>( - client: &mut MqttClient<'b, TokioNetwork, 5>, + client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>, topic: &str, amount: u16, ) -> Result<(), ReasonCode> { info!( "[Receiver] Connection to broker with username {} and password {}", - USERNAME, - PASSWORD + USERNAME, PASSWORD ); - let mut result = { client.connect_to_broker().await }; + let mut result = client.connect_to_broker().await; assert_ok!(result); info!("[Receiver] Subscribing to topic {}", topic); - result = { client.subscribe_to_topic(topic).await }; + result = client.subscribe_to_topic(topic).await; assert_ok!(result); info!("[Receiver] Waiting for new message!"); let mut count = 0; loop { - let msg = { client.receive_message().await }; + let msg = client.receive_message().await; assert_ok!(msg); let act_message = String::from_utf8_lossy(msg?); info!("[Receiver] Got new {}. message: {}", count, act_message); @@ -148,15 +148,20 @@ async fn receive_core<'b>( } } info!("[Receiver] Disconnecting"); - result = { client.disconnect().await }; + result = client.disconnect().await; assert_ok!(result); Ok(()) } -async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str, amount: u16) -> Result<(), ReasonCode> { +async fn receive( + ip: [u8; 4], + qos: QualityOfService, + topic: &str, + amount: u16, +) -> Result<(), ReasonCode> { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; - let mut config = ClientConfig::new(MQTTv5); + let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let mut config = ClientConfig::new(MQTTv5, CountingRng(50000)); config.add_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); @@ -166,7 +171,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str, amount: u16) - let mut recv_buffer = [0; 500]; let mut write_buffer = [0; 500]; - let mut client = MqttClient::::new( + let mut client = MqttClient::::new( tokio_network, &mut write_buffer, 500, @@ -188,7 +193,9 @@ async fn load_test_ten() { task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/ten", 10).await }); let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/ten", 10).await }); + task::spawn( + async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/ten", 10).await }, + ); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -201,10 +208,13 @@ async fn load_test_ten_qos() { info!("Running simple tests test"); let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/ten/qos", 10).await }); + task::spawn( + async move { receive(IP, QualityOfService::QoS1, "test/recv/ten/qos", 10).await }, + ); - let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/ten/qos", 10).await }); + let publ = task::spawn(async move { + publish(IP, 5, QualityOfService::QoS1, "test/recv/ten/qos", 10).await + }); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -218,10 +228,14 @@ async fn load_test_fifty() { info!("Running simple tests test"); let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/fifty", 50).await }); + task::spawn( + async move { receive(IP, QualityOfService::QoS0, "test/recv/fifty", 50).await }, + ); let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/fifty", 50).await }); + task::spawn( + async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/fifty", 50).await }, + ); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -235,10 +249,13 @@ async fn load_test_fifty_qos() { info!("Running simple tests test"); let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await }); + task::spawn( + async move { receive(IP, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await }, + ); - let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await }); + let publ = task::spawn(async move { + publish(IP, 5, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await + }); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -252,10 +269,13 @@ async fn load_test_hundred() { info!("Running simple tests test"); let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/hundred", 100).await }); + task::spawn( + async move { receive(IP, QualityOfService::QoS0, "test/recv/hundred", 100).await }, + ); - let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/hundred", 100).await }); + let publ = task::spawn(async move { + publish(IP, 5, QualityOfService::QoS0, "test/recv/hundred", 100).await + }); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -272,7 +292,9 @@ async fn load_test_hundred_qos() { task::spawn(async move { receive(IP, QualityOfService::QoS1, "hundred/qos", 100).await }); let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "hundred/qos", 100).await }); + task::spawn( + async move { publish(IP, 5, QualityOfService::QoS1, "hundred/qos", 100).await }, + ); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -289,7 +311,9 @@ async fn load_test_five_hundred() { task::spawn(async move { receive(IP, QualityOfService::QoS0, "five/hundred", 500).await }); let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "five/hundred", 500).await }); + task::spawn( + async move { publish(IP, 5, QualityOfService::QoS0, "five/hundred", 500).await }, + ); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -303,10 +327,13 @@ async fn load_test_five_hundred_qos() { info!("Running simple tests test"); let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS1, "five/hundred/qos", 500).await }); + task::spawn( + async move { receive(IP, QualityOfService::QoS1, "five/hundred/qos", 500).await }, + ); - let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "five/hundred/qos", 500).await }); + let publ = task::spawn(async move { + publish(IP, 5, QualityOfService::QoS1, "five/hundred/qos", 500).await + }); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -340,7 +367,9 @@ async fn load_test_thousand_qos() { task::spawn(async move { receive(IP, QualityOfService::QoS1, "thousand/qos", 1000).await }); let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "thousand/qos", 1000).await }); + task::spawn( + async move { publish(IP, 5, QualityOfService::QoS1, "thousand/qos", 1000).await }, + ); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -355,10 +384,13 @@ async fn load_test_ten_thousand_qos() { info!("Running simple tests test"); let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS1, "ten/thousand/qos", 10000).await }); + task::spawn( + async move { receive(IP, QualityOfService::QoS1, "ten/thousand/qos", 10000).await }, + ); - let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "ten/thousand/qos", 10000).await }); + let publ = task::spawn(async move { + publish(IP, 5, QualityOfService::QoS1, "ten/thousand/qos", 10000).await + }); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -372,10 +404,14 @@ async fn load_test_ten_thousand() { info!("Running simple tests test"); let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS0, "ten/thousand", 10000).await }); + task::spawn( + async move { receive(IP, QualityOfService::QoS0, "ten/thousand", 10000).await }, + ); let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "ten/thousand", 10000).await }); + task::spawn( + async move { publish(IP, 5, QualityOfService::QoS0, "ten/thousand", 10000).await }, + ); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -389,11 +425,13 @@ async fn load_test_twenty_thousand_qos() { setup(); info!("Running simple tests test"); - let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await }); + let recv = task::spawn(async move { + receive(IP, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await + }); - let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await }); + let publ = task::spawn(async move { + publish(IP, 5, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await + }); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); @@ -407,12 +445,15 @@ async fn load_test_twenty_thousand() { info!("Running simple tests test"); let recv = - task::spawn(async move { receive(IP, QualityOfService::QoS0, "twenty/thousand", 20000).await }); + task::spawn( + async move { receive(IP, QualityOfService::QoS0, "twenty/thousand", 20000).await }, + ); - let publ = - task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "twenty/thousand", 20000).await }); + let publ = task::spawn(async move { + publish(IP, 5, QualityOfService::QoS0, "twenty/thousand", 20000).await + }); let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); assert_ok!(p.unwrap()); -} \ No newline at end of file +}