diff --git a/Cargo.toml b/Cargo.toml index eef1804..2b25edd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,11 +15,11 @@ heapless = "0.7.10" rand_core = "0.6.0" defmt = { version = "0.3", optional = true } log = { version = "0.4.14", optional = true } -embedded-io = { version = "0.3.0", features = ["async"]} +embedded-io = { version = "0.4.0", features = ["async"]} [dev-dependencies] tokio = { version = "1", features = ["full"] } -embedded-io = { version = "0.3.0", features = ["tokio"]} +embedded-io = { version = "0.4.0", features = ["tokio"]} tokio-test = { version = "0.4.2"} env_logger = "0.9.0" futures = { version = "0.3.21" } diff --git a/examples/pubsub.rs b/examples/pubsub.rs new file mode 100644 index 0000000..78d62e6 --- /dev/null +++ b/examples/pubsub.rs @@ -0,0 +1,60 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + time::Duration, +}; + +use embedded_io::adapters::FromTokio; +use rust_mqtt::{ + client::{client::MqttClient, client_config::ClientConfig}, + packet::v5::reason_codes::ReasonCode, + utils::rng_generator::CountingRng, +}; +use tokio::net::TcpStream; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1883); + + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError) + .unwrap(); + let connection = FromTokio::::new(connection); + let mut config = ClientConfig::new( + rust_mqtt::client::client_config::MqttVersion::MQTTv5, + CountingRng(20000), + ); + config.add_max_subscribe_qos(rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1); + config.add_client_id("client"); + // config.add_username(USERNAME); + // config.add_password(PASSWORD); + config.max_packet_size = 100; + let mut recv_buffer = [0; 80]; + let mut write_buffer = [0; 80]; + + let mut client = MqttClient::<_, 5, _>::new( + connection, + &mut write_buffer, + 80, + &mut recv_buffer, + 80, + config, + ); + + client.connect_to_broker().await.unwrap(); + + loop { + client + .send_message( + "hello", + b"hello2", + rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS0, + true, + ) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(500)).await; + } +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 6f4e0aa..fa10937 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "nightly-2022-03-10" +channel = "nightly-2022-11-22" components = [ "rust-src", "rustfmt" ] -targets = [ "thumbv7em-none-eabi", "thumbv7m-none-eabi", "thumbv6m-none-eabi", "thumbv7em-none-eabihf", "thumbv8m.main-none-eabihf", "wasm32-unknown-unknown" ] \ No newline at end of file +targets = [ "thumbv7em-none-eabi", "thumbv7m-none-eabi", "thumbv6m-none-eabi", "thumbv7em-none-eabihf", "thumbv8m.main-none-eabihf", "wasm32-unknown-unknown" ] diff --git a/src/client/client.rs b/src/client/client.rs index 338f011..3fad8d2 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -26,38 +26,17 @@ use embedded_io::asynch::{Read, Write}; use heapless::Vec; use rand_core::RngCore; -use crate::client::client_config::{ClientConfig, MqttVersion}; -use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder}; -use crate::network::NetworkConnection; -use crate::packet::v5::connack_packet::ConnackPacket; -use crate::packet::v5::connect_packet::ConnectPacket; -use crate::packet::v5::disconnect_packet::DisconnectPacket; -use crate::packet::v5::mqtt_packet::Packet; -use crate::packet::v5::pingreq_packet::PingreqPacket; -use crate::packet::v5::pingresp_packet::PingrespPacket; -use crate::packet::v5::puback_packet::PubackPacket; -use crate::packet::v5::publish_packet::QualityOfService::QoS1; -use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService}; +use crate::client::client_config::ClientConfig; +use crate::packet::v5::publish_packet::QualityOfService::{self, QoS1}; use crate::packet::v5::reason_codes::ReasonCode; -use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError}; -use crate::packet::v5::suback_packet::SubackPacket; -use crate::packet::v5::subscription_packet::SubscriptionPacket; -use crate::packet::v5::unsuback_packet::UnsubackPacket; -use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket; -use crate::utils::buffer_reader::BuffReader; -use crate::utils::buffer_writer::BuffWriter; -use crate::utils::types::BufferError; + +use super::raw_client::{Event, RawMqttClient}; pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> where T: Read + Write, { - connection: Option>, - buffer: &'a mut [u8], - buffer_len: usize, - recv_buffer: &'a mut [u8], - recv_buffer_len: usize, - config: ClientConfig<'a, MAX_PROPERTIES, R>, + raw: RawMqttClient<'a, T, MAX_PROPERTIES, R>, } impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R> @@ -74,80 +53,14 @@ where config: ClientConfig<'a, MAX_PROPERTIES, R>, ) -> Self { Self { - connection: Some(NetworkConnection::new(network_driver)), - buffer, - buffer_len, - recv_buffer, - recv_buffer_len, - config, - } - } - - async fn connect_to_broker_v5<'b>(&'b mut self) -> Result<(), ReasonCode> { - if self.connection.is_none() { - return Err(ReasonCode::NetworkError); - } - let len = { - let mut connect = ConnectPacket::<'b, MAX_PROPERTIES, 0>::new(); - connect.keep_alive = self.config.keep_alive; - self.config.add_max_packet_size_as_prop(); - connect.property_len = connect.add_properties(&self.config.properties); - if self.config.username_flag { - connect.add_username(&self.config.username); - } - if self.config.password_flag { - connect.add_password(&self.config.password) - } - if self.config.will_flag { - connect.add_will( - &self.config.will_topic, - &self.config.will_payload, - self.config.will_retain, - ) - } - connect.add_client_id(&self.config.client_id); - connect.encode(self.buffer, self.buffer_len) - }; - - if let Err(err) = len { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - let conn = self.connection.as_mut().unwrap(); - trace!("Sending connect"); - conn.send(&self.buffer[0..len.unwrap()]).await?; - - //connack - let reason: Result = { - trace!("Waiting for connack"); - - let read = - { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; - - let mut packet = ConnackPacket::<'b, MAX_PROPERTIES>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { - if err == BufferError::PacketTypeMismatch { - let mut disc = DisconnectPacket::<'b, MAX_PROPERTIES>::new(); - if disc.decode(&mut BuffReader::new(self.buffer, read)).is_ok() { - error!("Client was disconnected with reason: "); - return Err(ReasonCode::from(disc.disconnect_reason)); - } - } - Err(err) - } else { - Ok(packet.connect_reason_code) - } - }; - - if let Err(err) = reason { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - let res = reason.unwrap(); - if res != 0x00 { - return Err(ReasonCode::from(res)); - } else { - Ok(()) + raw: RawMqttClient::new( + network_driver, + buffer, + buffer_len, + recv_buffer, + recv_buffer_len, + config, + ), } } @@ -156,33 +69,14 @@ where /// If the connection to the broker fails, method returns Err variable that contains /// Reason codes returned from the broker. pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> { - match self.config.mqtt_version { - MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), - MqttVersion::MQTTv5 => self.connect_to_broker_v5().await, - } - } + self.raw.connect_to_broker().await?; - async fn disconnect_v5<'b>(&'b mut self) -> Result<(), ReasonCode> { - if self.connection.is_none() { - return Err(ReasonCode::NetworkError); + match self.raw.poll::<0>().await? { + Event::Connack => Ok(()), + Event::Disconnect(reason) => Err(reason), + // If an application message comes at this moment, it is lost. + _ => Err(ReasonCode::ImplementationSpecificError), } - let conn = self.connection.as_mut().unwrap(); - trace!("Creating disconnect packet!"); - let mut disconnect = DisconnectPacket::<'b, MAX_PROPERTIES>::new(); - let len = disconnect.encode(self.buffer, self.buffer_len); - if let Err(err) = len { - warn!("[DECODE ERR]: {}", err); - let _ = self.connection.take(); - return Err(ReasonCode::BuffError); - } - - if let Err(_e) = conn.send(&self.buffer[0..len.unwrap()]).await { - warn!("Could not send DISCONNECT packet"); - } - - // Drop connection - let _ = self.connection.take(); - Ok(()) } /// Method allows client disconnect from the server. Client disconnects from the specified broker @@ -190,72 +84,10 @@ where /// If the disconnect from the broker fails, method returns Err variable that contains /// Reason codes returned from the broker. pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> { - match self.config.mqtt_version { - MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), - MqttVersion::MQTTv5 => self.disconnect_v5().await, - } - } - - async fn send_message_v5<'b>( - &'b mut self, - topic_name: &'b str, - message: &'b [u8], - ) -> Result<(), ReasonCode> { - if self.connection.is_none() { - return Err(ReasonCode::NetworkError); - } - let conn = self.connection.as_mut().unwrap(); - let identifier: u16 = self.config.rng.next_u32() as u16; - //self.rng.next_u32() as u16; - let len = { - let mut packet = PublishPacket::<'b, MAX_PROPERTIES>::new(); - packet.add_topic_name(topic_name); - packet.add_qos(self.config.qos); - packet.add_identifier(identifier); - packet.add_message(message); - packet.encode(self.buffer, self.buffer_len) - }; - - if let Err(err) = len { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - trace!("Sending message"); - conn.send(&self.buffer[0..len.unwrap()]).await?; - - // QoS1 - if >::into(self.config.qos) - == >::into(QoS1) - { - let reason: Result<[u16; 2], BufferError> = { - trace!("Waiting for ack"); - let read = - receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await?; - trace!("[PUBACK] Received packet with len"); - let mut packet = PubackPacket::<'b, MAX_PROPERTIES>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { - Err(err) - } else { - Ok([packet.packet_identifier, packet.reason_code as u16]) - } - }; - - if let Err(err) = reason { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - - let res = reason.unwrap(); - if identifier != res[0] { - return Err(ReasonCode::PacketIdentifierNotFound); - } - - if res[1] != 0 { - return Err(ReasonCode::from(res[1] as u8)); - } - } + self.raw.disconnect().await?; Ok(()) } + /// Method allows sending message to broker specified from the ClientConfig. Client sends the /// message from the parameter `message` to the topic `topic_name` on the broker /// specified in the ClientConfig. If the send fails method returns Err with reason code @@ -264,71 +96,31 @@ where &'b mut self, topic_name: &'b str, message: &'b [u8], + qos: QualityOfService, + retain: bool, ) -> Result<(), ReasonCode> { - match self.config.mqtt_version { - MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), - MqttVersion::MQTTv5 => self.send_message_v5(topic_name, message).await, - } - } + let identifier = self + .raw + .send_message(topic_name, message, qos, retain) + .await?; - async fn subscribe_to_topics_v5<'b, const TOPICS: usize>( - &'b mut self, - topic_names: &'b Vec<&'b str, TOPICS>, - ) -> Result<(), ReasonCode> { - if self.connection.is_none() { - return Err(ReasonCode::NetworkError); - } - let conn = self.connection.as_mut().unwrap(); - let len = { - let mut subs = SubscriptionPacket::<'b, TOPICS, MAX_PROPERTIES>::new(); - let mut i = 0; - loop { - if i == TOPICS { - break; + // QoS1 + if qos == QoS1 { + match self.raw.poll::<0>().await? { + Event::Puback(ack_identifier) => { + if identifier == ack_identifier { + Ok(()) + } else { + Err(ReasonCode::PacketIdentifierNotFound) + } } - subs.add_new_filter(topic_names.get(i).unwrap(), self.config.qos); - i = i + 1; + Event::Disconnect(reason) => Err(reason), + // If an application message comes at this moment, it is lost. + _ => Err(ReasonCode::ImplementationSpecificError), } - subs.encode(self.buffer, self.buffer_len) - }; - - if let Err(err) = len { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); + } else { + Ok(()) } - - conn.send(&self.buffer[0..len.unwrap()]).await?; - - let reason: Result, BufferError> = { - let read = - { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; - - let mut packet = SubackPacket::<'b, TOPICS, MAX_PROPERTIES>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { - Err(err) - } else { - Ok(packet.reason_codes) - } - }; - - if let Err(err) = reason { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - let reasons = reason.unwrap(); - let mut i = 0; - loop { - if i == TOPICS { - break; - } - if *reasons.get(i).unwrap() - != (>::into(self.config.qos) >> 1) - { - return Err(ReasonCode::from(*reasons.get(i).unwrap())); - } - i = i + 1; - } - Ok(()) } /// Method allows client subscribe to multiple topics specified in the parameter @@ -339,9 +131,19 @@ where &'b mut self, topic_names: &'b Vec<&'b str, TOPICS>, ) -> Result<(), ReasonCode> { - match self.config.mqtt_version { - MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), - MqttVersion::MQTTv5 => self.subscribe_to_topics_v5(topic_names).await, + let identifier = self.raw.subscribe_to_topics(topic_names).await?; + + match self.raw.poll::().await? { + Event::Suback(ack_identifier) => { + if identifier == ack_identifier { + Ok(()) + } else { + Err(ReasonCode::PacketIdentifierNotFound) + } + } + Event::Disconnect(reason) => Err(reason), + // If an application message comes at this moment, it is lost. + _ => Err(ReasonCode::ImplementationSpecificError), } } @@ -352,97 +154,19 @@ where &'b mut self, topic_name: &'b str, ) -> Result<(), ReasonCode> { - match self.config.mqtt_version { - MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), - MqttVersion::MQTTv5 => self.unsubscribe_from_topic_v5(topic_name).await, - } - } + let identifier = self.raw.unsubscribe_from_topic(topic_name).await?; - async fn unsubscribe_from_topic_v5<'b>( - &'b mut self, - topic_name: &'b str, - ) -> Result<(), ReasonCode> { - if self.connection.is_none() { - return Err(ReasonCode::NetworkError); - } - let conn = self.connection.as_mut().unwrap(); - - let len = { - let mut unsub = UnsubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new(); - unsub.packet_identifier = self.config.rng.next_u32() as u16; - unsub.add_new_filter(topic_name); - unsub.encode(self.buffer, self.buffer_len) - }; - - if let Err(err) = len { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - conn.send(&self.buffer[0..len.unwrap()]).await?; - - let reason: Result = { - let read = - { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; - let mut packet = UnsubackPacket::<'b, 1, MAX_PROPERTIES>::new(); - - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { - Err(err) - } else { - Ok(*packet.reason_codes.get(0).unwrap()) + match self.raw.poll::<0>().await? { + Event::Unsuback(ack_identifier) => { + if identifier == ack_identifier { + Ok(()) + } else { + Err(ReasonCode::PacketIdentifierNotFound) + } } - }; - - if let Err(err) = reason { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - - Ok(()) - } - - async fn subscribe_to_topic_v5<'b>( - &'b mut self, - topic_name: &'b str, - ) -> Result<(), ReasonCode> { - if self.connection.is_none() { - return Err(ReasonCode::NetworkError); - } - let conn = self.connection.as_mut().unwrap(); - let len = { - let mut subs = SubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new(); - subs.add_new_filter(topic_name, self.config.qos); - subs.encode(self.buffer, self.buffer_len) - }; - - if let Err(err) = len { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - - conn.send(&self.buffer[0..len.unwrap()]).await?; - - let reason: Result = { - let read = - { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; - - let mut packet = SubackPacket::<'b, 1, MAX_PROPERTIES>::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { - Err(err) - } else { - Ok(*packet.reason_codes.get(0).unwrap()) - } - }; - - if let Err(err) = reason { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - - let res = reason.unwrap(); - if res != (>::into(self.config.qos) >> 1) { - Err(ReasonCode::from(res)) - } else { - Ok(()) + Event::Disconnect(reason) => Err(reason), + // If an application message comes at this moment, it is lost. + _ => Err(ReasonCode::ImplementationSpecificError), } } @@ -453,85 +177,34 @@ where &'b mut self, topic_name: &'b str, ) -> Result<(), ReasonCode> { - match self.config.mqtt_version { - MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), - MqttVersion::MQTTv5 => self.subscribe_to_topic_v5(topic_name).await, - } - } + let mut topic_names = Vec::<&'b str, 1>::new(); + topic_names.push(topic_name).unwrap(); - async fn receive_message_v5<'b>(&'b mut self) -> Result<(&'b str, &'b [u8]), ReasonCode> { - if self.connection.is_none() { - return Err(ReasonCode::NetworkError); - } - let conn = self.connection.as_mut().unwrap(); - let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; + let identifier = self.raw.subscribe_to_topics(&topic_names).await?; - let mut packet = PublishPacket::<'b, 5>::new(); - if let Err(err) = { packet.decode(&mut BuffReader::new(self.buffer, read)) } { - if err == BufferError::PacketTypeMismatch { - let mut disc = DisconnectPacket::<'b, 5>::new(); - if disc.decode(&mut BuffReader::new(self.buffer, read)).is_ok() { - error!("Client was disconnected with reason: "); - return Err(ReasonCode::from(disc.disconnect_reason)); + match self.raw.poll::<1>().await? { + Event::Suback(ack_identifier) => { + if identifier == ack_identifier { + Ok(()) + } else { + Err(ReasonCode::PacketIdentifierNotFound) } } - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); + Event::Disconnect(reason) => Err(reason), + // If an application message comes at this moment, it is lost. + _ => Err(ReasonCode::ImplementationSpecificError), } - - if (packet.fixed_header & 0x06) - == >::into(QualityOfService::QoS1) - { - let mut puback = PubackPacket::<'b, MAX_PROPERTIES>::new(); - puback.packet_identifier = packet.packet_identifier; - puback.reason_code = 0x00; - { - let len = { puback.encode(self.recv_buffer, self.recv_buffer_len) }; - if let Err(err) = len { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - conn.send(&self.recv_buffer[0..len.unwrap()]).await?; - } - } - - return Ok((packet.topic_name.string, packet.message.unwrap())); } /// Method allows client receive a message. The work of this method strictly depends on the /// network implementation passed in the `ClientConfig`. It expects the PUBLISH packet /// from the broker. pub async fn receive_message<'b>(&'b mut self) -> Result<(&'b str, &'b [u8]), ReasonCode> { - match self.config.mqtt_version { - MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), - MqttVersion::MQTTv5 => self.receive_message_v5().await, - } - } - - async fn send_ping_v5<'b>(&'b mut self) -> Result<(), ReasonCode> { - if self.connection.is_none() { - return Err(ReasonCode::NetworkError); - } - let conn = self.connection.as_mut().unwrap(); - let len = { - let mut packet = PingreqPacket::new(); - packet.encode(self.buffer, self.buffer_len) - }; - - if let Err(err) = len { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } - - conn.send(&self.buffer[0..len.unwrap()]).await?; - - let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; - let mut packet = PingrespPacket::new(); - if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { - error!("[DECODE ERR]: {}", err); - return Err(ReasonCode::BuffError); - } else { - Ok(()) + match self.raw.poll::<0>().await? { + Event::Message(topic, payload) => Ok((topic, payload)), + Event::Disconnect(reason) => Err(reason), + // If an application message comes at this moment, it is lost. + _ => Err(ReasonCode::ImplementationSpecificError), } } @@ -539,96 +212,13 @@ where /// If there is expectation for long running connection. Method should be executed /// regularly by the timer that counts down the session expiry interval. pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> { - match self.config.mqtt_version { - MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), - MqttVersion::MQTTv5 => self.send_ping_v5().await, + self.raw.send_ping().await?; + + match self.raw.poll::<0>().await? { + Event::Pingresp => Ok(()), + Event::Disconnect(reason) => Err(reason), + // If an application message comes at this moment, it is lost. + _ => Err(ReasonCode::ImplementationSpecificError), } } } - -#[cfg(not(feature = "tls"))] -async fn receive_packet<'c, T: Read + Write>( - buffer: &mut [u8], - buffer_len: usize, - recv_buffer: &mut [u8], - conn: &'c mut NetworkConnection, -) -> Result { - let target_len: usize; - let mut rem_len: Result; - let mut writer = BuffWriter::new(buffer, buffer_len); - let mut i = 0; - - // Get len of packet - trace!("Reading lenght of packet"); - loop { - trace!(" Reading in loop!"); - let len: usize = conn - .receive(&mut recv_buffer[writer.position..(writer.position + 1)]) - .await?; - trace!(" Received data!"); - if len == 0 { - trace!("Zero byte len packet received, dropping connection."); - return Err(NetworkError); - } - i = i + len; - if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) { - error!("Error occurred during write to buffer!"); - return Err(ReasonCode::BuffError); - } - if i > 1 { - rem_len = writer.get_rem_len(); - if rem_len.is_ok() { - break; - } - if i >= 5 { - error!("Could not read len of packet!"); - return Err(NetworkError); - } - } - } - trace!("Lenght done!"); - let rem_len_len = i; - i = 0; - if let Ok(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()) { - trace!("Reading packet with target len {}", l); - target_len = l as usize; - } else { - error!("Could not decode len of packet!"); - return Err(BuffError); - } - - loop { - if writer.position == target_len + rem_len_len { - trace!("Received packet with len: {}", (target_len + rem_len_len)); - return Ok(target_len + rem_len_len); - } - let len: usize = conn - .receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]) - .await?; - i = i + len; - if let Err(_e) = - writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) - { - error!("Error occurred during write to buffer!"); - return Err(BuffError); - } - } -} - -#[cfg(feature = "tls")] -async fn receive_packet<'c, T: Read + Write>( - buffer: &mut [u8], - buffer_len: usize, - recv_buffer: &mut [u8], - conn: &'c mut NetworkConnection, -) -> Result { - trace!("Reading packet"); - let mut writer = BuffWriter::new(buffer, buffer_len); - let len = conn.receive(recv_buffer).await?; - if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + len)]) - { - error!("Error occurred during write to buffer!"); - return Err(BuffError); - } - Ok(len) -} diff --git a/src/client/client_config.rs b/src/client/client_config.rs index 7b77455..e604697 100644 --- a/src/client/client_config.rs +++ b/src/client/client_config.rs @@ -46,7 +46,7 @@ pub enum MqttVersion { /// Examples of the configurations can be found in the integration tests. #[derive(Clone)] pub struct ClientConfig<'a, const MAX_PROPERTIES: usize, T: RngCore> { - pub qos: QualityOfService, + pub max_subscribe_qos: QualityOfService, pub keep_alive: u16, pub username_flag: bool, pub username: EncodedString<'a>, @@ -66,7 +66,7 @@ pub struct ClientConfig<'a, const MAX_PROPERTIES: usize, T: RngCore> { impl<'a, const MAX_PROPERTIES: usize, T: RngCore> ClientConfig<'a, MAX_PROPERTIES, T> { pub fn new(version: MqttVersion, rng: T) -> Self { Self { - qos: QualityOfService::QoS0, + max_subscribe_qos: QualityOfService::QoS0, keep_alive: 60, username_flag: false, username: EncodedString::new(), @@ -84,8 +84,8 @@ impl<'a, const MAX_PROPERTIES: usize, T: RngCore> ClientConfig<'a, MAX_PROPERTIE } } - pub fn add_qos(&mut self, qos: QualityOfService) { - self.qos = qos; + pub fn add_max_subscribe_qos(&mut self, qos: QualityOfService) { + self.max_subscribe_qos = qos; } pub fn add_will(&mut self, topic: &'a str, payload: &'a [u8], retain: bool) { @@ -136,7 +136,7 @@ impl<'a, const MAX_PROPERTIES: usize, T: RngCore> ClientConfig<'a, MAX_PROPERTIE self.properties.push(prop); return 5; } - return 0; + 0 } pub fn add_client_id(&mut self, client_id: &'a str) { diff --git a/src/client/mod.rs b/src/client/mod.rs index 14145b3..e2ad82e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -22,6 +22,8 @@ * SOFTWARE. */ +#[allow(clippy::module_inception)] pub mod client; #[allow(unused_must_use)] pub mod client_config; +pub mod raw_client; diff --git a/src/client/raw_client.rs b/src/client/raw_client.rs new file mode 100644 index 0000000..15f3b8d --- /dev/null +++ b/src/client/raw_client.rs @@ -0,0 +1,572 @@ +use embedded_io::asynch::{Read, Write}; +use heapless::Vec; +use rand_core::RngCore; + +use crate::{ + encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder}, + network::NetworkConnection, + packet::v5::{ + connack_packet::ConnackPacket, + connect_packet::ConnectPacket, + disconnect_packet::DisconnectPacket, + mqtt_packet::Packet, + packet_type::PacketType, + pingreq_packet::PingreqPacket, + pingresp_packet::PingrespPacket, + puback_packet::PubackPacket, + publish_packet::{PublishPacket, QualityOfService}, + reason_codes::ReasonCode, + suback_packet::SubackPacket, + subscription_packet::SubscriptionPacket, + unsuback_packet::UnsubackPacket, + unsubscription_packet::UnsubscriptionPacket, + }, + utils::{buffer_reader::BuffReader, buffer_writer::BuffWriter, types::BufferError}, +}; + +use super::client_config::{ClientConfig, MqttVersion}; + +pub enum Event<'a> { + Connack, + Puback(u16), + Suback(u16), + Unsuback(u16), + Pingresp, + Message(&'a str, &'a [u8]), + Disconnect(ReasonCode), +} + +pub struct RawMqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> +where + T: Read + Write, +{ + connection: Option>, + buffer: &'a mut [u8], + buffer_len: usize, + recv_buffer: &'a mut [u8], + recv_buffer_len: usize, + config: ClientConfig<'a, MAX_PROPERTIES, R>, +} + +impl<'a, T, const MAX_PROPERTIES: usize, R> RawMqttClient<'a, T, MAX_PROPERTIES, R> +where + T: Read + Write, + R: RngCore, +{ + pub fn new( + network_driver: T, + buffer: &'a mut [u8], + buffer_len: usize, + recv_buffer: &'a mut [u8], + recv_buffer_len: usize, + config: ClientConfig<'a, MAX_PROPERTIES, R>, + ) -> Self { + Self { + connection: Some(NetworkConnection::new(network_driver)), + buffer, + buffer_len, + recv_buffer, + recv_buffer_len, + config, + } + } + + async fn connect_to_broker_v5<'b>(&'b mut self) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let len = { + let mut connect = ConnectPacket::<'b, MAX_PROPERTIES, 0>::new(); + connect.keep_alive = self.config.keep_alive; + self.config.add_max_packet_size_as_prop(); + connect.property_len = connect.add_properties(&self.config.properties); + if self.config.username_flag { + connect.add_username(&self.config.username); + } + if self.config.password_flag { + connect.add_password(&self.config.password) + } + if self.config.will_flag { + connect.add_will( + &self.config.will_topic, + &self.config.will_payload, + self.config.will_retain, + ) + } + connect.add_client_id(&self.config.client_id); + connect.encode(self.buffer, self.buffer_len) + }; + + if let Err(err) = len { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + let conn = self.connection.as_mut().unwrap(); + trace!("Sending connect"); + conn.send(&self.buffer[0..len.unwrap()]).await?; + + Ok(()) + } + + /// Method allows client connect to server. Client is connecting to the specified broker + /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config. + /// If the connection to the broker fails, method returns Err variable that contains + /// Reason codes returned from the broker. + pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> { + match self.config.mqtt_version { + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.connect_to_broker_v5().await, + } + } + + async fn disconnect_v5<'b>(&'b mut self) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let conn = self.connection.as_mut().unwrap(); + trace!("Creating disconnect packet!"); + let mut disconnect = DisconnectPacket::<'b, MAX_PROPERTIES>::new(); + let len = disconnect.encode(self.buffer, self.buffer_len); + if let Err(err) = len { + warn!("[DECODE ERR]: {}", err); + let _ = self.connection.take(); + return Err(ReasonCode::BuffError); + } + + if let Err(_e) = conn.send(&self.buffer[0..len.unwrap()]).await { + warn!("Could not send DISCONNECT packet"); + } + + // Drop connection + let _ = self.connection.take(); + Ok(()) + } + + /// Method allows client disconnect from the server. Client disconnects from the specified broker + /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config. + /// If the disconnect from the broker fails, method returns Err variable that contains + /// Reason codes returned from the broker. + pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> { + match self.config.mqtt_version { + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.disconnect_v5().await, + } + } + + async fn send_message_v5<'b>( + &'b mut self, + topic_name: &'b str, + message: &'b [u8], + qos: QualityOfService, + retain: bool, + ) -> Result { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let conn = self.connection.as_mut().unwrap(); + let identifier: u16 = self.config.rng.next_u32() as u16; + //self.rng.next_u32() as u16; + let len = { + let mut packet = PublishPacket::<'b, MAX_PROPERTIES>::new(); + packet.add_topic_name(topic_name); + packet.add_qos(qos); + packet.add_identifier(identifier); + packet.add_message(message); + packet.add_retain(retain); + packet.encode(self.buffer, self.buffer_len) + }; + + if let Err(err) = len { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + trace!("Sending message"); + conn.send(&self.buffer[0..len.unwrap()]).await?; + + Ok(identifier) + } + /// Method allows sending message to broker specified from the ClientConfig. Client sends the + /// message from the parameter `message` to the topic `topic_name` on the broker + /// specified in the ClientConfig. If the send fails method returns Err with reason code + /// received by broker. + pub async fn send_message<'b>( + &'b mut self, + topic_name: &'b str, + message: &'b [u8], + qos: QualityOfService, + retain: bool, + ) -> Result { + match self.config.mqtt_version { + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.send_message_v5(topic_name, message, qos, retain).await, + } + } + + async fn subscribe_to_topics_v5<'b, const TOPICS: usize>( + &'b mut self, + topic_names: &'b Vec<&'b str, TOPICS>, + ) -> Result { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let conn = self.connection.as_mut().unwrap(); + let identifier: u16 = self.config.rng.next_u32() as u16; + let len = { + let mut subs = SubscriptionPacket::<'b, TOPICS, MAX_PROPERTIES>::new(); + subs.packet_identifier = identifier; + let mut i = 0; + loop { + if i == TOPICS { + break; + } + subs.add_new_filter(topic_names.get(i).unwrap(), self.config.max_subscribe_qos); + i += 1; + } + subs.encode(self.buffer, self.buffer_len) + }; + + if let Err(err) = len { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + + conn.send(&self.buffer[0..len.unwrap()]).await?; + + Ok(identifier) + } + + /// Method allows client subscribe to multiple topics specified in the parameter + /// `topic_names` on the broker specified in the `ClientConfig`. Generics `TOPICS` + /// sets the value of the `topics_names` vector. MQTT protocol implementation + /// is selected automatically. + pub async fn subscribe_to_topics<'b, const TOPICS: usize>( + &'b mut self, + topic_names: &'b Vec<&'b str, TOPICS>, + ) -> Result { + match self.config.mqtt_version { + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.subscribe_to_topics_v5(topic_names).await, + } + } + + /// Method allows client unsubscribe from the topic specified in the parameter + /// `topic_name` on the broker from the `ClientConfig`. MQTT protocol implementation + /// is selected automatically. + pub async fn unsubscribe_from_topic<'b>( + &'b mut self, + topic_name: &'b str, + ) -> Result { + match self.config.mqtt_version { + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.unsubscribe_from_topic_v5(topic_name).await, + } + } + + async fn unsubscribe_from_topic_v5<'b>( + &'b mut self, + topic_name: &'b str, + ) -> Result { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let conn = self.connection.as_mut().unwrap(); + let identifier = self.config.rng.next_u32() as u16; + + let len = { + let mut unsub = UnsubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new(); + unsub.packet_identifier = identifier; + unsub.add_new_filter(topic_name); + unsub.encode(self.buffer, self.buffer_len) + }; + + if let Err(err) = len { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + conn.send(&self.buffer[0..len.unwrap()]).await?; + + Ok(identifier) + } + + async fn send_ping_v5<'b>(&'b mut self) -> Result<(), ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + let conn = self.connection.as_mut().unwrap(); + let len = { + let mut packet = PingreqPacket::new(); + packet.encode(self.buffer, self.buffer_len) + }; + + if let Err(err) = len { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + + conn.send(&self.buffer[0..len.unwrap()]).await?; + + Ok(()) + } + + /// Method allows client send PING message to the broker specified in the `ClientConfig`. + /// If there is expectation for long running connection. Method should be executed + /// regularly by the timer that counts down the session expiry interval. + pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> { + match self.config.mqtt_version { + MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion), + MqttVersion::MQTTv5 => self.send_ping_v5().await, + } + } + + pub async fn poll<'b, const MAX_TOPICS: usize>(&'b mut self) -> Result, ReasonCode> { + if self.connection.is_none() { + return Err(ReasonCode::NetworkError); + } + + let conn = self.connection.as_mut().unwrap(); + + trace!("Waiting for a packet"); + + let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; + + let buf_reader = BuffReader::new(self.buffer, read); + + match PacketType::from(buf_reader.peek_u8().map_err(|_| ReasonCode::BuffError)?) { + PacketType::Reserved + | PacketType::Connect + | PacketType::Subscribe + | PacketType::Unsubscribe + | PacketType::Pingreq => Err(ReasonCode::ProtocolError), + PacketType::Pubrec | PacketType::Pubrel | PacketType::Pubcomp | PacketType::Auth => { + Err(ReasonCode::ImplementationSpecificError) + } + PacketType::Connack => { + let mut packet = ConnackPacket::<'b, MAX_PROPERTIES>::new(); + if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { + // if err == BufferError::PacketTypeMismatch { + // let mut disc = DisconnectPacket::<'b, MAX_PROPERTIES>::new(); + // if disc.decode(&mut BuffReader::new(self.buffer, read)).is_ok() { + // error!("Client was disconnected with reason: "); + // return Err(ReasonCode::from(disc.disconnect_reason)); + // } + // } + error!("[DECODE ERR]: {}", err); + Err(ReasonCode::BuffError) + } else if packet.connect_reason_code != 0x00 { + Err(ReasonCode::from(packet.connect_reason_code)) + } else { + Ok(Event::Connack) + } + } + PacketType::Puback => { + let reason: Result<[u16; 2], BufferError> = { + let mut packet = PubackPacket::<'b, MAX_PROPERTIES>::new(); + packet + .decode(&mut BuffReader::new(self.buffer, read)) + .map(|_| [packet.packet_identifier, packet.reason_code as u16]) + }; + + if let Err(err) = reason { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + + let res = reason.unwrap(); + + if res[1] != 0 { + return Err(ReasonCode::from(res[1] as u8)); + } + + Ok(Event::Puback(res[0])) + } + PacketType::Suback => { + let reason: Result<(u16, Vec), BufferError> = { + let mut packet = SubackPacket::<'b, MAX_TOPICS, MAX_PROPERTIES>::new(); + packet + .decode(&mut BuffReader::new(self.buffer, read)) + .map(|_| (packet.packet_identifier, packet.reason_codes)) + }; + + if let Err(err) = reason { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + let (packet_identifier, reasons) = reason.unwrap(); + let mut i = 0; + loop { + if i == reasons.len() { + break; + } + if *reasons.get(i).unwrap() + != (>::into(self.config.max_subscribe_qos) + >> 1) + { + return Err(ReasonCode::from(*reasons.get(i).unwrap())); + } + i += 1; + } + Ok(Event::Suback(packet_identifier)) + } + PacketType::Unsuback => { + let res: Result = { + let mut packet = UnsubackPacket::<'b, 1, MAX_PROPERTIES>::new(); + packet + .decode(&mut BuffReader::new(self.buffer, read)) + .map(|_| packet.packet_identifier) + }; + + if let Err(err) = res { + error!("[DECODE ERR]: {}", err); + Err(ReasonCode::BuffError) + } else { + Ok(Event::Unsuback(res.unwrap())) + } + } + PacketType::Pingresp => { + let mut packet = PingrespPacket::new(); + if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) { + error!("[DECODE ERR]: {}", err); + Err(ReasonCode::BuffError) + } else { + Ok(Event::Pingresp) + } + } + PacketType::Publish => { + let mut packet = PublishPacket::<'b, 5>::new(); + if let Err(err) = { packet.decode(&mut BuffReader::new(self.buffer, read)) } { + // if err == BufferError::PacketTypeMismatch { + // let mut disc = DisconnectPacket::<'b, 5>::new(); + // if disc.decode(&mut BuffReader::new(self.buffer, read)).is_ok() { + // error!("Client was disconnected with reason: "); + // return Err(ReasonCode::from(disc.disconnect_reason)); + // } + // } + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + + if (packet.fixed_header & 0x06) + == >::into(QualityOfService::QoS1) + { + let mut puback = PubackPacket::<'b, MAX_PROPERTIES>::new(); + puback.packet_identifier = packet.packet_identifier; + puback.reason_code = 0x00; + { + let len = { puback.encode(self.recv_buffer, self.recv_buffer_len) }; + if let Err(err) = len { + error!("[DECODE ERR]: {}", err); + return Err(ReasonCode::BuffError); + } + conn.send(&self.recv_buffer[0..len.unwrap()]).await?; + } + } + + Ok(Event::Message( + packet.topic_name.string, + packet.message.unwrap(), + )) + } + PacketType::Disconnect => { + let mut disc = DisconnectPacket::<'b, 5>::new(); + let res = disc.decode(&mut BuffReader::new(self.buffer, read)); + + match res { + Ok(_) => Ok(Event::Disconnect(ReasonCode::from(disc.disconnect_reason))), + Err(err) => { + error!("[DECODE ERR]: {}", err); + Err(ReasonCode::BuffError) + } + } + } + } + } +} + +#[cfg(not(feature = "tls"))] +async fn receive_packet<'c, T: Read + Write>( + buffer: &mut [u8], + buffer_len: usize, + recv_buffer: &mut [u8], + conn: &'c mut NetworkConnection, +) -> Result { + use crate::utils::buffer_writer::RemLenError; + + let target_len: usize; + let mut rem_len: Result; + let mut writer = BuffWriter::new(buffer, buffer_len); + let mut i = 0; + + // Get len of packet + trace!("Reading lenght of packet"); + loop { + trace!(" Reading in loop!"); + let len: usize = conn + .receive(&mut recv_buffer[writer.position..(writer.position + 1)]) + .await?; + trace!(" Received data!"); + if len == 0 { + trace!("Zero byte len packet received, dropping connection."); + return Err(ReasonCode::NetworkError); + } + i += len; + if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) { + error!("Error occurred during write to buffer!"); + return Err(ReasonCode::BuffError); + } + if i > 1 { + rem_len = writer.get_rem_len(); + if rem_len.is_ok() { + break; + } + if i >= 5 { + error!("Could not read len of packet!"); + return Err(ReasonCode::NetworkError); + } + } + } + trace!("Lenght done!"); + let rem_len_len = i; + i = 0; + if let Ok(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()) { + trace!("Reading packet with target len {}", l); + target_len = l as usize; + } else { + error!("Could not decode len of packet!"); + return Err(ReasonCode::BuffError); + } + + loop { + if writer.position == target_len + rem_len_len { + trace!("Received packet with len: {}", (target_len + rem_len_len)); + return Ok(target_len + rem_len_len); + } + let len: usize = conn + .receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]) + .await?; + i += len; + if let Err(_e) = + writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) + { + error!("Error occurred during write to buffer!"); + return Err(ReasonCode::BuffError); + } + } +} + +#[cfg(feature = "tls")] +async fn receive_packet<'c, T: Read + Write>( + buffer: &mut [u8], + buffer_len: usize, + recv_buffer: &mut [u8], + conn: &'c mut NetworkConnection, +) -> Result { + trace!("Reading packet"); + let mut writer = BuffWriter::new(buffer, buffer_len); + let len = conn.receive(recv_buffer).await?; + if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + len)]) + { + error!("Error occurred during write to buffer!"); + return Err(ReasonCode::BuffError); + } + Ok(len) +} diff --git a/src/encoding/variable_byte_integer.rs b/src/encoding/variable_byte_integer.rs index ad71f6c..769e728 100644 --- a/src/encoding/variable_byte_integer.rs +++ b/src/encoding/variable_byte_integer.rs @@ -58,30 +58,29 @@ impl VariableByteIntegerEncoder { loop { encoded_byte = (target % MOD) as u8; - target = target / 128; + target /= 128; if target > 0 { - encoded_byte = encoded_byte | 128; + encoded_byte |= 128; } res[i] = encoded_byte; - i = i + 1; - if target <= 0 { + i += 1; + if target == 0 { break; } } - return Ok(res); + Ok(res) } pub fn len(var_int: VariableByteInteger) -> usize { let mut i: usize = 0; loop { - let encoded_byte: u8; - encoded_byte = var_int[i]; - i = i + 1; + let encoded_byte = var_int[i]; + i += 1; if (encoded_byte & 128) == 0 { break; } } - return i; + i } } @@ -102,17 +101,17 @@ impl VariableByteIntegerDecoder { loop { encoded_byte = encoded[i]; - i = i + 1; - ret = ret + ((encoded_byte & 127) as u32 * multiplier) as u32; + i += 1; + ret += (encoded_byte & 127) as u32 * multiplier; if multiplier > 128 * 128 * 128 { return Err(BufferError::DecodingError); } - multiplier = multiplier * 128; + multiplier *= 128; if (encoded_byte & 128) == 0 { break; } } - return Ok(ret); + Ok(ret) } } diff --git a/src/lib.rs b/src/lib.rs index 0cf6008..96cf02d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,6 @@ #![cfg_attr(not(feature = "std"), no_std)] #![allow(dead_code)] #![feature(type_alias_impl_trait)] -#![feature(generic_associated_types)] pub(crate) mod fmt; pub mod client; diff --git a/src/packet/v5/auth_packet.rs b/src/packet/v5/auth_packet.rs index 7a6591b..e5cac3c 100644 --- a/src/packet/v5/auth_packet.rs +++ b/src/packet/v5/auth_packet.rs @@ -78,8 +78,8 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for AuthPacket<'a, MAX_PROPERTI let mut rm_ln = self.property_len; let property_len_enc: [u8; 4] = VariableByteIntegerEncoder::encode(self.property_len)?; let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); - rm_ln = rm_ln + property_len_len as u32; - rm_ln = rm_ln + 1; + rm_ln += property_len_len as u32; + rm_ln += 1; buff_writer.write_u8(self.fixed_header)?; buff_writer.write_variable_byte_int(rm_ln)?; @@ -92,7 +92,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for AuthPacket<'a, MAX_PROPERTI fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { self.decode_fixed_header(buff_reader)?; self.auth_reason = buff_reader.read_u8()?; - return self.decode_properties(buff_reader); + self.decode_properties(buff_reader) } fn set_property_len(&mut self, value: u32) { @@ -100,7 +100,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for AuthPacket<'a, MAX_PROPERTI } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/connack_packet.rs b/src/packet/v5/connack_packet.rs index e8911c1..64bf0c4 100644 --- a/src/packet/v5/connack_packet.rs +++ b/src/packet/v5/connack_packet.rs @@ -72,7 +72,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPE } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Connack).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Connack { error!("Packet you are trying to decode is not CONNACK packet!"); return Err(BufferError::PacketTypeMismatch); } @@ -86,7 +86,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPE } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/connect_packet.rs b/src/packet/v5/connect_packet.rs index babec24..1748b2c 100644 --- a/src/packet/v5/connect_packet.rs +++ b/src/packet/v5/connect_packet.rs @@ -80,22 +80,22 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> let y = Property::ReceiveMaximum(20); x.properties.push(y); x.client_id.len = 0; - return x; + x } pub fn add_packet_type(&mut self, new_packet_type: PacketType) { - self.fixed_header = self.fixed_header & 0x0F; - self.fixed_header = self.fixed_header | >::into(new_packet_type); + self.fixed_header &= 0x0F; + self.fixed_header |= u8::from(new_packet_type); } pub fn add_username(&mut self, username: &EncodedString<'a>) { self.username = (*username).clone(); - self.connect_flags = self.connect_flags | 0x80; + self.connect_flags |= 0x80; } pub fn add_password(&mut self, password: &BinaryData<'a>) { self.password = (*password).clone(); - self.connect_flags = self.connect_flags | 0x40; + self.connect_flags |= 0x40; } pub fn add_will(&mut self, topic: &EncodedString<'a>, payload: &BinaryData<'a>, retain: bool) { @@ -152,7 +152,7 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<' let wil_prop_len_len = VariableByteIntegerEncoder::len(wil_prop_len_enc); rm_ln = rm_ln + wil_prop_len_len as u32 - + self.will_property_len as u32 + + self.will_property_len + self.will_topic.len as u32 + 2 + self.will_payload.len as u32 @@ -206,7 +206,7 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<' } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/disconnect_packet.rs b/src/packet/v5/disconnect_packet.rs index cb59a71..a6f095e 100644 --- a/src/packet/v5/disconnect_packet.rs +++ b/src/packet/v5/disconnect_packet.rs @@ -78,7 +78,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PR } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Disconnect).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Disconnect { error!("Packet you are trying to decode is not DISCONNECT packet!"); return Err(BufferError::WrongPacketToDecode); } @@ -87,7 +87,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PR return Ok(()); } self.disconnect_reason = buff_reader.read_u8()?; - return self.decode_properties(buff_reader); + self.decode_properties(buff_reader) } fn set_property_len(&mut self, value: u32) { @@ -95,7 +95,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PR } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/mqtt_packet.rs b/src/packet/v5/mqtt_packet.rs index c5e6fc5..3c7b121 100644 --- a/src/packet/v5/mqtt_packet.rs +++ b/src/packet/v5/mqtt_packet.rs @@ -59,14 +59,14 @@ pub trait Packet<'a> { let prop = properties.get(i).unwrap(); if self.property_allowed(prop) { self.push_to_properties((*prop).clone()); - res = res + prop.len() as u32 + 1; + res = res + prop.encoded_len() as u32 + 1; } - i = i + 1; + i += 1; if i == max { break; } } - return res; + res } /// Setter for packet fixed header @@ -84,7 +84,7 @@ pub trait Packet<'a> { loop { prop = Property::decode(buff_reader)?; //debug!("Parsed property {:?}", prop); - x = x + prop.len() as u32 + 1; + x = x + prop.encoded_len() as u32 + 1; self.push_to_properties(prop); if x == self.get_property_len() { @@ -104,6 +104,6 @@ pub trait Packet<'a> { trace!("First byte of accepted packet: {:02X}", first_byte); self.set_fixed_header(first_byte); self.set_remaining_len(buff_reader.read_variable_byte_int()?); - return Ok(PacketType::from(first_byte)); + Ok(PacketType::from(first_byte)) } } diff --git a/src/packet/v5/packet_type.rs b/src/packet/v5/packet_type.rs index eb6d40a..0108f60 100644 --- a/src/packet/v5/packet_type.rs +++ b/src/packet/v5/packet_type.rs @@ -47,7 +47,7 @@ pub enum PacketType { impl From for PacketType { fn from(orig: u8) -> Self { let packet_type: u8 = orig & 0xF0; - return match packet_type { + match packet_type { 0x10 => PacketType::Connect, 0x20 => PacketType::Connack, 0x00 => PacketType::Reserved, @@ -65,13 +65,13 @@ impl From for PacketType { 0xE0 => PacketType::Disconnect, 0xF0 => PacketType::Auth, _ => PacketType::Reserved, - }; + } } } -impl Into for PacketType { - fn into(self) -> u8 { - return match self { +impl From for u8 { + fn from(value: PacketType) -> Self { + match value { PacketType::Connect => 0x10, PacketType::Connack => 0x20, PacketType::Publish => 0x30, @@ -88,6 +88,6 @@ impl Into for PacketType { PacketType::Disconnect => 0xE0, PacketType::Auth => 0xF0, PacketType::Reserved => 0x00, - }; + } } } diff --git a/src/packet/v5/pingreq_packet.rs b/src/packet/v5/pingreq_packet.rs index 3e8e218..7e439ae 100644 --- a/src/packet/v5/pingreq_packet.rs +++ b/src/packet/v5/pingreq_packet.rs @@ -48,7 +48,7 @@ impl<'a> Packet<'a> for PingreqPacket { fn encode(&mut self, buffer: &mut [u8], buffer_len: usize) -> Result { let mut buff_writer = BuffWriter::new(buffer, buffer_len); buff_writer.write_u8(self.fixed_header)?; - buff_writer.write_variable_byte_int(0 as u32)?; + buff_writer.write_variable_byte_int(0)?; Ok(buff_writer.position) } @@ -63,7 +63,7 @@ impl<'a> Packet<'a> for PingreqPacket { fn get_property_len(&mut self) -> u32 { error!("PINGREQ packet does not contain any properties!"); - return 0; + 0 } fn push_to_properties(&mut self, _property: Property<'a>) { diff --git a/src/packet/v5/pingresp_packet.rs b/src/packet/v5/pingresp_packet.rs index 22b0827..51c2457 100644 --- a/src/packet/v5/pingresp_packet.rs +++ b/src/packet/v5/pingresp_packet.rs @@ -35,7 +35,7 @@ pub struct PingrespPacket { pub remain_len: u32, } -impl<'a> PingrespPacket {} +impl PingrespPacket {} impl<'a> Packet<'a> for PingrespPacket { fn new() -> Self { @@ -54,7 +54,7 @@ impl<'a> Packet<'a> for PingrespPacket { fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { let x = self.decode_fixed_header(buff_reader)?; - if x != (PacketType::Pingresp).into() { + if x != PacketType::Pingresp { error!("Packet you are trying to decode is not PINGRESP packet!"); return Err(BufferError::PacketTypeMismatch); } @@ -71,7 +71,7 @@ impl<'a> Packet<'a> for PingrespPacket { fn get_property_len(&mut self) -> u32 { error!("PINGRESP packet does not contain any properties!"); - return 0; + 0 } fn push_to_properties(&mut self, _property: Property<'a>) { diff --git a/src/packet/v5/property.rs b/src/packet/v5/property.rs index da773ce..4ea8253 100644 --- a/src/packet/v5/property.rs +++ b/src/packet/v5/property.rs @@ -61,7 +61,9 @@ pub enum Property<'a> { impl<'a> Property<'a> { pub fn connect_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::SessionExpiryInterval(_u) => true, Property::ReceiveMaximum(_u) => true, Property::MaximumPacketSize(_u) => true, @@ -72,11 +74,13 @@ impl<'a> Property<'a> { Property::AuthenticationMethod(_u) => true, Property::AuthenticationData(_u) => true, _ => false, - }; + } } pub fn connack_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::SessionExpiryInterval(_u) => true, Property::ReceiveMaximum(_u) => true, Property::MaximumQoS(_u) => true, @@ -94,11 +98,13 @@ impl<'a> Property<'a> { Property::AuthenticationMethod(_u) => true, Property::AuthenticationData(_u) => true, _ => false, - }; + } } pub fn publish_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::PayloadFormat(_u) => true, Property::MessageExpiryInterval(_u) => true, Property::TopicAlias(_u) => true, @@ -108,142 +114,155 @@ impl<'a> Property<'a> { Property::SubscriptionIdentifier(_u) => true, Property::ContentType(_u) => true, _ => false, - }; + } } pub fn puback_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::ReasonString(_u) => true, Property::UserProperty(_u) => true, _ => false, - }; + } } pub fn pubrec_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::ReasonString(_u) => true, Property::UserProperty(_u) => true, _ => false, - }; + } } pub fn pubrel_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::ReasonString(_u) => true, Property::UserProperty(_u) => true, _ => false, - }; + } } pub fn pubcomp_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::ReasonString(_u) => true, Property::UserProperty(_u) => true, _ => false, - }; + } } pub fn subscribe_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::SubscriptionIdentifier(_u) => true, Property::UserProperty(_u) => true, _ => false, - }; + } } pub fn suback_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::ReasonString(_u) => true, Property::UserProperty(_u) => true, _ => false, - }; + } } pub fn unsubscribe_property(&self) -> bool { - return match self { - Property::UserProperty(_u) => true, - _ => false, - }; + matches!(self, Property::UserProperty(_u)) } pub fn unsuback_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::ReasonString(_u) => true, Property::UserProperty(_u) => true, _ => false, - }; + } } pub fn pingreq_property(&self) -> bool { - return match self { - _ => false, - }; + warn!("pingreq property list is incomplete"); + false } pub fn pingresp_property(&self) -> bool { - return match self { - _ => false, - }; + warn!("pingresp property list is incomplete"); + false } pub fn disconnect_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::SessionExpiryInterval(_u) => true, Property::ReasonString(_u) => true, Property::UserProperty(_u) => true, Property::ServerReference(_u) => true, _ => false, - }; + } } pub fn auth_property(&self) -> bool { - return match self { + // not possible to use with associated values with different types + #[allow(clippy::match_like_matches_macro)] + match self { Property::AuthenticationMethod(_u) => true, Property::AuthenticationData(_u) => true, Property::ReasonString(_u) => true, Property::UserProperty(_u) => true, _ => false, - }; + } } - pub fn len(&self) -> u16 { - return match self { + pub fn encoded_len(&self) -> u16 { + match self { Property::PayloadFormat(_u) => 1, Property::MessageExpiryInterval(_u) => 4, - Property::ContentType(u) => u.len(), - Property::ResponseTopic(u) => u.len(), - Property::CorrelationData(u) => u.len(), + Property::ContentType(u) => u.encoded_len(), + Property::ResponseTopic(u) => u.encoded_len(), + Property::CorrelationData(u) => u.encoded_len(), Property::SubscriptionIdentifier(u) => { VariableByteIntegerEncoder::len(VariableByteIntegerEncoder::encode(*u).unwrap()) as u16 } Property::SessionExpiryInterval(_u) => 4, - Property::AssignedClientIdentifier(u) => u.len(), + Property::AssignedClientIdentifier(u) => u.encoded_len(), Property::ServerKeepAlive(_u) => 2, - Property::AuthenticationMethod(u) => u.len(), - Property::AuthenticationData(u) => u.len(), + Property::AuthenticationMethod(u) => u.encoded_len(), + Property::AuthenticationData(u) => u.encoded_len(), Property::RequestProblemInformation(_u) => 1, Property::WillDelayInterval(_u) => 4, Property::RequestResponseInformation(_u) => 1, - Property::ResponseInformation(u) => u.len(), - Property::ServerReference(u) => u.len(), - Property::ReasonString(u) => u.len(), + Property::ResponseInformation(u) => u.encoded_len(), + Property::ServerReference(u) => u.encoded_len(), + Property::ReasonString(u) => u.encoded_len(), Property::ReceiveMaximum(_u) => 2, Property::TopicAliasMaximum(_u) => 2, Property::TopicAlias(_u) => 2, Property::MaximumQoS(_u) => 1, Property::RetainAvailable(_u) => 1, - Property::UserProperty(u) => u.len(), + Property::UserProperty(u) => u.encoded_len(), Property::MaximumPacketSize(_u) => 4, Property::WildcardSubscriptionAvailable(_u) => 1, Property::SubscriptionIdentifierAvailable(_u) => 1, Property::SharedSubscriptionAvailable(_u) => 1, _ => 0, - }; + } } pub fn encode(&self, buff_writer: &mut BuffWriter<'a>) -> Result<(), BufferError> { - return match self { + match self { Property::PayloadFormat(u) => buff_writer.write_u8(*u), Property::MessageExpiryInterval(u) => buff_writer.write_u32(*u), Property::ContentType(u) => buff_writer.write_string_ref(u), @@ -272,7 +291,7 @@ impl<'a> Property<'a> { Property::SubscriptionIdentifierAvailable(u) => buff_writer.write_u8(*u), Property::SharedSubscriptionAvailable(u) => buff_writer.write_u8(*u), _ => Err(BufferError::PropertyNotFound), - }; + } } pub fn decode(buff_reader: &mut BuffReader<'a>) -> Result, BufferError> { @@ -321,9 +340,9 @@ impl<'a> Property<'a> { } } -impl<'a> Into for &Property<'a> { - fn into(self) -> u8 { - return match &*self { +impl<'a> From<&Property<'a>> for u8 { + fn from(value: &Property<'a>) -> Self { + match value { Property::PayloadFormat(_u) => 0x01, Property::MessageExpiryInterval(_u) => 0x02, Property::ContentType(_u) => 0x03, @@ -352,14 +371,13 @@ impl<'a> Into for &Property<'a> { Property::SubscriptionIdentifierAvailable(_u) => 0x29, Property::SharedSubscriptionAvailable(_u) => 0x2A, _ => 0x00, - }; + } } } impl<'a> From for Property<'a> { fn from(_orig: u8) -> Self { - return match _orig { - _ => Property::Reserved(), - }; + warn!("Deserialization of Properties from u8 is not implemented"); + Property::Reserved() } } diff --git a/src/packet/v5/puback_packet.rs b/src/packet/v5/puback_packet.rs index bff4c77..f6e80a2 100644 --- a/src/packet/v5/puback_packet.rs +++ b/src/packet/v5/puback_packet.rs @@ -74,7 +74,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPER } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Puback).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Puback { error!("Packet you are trying to decode is not PUBACK packet!"); return Err(BufferError::PacketTypeMismatch); } @@ -95,7 +95,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPER } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/pubcomp_packet.rs b/src/packet/v5/pubcomp_packet.rs index 737e8fc..7380d00 100644 --- a/src/packet/v5/pubcomp_packet.rs +++ b/src/packet/v5/pubcomp_packet.rs @@ -74,7 +74,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubcompPacket<'a, MAX_PROPE } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Pubcomp).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Pubcomp { error!("Packet you are trying to decode is not PUBCOMP packet!"); return Err(BufferError::PacketTypeMismatch); } @@ -89,7 +89,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubcompPacket<'a, MAX_PROPE } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/publish_packet.rs b/src/packet/v5/publish_packet.rs index 4b15d3a..b7f8e85 100644 --- a/src/packet/v5/publish_packet.rs +++ b/src/packet/v5/publish_packet.rs @@ -44,23 +44,23 @@ pub enum QualityOfService { impl From for QualityOfService { fn from(orig: u8) -> Self { - return match orig { + match orig { 0 => QoS0, 2 => QoS1, 4 => QoS2, _ => INVALID, - }; + } } } -impl Into for QualityOfService { - fn into(self) -> u8 { - return match self { +impl From for u8 { + fn from(value: QualityOfService) -> Self { + match value { QoS0 => 0, QoS1 => 2, QoS2 => 4, INVALID => 3, - }; + } } } @@ -85,7 +85,11 @@ impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> { } pub fn add_qos(&mut self, qos: QualityOfService) { - self.fixed_header = self.fixed_header | >::into(qos); + self.fixed_header |= >::into(qos); + } + + pub fn add_retain(&mut self, retain: bool) { + self.fixed_header |= retain as u8 } pub fn add_identifier(&mut self, identifier: u16) { @@ -118,7 +122,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE buff_writer.write_u8(self.fixed_header)?; let qos = self.fixed_header & 0x06; if qos != 0 { - rm_ln = rm_ln + 2; + rm_ln += 2; } buff_writer.write_variable_byte_int(rm_ln)?; @@ -135,7 +139,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Publish).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Publish { error!("Packet you are trying to decode is not PUBLISH packet!"); return Err(BufferError::PacketTypeMismatch); } @@ -158,7 +162,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/pubrec_packet.rs b/src/packet/v5/pubrec_packet.rs index e1b9f29..ec1f200 100644 --- a/src/packet/v5/pubrec_packet.rs +++ b/src/packet/v5/pubrec_packet.rs @@ -74,13 +74,13 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrecPacket<'a, MAX_PROPER } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Pubrec).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Pubrec { error!("Packet you are trying to decode is not PUBREC packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; self.reason_code = buff_reader.read_u8()?; - return self.decode_properties(buff_reader); + self.decode_properties(buff_reader) } fn set_property_len(&mut self, value: u32) { @@ -88,7 +88,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrecPacket<'a, MAX_PROPER } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/pubrel_packet.rs b/src/packet/v5/pubrel_packet.rs index 960a0c4..d101c8f 100644 --- a/src/packet/v5/pubrel_packet.rs +++ b/src/packet/v5/pubrel_packet.rs @@ -74,13 +74,13 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrelPacket<'a, MAX_PROPER } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Pubrel).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Pubrel { error!("Packet you are trying to decode is not PUBREL packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; self.reason_code = buff_reader.read_u8()?; - return self.decode_properties(buff_reader); + self.decode_properties(buff_reader) } fn set_property_len(&mut self, value: u32) { @@ -88,7 +88,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrelPacket<'a, MAX_PROPER } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/reason_codes.rs b/src/packet/v5/reason_codes.rs index 496dabd..9104dab 100644 --- a/src/packet/v5/reason_codes.rs +++ b/src/packet/v5/reason_codes.rs @@ -75,9 +75,9 @@ pub enum ReasonCode { NetworkError, } -impl Into for ReasonCode { - fn into(self) -> u8 { - return match self { +impl From for u8 { + fn from(value: ReasonCode) -> Self { + match value { ReasonCode::Success => 0x00, ReasonCode::GrantedQoS1 => 0x01, ReasonCode::GrantedQoS2 => 0x02, @@ -124,13 +124,13 @@ impl Into for ReasonCode { ReasonCode::TimerNotSupported => 0xFD, ReasonCode::BuffError => 0xFE, ReasonCode::NetworkError => 0xFF, - }; + } } } impl From for ReasonCode { fn from(orig: u8) -> Self { - return match orig { + match orig { 0x00 => ReasonCode::Success, 0x01 => ReasonCode::GrantedQoS1, 0x02 => ReasonCode::GrantedQoS2, @@ -176,7 +176,7 @@ impl From for ReasonCode { 0xFD => ReasonCode::TimerNotSupported, 0xFE => ReasonCode::BuffError, _ => ReasonCode::NetworkError, - }; + } } } diff --git a/src/packet/v5/suback_packet.rs b/src/packet/v5/suback_packet.rs index 77671d7..0797c90 100644 --- a/src/packet/v5/suback_packet.rs +++ b/src/packet/v5/suback_packet.rs @@ -61,7 +61,7 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> break; } } - return Ok(()); + Ok(()) } } @@ -81,17 +81,17 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> fn encode(&mut self, _buffer: &mut [u8], _buffer_len: usize) -> Result { error!("SUBACK packet does not support encoding!"); - return Err(BufferError::WrongPacketToEncode); + Err(BufferError::WrongPacketToEncode) } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Suback).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Suback { error!("Packet you are trying to decode is not SUBACK packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; self.decode_properties(buff_reader)?; - return self.read_reason_codes(buff_reader); + self.read_reason_codes(buff_reader) } fn set_property_len(&mut self, value: u32) { @@ -99,7 +99,7 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/subscription_packet.rs b/src/packet/v5/subscription_packet.rs index c4d65cc..3b475b2 100644 --- a/src/packet/v5/subscription_packet.rs +++ b/src/packet/v5/subscription_packet.rs @@ -52,10 +52,9 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> let mut new_filter = TopicFilter::new(); new_filter.filter.string = topic_name; new_filter.filter.len = len as u16; - new_filter.sub_options = - new_filter.sub_options | (>::into(qos) >> 1); + new_filter.sub_options |= >::into(qos) >> 1; self.topic_filters.push(new_filter); - self.topic_filter_len = self.topic_filter_len + 1; + self.topic_filter_len += 1; } } @@ -63,7 +62,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> for SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> { fn new() -> Self { - let x = Self { + Self { fixed_header: PacketType::Subscribe.into(), remain_len: 0, packet_identifier: 1, @@ -71,8 +70,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> properties: Vec::, MAX_PROPERTIES>::new(), topic_filter_len: 0, topic_filters: Vec::, MAX_FILTERS>::new(), - }; - return x; + } } fn encode(&mut self, buffer: &mut [u8], buffer_len: usize) -> Result { @@ -86,7 +84,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> let mut filters_len = 0; loop { filters_len = filters_len + self.topic_filters.get(lt).unwrap().filter.len + 3; - lt = lt + 1; + lt += 1; if lt == self.topic_filter_len as usize { break; } @@ -115,7 +113,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/unsuback_packet.rs b/src/packet/v5/unsuback_packet.rs index 81cc48c..6d41096 100644 --- a/src/packet/v5/unsuback_packet.rs +++ b/src/packet/v5/unsuback_packet.rs @@ -50,7 +50,7 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> let mut i = 0; loop { self.reason_codes.push(buff_reader.read_u8()?); - i = i + 1; + i += 1; if i == MAX_REASONS { break; } @@ -79,13 +79,13 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> { - if self.decode_fixed_header(buff_reader)? != (PacketType::Unsuback).into() { + if self.decode_fixed_header(buff_reader)? != PacketType::Unsuback { error!("Packet you are trying to decode is not UNSUBACK packet!"); return Err(BufferError::PacketTypeMismatch); } self.packet_identifier = buff_reader.read_u16()?; self.decode_properties(buff_reader)?; - return self.read_reason_codes(buff_reader); + self.read_reason_codes(buff_reader) } fn set_property_len(&mut self, value: u32) { @@ -93,7 +93,7 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/packet/v5/unsubscription_packet.rs b/src/packet/v5/unsubscription_packet.rs index 11ce39a..eb0631d 100644 --- a/src/packet/v5/unsubscription_packet.rs +++ b/src/packet/v5/unsubscription_packet.rs @@ -51,9 +51,9 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> let mut new_filter = TopicFilter::new(); new_filter.filter.string = topic_name; new_filter.filter.len = len as u16; - new_filter.sub_options = new_filter.sub_options | 0x01; + new_filter.sub_options |= 0x01; self.topic_filters.push(new_filter); - self.topic_filter_len = self.topic_filter_len + 1; + self.topic_filter_len += 1; } } @@ -83,7 +83,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> let mut filters_len = 0; loop { filters_len = filters_len + self.topic_filters.get(lt).unwrap().filter.len + 2; - lt = lt + 1; + lt += 1; if lt == self.topic_filter_len as usize { break; } @@ -113,7 +113,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> } fn get_property_len(&mut self) -> u32 { - return self.property_len; + self.property_len } fn push_to_properties(&mut self, property: Property<'a>) { diff --git a/src/utils/buffer_reader.rs b/src/utils/buffer_reader.rs index 61ac1e2..188190c 100644 --- a/src/utils/buffer_reader.rs +++ b/src/utils/buffer_reader.rs @@ -38,15 +38,15 @@ pub struct BuffReader<'a> { impl<'a> BuffReader<'a> { pub fn increment_position(&mut self, increment: usize) { - self.position = self.position + increment; + self.position += increment; } pub fn new(buffer: &'a [u8], buff_len: usize) -> Self { - return BuffReader { + Self { buffer, position: 0, len: buff_len, - }; + } } /// Variable byte integer can be 1-4 Bytes long. Buffer reader takes all 4 Bytes at first and @@ -66,14 +66,14 @@ impl<'a> BuffReader<'a> { } if self.buffer[self.position + x] & 0x80 != 0 { variable_byte_integer[x] = self.buffer[self.position + x]; - len = len + 1 + len += 1 } else { variable_byte_integer[x] = self.buffer[self.position + x]; - x = x + 1; + x += 1; if x != 4 { loop { variable_byte_integer[x] = 0; - x = x + 1; + x += 1; if x == 4 { break; } @@ -81,10 +81,10 @@ impl<'a> BuffReader<'a> { break; } } - x = x + 1; + x += 1; } self.increment_position(len); - return VariableByteIntegerDecoder::decode(variable_byte_integer); + VariableByteIntegerDecoder::decode(variable_byte_integer) } /// Reading u32 from buffer as `Big endian` @@ -95,7 +95,7 @@ impl<'a> BuffReader<'a> { let (int_bytes, _rest) = self.buffer[self.position..].split_at(mem::size_of::()); let ret: u32 = u32::from_be_bytes(int_bytes.try_into().unwrap()); self.increment_position(4); - return Ok(ret); + Ok(ret) } /// Reading u16 from buffer as `Big endinan` @@ -106,7 +106,7 @@ impl<'a> BuffReader<'a> { let (int_bytes, _rest) = self.buffer[self.position..].split_at(mem::size_of::()); let ret: u16 = u16::from_be_bytes(int_bytes.try_into().unwrap()); self.increment_position(2); - return Ok(ret); + Ok(ret) } /// Reading one byte from buffer as `Big endian` @@ -116,14 +116,14 @@ impl<'a> BuffReader<'a> { } let ret: u8 = self.buffer[self.position]; self.increment_position(1); - return Ok(ret); + Ok(ret) } /// Reading UTF-8 encoded string from buffer pub fn read_string(&mut self) -> Result, BufferError> { let len = self.read_u16()? as usize; - if self.position + len - 1 >= self.len { + if self.position + len > self.len { return Err(BufferError::InsufficientBufferSize); } @@ -133,29 +133,29 @@ impl<'a> BuffReader<'a> { return Err(BufferError::Utf8Error); } self.increment_position(len); - return Ok(EncodedString { + Ok(EncodedString { string: res_str.unwrap(), len: len as u16, - }); + }) } /// Read Binary data from buffer pub fn read_binary(&mut self) -> Result, BufferError> { let len = self.read_u16()?; - if self.position + len as usize - 1 >= self.len { + if self.position + len as usize > self.len { return Err(BufferError::InsufficientBufferSize); } let res_bin = &(self.buffer[self.position..(self.position + len as usize)]); - return Ok(BinaryData { bin: res_bin, len }); + Ok(BinaryData { bin: res_bin, len }) } /// Read string pair from buffer pub fn read_string_pair(&mut self) -> Result, BufferError> { let name = self.read_string()?; let value = self.read_string()?; - return Ok(StringPair { name, value }); + Ok(StringPair { name, value }) } /// Read payload message from buffer @@ -163,6 +163,14 @@ impl<'a> BuffReader<'a> { if total_len > self.len { return &self.buffer[self.position..self.len]; } - return &self.buffer[self.position..total_len]; + &self.buffer[self.position..total_len] + } + + /// Peeking (without incremental internal pointer) one byte from buffer as `Big endian` + pub fn peek_u8(&self) -> Result { + if self.position >= self.len { + return Err(BufferError::InsufficientBufferSize); + } + Ok(self.buffer[self.position]) } } diff --git a/src/utils/buffer_writer.rs b/src/utils/buffer_writer.rs index 316b695..7d0186d 100644 --- a/src/utils/buffer_writer.rs +++ b/src/utils/buffer_writer.rs @@ -28,6 +28,9 @@ use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIn use crate::packet::v5::property::Property; use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair, TopicFilter}; +#[derive(Debug, Clone, Copy)] +pub struct RemLenError; + /// Buff writer is writing corresponding types to buffer (Byte array) and stores current position /// (later as cursor) pub struct BuffWriter<'a> { @@ -38,27 +41,28 @@ pub struct BuffWriter<'a> { impl<'a> BuffWriter<'a> { pub fn new(buffer: &'a mut [u8], buff_len: usize) -> Self { - return BuffWriter { + Self { buffer, position: 0, len: buff_len, - }; + } } fn increment_position(&mut self, increment: usize) { - self.position = self.position + increment; + self.position += increment; } /// Returns n-th Byte from the buffer to which is currently written. pub fn get_n_byte(&mut self, n: usize) -> u8 { if self.position >= n { - return self.buffer[n]; + self.buffer[n] + } else { + 0 } - return 0; } /// Return the remaining lenght of the packet from the buffer to which is packet written. - pub fn get_rem_len(&mut self) -> Result { + pub fn get_rem_len(&mut self) -> Result { let max = if self.position >= 5 { 4 } else { @@ -72,12 +76,12 @@ impl<'a> BuffWriter<'a> { return Ok(len); } if len[i - 1] & 0x80 != 0 && i == max && i != 4 { - return Err(()); + return Err(RemLenError); } if i == max { return Ok(len); } - i = i + 1; + i += 1; } } @@ -91,36 +95,36 @@ impl<'a> BuffWriter<'a> { loop { self.buffer[self.position] = array[x]; self.increment_position(1); - x = x + 1; + x += 1; if x == len { break; } } } - return Ok(()); + Ok(()) } /// Writes a single Byte to the buffer. pub fn write_u8(&mut self, byte: u8) -> Result<(), BufferError> { - return if self.position >= self.len { + if self.position >= self.len { Err(BufferError::InsufficientBufferSize) } else { self.buffer[self.position] = byte; self.increment_position(1); Ok(()) - }; + } } /// Writes the two Byte value to the buffer. pub fn write_u16(&mut self, two_bytes: u16) -> Result<(), BufferError> { let bytes: [u8; 2] = two_bytes.to_be_bytes(); - return self.insert_ref(2, &bytes); + self.insert_ref(2, &bytes) } /// Writes the four Byte value to the buffer. pub fn write_u32(&mut self, four_bytes: u32) -> Result<(), BufferError> { let bytes: [u8; 4] = four_bytes.to_be_bytes(); - return self.insert_ref(4, &bytes); + self.insert_ref(4, &bytes) } /// Writes the UTF-8 string type to the buffer. @@ -130,32 +134,32 @@ impl<'a> BuffWriter<'a> { let bytes = str.string.as_bytes(); return self.insert_ref(str.len as usize, bytes); } - return Ok(()); + Ok(()) } /// Writes BinaryData to the buffer. pub fn write_binary_ref(&mut self, bin: &BinaryData<'a>) -> Result<(), BufferError> { self.write_u16(bin.len)?; - return self.insert_ref(bin.len as usize, bin.bin); + self.insert_ref(bin.len as usize, bin.bin) } /// Writes the string pair to the buffer. pub fn write_string_pair_ref(&mut self, str_pair: &StringPair<'a>) -> Result<(), BufferError> { self.write_string_ref(&str_pair.name)?; - return self.write_string_ref(&str_pair.value); + self.write_string_ref(&str_pair.value) } /// Encodes the u32 value into the VariableByteInteger and this value writes to the buffer. pub fn write_variable_byte_int(&mut self, int: u32) -> Result<(), BufferError> { let x: VariableByteInteger = VariableByteIntegerEncoder::encode(int)?; let len = VariableByteIntegerEncoder::len(x); - return self.insert_ref(len, &x); + self.insert_ref(len, &x) } fn write_property(&mut self, property: &Property<'a>) -> Result<(), BufferError> { let x: u8 = property.into(); self.write_u8(x)?; - return property.encode(self); + property.encode(self) } /// Writes all properties from the `properties` Vec into the buffer. @@ -169,7 +173,7 @@ impl<'a> BuffWriter<'a> { loop { let prop: &Property = properties.get(i).unwrap_or(&Property::Reserved()); self.write_property(prop)?; - i = i + 1; + i += 1; if i == len { break; } @@ -189,7 +193,7 @@ impl<'a> BuffWriter<'a> { if sub { self.write_u8(topic_filter.sub_options)?; } - return Ok(()); + Ok(()) } /// Writes the topic filter Vec to the buffer. If the `sub` option is set to `false`, it will not @@ -204,7 +208,7 @@ impl<'a> BuffWriter<'a> { loop { let topic_filter: &TopicFilter<'a> = filters.get(i).unwrap(); self.write_topic_filter_ref(sub, topic_filter)?; - i = i + 1; + i += 1; if i == len { break; } diff --git a/src/utils/rng_generator.rs b/src/utils/rng_generator.rs index f703d2e..e020b78 100644 --- a/src/utils/rng_generator.rs +++ b/src/utils/rng_generator.rs @@ -23,6 +23,7 @@ impl RngCore for CountingRng { } fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), Error> { - Ok(self.fill_bytes(dest)) + self.fill_bytes(dest); + Ok(()) } } diff --git a/src/utils/types.rs b/src/utils/types.rs index cf5a1f7..6ff017a 100644 --- a/src/utils/types.rs +++ b/src/utils/types.rs @@ -56,7 +56,7 @@ impl Display for BufferError { } } /// Encoded string provides structure representing UTF-8 encoded string in MQTTv5 packets -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct EncodedString<'a> { pub string: &'a str, pub len: u16, @@ -68,13 +68,13 @@ impl EncodedString<'_> { } /// Return length of string - pub fn len(&self) -> u16 { - return self.len + 2; + pub fn encoded_len(&self) -> u16 { + self.len + 2 } } /// Binary data represents `Binary data` in MQTTv5 protocol -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct BinaryData<'a> { pub bin: &'a [u8], pub len: u16, @@ -85,13 +85,13 @@ impl BinaryData<'_> { Self { bin: &[0], len: 0 } } /// Returns length of Byte array - pub fn len(&self) -> u16 { - return self.len + 2; + pub fn encoded_len(&self) -> u16 { + self.len + 2 } } /// String pair struct represents `String pair` in MQTTv5 (2 UTF-8 encoded strings name-value) -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct StringPair<'a> { pub name: EncodedString<'a>, pub value: EncodedString<'a>, @@ -105,14 +105,13 @@ impl StringPair<'_> { } } /// Returns length which is equal to sum of the lenghts of UTF-8 encoded strings in pair - pub fn len(&self) -> u16 { - let ln = self.name.len() + self.value.len(); - return ln; + pub fn encoded_len(&self) -> u16 { + self.name.encoded_len() + self.value.encoded_len() } } /// Topic filter serves as bound for topic selection and subscription options for `SUBSCRIPTION` packet -#[derive(Debug)] +#[derive(Debug, Default)] pub struct TopicFilter<'a> { pub filter: EncodedString<'a>, pub sub_options: u8, @@ -126,7 +125,7 @@ impl TopicFilter<'_> { } } - pub fn len(&self) -> u16 { - return self.filter.len + 3; + pub fn encoded_len(&self) -> u16 { + self.filter.len + 3 } } diff --git a/tests/integration_test_single.rs b/tests/integration_test_single.rs index 3c05cc3..ee10700 100644 --- a/tests/integration_test_single.rs +++ b/tests/integration_test_single.rs @@ -63,6 +63,7 @@ fn setup() { async fn publish_core<'b>( client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>, wait: u64, + qos: QualityOfService, topic: &str, message: &str, err: bool, @@ -80,7 +81,9 @@ async fn publish_core<'b>( "[Publisher] Sending new message {} to topic {}", message, topic ); - result = client.send_message(topic, message.as_bytes()).await; + result = client + .send_message(topic, message.as_bytes(), qos, false) + .await; info!("[PUBLISHER] sent"); if err == true { assert_err!(result); @@ -107,7 +110,7 @@ async fn publish( .map_err(|_| ReasonCode::NetworkError)?; let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); - config.add_qos(qos); + config.add_max_subscribe_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 100; @@ -122,7 +125,7 @@ async fn publish( 80, config, ); - publish_core(&mut client, wait, topic, MSG, false).await + publish_core(&mut client, wait, qos, topic, MSG, false).await } async fn publish_spec( @@ -139,7 +142,7 @@ async fn publish_spec( .map_err(|_| ReasonCode::NetworkError)?; let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); - config.add_qos(qos); + config.add_max_subscribe_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 100; @@ -154,7 +157,7 @@ async fn publish_spec( 80, config, ); - publish_core(&mut client, wait, topic, message, err).await + publish_core(&mut client, wait, qos, topic, message, err).await } async fn receive_core<'b>( @@ -236,7 +239,7 @@ async fn receive_multiple( .map_err(|_| ReasonCode::NetworkError)?; let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); - config.add_qos(qos); + config.add_max_subscribe_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 60; @@ -263,7 +266,7 @@ async fn receive(ip: Ipv4Addr, qos: QualityOfService, topic: &str) -> Result<(), .map_err(|_| ReasonCode::NetworkError)?; let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); - config.add_qos(qos); + config.add_max_subscribe_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 6000; @@ -290,7 +293,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode .map_err(|_| ReasonCode::NetworkError)?; let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); - config.add_qos(qos); + config.add_max_subscribe_qos(qos); config.add_username("xyz"); config.add_password(PASSWORD); config.max_packet_size = 60; @@ -329,7 +332,7 @@ async fn receive_multiple_second_unsub( .map_err(|_| ReasonCode::NetworkError)?; let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); - config.add_qos(qos); + config.add_max_subscribe_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 60; diff --git a/tests/load_test.rs b/tests/load_test.rs index 50ee926..d19aef5 100644 --- a/tests/load_test.rs +++ b/tests/load_test.rs @@ -78,7 +78,9 @@ async fn publish_core<'b>( info!("[Publisher] Sending new message {} to topic {}", MSG, topic); let mut count = 0; loop { - result = client.send_message(topic, MSG.as_bytes()).await; + result = client + .send_message(topic, MSG.as_bytes(), QualityOfService::QoS0, false) + .await; info!("[PUBLISHER] sent {}", count); assert_ok!(result); count = count + 1; @@ -107,7 +109,7 @@ async fn publish( .map_err(|_| ReasonCode::NetworkError)?; let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(50000)); - config.add_qos(qos); + config.add_max_subscribe_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 100; @@ -171,7 +173,7 @@ async fn receive( .map_err(|_| ReasonCode::NetworkError)?; let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(50000)); - config.add_qos(qos); + config.add_max_subscribe_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); config.max_packet_size = 6000;