Error handling

This commit is contained in:
Ondrej Babec 2022-02-27 15:48:09 +01:00
parent 3cb9de0371
commit 4eaa83bf1c
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
14 changed files with 421 additions and 41 deletions

View File

@ -0,0 +1,42 @@
use crate::packet::publish_packet::QualityOfService;
use crate::utils::buffer_reader::{BinaryData, EncodedString};
pub struct ClientConfig<'a> {
pub qos: QualityOfService,
pub username_flag: bool,
pub username: EncodedString<'a>,
pub password_flag: bool,
pub password: BinaryData<'a>
}
impl ClientConfig<'a> {
pub fn new() -> Self {
Self {
qos: QualityOfService::QoS0,
username_flag: false,
username: EncodedString::new(),
password_flag: false,
password: BinaryData::new(),
}
}
pub fn add_qos(& mut self, qos: QualityOfService) {
self.qos = qos;
}
pub fn add_username(& mut self, username: &'a str) {
let mut username_s: EncodedString = EncodedString::new();
username_s.string = username;
username_s.len = username.len() as u16;
self.username_flag = true;
self.username = username_s;
}
pub fn add_password(& mut self, password: &'a str) {
let mut password_s: BinaryData = BinaryData::new();
password_s.bin = password.as_bytes();
password_s.len = password_s.bin.len() as u16;
self.password = password_s;
self.password_flag = true;
}
}

View File

@ -1,35 +1,52 @@
use core::future::Future;
use embassy::traits::rng;
use rand_core::RngCore;
use crate::client::client_config::ClientConfig;
use crate::network::network_trait::{Network, NetworkError};
use crate::packet::connack_packet::ConnackPacket;
use crate::packet::connect_packet::ConnectPacket;
use crate::packet::disconnect_packet::DisconnectPacket;
use crate::packet::mqtt_packet::Packet;
use crate::packet::puback_packet::PubackPacket;
use crate::packet::publish_packet::QualityOfService::QoS1;
use crate::packet::publish_packet::{PublishPacket, QualityOfService};
use crate::packet::suback_packet::SubackPacket;
use crate::packet::subscription_packet::SubscriptionPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::rng_generator::CountingRng;
pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
network_driver: &'a mut T,
buffer: &'a mut [u8],
recv_buffer: &'a mut [u8],
rng: CountingRng,
config: ClientConfig<'a>,
}
impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5<'a, T, MAX_PROPERTIES>
where
T: Network,
{
pub fn new(network_driver: &'a mut T, buffer: &'a mut [u8]) -> Self {
pub fn new(network_driver: &'a mut T, buffer: &'a mut [u8], recv_buffer: &'a mut [u8], config: ClientConfig<'a>) -> Self {
Self {
network_driver,
buffer,
recv_buffer,
rng: CountingRng(50),
config
}
}
pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), NetworkError> {
let mut len = {
let mut connect = ConnectPacket::<'b, 3, 0>::clean();
if self.config.username_flag {
connect.add_username(& self.config.username);
}
if self.config.password_flag {
connect.add_password(& self.config.password)
}
connect.encode(self.buffer)
};
@ -58,35 +75,55 @@ where
Ok(())
}
// connect -> connack -> publish -> QoS ? -> disconn
pub async fn send_message<'b>(
&'b mut self,
topic_name: &'b str,
message: &'b str,
qos: QualityOfService,
) -> Result<(), NetworkError> {
// publish
let identifier: u16 = self.rng.next_u32() as u16;
let len = {
let mut packet = PublishPacket::<'b, 5>::new();
packet.add_topic_name(topic_name);
packet.add_qos(self.config.qos);
packet.add_identifier(identifier);
packet.add_message(message.as_bytes());
packet.encode(self.buffer)
};
self.network_driver.send(self.buffer, len).await?;
let x = self.network_driver.send(self.buffer, len).await;
if let Err(e) = x {
log::error!("Chyba pri prenosu!");
return Err(e);
}
//QoS1
if <QualityOfService as Into<u8>>::into(qos) == <QualityOfService as Into<u8>>::into(QoS1) {
todo!();
if <QualityOfService as Into<u8>>::into(self.config.qos ) == <QualityOfService as Into<u8>>::into(QoS1) {
let reason = {
self.network_driver.receive(self.buffer).await?;
let mut packet = PubackPacket::<'b, 5>::new();
packet.decode(&mut BuffReader::new(self.buffer));
[packet.packet_identifier, packet.reason_code as u16]
};
if identifier != reason[0] {
return Err(NetworkError::IDNotMatchedOnAck);
}
if reason[1] != 0 {
return Err(NetworkError::QoSAck);
}
}
Ok(())
}
// TODO - multiple topic subscribe func
pub async fn subscribe_to_topic<'b>(&'b mut self, topic_name: &'b str) -> Result<(), NetworkError> {
let len = {
let mut subs = SubscriptionPacket::<'b, 1, 1>::new();
subs.add_new_filter(topic_name);
subs.add_new_filter(topic_name, self.config.qos);
subs.encode(self.buffer)
};
let xx: [u8; 14] = (self.buffer[0..14]).try_into().unwrap();
@ -100,7 +137,7 @@ where
*packet.reason_codes.get(0).unwrap()
};
if reason > 1 {
if reason == (<QualityOfService as Into<u8>>::into(self.config.qos) >> 1) {
Err(NetworkError::Unknown)
} else {
Ok(())
@ -109,9 +146,20 @@ where
}
pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], NetworkError> {
self.network_driver.receive(self.buffer).await?;
self.network_driver.receive(self.recv_buffer).await?;
let mut packet = PublishPacket::<'b, 5>::new();
packet.decode(&mut BuffReader::new(self.buffer));
packet.decode(&mut BuffReader::new(self.recv_buffer));
if (packet.fixed_header & 0x06) == <QualityOfService as Into<u8>>::into(QualityOfService::QoS1) {
let mut puback = PubackPacket::<'b, 5>::new();
puback.packet_identifier = packet.packet_identifier;
puback.reason_code = 0x00;
{
let len = puback.encode(self.buffer);
self.network_driver.send(self.buffer, len).await ?;
}
}
return Ok(packet.message.unwrap());
}
}

