From 4eaa83bf1cf16a3ac4f0d4adc47df34670f31df4 Mon Sep 17 00:00:00 2001 From: Ondrej Babec Date: Sun, 27 Feb 2022 15:48:09 +0100 Subject: [PATCH] Error handling --- src/client/client_config.rs | 42 +++++++ src/client/client_v5.rs | 72 +++++++++-- src/client/mod.rs | 1 + src/main.rs | 38 ++++-- src/network/network_trait.rs | 3 + src/packet/connect_packet.rs | 28 +++-- src/packet/mod.rs | 1 + src/packet/puback_packet.rs | 20 ++- src/packet/publish_packet.rs | 19 ++- src/packet/reason_codes.rs | 202 ++++++++++++++++++++++++++++++ src/packet/subscription_packet.rs | 8 +- src/utils/buffer_reader.rs | 2 + src/utils/mod.rs | 1 + src/utils/rng_generator.rs | 25 ++++ 14 files changed, 421 insertions(+), 41 deletions(-) create mode 100644 src/client/client_config.rs create mode 100644 src/packet/reason_codes.rs create mode 100644 src/utils/rng_generator.rs diff --git a/src/client/client_config.rs b/src/client/client_config.rs new file mode 100644 index 0000000..21b21dc --- /dev/null +++ b/src/client/client_config.rs @@ -0,0 +1,42 @@ +use crate::packet::publish_packet::QualityOfService; +use crate::utils::buffer_reader::{BinaryData, EncodedString}; + +pub struct ClientConfig<'a> { + pub qos: QualityOfService, + pub username_flag: bool, + pub username: EncodedString<'a>, + pub password_flag: bool, + pub password: BinaryData<'a> +} + +impl ClientConfig<'a> { + pub fn new() -> Self { + Self { + qos: QualityOfService::QoS0, + username_flag: false, + username: EncodedString::new(), + password_flag: false, + password: BinaryData::new(), + } + } + + pub fn add_qos(& mut self, qos: QualityOfService) { + self.qos = qos; + } + + pub fn add_username(& mut self, username: &'a str) { + let mut username_s: EncodedString = EncodedString::new(); + username_s.string = username; + username_s.len = username.len() as u16; + self.username_flag = true; + self.username = username_s; + } + + pub fn add_password(& mut self, password: &'a str) { + let mut password_s: BinaryData = BinaryData::new(); + password_s.bin = password.as_bytes(); + password_s.len = password_s.bin.len() as u16; + self.password = password_s; + self.password_flag = true; + } +} \ No newline at end of file diff --git a/src/client/client_v5.rs b/src/client/client_v5.rs index eae67f9..937492c 100644 --- a/src/client/client_v5.rs +++ b/src/client/client_v5.rs @@ -1,35 +1,52 @@ use core::future::Future; +use embassy::traits::rng; +use rand_core::RngCore; +use crate::client::client_config::ClientConfig; use crate::network::network_trait::{Network, NetworkError}; use crate::packet::connack_packet::ConnackPacket; use crate::packet::connect_packet::ConnectPacket; use crate::packet::disconnect_packet::DisconnectPacket; use crate::packet::mqtt_packet::Packet; +use crate::packet::puback_packet::PubackPacket; use crate::packet::publish_packet::QualityOfService::QoS1; use crate::packet::publish_packet::{PublishPacket, QualityOfService}; use crate::packet::suback_packet::SubackPacket; use crate::packet::subscription_packet::SubscriptionPacket; use crate::utils::buffer_reader::BuffReader; +use crate::utils::rng_generator::CountingRng; pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> { network_driver: &'a mut T, buffer: &'a mut [u8], + recv_buffer: &'a mut [u8], + rng: CountingRng, + config: ClientConfig<'a>, } impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5<'a, T, MAX_PROPERTIES> where T: Network, { - pub fn new(network_driver: &'a mut T, buffer: &'a mut [u8]) -> Self { + pub fn new(network_driver: &'a mut T, buffer: &'a mut [u8], recv_buffer: &'a mut [u8], config: ClientConfig<'a>) -> Self { Self { network_driver, buffer, + recv_buffer, + rng: CountingRng(50), + config } } pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), NetworkError> { let mut len = { let mut connect = ConnectPacket::<'b, 3, 0>::clean(); + if self.config.username_flag { + connect.add_username(& self.config.username); + } + if self.config.password_flag { + connect.add_password(& self.config.password) + } connect.encode(self.buffer) }; @@ -58,35 +75,55 @@ where Ok(()) } - - // connect -> connack -> publish -> QoS ? -> disconn pub async fn send_message<'b>( &'b mut self, topic_name: &'b str, message: &'b str, - qos: QualityOfService, ) -> Result<(), NetworkError> { - // publish + + let identifier: u16 = self.rng.next_u32() as u16; let len = { let mut packet = PublishPacket::<'b, 5>::new(); packet.add_topic_name(topic_name); + packet.add_qos(self.config.qos); + packet.add_identifier(identifier); packet.add_message(message.as_bytes()); packet.encode(self.buffer) }; - self.network_driver.send(self.buffer, len).await?; + let x = self.network_driver.send(self.buffer, len).await; + + if let Err(e) = x { + log::error!("Chyba pri prenosu!"); + return Err(e); + } //QoS1 - if >::into(qos) == >::into(QoS1) { - todo!(); + if >::into(self.config.qos ) == >::into(QoS1) { + let reason = { + self.network_driver.receive(self.buffer).await?; + let mut packet = PubackPacket::<'b, 5>::new(); + packet.decode(&mut BuffReader::new(self.buffer)); + [packet.packet_identifier, packet.reason_code as u16] + }; + + if identifier != reason[0] { + return Err(NetworkError::IDNotMatchedOnAck); + } + + if reason[1] != 0 { + return Err(NetworkError::QoSAck); + } } Ok(()) } + // TODO - multiple topic subscribe func + pub async fn subscribe_to_topic<'b>(&'b mut self, topic_name: &'b str) -> Result<(), NetworkError> { let len = { let mut subs = SubscriptionPacket::<'b, 1, 1>::new(); - subs.add_new_filter(topic_name); + subs.add_new_filter(topic_name, self.config.qos); subs.encode(self.buffer) }; let xx: [u8; 14] = (self.buffer[0..14]).try_into().unwrap(); @@ -100,7 +137,7 @@ where *packet.reason_codes.get(0).unwrap() }; - if reason > 1 { + if reason == (>::into(self.config.qos) >> 1) { Err(NetworkError::Unknown) } else { Ok(()) @@ -109,9 +146,20 @@ where } pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], NetworkError> { - self.network_driver.receive(self.buffer).await?; + self.network_driver.receive(self.recv_buffer).await?; let mut packet = PublishPacket::<'b, 5>::new(); - packet.decode(&mut BuffReader::new(self.buffer)); + packet.decode(&mut BuffReader::new(self.recv_buffer)); + + if (packet.fixed_header & 0x06) == >::into(QualityOfService::QoS1) { + let mut puback = PubackPacket::<'b, 5>::new(); + puback.packet_identifier = packet.packet_identifier; + puback.reason_code = 0x00; + { + let len = puback.encode(self.buffer); + self.network_driver.send(self.buffer, len).await ?; + } + } + return Ok(packet.message.unwrap()); } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 3f271f3..c047371 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1 +1,2 @@ pub mod client_v5; +pub mod client_config; diff --git a/src/main.rs b/src/main.rs index 9c18a55..824dc28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,20 +8,27 @@ use rust_mqtt::tokio_network::TokioNetwork; use std::time::Duration; use tokio::time::sleep; use tokio::{join, task}; +use rust_mqtt::client::client_config::ClientConfig; +use rust_mqtt::packet::publish_packet::QualityOfService::QoS1; async fn receive() { let mut ip: [u8; 4] = [37, 205, 11, 180]; let mut port: u16 = 1883; let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); tokio_network.create_connection().await; + let mut config = ClientConfig::new(); + config.add_qos(QualityOfService::QoS1); + config.add_username("test"); + config.add_password("testPass"); let mut res2 = vec![0; 260]; - let mut client = MqttClientV5::::new(&mut tokio_network, &mut res2); + let mut res3 = vec![0; 260]; + let mut client = MqttClientV5::::new(&mut tokio_network, &mut res2, & mut res3, config); let mut result = { client.connect_to_broker().await }; { client.subscribe_to_topic("test/topic").await; - } + }; { log::info!("Waiting for new message!"); let mes = client.receive_message().await.unwrap(); @@ -38,18 +45,33 @@ async fn publish(message: &str) { let mut port: u16 = 1883; let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); tokio_network.create_connection().await; + let config = ClientConfig::new(); let mut res2 = vec![0; 260]; - let mut client = MqttClientV5::::new(&mut tokio_network, &mut res2); + let mut res3 = vec![0; 260]; + let mut client = MqttClientV5::::new(&mut tokio_network, &mut res2, & mut res3, config); let mut result = { client.connect_to_broker().await }; log::info!("Waiting until send!"); sleep(Duration::from_secs(15)); - let mut result: Result<(), NetworkError> = { + result= { log::info!("Sending new message!"); client - .send_message("test/topic", message, QualityOfService::QoS0) + .send_message("test/topic", message) .await }; + if let Err(e) = result { + log::error!("Chyba!"); + } + + result = { + log::info!("Sending new message!"); + client + .send_message("test/topic", "Dalsi zprava :)") + .await + }; + if let Err(err) = result { + log::error!("Chyba!"); + } { client.disconnect().await; @@ -63,7 +85,7 @@ async fn main() { .format_timestamp_nanos() .init(); - let recv = task::spawn(async move { + /*let recv = task::spawn(async move { receive().await; }); @@ -71,6 +93,8 @@ async fn main() { publish("hello world 123 !").await; }); - join!(recv, publ); + join!(recv, publ);*/ + receive().await; + //publish("Ahoj 123").await; log::info!("Done"); } diff --git a/src/network/network_trait.rs b/src/network/network_trait.rs index b446bc3..3466a3f 100644 --- a/src/network/network_trait.rs +++ b/src/network/network_trait.rs @@ -8,6 +8,9 @@ use crate::packet::mqtt_packet::Packet; pub enum NetworkError { Connection, Unknown, + QoSAck, + IDNotMatchedOnAck, + NoMatchingSubs, } pub trait Network { diff --git a/src/packet/connect_packet.rs b/src/packet/connect_packet.rs index 7a38449..1845d3b 100644 --- a/src/packet/connect_packet.rs +++ b/src/packet/connect_packet.rs @@ -97,6 +97,16 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> } self.fixed_header = cur_type | flags; } + + pub fn add_username(&mut self, username: &EncodedString<'a>) { + self.username = (*username).clone(); + self.connect_flags = self.connect_flags | 0x80; + } + + pub fn add_password(&mut self, password: &BinaryData<'a>) { + self.password = (*password).clone(); + self.connect_flags = self.connect_flags | 0x40; + } } impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'a> @@ -116,7 +126,7 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<' // 12 = protocol_name_len + protocol_name + protocol_version + connect_flags + keep_alive + client_id_len rm_ln = rm_ln + property_len_len as u32 + 12; - if self.connect_flags & 0x04 == 1 { + if self.connect_flags & 0x04 != 0 { let wil_prop_len_enc = VariableByteIntegerEncoder::encode(self.will_property_len).unwrap(); let wil_prop_len_len = VariableByteIntegerEncoder::len(wil_prop_len_enc); @@ -126,13 +136,13 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<' + self.will_topic.len as u32 + self.will_payload.len as u32; } - - if self.connect_flags & 0x80 == 1 { - rm_ln = rm_ln + self.username.len as u32; + let x = self.connect_flags & 0x80; + if (self.connect_flags & 0x80) != 0 { + rm_ln = rm_ln + self.username.len as u32 + 2; } - if self.connect_flags & 0x40 == 1 { - rm_ln = rm_ln + self.password.len as u32; + if self.connect_flags & 0x40 != 0 { + rm_ln = rm_ln + self.password.len as u32 + 2; } buff_writer.write_u8(self.fixed_header); @@ -147,18 +157,18 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<' buff_writer.encode_properties::(&self.properties); buff_writer.write_string_ref(&self.client_id); - if self.connect_flags & 0x04 == 1 { + if self.connect_flags & 0x04 != 0 { buff_writer.write_variable_byte_int(self.will_property_len); buff_writer.encode_properties(&self.will_properties); buff_writer.write_string_ref(&self.will_topic); buff_writer.write_binary_ref(&self.will_payload); } - if self.connect_flags & 0x80 == 1 { + if self.connect_flags & 0x80 != 0 { buff_writer.write_string_ref(&self.username); } - if self.connect_flags & 0x40 == 1 { + if self.connect_flags & 0x40 != 0 { buff_writer.write_binary_ref(&self.password); } diff --git a/src/packet/mod.rs b/src/packet/mod.rs index f76a034..2b75c1d 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -18,3 +18,4 @@ pub mod pingreq_packet; pub mod pingresp_packet; pub mod suback_packet; pub mod unsuback_packet; +pub mod reason_codes; diff --git a/src/packet/puback_packet.rs b/src/packet/puback_packet.rs index 9a51b85..d3a2705 100644 --- a/src/packet/puback_packet.rs +++ b/src/packet/puback_packet.rs @@ -30,14 +30,28 @@ impl<'a, const MAX_PROPERTIES: usize> PubackPacket<'a, MAX_PROPERTIES> { return; } self.packet_identifier = buff_reader.read_u16().unwrap(); - self.reason_code = buff_reader.read_u8().unwrap(); - self.decode_properties(buff_reader); + if self.remain_len != 2 { + self.reason_code = buff_reader.read_u8().unwrap(); + } + if self.remain_len < 4 { + self.property_len = 0; + } else { + self.decode_properties(buff_reader); + } + } } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPERTIES> { fn new() -> Self { - todo!() + Self { + fixed_header: PacketType::Puback.into(), + remain_len: 0, + packet_identifier: 0, + reason_code: 0, + property_len: 0, + properties: Vec::, MAX_PROPERTIES>::new() + } } fn encode(&mut self, buffer: &mut [u8]) -> usize { diff --git a/src/packet/publish_packet.rs b/src/packet/publish_packet.rs index 3f4ed21..99ec544 100644 --- a/src/packet/publish_packet.rs +++ b/src/packet/publish_packet.rs @@ -11,6 +11,7 @@ use crate::utils::buffer_writer::BuffWriter; use super::packet_type::PacketType; use super::property::Property; +#[derive(Clone, Copy)] pub enum QualityOfService { QoS0, QoS1, @@ -22,8 +23,8 @@ impl From for QualityOfService { fn from(orig: u8) -> Self { return match orig { 0 => QoS0, - 1 => QoS1, - 2 => QoS2, + 2 => QoS1, + 4 => QoS2, _ => INVALID, }; } @@ -33,8 +34,8 @@ impl Into for QualityOfService { fn into(self) -> u8 { return match self { QoS0 => 0, - QoS1 => 1, - QoS2 => 2, + QoS1 => 2, + QoS2 => 4, INVALID => 3, }; } @@ -67,6 +68,14 @@ impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> { self.message = Some(message); } + pub fn add_qos(& mut self, qos: QualityOfService) { + self.fixed_header = self.fixed_header | >::into(qos); + } + + pub fn add_identifier(& mut self, identifier: u16) { + self.packet_identifier = identifier; + } + pub fn decode_publish_packet(&mut self, buff_reader: &mut BuffReader<'a>) { if self.decode_fixed_header(buff_reader) != (PacketType::Publish).into() { log::error!("Packet you are trying to decode is not PUBLISH packet!"); @@ -112,7 +121,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE buff_writer.write_u8(self.fixed_header); let qos = self.fixed_header & 0x03; if qos != 0 { - rm_ln + 2; + rm_ln = rm_ln + 2; } buff_writer.write_variable_byte_int(rm_ln); diff --git a/src/packet/reason_codes.rs b/src/packet/reason_codes.rs new file mode 100644 index 0000000..22a54f1 --- /dev/null +++ b/src/packet/reason_codes.rs @@ -0,0 +1,202 @@ +use core::fmt::{Display, Formatter, write}; +use crate::packet::reason_codes::ReasonCode::ServerMoved; + +pub enum ReasonCode { + Success, + GrantedQoS1, + GrantedQoS2, + DisconnectWithWillMessage, + NoMatchingSubscribers, + NoSubscriptionExisted, + ContinueAuth, + ReAuthenticate, + UnspecifiedError, + MalformedPacket, + ProtocolError, + ImplementationSpecificError, + UnsupportedProtocolVersion, + ClientIdNotValid, + BadUserNameOrPassword, + NotAuthorized, + ServerUnavailable, + ServerBusy, + Banned, + ServerShuttingDown, + BadAuthMethod, + KeepAliveTimeout, + SessionTakeOver, + TopicFilterInvalid, + TopicNameInvalid, + PacketIdentifierInUse, + PacketIdentifierNotFound, + ReceiveMaximumExceeded, + TopicAliasInvalid, + PacketTooLarge, + MessageRateTooHigh, + QuotaExceeded, + AdministrativeAction, + PayloadFormatInvalid, + RetainNotSupported, + QoSNotSupported, + UseAnotherServer, + ServerMoved, + SharedSubscriptionNotSupported, + ConnectionRateExceeded, + MaximumConnectTime, + SubscriptionIdentifiersNotSupported, + WildcardSubscriptionNotSupported, + Unknown +} + +impl Into for ReasonCode { + fn into(self) -> u8 { + return match self { + ReasonCode::Success => 0x00, + ReasonCode::GrantedQoS1 => 0x01, + ReasonCode::GrantedQoS2 => 0x02, + ReasonCode::DisconnectWithWillMessage => 0x04, + ReasonCode::NoMatchingSubscribers => 0x10, + ReasonCode::NoSubscriptionExisted => 0x11, + ReasonCode::ContinueAuth => 0x18, + ReasonCode::ReAuthenticate => 0x19, + ReasonCode::UnspecifiedError => 0x80, + ReasonCode::MalformedPacket => 0x81, + ReasonCode::ProtocolError => 0x82, + ReasonCode::ImplementationSpecificError => 0x83, + ReasonCode::UnsupportedProtocolVersion => 0x84, + ReasonCode::ClientIdNotValid => 0x85, + ReasonCode::BadUserNameOrPassword => 0x86, + ReasonCode::NotAuthorized => 0x87, + ReasonCode::ServerUnavailable => 0x88, + ReasonCode::ServerBusy => 0x89, + ReasonCode::Banned => 0x8A, + ReasonCode::ServerShuttingDown => 0x8B, + ReasonCode::BadAuthMethod => 0x8C, + ReasonCode::KeepAliveTimeout => 0x8D, + ReasonCode::SessionTakeOver => 0x8E, + ReasonCode::TopicFilterInvalid => 0x8F, + ReasonCode::TopicNameInvalid => 0x90, + ReasonCode::PacketIdentifierInUse => 0x91, + ReasonCode::PacketIdentifierNotFound => 0x92, + ReasonCode::ReceiveMaximumExceeded => 0x93, + ReasonCode::TopicAliasInvalid => 0x94, + ReasonCode::PacketTooLarge => 0x95, + ReasonCode::MessageRateTooHigh => 0x96, + ReasonCode::QuotaExceeded => 0x97, + ReasonCode::AdministrativeAction => 0x98, + ReasonCode::PayloadFormatInvalid => 0x99, + ReasonCode::RetainNotSupported => 0x9A, + ReasonCode::QoSNotSupported => 0x9B, + ReasonCode::UseAnotherServer => 0x9C, + ReasonCode::ServerMoved => 0x9D, + ReasonCode::SharedSubscriptionNotSupported => 0x9E, + ReasonCode::ConnectionRateExceeded => 0x9F, + ReasonCode::MaximumConnectTime => 0xA0, + ReasonCode::SubscriptionIdentifiersNotSupported => 0xA1, + ReasonCode::WildcardSubscriptionNotSupported => 0xA2, + ReasonCode::Unknown => 0xFF + } + + } +} + +impl From for ReasonCode { + fn from(orig: u8) -> Self { + return match orig { + 0x00 => ReasonCode::Success, + 0x01 => ReasonCode::GrantedQoS1, + 0x02 => ReasonCode::GrantedQoS2, + 0x04 => ReasonCode::DisconnectWithWillMessage, + 0x10 => ReasonCode::NoMatchingSubscribers, + 0x11 => ReasonCode::NoSubscriptionExisted, + 0x18 => ReasonCode::ContinueAuth, + 0x19 => ReasonCode::ReAuthenticate, + 0x80 => ReasonCode::UnspecifiedError, + 0x81 => ReasonCode::MalformedPacket, + 0x82 => ReasonCode::ProtocolError, + 0x83 => ReasonCode::ImplementationSpecificError, + 0x84 => ReasonCode::UnsupportedProtocolVersion, + 0x85 => ReasonCode::ClientIdNotValid, + 0x86 => ReasonCode::BadUserNameOrPassword, + 0x87 => ReasonCode::NotAuthorized, + 0x88 => ReasonCode::ServerUnavailable, + 0x89 => ReasonCode::ServerBusy, + 0x8A => ReasonCode::Banned, + 0x8B => ReasonCode::ServerShuttingDown, + 0x8C => ReasonCode::BadAuthMethod, + 0x8D => ReasonCode::KeepAliveTimeout, + 0x8E => ReasonCode::SessionTakeOver, + 0x8F => ReasonCode::TopicFilterInvalid, + 0x90 => ReasonCode::TopicNameInvalid, + 0x91 => ReasonCode::PacketIdentifierInUse, + 0x92 => ReasonCode::PacketIdentifierNotFound, + 0x93 => ReasonCode::ReceiveMaximumExceeded, + 0x94 => ReasonCode::TopicAliasInvalid, + 0x95 => ReasonCode::PacketTooLarge, + 0x96 => ReasonCode::MessageRateTooHigh, + 0x97 => ReasonCode::QuotaExceeded, + 0x98 => ReasonCode::AdministrativeAction, + 0x99 => ReasonCode::PayloadFormatInvalid, + 0x9A => ReasonCode::RetainNotSupported, + 0x9B => ReasonCode::QoSNotSupported, + 0x9C => ReasonCode::UseAnotherServer, + 0x9D => ReasonCode::ServerMoved, + 0x9E => ReasonCode::SharedSubscriptionNotSupported, + 0xA0 => ReasonCode::MaximumConnectTime, + 0xA1 => ReasonCode::SubscriptionIdentifiersNotSupported, + 0xA2 => ReasonCode::WildcardSubscriptionNotSupported, + _ => ReasonCode::Unknown + } + } +} + +impl Display for ReasonCode { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match *self { + ReasonCode::Success => write!(f, "Operation was successful!"), + ReasonCode::GrantedQoS1 => write!(f, "Granted QoS level 1!"), + ReasonCode::GrantedQoS2 => write!(f, "Granted QoS level 2!"), + ReasonCode::DisconnectWithWillMessage => {} + ReasonCode::NoMatchingSubscribers => write!(f, "No matching subscribers on broker!"), + ReasonCode::NoSubscriptionExisted => write!(f, "Subscription not exist!"), + ReasonCode::ContinueAuth => write!(f, "Broker asks for more AUTH packets!"), + ReasonCode::ReAuthenticate => write!(f, "Broker requires re-authentication!"), + ReasonCode::UnspecifiedError => write!(f, "Unspecified error!"), + ReasonCode::MalformedPacket => write!(f, "Malformed packet sent!"), + ReasonCode::ProtocolError => write!(f, "Protocol specific error!"), + ReasonCode::ImplementationSpecificError => write!(f, "Implementation specific error!"), + ReasonCode::UnsupportedProtocolVersion => write!(f, "Unsupported protocol version!"), + ReasonCode::ClientIdNotValid => write!(f, "Client sent not valid identification"), + ReasonCode::BadUserNameOrPassword => write!(f, "Authentication error, username of password not valid!"), + ReasonCode::NotAuthorized => write!(f, "Client not authorized!"), + ReasonCode::ServerUnavailable => write!(f, "Server unavailable!"), + ReasonCode::ServerBusy => write!(f, "Server is busy!"), + ReasonCode::Banned => write!(f, "Client is banned on broker!"), + ReasonCode::ServerShuttingDown => write!(f, "Server is shutting down!"), + ReasonCode::BadAuthMethod => write!(f, "Provided bad authentication method!"), + ReasonCode::KeepAliveTimeout => write!(f, "Client reached timeout"), + ReasonCode::SessionTakeOver => write!(f, "Took over session!"), + ReasonCode::TopicFilterInvalid => write!(f, "Topic filter is not valid!"), + ReasonCode::TopicNameInvalid => write!(f, "Topic name is not valid!"), + ReasonCode::PacketIdentifierInUse => write!(f, "Packet identifier is already in use!"), + ReasonCode::PacketIdentifierNotFound => write!(f, "Packet identifier not found!"), + ReasonCode::ReceiveMaximumExceeded => write!(f, "Maximum receive amount exceeded!"), + ReasonCode::TopicAliasInvalid => write!(f, "Invalid topic alias!"), + ReasonCode::PacketTooLarge => write!(f, "Sent packet was too large!"), + ReasonCode::MessageRateTooHigh => write!(f, "Message rate is too high!"), + ReasonCode::QuotaExceeded => write!(f, "Quota exceeded!"), + ReasonCode::AdministrativeAction => write!(f, "Administrative action!"), + ReasonCode::PayloadFormatInvalid => write!(f, "Invalid payload format!"), + ReasonCode::RetainNotSupported => write!(f, "Message retain not supported!"), + ReasonCode::QoSNotSupported => write!(f, "Used QoS is not supported!"), + ReasonCode::UseAnotherServer => write!(f, "Use another server!"), + ReasonCode::ServerMoved => write!(f, "Server moved!"), + ReasonCode::SharedSubscriptionNotSupported => write!(f, "Shared subscription is not supported"), + ReasonCode::ConnectionRateExceeded => write!(f, "Connection rate exceeded!"), + ReasonCode::MaximumConnectTime => write!(f, "Maximum connect time exceeded!"), + ReasonCode::SubscriptionIdentifiersNotSupported => write!(f, "Subscription identifier not supported!"), + ReasonCode::WildcardSubscriptionNotSupported => write!(f, "Wildcard subscription not supported!"), + ReasonCode::Unknown => write!(f, "Unknown error!"), + } + } +} \ No newline at end of file diff --git a/src/packet/subscription_packet.rs b/src/packet/subscription_packet.rs index 8a63aab..d396035 100644 --- a/src/packet/subscription_packet.rs +++ b/src/packet/subscription_packet.rs @@ -2,6 +2,7 @@ use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::packet::mqtt_packet::Packet; +use crate::packet::publish_packet::QualityOfService; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::TopicFilter; use crate::utils::buffer_writer::BuffWriter; @@ -32,11 +33,12 @@ pub struct SubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> { - pub fn add_new_filter(& mut self, topic_name: &'a str) { + pub fn add_new_filter(& mut self, topic_name: &'a str, qos: QualityOfService) { let len = topic_name.len(); let mut new_filter = TopicFilter::new(); new_filter.filter.string = topic_name; new_filter.filter.len = len as u16; + new_filter.sub_options = new_filter.sub_options | (>::into(qos) >> 1); self.topic_filters.push(new_filter); self.topic_filter_len = self.topic_filter_len + 1; } @@ -55,10 +57,6 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> topic_filter_len: 0, topic_filters: Vec::, MAX_FILTERS>::new(), }; - let mut p = TopicFilter::new(); - p.filter.len = 6; - p.filter.string = "test/#"; - x.topic_filters.push(p); return x; } diff --git a/src/utils/buffer_reader.rs b/src/utils/buffer_reader.rs index d1a8276..6dc2c46 100644 --- a/src/utils/buffer_reader.rs +++ b/src/utils/buffer_reader.rs @@ -4,6 +4,7 @@ use core::str; use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder; #[derive(Debug)] +#[derive(Clone)] pub struct EncodedString<'a> { pub string: &'a str, pub len: u16, @@ -20,6 +21,7 @@ impl EncodedString<'_> { } #[derive(Debug)] +#[derive(Clone)] pub struct BinaryData<'a> { pub bin: &'a [u8], pub len: u16, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 7f6b1dc..3e38d69 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,2 +1,3 @@ pub mod buffer_reader; pub mod buffer_writer; +pub mod rng_generator; diff --git a/src/utils/rng_generator.rs b/src/utils/rng_generator.rs new file mode 100644 index 0000000..9d98c42 --- /dev/null +++ b/src/utils/rng_generator.rs @@ -0,0 +1,25 @@ +use rand_core::{RngCore, Error, impls}; + +pub struct CountingRng(pub u64); + +impl RngCore for CountingRng { + fn next_u32(&mut self) -> u32 { + self.next_u64() as u32 + } + + fn next_u64(&mut self) -> u64 { + self.0 += 1; + if self.0 > u16::MAX as u64 { + self.0 = 1; + } + self.0 + } + + fn fill_bytes(&mut self, dest: &mut [u8]) { + impls::fill_bytes_via_next(self, dest) + } + + fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), Error> { + Ok(self.fill_bytes(dest)) + } +} \ No newline at end of file