View File

@ -1 +1,2 @@
pub mod client_v5;
pub mod client_config;

View File

@ -8,20 +8,27 @@ use rust_mqtt::tokio_network::TokioNetwork;
use std::time::Duration;
use tokio::time::sleep;
use tokio::{join, task};
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::packet::publish_packet::QualityOfService::QoS1;
async fn receive() {
let mut ip: [u8; 4] = [37, 205, 11, 180];
let mut port: u16 = 1883;
let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port);
tokio_network.create_connection().await;
let mut config = ClientConfig::new();
config.add_qos(QualityOfService::QoS1);
config.add_username("test");
config.add_password("testPass");
let mut res2 = vec![0; 260];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(&mut tokio_network, &mut res2);
let mut res3 = vec![0; 260];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(&mut tokio_network, &mut res2, & mut res3, config);
let mut result = { client.connect_to_broker().await };
{
client.subscribe_to_topic("test/topic").await;
}
};
{
log::info!("Waiting for new message!");
let mes = client.receive_message().await.unwrap();
@ -38,18 +45,33 @@ async fn publish(message: &str) {
let mut port: u16 = 1883;
let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port);
tokio_network.create_connection().await;
let config = ClientConfig::new();
let mut res2 = vec![0; 260];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(&mut tokio_network, &mut res2);
let mut res3 = vec![0; 260];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(&mut tokio_network, &mut res2, & mut res3, config);
let mut result = { client.connect_to_broker().await };
log::info!("Waiting until send!");
sleep(Duration::from_secs(15));
let mut result: Result<(), NetworkError> = {
result= {
log::info!("Sending new message!");
client
.send_message("test/topic", message, QualityOfService::QoS0)
.send_message("test/topic", message)
.await
};
if let Err(e) = result {
log::error!("Chyba!");
}
result = {
log::info!("Sending new message!");
client
.send_message("test/topic", "Dalsi zprava :)")
.await
};
if let Err(err) = result {
log::error!("Chyba!");
}
{
client.disconnect().await;
@ -63,7 +85,7 @@ async fn main() {
.format_timestamp_nanos()
.init();
let recv = task::spawn(async move {
/*let recv = task::spawn(async move {
receive().await;
});
@ -71,6 +93,8 @@ async fn main() {
publish("hello world 123 !").await;
});
join!(recv, publ);
join!(recv, publ);*/
receive().await;
//publish("Ahoj 123").await;
log::info!("Done");
}

View File

@ -8,6 +8,9 @@ use crate::packet::mqtt_packet::Packet;
pub enum NetworkError {
Connection,
Unknown,
QoSAck,
IDNotMatchedOnAck,
NoMatchingSubs,
}
pub trait Network {

View File

@ -97,6 +97,16 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize>
}
self.fixed_header = cur_type | flags;
}
pub fn add_username(&mut self, username: &EncodedString<'a>) {
self.username = (*username).clone();
self.connect_flags = self.connect_flags | 0x80;
}
pub fn add_password(&mut self, password: &BinaryData<'a>) {
self.password = (*password).clone();
self.connect_flags = self.connect_flags | 0x40;
}
}
impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'a>
@ -116,7 +126,7 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'
// 12 = protocol_name_len + protocol_name + protocol_version + connect_flags + keep_alive + client_id_len
rm_ln = rm_ln + property_len_len as u32 + 12;
if self.connect_flags & 0x04 == 1 {
if self.connect_flags & 0x04 != 0 {
let wil_prop_len_enc =
VariableByteIntegerEncoder::encode(self.will_property_len).unwrap();
let wil_prop_len_len = VariableByteIntegerEncoder::len(wil_prop_len_enc);
@ -126,13 +136,13 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'
+ self.will_topic.len as u32
+ self.will_payload.len as u32;
}
if self.connect_flags & 0x80 == 1 {
rm_ln = rm_ln + self.username.len as u32;
let x = self.connect_flags & 0x80;
if (self.connect_flags & 0x80) != 0 {
rm_ln = rm_ln + self.username.len as u32 + 2;
}
if self.connect_flags & 0x40 == 1 {
rm_ln = rm_ln + self.password.len as u32;
if self.connect_flags & 0x40 != 0 {
rm_ln = rm_ln + self.password.len as u32 + 2;
}
buff_writer.write_u8(self.fixed_header);
@ -147,18 +157,18 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'
buff_writer.encode_properties::<MAX_PROPERTIES>(&self.properties);
buff_writer.write_string_ref(&self.client_id);
if self.connect_flags & 0x04 == 1 {
if self.connect_flags & 0x04 != 0 {
buff_writer.write_variable_byte_int(self.will_property_len);
buff_writer.encode_properties(&self.will_properties);
buff_writer.write_string_ref(&self.will_topic);
buff_writer.write_binary_ref(&self.will_payload);
}
if self.connect_flags & 0x80 == 1 {
if self.connect_flags & 0x80 != 0 {
buff_writer.write_string_ref(&self.username);
}
if self.connect_flags & 0x40 == 1 {
if self.connect_flags & 0x40 != 0 {
buff_writer.write_binary_ref(&self.password);
}

View File

@ -18,3 +18,4 @@ pub mod pingreq_packet;
pub mod pingresp_packet;
pub mod suback_packet;
pub mod unsuback_packet;
pub mod reason_codes;

View File

@ -30,14 +30,28 @@ impl<'a, const MAX_PROPERTIES: usize> PubackPacket<'a, MAX_PROPERTIES> {
return;
}
self.packet_identifier = buff_reader.read_u16().unwrap();
self.reason_code = buff_reader.read_u8().unwrap();
self.decode_properties(buff_reader);
if self.remain_len != 2 {
self.reason_code = buff_reader.read_u8().unwrap();
}
if self.remain_len < 4 {
self.property_len = 0;
} else {
self.decode_properties(buff_reader);
}
}
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
todo!()
Self {
fixed_header: PacketType::Puback.into(),
remain_len: 0,
packet_identifier: 0,
reason_code: 0,
property_len: 0,
properties: Vec::<Property<'a>, MAX_PROPERTIES>::new()
}
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {

View File

@ -11,6 +11,7 @@ use crate::utils::buffer_writer::BuffWriter;
use super::packet_type::PacketType;
use super::property::Property;
#[derive(Clone, Copy)]
pub enum QualityOfService {
QoS0,
QoS1,
@ -22,8 +23,8 @@ impl From<u8> for QualityOfService {
fn from(orig: u8) -> Self {
return match orig {
0 => QoS0,
1 => QoS1,
2 => QoS2,
2 => QoS1,
4 => QoS2,
_ => INVALID,
};
}
@ -33,8 +34,8 @@ impl Into<u8> for QualityOfService {
fn into(self) -> u8 {
return match self {
QoS0 => 0,
QoS1 => 1,
QoS2 => 2,
QoS1 => 2,
QoS2 => 4,
INVALID => 3,
};
}
@ -67,6 +68,14 @@ impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> {
self.message = Some(message);
}
pub fn add_qos(& mut self, qos: QualityOfService) {
self.fixed_header = self.fixed_header | <QualityOfService as Into<u8>>::into(qos);
}
pub fn add_identifier(& mut self, identifier: u16) {
self.packet_identifier = identifier;
}
pub fn decode_publish_packet(&mut self, buff_reader: &mut BuffReader<'a>) {
if self.decode_fixed_header(buff_reader) != (PacketType::Publish).into() {
log::error!("Packet you are trying to decode is not PUBLISH packet!");
@ -112,7 +121,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE
buff_writer.write_u8(self.fixed_header);
let qos = self.fixed_header & 0x03;
if qos != 0 {
rm_ln + 2;
rm_ln = rm_ln + 2;
}
buff_writer.write_variable_byte_int(rm_ln);

202
src/packet/reason_codes.rs Normal file
View File

@ -0,0 +1,202 @@
use core::fmt::{Display, Formatter, write};
use crate::packet::reason_codes::ReasonCode::ServerMoved;
pub enum ReasonCode {
Success,
GrantedQoS1,
GrantedQoS2,
DisconnectWithWillMessage,
NoMatchingSubscribers,
NoSubscriptionExisted,
ContinueAuth,
ReAuthenticate,
UnspecifiedError,
MalformedPacket,
ProtocolError,
ImplementationSpecificError,
UnsupportedProtocolVersion,
ClientIdNotValid,
BadUserNameOrPassword,
NotAuthorized,
ServerUnavailable,
ServerBusy,
Banned,
ServerShuttingDown,
BadAuthMethod,
KeepAliveTimeout,
SessionTakeOver,
TopicFilterInvalid,
TopicNameInvalid,
PacketIdentifierInUse,
PacketIdentifierNotFound,
ReceiveMaximumExceeded,
TopicAliasInvalid,
PacketTooLarge,
MessageRateTooHigh,
QuotaExceeded,
AdministrativeAction,
PayloadFormatInvalid,
RetainNotSupported,
QoSNotSupported,
UseAnotherServer,
ServerMoved,
SharedSubscriptionNotSupported,
ConnectionRateExceeded,
MaximumConnectTime,
SubscriptionIdentifiersNotSupported,
WildcardSubscriptionNotSupported,
Unknown
}
impl Into<u8> for ReasonCode {
fn into(self) -> u8 {
return match self {
ReasonCode::Success => 0x00,
ReasonCode::GrantedQoS1 => 0x01,
ReasonCode::GrantedQoS2 => 0x02,
ReasonCode::DisconnectWithWillMessage => 0x04,
ReasonCode::NoMatchingSubscribers => 0x10,
ReasonCode::NoSubscriptionExisted => 0x11,
ReasonCode::ContinueAuth => 0x18,
ReasonCode::ReAuthenticate => 0x19,
ReasonCode::UnspecifiedError => 0x80,
ReasonCode::MalformedPacket => 0x81,
ReasonCode::ProtocolError => 0x82,
ReasonCode::ImplementationSpecificError => 0x83,
ReasonCode::UnsupportedProtocolVersion => 0x84,
ReasonCode::ClientIdNotValid => 0x85,
ReasonCode::BadUserNameOrPassword => 0x86,
ReasonCode::NotAuthorized => 0x87,
ReasonCode::ServerUnavailable => 0x88,
ReasonCode::ServerBusy => 0x89,
ReasonCode::Banned => 0x8A,
ReasonCode::ServerShuttingDown => 0x8B,
ReasonCode::BadAuthMethod => 0x8C,
ReasonCode::KeepAliveTimeout => 0x8D,
ReasonCode::SessionTakeOver => 0x8E,
ReasonCode::TopicFilterInvalid => 0x8F,
ReasonCode::TopicNameInvalid => 0x90,
ReasonCode::PacketIdentifierInUse => 0x91,
ReasonCode::PacketIdentifierNotFound => 0x92,
ReasonCode::ReceiveMaximumExceeded => 0x93,
ReasonCode::TopicAliasInvalid => 0x94,
ReasonCode::PacketTooLarge => 0x95,
ReasonCode::MessageRateTooHigh => 0x96,
ReasonCode::QuotaExceeded => 0x97,
ReasonCode::AdministrativeAction => 0x98,
ReasonCode::PayloadFormatInvalid => 0x99,
ReasonCode::RetainNotSupported => 0x9A,
ReasonCode::QoSNotSupported => 0x9B,
ReasonCode::UseAnotherServer => 0x9C,
ReasonCode::ServerMoved => 0x9D,
ReasonCode::SharedSubscriptionNotSupported => 0x9E,
ReasonCode::ConnectionRateExceeded => 0x9F,
ReasonCode::MaximumConnectTime => 0xA0,
ReasonCode::SubscriptionIdentifiersNotSupported => 0xA1,
ReasonCode::WildcardSubscriptionNotSupported => 0xA2,
ReasonCode::Unknown => 0xFF
}
}
}
impl From<u8> for ReasonCode {
fn from(orig: u8) -> Self {
return match orig {
0x00 => ReasonCode::Success,
0x01 => ReasonCode::GrantedQoS1,
0x02 => ReasonCode::GrantedQoS2,
0x04 => ReasonCode::DisconnectWithWillMessage,
0x10 => ReasonCode::NoMatchingSubscribers,
0x11 => ReasonCode::NoSubscriptionExisted,
0x18 => ReasonCode::ContinueAuth,
0x19 => ReasonCode::ReAuthenticate,
0x80 => ReasonCode::UnspecifiedError,
0x81 => ReasonCode::MalformedPacket,
0x82 => ReasonCode::ProtocolError,
0x83 => ReasonCode::ImplementationSpecificError,
0x84 => ReasonCode::UnsupportedProtocolVersion,
0x85 => ReasonCode::ClientIdNotValid,
0x86 => ReasonCode::BadUserNameOrPassword,
0x87 => ReasonCode::NotAuthorized,
0x88 => ReasonCode::ServerUnavailable,
0x89 => ReasonCode::ServerBusy,
0x8A => ReasonCode::Banned,
0x8B => ReasonCode::ServerShuttingDown,
0x8C => ReasonCode::BadAuthMethod,
0x8D => ReasonCode::KeepAliveTimeout,
0x8E => ReasonCode::SessionTakeOver,
0x8F => ReasonCode::TopicFilterInvalid,
0x90 => ReasonCode::TopicNameInvalid,
0x91 => ReasonCode::PacketIdentifierInUse,
0x92 => ReasonCode::PacketIdentifierNotFound,
0x93 => ReasonCode::ReceiveMaximumExceeded,
0x94 => ReasonCode::TopicAliasInvalid,
0x95 => ReasonCode::PacketTooLarge,
0x96 => ReasonCode::MessageRateTooHigh,
0x97 => ReasonCode::QuotaExceeded,
0x98 => ReasonCode::AdministrativeAction,
0x99 => ReasonCode::PayloadFormatInvalid,
0x9A => ReasonCode::RetainNotSupported,
0x9B => ReasonCode::QoSNotSupported,
0x9C => ReasonCode::UseAnotherServer,
0x9D => ReasonCode::ServerMoved,
0x9E => ReasonCode::SharedSubscriptionNotSupported,
0xA0 => ReasonCode::MaximumConnectTime,
0xA1 => ReasonCode::SubscriptionIdentifiersNotSupported,
0xA2 => ReasonCode::WildcardSubscriptionNotSupported,
_ => ReasonCode::Unknown
}
}
}
impl Display for ReasonCode {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match *self {
ReasonCode::Success => write!(f, "Operation was successful!"),
ReasonCode::GrantedQoS1 => write!(f, "Granted QoS level 1!"),
ReasonCode::GrantedQoS2 => write!(f, "Granted QoS level 2!"),
ReasonCode::DisconnectWithWillMessage => {}
ReasonCode::NoMatchingSubscribers => write!(f, "No matching subscribers on broker!"),
ReasonCode::NoSubscriptionExisted => write!(f, "Subscription not exist!"),
ReasonCode::ContinueAuth => write!(f, "Broker asks for more AUTH packets!"),
ReasonCode::ReAuthenticate => write!(f, "Broker requires re-authentication!"),
ReasonCode::UnspecifiedError => write!(f, "Unspecified error!"),
ReasonCode::MalformedPacket => write!(f, "Malformed packet sent!"),
ReasonCode::ProtocolError => write!(f, "Protocol specific error!"),
ReasonCode::ImplementationSpecificError => write!(f, "Implementation specific error!"),
ReasonCode::UnsupportedProtocolVersion => write!(f, "Unsupported protocol version!"),
ReasonCode::ClientIdNotValid => write!(f, "Client sent not valid identification"),
ReasonCode::BadUserNameOrPassword => write!(f, "Authentication error, username of password not valid!"),
ReasonCode::NotAuthorized => write!(f, "Client not authorized!"),
ReasonCode::ServerUnavailable => write!(f, "Server unavailable!"),
ReasonCode::ServerBusy => write!(f, "Server is busy!"),
ReasonCode::Banned => write!(f, "Client is banned on broker!"),
ReasonCode::ServerShuttingDown => write!(f, "Server is shutting down!"),
ReasonCode::BadAuthMethod => write!(f, "Provided bad authentication method!"),
ReasonCode::KeepAliveTimeout => write!(f, "Client reached timeout"),
ReasonCode::SessionTakeOver => write!(f, "Took over session!"),
ReasonCode::TopicFilterInvalid => write!(f, "Topic filter is not valid!"),
ReasonCode::TopicNameInvalid => write!(f, "Topic name is not valid!"),
ReasonCode::PacketIdentifierInUse => write!(f, "Packet identifier is already in use!"),
ReasonCode::PacketIdentifierNotFound => write!(f, "Packet identifier not found!"),
ReasonCode::ReceiveMaximumExceeded => write!(f, "Maximum receive amount exceeded!"),
ReasonCode::TopicAliasInvalid => write!(f, "Invalid topic alias!"),
ReasonCode::PacketTooLarge => write!(f, "Sent packet was too large!"),
ReasonCode::MessageRateTooHigh => write!(f, "Message rate is too high!"),
ReasonCode::QuotaExceeded => write!(f, "Quota exceeded!"),
ReasonCode::AdministrativeAction => write!(f, "Administrative action!"),
ReasonCode::PayloadFormatInvalid => write!(f, "Invalid payload format!"),
ReasonCode::RetainNotSupported => write!(f, "Message retain not supported!"),
ReasonCode::QoSNotSupported => write!(f, "Used QoS is not supported!"),
ReasonCode::UseAnotherServer => write!(f, "Use another server!"),
ReasonCode::ServerMoved => write!(f, "Server moved!"),
ReasonCode::SharedSubscriptionNotSupported => write!(f, "Shared subscription is not supported"),
ReasonCode::ConnectionRateExceeded => write!(f, "Connection rate exceeded!"),
ReasonCode::MaximumConnectTime => write!(f, "Maximum connect time exceeded!"),
ReasonCode::SubscriptionIdentifiersNotSupported => write!(f, "Subscription identifier not supported!"),
ReasonCode::WildcardSubscriptionNotSupported => write!(f, "Wildcard subscription not supported!"),
ReasonCode::Unknown => write!(f, "Unknown error!"),
}
}
}

View File

@ -2,6 +2,7 @@ use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use crate::packet::mqtt_packet::Packet;
use crate::packet::publish_packet::QualityOfService;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_reader::TopicFilter;
use crate::utils::buffer_writer::BuffWriter;
@ -32,11 +33,12 @@ pub struct SubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES
impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize>
SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES>
{
pub fn add_new_filter(& mut self, topic_name: &'a str) {
pub fn add_new_filter(& mut self, topic_name: &'a str, qos: QualityOfService) {
let len = topic_name.len();
let mut new_filter = TopicFilter::new();
new_filter.filter.string = topic_name;
new_filter.filter.len = len as u16;
new_filter.sub_options = new_filter.sub_options | (<QualityOfService as Into<u8>>::into(qos) >> 1);
self.topic_filters.push(new_filter);
self.topic_filter_len = self.topic_filter_len + 1;
}
@ -55,10 +57,6 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a>
topic_filter_len: 0,
topic_filters: Vec::<TopicFilter<'a>, MAX_FILTERS>::new(),
};
let mut p = TopicFilter::new();
p.filter.len = 6;
p.filter.string = "test/#";
x.topic_filters.push(p);
return x;
}

View File

@ -4,6 +4,7 @@ use core::str;
use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder;
#[derive(Debug)]
#[derive(Clone)]
pub struct EncodedString<'a> {
pub string: &'a str,
pub len: u16,
@ -20,6 +21,7 @@ impl EncodedString<'_> {
}
#[derive(Debug)]
#[derive(Clone)]
pub struct BinaryData<'a> {
pub bin: &'a [u8],
pub len: u16,

View File

@ -1,2 +1,3 @@
pub mod buffer_reader;
pub mod buffer_writer;
pub mod rng_generator;

View File

@ -0,0 +1,25 @@
use rand_core::{RngCore, Error, impls};
pub struct CountingRng(pub u64);
impl RngCore for CountingRng {
fn next_u32(&mut self) -> u32 {
self.next_u64() as u32
}
fn next_u64(&mut self) -> u64 {
self.0 += 1;
if self.0 > u16::MAX as u64 {
self.0 = 1;
}
self.0
}
fn fill_bytes(&mut self, dest: &mut [u8]) {
impls::fill_bytes_via_next(self, dest)
}
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), Error> {
Ok(self.fill_bytes(dest))
}
}