diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index 9ba016a..563194c 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -1 +1 @@ -pub mod variable_byte_integer; \ No newline at end of file +pub mod variable_byte_integer; diff --git a/src/encoding/variable_byte_integer.rs b/src/encoding/variable_byte_integer.rs index 6ed1b35..a338f90 100644 --- a/src/encoding/variable_byte_integer.rs +++ b/src/encoding/variable_byte_integer.rs @@ -1,7 +1,8 @@ #![crate_name = "doc"] use crate::utils::buffer_reader::ParseError; -/// VariableByteIntegerEncoder and VariableByteIntegerDecoder are implemented based on + +/// VariableByteIntegerEncoder and VariableByteIntegerDecoder are implemented based on /// pseudo code which is introduced in MQTT version 5.0 OASIS standard accesible from /// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901107 @@ -9,6 +10,7 @@ use crate::utils::buffer_reader::ParseError; /// encode integer into MQTT variable byte integer format. This format is mainly used to encode /// lenghts stored in a packet. pub struct VariableByteIntegerEncoder; + /// Variable byte integers error enumeration is used by both encoder and decoder for /// error notification. @@ -44,7 +46,20 @@ impl VariableByteIntegerEncoder { break; } } - return Ok(res); + return Ok(res); + } + + pub fn len(var_int: VariableByteInteger) -> usize { + let mut i: usize = 0; + loop { + let mut encoded_byte: u8; + encoded_byte = var_int[i]; + i = i + 1; + if (encoded_byte & 128) == 0 { + break; + } + } + return i; } } @@ -52,7 +67,6 @@ impl VariableByteIntegerEncoder { /// decode message lenghts in MQTT packet and other parts encoded into variable byte integer. pub struct VariableByteIntegerDecoder; - impl VariableByteIntegerDecoder { /// Decode function takes as paramater encoded integer represented /// as array of 4 unsigned numbers of exactly 1 Byte each -> 4 Bytes maximal @@ -63,7 +77,7 @@ impl VariableByteIntegerDecoder { let mut encoded_byte: u8; let mut i: usize = 0; - + loop { encoded_byte = encoded[i]; i = i + 1; @@ -79,4 +93,4 @@ impl VariableByteIntegerDecoder { return Ok(ret); } -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index 7673fbc..9ce0124 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,10 @@ - +#![feature(in_band_lifetimes)] #![macro_use] #![cfg_attr(not(feature = "std"), no_std)] #![allow(dead_code)] -pub mod packet; pub mod encoding; +pub mod packet; pub mod utils; #[allow(unused_variables)] @@ -33,4 +33,4 @@ pub fn print_value_size(name: &'static str, val: &T) { name, core::mem::size_of_val::(val) );*/ -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index 535d88f..d8ac3a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,42 +1,53 @@ -use rust_mqtt::packet::mqtt_packet::*; -use rust_mqtt::packet::packet_type::PacketType; -use rust_mqtt::packet::packet_builder::PacketBuilder; -use rust_mqtt::encoding::variable_byte_integer::VariableByteIntegerEncoder; -use rust_mqtt::encoding::variable_byte_integer::VariableByteIntegerDecoder; -use rust_mqtt::packet::property::*; -use rust_mqtt::utils::buffer_reader::BuffReader; -use heapless::Vec; +/*use rust_mqtt::packet::mqtt_packet::*; +use rust_mqtt::packet::property::*;*/ +/*use heapless::Vec; use std::fs::File; -use std::io::Read; +use std::io::Read;*/ + +use rust_mqtt::packet::connect_packet::ConnectPacket; +use rust_mqtt::packet::mqtt_packet::Packet; +use rust_mqtt::packet::publish_packet::PublishPacket; +use rust_mqtt::packet::subscription_packet::SubscriptionPacket; fn main() { env_logger::builder() - .filter_level(log::LevelFilter::Info) - .format_timestamp_nanos() - .init(); + .filter_level(log::LevelFilter::Info) + .format_timestamp_nanos() + .init(); - let fl = File::open("/Users/obabec/development/school/rust-mqtt/mqtt_control_example.bin"); + let mut pckt: SubscriptionPacket<1, 0> = SubscriptionPacket::new(); + let mut res = vec![0; 140]; + let lnsub = pckt.encode(&mut res); + println!("{:02X?}", &res[0..lnsub]); + let mut res2 = vec![0; 260]; + let mut x = b"hello world"; + let mut pblsh = PublishPacket::<0>::new(x); + let lnpblsh = pblsh.encode(&mut res2); + println!("{:02X?}", &res2[0..lnpblsh]); + log::info!("xxx"); + + let mut res3 = vec![0; 260]; + let mut cntrl = ConnectPacket::<3, 0>::clean(); + let lncntrl = cntrl.encode(&mut res3); + println!("{:02X?}", &res3[0..lncntrl]); + log::info!("xxx"); + + /*let fl = File::open("/Users/obabec/development/school/rust-mqtt/mqtt_control_example.bin"); let mut f = File::open("/Users/obabec/development/school/rust-mqtt/mqtt_control_example.bin").expect("no file found"); let mut buffer: [u8; 500] = [0; 500]; f.read(&mut buffer).expect("buffer overflow"); - let mut txt = Vec::new(); - let mut payld = *b"xxxxx"; - let packet = Packet::clean(txt, &mut payld); - let mut packet_builder = PacketBuilder::new(packet); - let mut buffer_reader = BuffReader::new(&buffer); + // + let mut payld = *b"xxxxx";*/ + //let packet = Packet::clean(txt, &mut payld); + /*let mut buffer_reader = BuffReader::new(&buffer); packet_builder.decode_packet(& mut buffer_reader); - + let bytes: [u8; 4] = packet_builder.currentPacket.protocol_name.to_be_bytes(); let prot = std::str::from_utf8(&bytes).unwrap(); - log::info!("Protocol name: {}", prot) + log::info!("Protocol name: {}", prot)*/ } - -/*fn test(tst: &str) { - log::info!("xx"); - log::info!("Prvni: {}", ) -}*/ \ No newline at end of file diff --git a/src/packet/auth_packet.rs b/src/packet/auth_packet.rs new file mode 100644 index 0000000..4e62253 --- /dev/null +++ b/src/packet/auth_packet.rs @@ -0,0 +1,94 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct AuthPacket<'a, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub auth_reason: u8, + + pub property_len: u32, + + pub properties: Vec, MAX_PROPERTIES>, +} + +impl<'a, const MAX_PROPERTIES: usize> AuthPacket<'a, MAX_PROPERTIES> { + pub fn decode_auth_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_fixed_header(buff_reader); + self.auth_reason = buff_reader.read_u8().unwrap(); + self.decode_properties(buff_reader); + } + + pub fn add_reason_code(&mut self, code: u8) { + if code != 0 && code != 24 && code != 25 { + log::error!("Provided reason code is not supported!"); + return; + } + self.auth_reason = code; + } + + pub fn add_property(&mut self, p: Property<'a>) { + if p.auth_property() { + self.push_to_properties(p); + } else { + log::error!("Provided property is not correct AUTH packet property!"); + } + } +} + +impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for AuthPacket<'a, MAX_PROPERTIES> { + /*fn new() -> Packet<'a, MAX_PROPERTIES> { + return AuthPacket { fixed_header: PacketType::Auth.into(), remain_len: 0, auth_reason: 0, property_len: 0, properties: Vec::, MAX_PROPERTIES>::new() } + }*/ + + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + rm_ln = rm_ln + property_len_len as u32; + rm_ln = rm_ln + 1; + + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(rm_ln); + buff_writer.write_u8(self.auth_reason); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_auth_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/connack_packet.rs b/src/packet/connack_packet.rs new file mode 100644 index 0000000..83cd1cb --- /dev/null +++ b/src/packet/connack_packet.rs @@ -0,0 +1,72 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct ConnackPacket<'a, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + pub ack_flags: u8, + pub connect_reason_code: u8, + pub property_len: u32, + pub properties: Vec, MAX_PROPERTIES>, +} + +impl<'a, const MAX_PROPERTIES: usize> ConnackPacket<'a, MAX_PROPERTIES> { + pub fn decode_connack_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Connack).into() { + log::error!("Packet you are trying to decode is not CONNACK packet!"); + return; + } + self.ack_flags = buff_reader.read_u8().unwrap(); + self.connect_reason_code = buff_reader.read_u8().unwrap(); + self.decode_properties(buff_reader); + } +} + +impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPERTIES> { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + buff_writer.write_u8(self.fixed_header); + let mut property_len_enc = VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + + let rm_len: u32 = 2 + self.property_len + property_len_len as u32; + buff_writer.write_variable_byte_int(rm_len); + buff_writer.write_u8(self.ack_flags); + buff_writer.write_u8(self.connect_reason_code); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties(&self.properties); + return buff_writer.position; + } + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_connack_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/connect_packet.rs b/src/packet/connect_packet.rs new file mode 100644 index 0000000..9e55674 --- /dev/null +++ b/src/packet/connect_packet.rs @@ -0,0 +1,208 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BinaryData; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_reader::EncodedString; +use crate::utils::buffer_reader::ParseError; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct ConnectPacket<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + pub protocol_name_len: u16, + pub protocol_name: u32, + pub protocol_version: u8, + pub connect_flags: u8, + pub keep_alive: u16, + // property len + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, + + //payload + pub client_id: EncodedString<'a>, + // property len + pub will_property_len: u32, + pub will_properties: Vec, MAX_WILL_PROPERTIES>, + pub will_topic: EncodedString<'a>, + pub will_payload: BinaryData<'a>, + pub username: EncodedString<'a>, + pub password: BinaryData<'a>, +} + +impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> + ConnectPacket<'a, MAX_PROPERTIES, MAX_WILL_PROPERTIES> +{ + pub fn clean() -> Self { + let mut x = Self { + fixed_header: PacketType::Connect.into(), + remain_len: 0, + protocol_name_len: 4, + protocol_name: 0x4d515454, + protocol_version: 5, + connect_flags: 0x02, + keep_alive: 60, + property_len: 3, + properties: Vec::, MAX_PROPERTIES>::new(), + client_id: EncodedString::new(), + will_property_len: 0, + will_properties: Vec::, MAX_WILL_PROPERTIES>::new(), + will_topic: EncodedString::new(), + will_payload: BinaryData::new(), + username: EncodedString::new(), + password: BinaryData::new(), + }; + + let y = Property::ReceiveMaximum(20); + x.properties.push(y); + x.client_id.len = 0; + return x; + } + + pub fn get_reason_code(&self) { + log::info!("Getting reason code!"); + } + + pub fn add_packet_type(&mut self, new_packet_type: PacketType) { + self.fixed_header = self.fixed_header & 0x0F; + self.fixed_header = self.fixed_header | >::into(new_packet_type); + } + + pub fn add_flags(&mut self, dup: bool, qos: u8, retain: bool) { + let cur_type: u8 = self.fixed_header & 0xF0; + if cur_type != 0x30 { + log::error!("Cannot add flags into packet with other than PUBLISH type"); + return; + } + let mut flags: u8 = 0x00; + if dup { + flags = flags | 0x08; + } + if qos == 1 { + flags = flags | 0x02; + } + if qos == 2 { + flags = flags | 0x04; + } + if retain { + flags = flags | 0x01; + } + self.fixed_header = cur_type | flags; + } +} +impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'a> + for ConnectPacket<'a, MAX_PROPERTIES, MAX_WILL_PROPERTIES> +{ + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + // 12 = protocol_name_len + protocol_name + protocol_version + connect_flags + keep_alive + client_id_len + rm_ln = rm_ln + property_len_len as u32 + 12; + + if self.connect_flags & 0x04 == 1 { + let wil_prop_len_enc = + VariableByteIntegerEncoder::encode(self.will_property_len).unwrap(); + let wil_prop_len_len = VariableByteIntegerEncoder::len(wil_prop_len_enc); + rm_ln = rm_ln + + wil_prop_len_len as u32 + + self.will_property_len as u32 + + self.will_topic.len as u32 + + self.will_payload.len as u32; + } + + if self.connect_flags & 0x80 == 1 { + rm_ln = rm_ln + self.username.len as u32; + } + + if self.connect_flags & 0x40 == 1 { + rm_ln = rm_ln + self.password.len as u32; + } + + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(rm_ln); + + buff_writer.write_u16(self.protocol_name_len); + buff_writer.write_u32(self.protocol_name); + buff_writer.write_u8(self.protocol_version); + buff_writer.write_u8(self.connect_flags); + buff_writer.write_u16(self.keep_alive); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + buff_writer.write_string_ref(&self.client_id); + + if self.connect_flags & 0x04 == 1 { + buff_writer.write_variable_byte_int(self.will_property_len); + buff_writer.encode_properties(&self.will_properties); + buff_writer.write_string_ref(&self.will_topic); + buff_writer.write_binary_ref(&self.will_payload); + } + + if self.connect_flags & 0x80 == 1 { + buff_writer.write_string_ref(&self.username); + } + + if self.connect_flags & 0x40 == 1 { + buff_writer.write_binary_ref(&self.password); + } + + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + log::error!("Decode function is not available for control packet!") + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } + + fn decode_properties(&mut self, buff_reader: &mut BuffReader<'a>) { + self.property_len = buff_reader.read_variable_byte_int().unwrap(); + let mut x: u32 = 0; + let mut prop: Result; + loop { + let mut res: Property; + prop = Property::decode(buff_reader); + if let Ok(res) = prop { + log::info!("Parsed property {:?}", res); + x = x + res.len() as u32 + 1; + self.properties.push(res); + } else { + // error handlo + log::error!("Problem during property decoding"); + } + + if x == self.property_len { + break; + } + } + } +} diff --git a/src/packet/disconnect_packet.rs b/src/packet/disconnect_packet.rs new file mode 100644 index 0000000..f182e50 --- /dev/null +++ b/src/packet/disconnect_packet.rs @@ -0,0 +1,73 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct DisconnectPacket<'a, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub disconnect_reason: u8, + + pub property_len: u32, + + pub properties: Vec, MAX_PROPERTIES>, +} + +impl<'a, const MAX_PROPERTIES: usize> DisconnectPacket<'a, MAX_PROPERTIES> { + pub fn decode_auth_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Pingresp).into() { + log::error!("Packet you are trying to decode is not PUBACK packet!"); + return; + } + self.disconnect_reason = buff_reader.read_u8().unwrap(); + self.decode_properties(buff_reader); + } +} + +impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PROPERTIES> { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + buff_writer.write_u8(self.fixed_header); + let mut property_len_enc = VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + + let rm_len: u32 = 1 + self.property_len + property_len_len as u32; + buff_writer.write_variable_byte_int(rm_len); + buff_writer.write_u8(self.disconnect_reason); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties(&self.properties); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_auth_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 1ede481..f76a034 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -1,4 +1,20 @@ +//pub mod control_packet; +pub mod auth_packet; +pub mod connack_packet; pub mod mqtt_packet; pub mod packet_type; -pub mod packet_builder; -pub mod property; \ No newline at end of file +pub mod property; +pub mod puback_packet; +pub mod pubcomp_packet; +pub mod publish_packet; +pub mod pubrec_packet; +pub mod pubrel_packet; +pub mod subscription_packet; +pub mod unsubscription_packet; + +pub mod connect_packet; +pub mod disconnect_packet; +pub mod pingreq_packet; +pub mod pingresp_packet; +pub mod suback_packet; +pub mod unsuback_packet; diff --git a/src/packet/mqtt_packet.rs b/src/packet/mqtt_packet.rs index 4b5ca0d..b090d63 100644 --- a/src/packet/mqtt_packet.rs +++ b/src/packet/mqtt_packet.rs @@ -1,55 +1,50 @@ +use crate::packet::packet_type::PacketType; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_reader::ParseError; + use super::property::Property; -use super::packet_type::PacketType; -use heapless::Vec; -pub const MAX_PROPERTIES: usize = 18; +pub trait Packet<'a> { + //fn new() -> dyn Packet<'a> where Self: Sized; -pub struct Packet<'a> { - // 7 - 4 mqtt control packet type, 3-0 flagy - pub fixed_header: u8, - // 1 - 4 B lenght of variable header + len of payload - pub remain_len: u32, - - // variable header - //optional prida se pouze u packetu ve kterych ma co delat - pub packet_identifier: u16, - - pub protocol_name_len: u16, - - pub protocol_name: u32, - - pub protocol_version: u8, - - pub connect_flags: u8, - - pub keep_alive: u16, - // property len - pub property_len: u32, + fn encode(&mut self, buffer: &mut [u8]) -> usize; + fn decode(&mut self, buff_reader: &mut BuffReader<'a>); // properties - pub properties: Vec, MAX_PROPERTIES>, + fn set_property_len(&mut self, value: u32); + fn get_property_len(&mut self) -> u32; + fn push_to_properties(&mut self, property: Property<'a>); - // Payload of message - pub payload: &'a mut [u8] + // header + fn set_fixed_header(&mut self, header: u8); + fn set_remaining_len(&mut self, remaining_len: u32); + + fn decode_properties(&mut self, buff_reader: &mut BuffReader<'a>) { + self.set_property_len(buff_reader.read_variable_byte_int().unwrap()); + let mut x: u32 = 0; + let mut prop: Result; + loop { + let mut res: Property; + prop = Property::decode(buff_reader); + if let Ok(res) = prop { + log::info!("Parsed property {:?}", res); + x = x + res.len() as u32 + 1; + self.push_to_properties(res); + } else { + // error handler + log::error!("Problem during property decoding"); + } + + if x == self.get_property_len() { + break; + } + } + } + + fn decode_fixed_header(&mut self, buff_reader: &mut BuffReader) -> PacketType { + let first_byte: u8 = buff_reader.read_u8().unwrap(); + self.set_fixed_header(first_byte); + self.set_remaining_len(buff_reader.read_variable_byte_int().unwrap()); + return PacketType::from(first_byte); + } } - -impl<'a> Packet<'a> { - pub fn new(fixed_header: u8, remain_len: u32, packet_identifier: u16, protocol_name_len: u16, - protocol_name: u32, protocol_version: u8, - connect_flags: u8, keep_alive: u16, property_len: u32, - properties: Vec, MAX_PROPERTIES>, payload: &'a mut [u8]) -> Self { - Self { fixed_header, remain_len, packet_identifier, property_len, properties, payload, connect_flags, keep_alive, protocol_name_len, protocol_name, protocol_version } - } - - pub fn clean(properties: Vec, MAX_PROPERTIES>, payload: &'a mut [u8]) -> Self { - Self{ fixed_header: 0x00, remain_len: 0, packet_identifier: 0, property_len: 0, properties, payload, connect_flags: 0, keep_alive: 0, protocol_name_len: 0, protocol_name: 0, protocol_version: 5} - } - - pub fn encode(&self) { - log::info!("Encoding!"); - } - - pub fn get_reason_code(&self) { - log::info!("Getting reason code!"); - } -} \ No newline at end of file diff --git a/src/packet/packet_builder.rs b/src/packet/packet_builder.rs deleted file mode 100644 index 66d568e..0000000 --- a/src/packet/packet_builder.rs +++ /dev/null @@ -1,107 +0,0 @@ -use super::mqtt_packet::Packet; -use super::packet_type::PacketType; -use super::property::Property; -use crate::utils::buffer_reader::*; - -// Je potreba vytvori - -// metody packet buildery budou prijimat jako parametr buff reader, z ktereho bude postupne parsovat - -pub struct PacketBuilder<'a> { - pub currentPacket: Packet<'a>, -} - -impl<'a> PacketBuilder<'a> { - - pub fn new(packet: Packet<'a>) -> Self { - Self{ currentPacket: packet } - } - - pub fn build(&self) -> &Packet<'a> { - return &self.currentPacket; - } - - pub fn decode(&self, buffer: &'a mut [u8]) -> &Packet<'a> { - return &self.currentPacket; - } - - pub fn addPacketType(& mut self, new_packet_type: PacketType) { - self.currentPacket.fixed_header = self.currentPacket.fixed_header & 0x0F; - self.currentPacket.fixed_header = self.currentPacket.fixed_header | >::into(new_packet_type); - } - - pub fn addFlags(& mut self, dup: bool, qos: u8, retain: bool) { - let cur_type: u8 = self.currentPacket.fixed_header & 0xF0; - if cur_type != 0x30 { - log::error!("Cannot add flags into packet with other than PUBLISH type"); - return; - } - let mut flags: u8 = 0x00; - if dup { - flags = flags | 0x08; - } - if qos == 1 { - flags = flags | 0x02; - } - if qos == 2 { - flags = flags | 0x04; - } - if retain { - flags = flags | 0x01; - } - self.currentPacket.fixed_header = cur_type | flags; - } - - pub fn completePacket(& mut self) { - // Tutaj se cely packet dokonci - spocita se remaining len co chybi v hlavicce atd... - } - - pub fn decode_packet(& mut self, buff_reader: & mut BuffReader<'a>) { - self.decodeFixedHeader(buff_reader); - let y: u8 = self.currentPacket.fixed_header & 0xF0; - let z: u8 = (PacketType::Connect).into(); - if self.currentPacket.fixed_header & 0xF0 == (PacketType::Connect).into() { - self.decodeControllPacket(buff_reader); - } - } - - pub fn decodeFixedHeader(& mut self, buff_reader: & mut BuffReader) -> PacketType { - let first_byte: u8 = buff_reader.readU8().unwrap(); - self.currentPacket.fixed_header = first_byte; - self.currentPacket.remain_len = buff_reader.readVariableByteInt().unwrap(); - return PacketType::from(self.currentPacket.fixed_header); - } - - pub fn decode_properties(& mut self, buff_reader: & mut BuffReader<'a>) { - self.currentPacket.property_len = buff_reader.readVariableByteInt().unwrap(); - let mut x: u32 = 0; - let mut prop: Result; - loop { - let mut res: Property; - prop = Property::decode(buff_reader); - if let Ok(res) = prop { - log::info!("Parsed property {:?}", res); - x = x + res.len() as u32 + 1; - self.currentPacket.properties.push(res); - } else { - // error handlo - log::error!("Problem during property decoding"); - } - - - if x == self.currentPacket.property_len { - break; - } - } - } - - pub fn decodeControllPacket(& mut self, buff_reader: & mut BuffReader<'a>) { - self.currentPacket.packet_identifier = 0; - self.currentPacket.protocol_name_len = buff_reader.readU16().unwrap(); - self.currentPacket.protocol_name = buff_reader.readU32().unwrap(); - self.currentPacket.protocol_version = buff_reader.readU8().unwrap(); - self.currentPacket.connect_flags = buff_reader.readU8().unwrap(); - self.currentPacket.keep_alive = buff_reader.readU16().unwrap(); - self.decode_properties(buff_reader); - } -} \ No newline at end of file diff --git a/src/packet/packet_type.rs b/src/packet/packet_type.rs index 3598209..82fc069 100644 --- a/src/packet/packet_type.rs +++ b/src/packet/packet_type.rs @@ -17,54 +17,53 @@ pub enum PacketType { Pingreq, Pingresp, Disconnect, - Auth + Auth, } impl From for PacketType { fn from(orig: u8) -> Self { let packet_type: u8 = orig & 0xF0; - match packet_type { - 0x10 => return PacketType::Connect, - 0x20 => return PacketType::Connack, - 0x00 => return PacketType::Reserved, - 0x30 => return PacketType::Publish, - 0x40 => return PacketType::Puback, - 0x50 => return PacketType::Pubrec, - 0x60 => return PacketType::Pubrel, - 0x70 => return PacketType::Pubcomp, - 0x80 => return PacketType::Subscribe, - 0x90 => return PacketType::Suback, - 0xA0 => return PacketType::Unsubscribe, - 0xB0 => return PacketType::Unsuback, - 0xC0 => return PacketType::Pingreq, - 0xD0 => return PacketType::Pingresp, - 0xE0 => return PacketType::Disconnect, - 0xF0 => return PacketType::Auth, - _ => return PacketType::Reserved + return match packet_type { + 0x10 => PacketType::Connect, + 0x20 => PacketType::Connack, + 0x00 => PacketType::Reserved, + 0x30 => PacketType::Publish, + 0x40 => PacketType::Puback, + 0x50 => PacketType::Pubrec, + 0x60 => PacketType::Pubrel, + 0x70 => PacketType::Pubcomp, + 0x80 => PacketType::Subscribe, + 0x90 => PacketType::Suback, + 0xA0 => PacketType::Unsubscribe, + 0xB0 => PacketType::Unsuback, + 0xC0 => PacketType::Pingreq, + 0xD0 => PacketType::Pingresp, + 0xE0 => PacketType::Disconnect, + 0xF0 => PacketType::Auth, + _ => PacketType::Reserved, }; } } impl Into for PacketType { fn into(self) -> u8 { - match self { - PacketType::Connect => return 0x10, - PacketType::Connack => return 0x20, - PacketType::Publish => return 0x30, - PacketType::Puback => return 0x40, - PacketType::Pubrec => return 0x50, - PacketType::Pubrel => return 0x60, - PacketType::Pubcomp => return 0x70, - PacketType::Subscribe => return 0x80, - PacketType::Suback => return 0x90, - PacketType::Unsubscribe => return 0xA0, - PacketType::Unsuback => return 0xB0, - PacketType::Pingreq => return 0xC0, - PacketType::Pingresp => return 0xD0, - PacketType::Disconnect => return 0xE0, - PacketType::Auth => return 0xF0, - PacketType::Reserved => return 0x00 - } + return match self { + PacketType::Connect => 0x10, + PacketType::Connack => 0x20, + PacketType::Publish => 0x30, + PacketType::Puback => 0x40, + PacketType::Pubrec => 0x50, + PacketType::Pubrel => 0x60, + PacketType::Pubcomp => 0x70, + PacketType::Subscribe => 0x82, + PacketType::Suback => 0x90, + PacketType::Unsubscribe => 0xA0, + PacketType::Unsuback => 0xB0, + PacketType::Pingreq => 0xC0, + PacketType::Pingresp => 0xD0, + PacketType::Disconnect => 0xE0, + PacketType::Auth => 0xF0, + PacketType::Reserved => 0x00, + }; } } - diff --git a/src/packet/pingreq_packet.rs b/src/packet/pingreq_packet.rs new file mode 100644 index 0000000..40234f2 --- /dev/null +++ b/src/packet/pingreq_packet.rs @@ -0,0 +1,49 @@ +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct PingreqPacket { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, +} + +impl PingreqPacket {} + +impl<'a> Packet<'a> for PingreqPacket { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(0 as u32); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + log::error!("PingreqPacket packet does not support decode funtion on client!"); + } + + fn set_property_len(&mut self, value: u32) { + log::error!("PINGREQ packet does not contain any properties!"); + } + + fn get_property_len(&mut self) -> u32 { + log::error!("PINGREQ packet does not contain any properties!"); + return 0; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + log::error!("PINGREQ packet does not contain any properties!"); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/pingresp_packet.rs b/src/packet/pingresp_packet.rs new file mode 100644 index 0000000..b6c32ae --- /dev/null +++ b/src/packet/pingresp_packet.rs @@ -0,0 +1,56 @@ +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct PingrespPacket { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, +} + +impl<'a> PingrespPacket { + pub fn decode_pingresp_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Pingresp).into() { + log::error!("Packet you are trying to decode is not PUBACK packet!"); + return; + } + } +} + +impl<'a> Packet<'a> for PingrespPacket { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(0 as u32); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_pingresp_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + log::error!("PINGRESP packet does not contain any properties!"); + } + + fn get_property_len(&mut self) -> u32 { + log::error!("PINGRESP packet does not contain any properties!"); + return 0; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + log::error!("PINGRESP packet does not contain any properties!"); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/property.rs b/src/packet/property.rs index 059315d..fe16104 100644 --- a/src/packet/property.rs +++ b/src/packet/property.rs @@ -1,8 +1,10 @@ -use crate::utils::buffer_reader::ParseError; -use crate::utils::buffer_reader::StringPair; -use crate::utils::buffer_reader::EncodedString; +use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerEncoder}; use crate::utils::buffer_reader::BinaryData; use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_reader::EncodedString; +use crate::utils::buffer_reader::ParseError; +use crate::utils::buffer_reader::StringPair; +use crate::utils::buffer_writer::BuffWriter; #[derive(Debug)] pub enum Property<'a> { @@ -32,73 +34,171 @@ pub enum Property<'a> { MaximumPacketSize(u32), WildcardSubscriptionAvailable(u8), SubscriptionIdentifierAvailable(u8), - SharedSubscriptionAvailable(u8) + SharedSubscriptionAvailable(u8), + Reserved(), } impl<'a> Property<'a> { - pub fn len(&self) -> u16 { - match self { - Property::PayloadFormat(u) => return 1, - Property::MessageExpiryInterval(u) => return 4, - Property::ContentType(u) => return u.len(), - Property::ResponseTopic(u) => return u.len(), - Property::CorrelationData(u) => return u.len(), - Property::SubscriptionIdentifier(u) => return 4, - Property::AssignedClientIdentifier(u) => return u.len(), - Property::ServerKeepAlive(u) => return 2, - Property::AuthenticationMethod(u) => return u.len(), - Property::AuthenticationData(u) => return u.len(), - Property::RequestProblemInformation(u) => return 1, - Property::WillDelayInterval(u) => return 4, - Property::RequestResponseInformation(u) => return 1, - Property::ResponseInformation(u) => return u.len(), - Property::ServerReference(u) => return u.len(), - Property::ReasonString(u) => return u.len(), - Property::ReceiveMaximum(u) => return 2, - Property::TopicAliasMaximum(u) => return 2, - Property::TopicAlias(u) => return 2, - Property::MaximumQoS(u) => return 1, - Property::RetainAvailable(u) => return 1, - Property::UserProperty(u) => return u.len(), - Property::MaximumPacketSize(u) => return 4, - Property::WildcardSubscriptionAvailable(u) => return 1, - Property::SubscriptionIdentifierAvailable(u) => return 1, - Property::SharedSubscriptionAvailable(u) => return 1, - _ => return 0 - } + pub fn auth_property(&self) -> bool { + return match self { + Property::AuthenticationMethod(_u) => true, + Property::AuthenticationData(_u) => true, + Property::ReasonString(_u) => true, + Property::UserProperty(_u) => true, + _ => false, + }; } - pub fn decode(buff_reader: & mut BuffReader<'a>) -> Result, ParseError> { - let propertyIdentifier = buff_reader.readU8(); - match propertyIdentifier { - Ok(0x01) => return Ok(Property::PayloadFormat(buff_reader.readU8()?)), - Ok(0x02) => return Ok(Property::MessageExpiryInterval(buff_reader.readU32()?)), - Ok(0x03) => return Ok(Property::ContentType(buff_reader.readString()?)), - Ok(0x08) => return Ok(Property::ResponseTopic(buff_reader.readString()?)), - Ok(0x09) => return Ok(Property::CorrelationData(buff_reader.readBinary()?)), - Ok(0x0B) => return Ok(Property::SubscriptionIdentifier(buff_reader.readVariableByteInt()?)), - Ok(0x11) => return Ok(Property::SessionExpiryInterval(buff_reader.readU32()?)), - Ok(0x12) => return Ok(Property::AssignedClientIdentifier(buff_reader.readString()?)), - Ok(0x13) => return Ok(Property::ServerKeepAlive(buff_reader.readU16()?)), - Ok(0x15) => return Ok(Property::AuthenticationMethod(buff_reader.readString()?)), - Ok(0x16) => return Ok(Property::AuthenticationData(buff_reader.readBinary()?)), - Ok(0x17) => return Ok(Property::RequestProblemInformation(buff_reader.readU8()?)), - Ok(0x18) => return Ok(Property::WillDelayInterval(buff_reader.readU32()?)), - Ok(0x19) => return Ok(Property::RequestResponseInformation(buff_reader.readU8()?)), - Ok(0x1A) => return Ok(Property::ResponseInformation(buff_reader.readString()?)), - Ok(0x1C) => return Ok(Property::ServerReference(buff_reader.readString()?)), - Ok(0x1F) => return Ok(Property::ReasonString(buff_reader.readString()?)), - Ok(0x21) => return Ok(Property::ReceiveMaximum(buff_reader.readU16()?)), - Ok(0x22) => return Ok(Property::TopicAliasMaximum(buff_reader.readU16()?)), - Ok(0x23) => return Ok(Property::TopicAlias(buff_reader.readU16()?)), - Ok(0x24) => return Ok(Property::MaximumQoS(buff_reader.readU8()?)), - Ok(0x25) => return Ok(Property::RetainAvailable(buff_reader.readU8()?)), - Ok(0x26) => return Ok(Property::UserProperty(buff_reader.readStringPair()?)), - Ok(0x28) => return Ok(Property::WildcardSubscriptionAvailable(buff_reader.readU8()?)), - Ok(0x29) => return Ok(Property::SubscriptionIdentifierAvailable(buff_reader.readU8()?)), - Ok(0x2A) => return Ok(Property::SharedSubscriptionAvailable(buff_reader.readU8()?)), - Err(err) => return Err(err), - _ => return Err(ParseError::IdNotFound) - } + pub fn len(&self) -> u16 { + return match self { + Property::PayloadFormat(_u) => 1, + Property::MessageExpiryInterval(_u) => 4, + Property::ContentType(u) => u.len(), + Property::ResponseTopic(u) => u.len(), + Property::CorrelationData(u) => u.len(), + Property::SubscriptionIdentifier(_u) => 4, + Property::SessionExpiryInterval(_u) => 4, + Property::AssignedClientIdentifier(u) => u.len(), + Property::ServerKeepAlive(_u) => 2, + Property::AuthenticationMethod(u) => u.len(), + Property::AuthenticationData(u) => u.len(), + Property::RequestProblemInformation(_u) => 1, + Property::WillDelayInterval(_u) => 4, + Property::RequestResponseInformation(_u) => 1, + Property::ResponseInformation(u) => u.len(), + Property::ServerReference(u) => u.len(), + Property::ReasonString(u) => u.len(), + Property::ReceiveMaximum(_u) => 2, + Property::TopicAliasMaximum(_u) => 2, + Property::TopicAlias(_u) => 2, + Property::MaximumQoS(_u) => 1, + Property::RetainAvailable(_u) => 1, + Property::UserProperty(u) => u.len(), + Property::MaximumPacketSize(_u) => 4, + Property::WildcardSubscriptionAvailable(_u) => 1, + Property::SubscriptionIdentifierAvailable(_u) => 1, + Property::SharedSubscriptionAvailable(_u) => 1, + _ => 0, + }; } -} \ No newline at end of file + + pub fn encode(&self, buff_writer: &mut BuffWriter<'a>) { + return match self { + Property::PayloadFormat(u) => buff_writer.write_u8(*u), + Property::MessageExpiryInterval(u) => buff_writer.write_u32(*u), + Property::ContentType(u) => buff_writer.write_string_ref(u), + Property::ResponseTopic(u) => buff_writer.write_string_ref(u), + Property::CorrelationData(u) => buff_writer.write_binary_ref(u), + Property::SubscriptionIdentifier(u) => buff_writer.write_variable_byte_int(*u), + Property::SessionExpiryInterval(u) => buff_writer.write_u32(*u), + Property::AssignedClientIdentifier(u) => buff_writer.write_string_ref(u), + Property::ServerKeepAlive(u) => buff_writer.write_u16(*u), + Property::AuthenticationMethod(u) => buff_writer.write_string_ref(u), + Property::AuthenticationData(u) => buff_writer.write_binary_ref(u), + Property::RequestProblemInformation(u) => buff_writer.write_u8(*u), + Property::WillDelayInterval(u) => buff_writer.write_u32(*u), + Property::RequestResponseInformation(u) => buff_writer.write_u8(*u), + Property::ResponseInformation(u) => buff_writer.write_string_ref(u), + Property::ServerReference(u) => buff_writer.write_string_ref(u), + Property::ReasonString(u) => buff_writer.write_string_ref(u), + Property::ReceiveMaximum(u) => buff_writer.write_u16(*u), + Property::TopicAliasMaximum(u) => buff_writer.write_u16(*u), + Property::TopicAlias(u) => buff_writer.write_u16(*u), + Property::MaximumQoS(u) => buff_writer.write_u8(*u), + Property::RetainAvailable(u) => buff_writer.write_u8(*u), + Property::UserProperty(u) => buff_writer.write_string_pair_ref(u), + Property::MaximumPacketSize(u) => buff_writer.write_u32(*u), + Property::WildcardSubscriptionAvailable(u) => buff_writer.write_u8(*u), + Property::SubscriptionIdentifierAvailable(u) => buff_writer.write_u8(*u), + Property::SharedSubscriptionAvailable(u) => buff_writer.write_u8(*u), + _ => (), + }; + } + + pub fn decode(buff_reader: &mut BuffReader<'a>) -> Result, ParseError> { + let property_identifier = buff_reader.read_u8(); + return match property_identifier { + Ok(0x01) => Ok(Property::PayloadFormat(buff_reader.read_u8()?)), + Ok(0x02) => Ok(Property::MessageExpiryInterval(buff_reader.read_u32()?)), + Ok(0x03) => Ok(Property::ContentType(buff_reader.read_string()?)), + Ok(0x08) => Ok(Property::ResponseTopic(buff_reader.read_string()?)), + Ok(0x09) => Ok(Property::CorrelationData(buff_reader.read_binary()?)), + Ok(0x0B) => Ok(Property::SubscriptionIdentifier( + buff_reader.read_variable_byte_int()?, + )), + Ok(0x11) => Ok(Property::SessionExpiryInterval(buff_reader.read_u32()?)), + Ok(0x12) => Ok(Property::AssignedClientIdentifier( + buff_reader.read_string()?, + )), + Ok(0x13) => Ok(Property::ServerKeepAlive(buff_reader.read_u16()?)), + Ok(0x15) => Ok(Property::AuthenticationMethod(buff_reader.read_string()?)), + Ok(0x16) => Ok(Property::AuthenticationData(buff_reader.read_binary()?)), + Ok(0x17) => Ok(Property::RequestProblemInformation(buff_reader.read_u8()?)), + Ok(0x18) => Ok(Property::WillDelayInterval(buff_reader.read_u32()?)), + Ok(0x19) => Ok(Property::RequestResponseInformation(buff_reader.read_u8()?)), + Ok(0x1A) => Ok(Property::ResponseInformation(buff_reader.read_string()?)), + Ok(0x1C) => Ok(Property::ServerReference(buff_reader.read_string()?)), + Ok(0x1F) => Ok(Property::ReasonString(buff_reader.read_string()?)), + Ok(0x21) => Ok(Property::ReceiveMaximum(buff_reader.read_u16()?)), + Ok(0x22) => Ok(Property::TopicAliasMaximum(buff_reader.read_u16()?)), + Ok(0x23) => Ok(Property::TopicAlias(buff_reader.read_u16()?)), + Ok(0x24) => Ok(Property::MaximumQoS(buff_reader.read_u8()?)), + Ok(0x25) => Ok(Property::RetainAvailable(buff_reader.read_u8()?)), + Ok(0x26) => Ok(Property::UserProperty(buff_reader.read_string_pair()?)), + Ok(0x28) => Ok(Property::WildcardSubscriptionAvailable( + buff_reader.read_u8()?, + )), + Ok(0x29) => Ok(Property::SubscriptionIdentifierAvailable( + buff_reader.read_u8()?, + )), + Ok(0x2A) => Ok(Property::SharedSubscriptionAvailable( + buff_reader.read_u8()?, + )), + Err(err) => Err(err), + _ => Err(ParseError::IdNotFound), + }; + } +} + +impl Into for &Property<'a> { + fn into(self) -> u8 { + return match &*self { + Property::PayloadFormat(_u) => 0x01, + Property::MessageExpiryInterval(_u) => 0x02, + Property::ContentType(_u) => 0x03, + Property::ResponseTopic(_u) => 0x08, + Property::CorrelationData(_u) => 0x09, + Property::SubscriptionIdentifier(_u) => 0x0B, + Property::SessionExpiryInterval(_u) => 0x11, + Property::AssignedClientIdentifier(_u) => 0x12, + Property::ServerKeepAlive(_u) => 0x13, + Property::AuthenticationMethod(_u) => 0x15, + Property::AuthenticationData(_u) => 0x16, + Property::RequestProblemInformation(_u) => 0x17, + Property::WillDelayInterval(_u) => 0x18, + Property::RequestResponseInformation(_u) => 0x19, + Property::ResponseInformation(_u) => 0x1A, + Property::ServerReference(_u) => 0x1C, + Property::ReasonString(_u) => 0x1F, + Property::ReceiveMaximum(_u) => 0x21, + Property::TopicAliasMaximum(_u) => 0x22, + Property::TopicAlias(_u) => 0x23, + Property::MaximumQoS(_u) => 0x24, + Property::RetainAvailable(_u) => 0x25, + Property::UserProperty(_u) => 0x26, + Property::MaximumPacketSize(_u) => 0x27, + Property::WildcardSubscriptionAvailable(_u) => 0x28, + Property::SubscriptionIdentifierAvailable(_u) => 0x29, + Property::SharedSubscriptionAvailable(_u) => 0x2A, + _ => 0x00, + }; + } +} + +impl From for Property<'a> { + fn from(_orig: u8) -> Self { + return match _orig { + _ => Property::Reserved(), + }; + } +} diff --git a/src/packet/puback_packet.rs b/src/packet/puback_packet.rs new file mode 100644 index 0000000..42e8278 --- /dev/null +++ b/src/packet/puback_packet.rs @@ -0,0 +1,80 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct PubackPacket<'a, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub packet_identifier: u16, + pub reason_code: u8, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, +} + +impl<'a, const MAX_PROPERTIES: usize> PubackPacket<'a, MAX_PROPERTIES> { + pub fn decode_puback_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Puback).into() { + log::error!("Packet you are trying to decode is not PUBACK packet!"); + return; + } + self.packet_identifier = buff_reader.read_u16().unwrap(); + self.reason_code = buff_reader.read_u8().unwrap(); + self.decode_properties(buff_reader); + } +} + +impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPERTIES> { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + rm_ln = rm_ln + property_len_len as u32 + 3; + + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(rm_ln); + buff_writer.write_u16(self.packet_identifier); + buff_writer.write_u8(self.reason_code); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_puback_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/pubcomp_packet.rs b/src/packet/pubcomp_packet.rs new file mode 100644 index 0000000..58811a9 --- /dev/null +++ b/src/packet/pubcomp_packet.rs @@ -0,0 +1,80 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct PubcompPacket<'a, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub packet_identifier: u16, + pub reason_code: u8, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, +} + +impl<'a, const MAX_PROPERTIES: usize> PubcompPacket<'a, MAX_PROPERTIES> { + pub fn decode_puback_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Pubcomp).into() { + log::error!("Packet you are trying to decode is not PUBCOMP packet!"); + return; + } + self.packet_identifier = buff_reader.read_u16().unwrap(); + self.reason_code = buff_reader.read_u8().unwrap(); + self.decode_properties(buff_reader); + } +} + +impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubcompPacket<'a, MAX_PROPERTIES> { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + rm_ln = rm_ln + property_len_len as u32 + 3; + + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(rm_ln); + buff_writer.write_u16(self.packet_identifier); + buff_writer.write_u8(self.reason_code); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_puback_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/publish_packet.rs b/src/packet/publish_packet.rs new file mode 100644 index 0000000..cb7d05e --- /dev/null +++ b/src/packet/publish_packet.rs @@ -0,0 +1,114 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_reader::EncodedString; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct PublishPacket<'a, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub topic_name: EncodedString<'a>, + pub packet_identifier: u16, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, + + pub message: &'a [u8], +} + +impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> { + pub fn new(message: &'a [u8]) -> Self { + let mut x = Self { + fixed_header: PacketType::Publish.into(), + remain_len: 0, + topic_name: EncodedString::new(), + packet_identifier: 0, + property_len: 0, + properties: Vec::, MAX_PROPERTIES>::new(), + message, + }; + x.topic_name.string = "test/topic"; + x.topic_name.len = 10; + return x; + } + + pub fn decode_publish_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Publish).into() { + log::error!("Packet you are trying to decode is not PUBLISH packet!"); + return; + } + self.topic_name = buff_reader.read_string().unwrap(); + let qos = self.fixed_header & 0x03; + if qos != 0 { + // je potreba dekodovat jenom pro QoS 1 / 2 + self.packet_identifier = buff_reader.read_u16().unwrap(); + } + self.decode_properties(buff_reader); + self.message = buff_reader.read_message(); + } +} + +impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPERTIES> { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + let mut msg_len = self.message.len() as u32; + rm_ln = rm_ln + property_len_len as u32 + msg_len + self.topic_name.len as u32 + 2; + + buff_writer.write_u8(self.fixed_header); + let qos = self.fixed_header & 0x03; + if qos != 0 { + rm_ln + 2; + } + + buff_writer.write_variable_byte_int(rm_ln); + buff_writer.write_string_ref(&self.topic_name); + + if qos != 0 { + buff_writer.write_u16(self.packet_identifier); + } + + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + buff_writer.insert_ref(msg_len as usize, self.message); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_publish_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/pubrec_packet.rs b/src/packet/pubrec_packet.rs new file mode 100644 index 0000000..996016e --- /dev/null +++ b/src/packet/pubrec_packet.rs @@ -0,0 +1,79 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct PubrecPacket<'a, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub packet_identifier: u16, + pub reason_code: u8, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, +} + +impl<'a, const MAX_PROPERTIES: usize> PubrecPacket<'a, MAX_PROPERTIES> { + pub fn decode_pubrec_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Pubrec).into() { + log::error!("Packet you are trying to decode is not PUBREC packet!"); + return; + } + self.packet_identifier = buff_reader.read_u16().unwrap(); + self.reason_code = buff_reader.read_u8().unwrap(); + self.decode_properties(buff_reader); + } +} + +impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrecPacket<'a, MAX_PROPERTIES> { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + rm_ln = rm_ln + property_len_len as u32 + 3; + + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(rm_ln); + buff_writer.write_u16(self.packet_identifier); + buff_writer.write_u8(self.reason_code); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_pubrec_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/pubrel_packet.rs b/src/packet/pubrel_packet.rs new file mode 100644 index 0000000..3ac8da3 --- /dev/null +++ b/src/packet/pubrel_packet.rs @@ -0,0 +1,80 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct PubrelPacket<'a, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub packet_identifier: u16, + pub reason_code: u8, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, +} + +impl<'a, const MAX_PROPERTIES: usize> PubrelPacket<'a, MAX_PROPERTIES> { + pub fn decode_puback_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Pubrel).into() { + log::error!("Packet you are trying to decode is not PUBREL packet!"); + return; + } + self.packet_identifier = buff_reader.read_u16().unwrap(); + self.reason_code = buff_reader.read_u8().unwrap(); + self.decode_properties(buff_reader); + } +} + +impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrelPacket<'a, MAX_PROPERTIES> { + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + rm_ln = rm_ln + property_len_len as u32 + 3; + + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(rm_ln); + buff_writer.write_u16(self.packet_identifier); + buff_writer.write_u8(self.reason_code); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_puback_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/suback_packet.rs b/src/packet/suback_packet.rs new file mode 100644 index 0000000..c90bc28 --- /dev/null +++ b/src/packet/suback_packet.rs @@ -0,0 +1,83 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct SubackPacket<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub packet_identifier: u16, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, + + pub reason_codes: Vec, +} + +impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> + SubackPacket<'a, MAX_REASONS, MAX_PROPERTIES> +{ + pub fn read_reason_codes(&mut self, buff_reader: &mut BuffReader<'a>) { + let mut i = 0; + loop { + self.reason_codes.push(buff_reader.read_u8().unwrap()); + i = i + 1; + if i == MAX_REASONS { + break; + } + } + } + + pub fn decode_suback_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Suback).into() { + log::error!("Packet you are trying to decode is not SUBACK packet!"); + return; + } + self.packet_identifier = buff_reader.read_u16().unwrap(); + self.decode_properties(buff_reader); + self.read_reason_codes(buff_reader); + } +} + +impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> + for SubackPacket<'a, MAX_REASONS, MAX_PROPERTIES> +{ + fn encode(&mut self, buffer: &mut [u8]) -> usize { + log::error!("SUBACK packet does not support encoding!"); + return 0; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_suback_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/subscription_packet.rs b/src/packet/subscription_packet.rs new file mode 100644 index 0000000..096f8ac --- /dev/null +++ b/src/packet/subscription_packet.rs @@ -0,0 +1,110 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_reader::TopicFilter; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct SubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub packet_identifier: u16, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, + + // topic filter len + pub topic_filter_len: u16, + + // payload + pub topic_filters: Vec, MAX_FILTERS>, +} + +impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> + SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> +{ + pub fn new() -> Self { + let mut x = Self { + fixed_header: PacketType::Subscribe.into(), + remain_len: 0, + packet_identifier: 1, + property_len: 0, + properties: Vec::, MAX_PROPERTIES>::new(), + topic_filter_len: 1, + topic_filters: Vec::, MAX_FILTERS>::new(), + }; + let mut p = TopicFilter::new(); + p.filter.len = 6; + p.filter.string = "test/#"; + x.topic_filters.push(p); + return x; + } +} + +impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> + for SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> +{ + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + + let mut lt = 0; + let mut filters_len = 0; + loop { + filters_len = filters_len + self.topic_filters.get(lt).unwrap().filter.len + 3; + lt = lt + 1; + if lt == self.topic_filter_len as usize { + break; + } + } + rm_ln = rm_ln + property_len_len as u32 + 2 + filters_len as u32; + + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(rm_ln); + buff_writer.write_u16(self.packet_identifier); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + buff_writer.encode_topic_filters_ref( + false, + self.topic_filter_len as usize, + &self.topic_filters, + ); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + log::error!("Subscribe packet does not support decode funtion on client!"); + } + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/unsuback_packet.rs b/src/packet/unsuback_packet.rs new file mode 100644 index 0000000..b1c857a --- /dev/null +++ b/src/packet/unsuback_packet.rs @@ -0,0 +1,81 @@ +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct UnsubackPacket<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub packet_identifier: u16, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, + + pub reason_codes: Vec, +} + +impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> + UnsubackPacket<'a, MAX_REASONS, MAX_PROPERTIES> +{ + pub fn read_reason_codes(&mut self, buff_reader: &mut BuffReader<'a>) { + let mut i = 0; + loop { + self.reason_codes.push(buff_reader.read_u8().unwrap()); + i = i + 1; + if i == MAX_REASONS { + break; + } + } + } + + pub fn decode_suback_packet(&mut self, buff_reader: &mut BuffReader<'a>) { + if self.decode_fixed_header(buff_reader) != (PacketType::Suback).into() { + log::error!("Packet you are trying to decode is not UNSUBACK packet!"); + return; + } + self.packet_identifier = buff_reader.read_u16().unwrap(); + self.decode_properties(buff_reader); + self.read_reason_codes(buff_reader); + } +} + +impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> + for UnsubackPacket<'a, MAX_REASONS, MAX_PROPERTIES> +{ + fn encode(&mut self, buffer: &mut [u8]) -> usize { + log::error!("UNSUBACK packet does not support encoding!"); + return 0; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + self.decode_suback_packet(buff_reader); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/packet/unsubscription_packet.rs b/src/packet/unsubscription_packet.rs new file mode 100644 index 0000000..1464eec --- /dev/null +++ b/src/packet/unsubscription_packet.rs @@ -0,0 +1,89 @@ +use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; +use heapless::Vec; + +use crate::packet::mqtt_packet::Packet; +use crate::utils::buffer_reader::BuffReader; +use crate::utils::buffer_reader::TopicFilter; +use crate::utils::buffer_writer::BuffWriter; + +use super::packet_type::PacketType; +use super::property::Property; + +pub struct UnsubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> { + // 7 - 4 mqtt control packet type, 3-0 flagy + pub fixed_header: u8, + // 1 - 4 B lenght of variable header + len of payload + pub remain_len: u32, + + pub packet_identifier: u16, + + pub property_len: u32, + + // properties + pub properties: Vec, MAX_PROPERTIES>, + + // topic filter len + pub topic_filter_len: u16, + + // payload + pub topic_filters: Vec, MAX_FILTERS>, +} + +impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> + UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> +{ + /*pub fn new() -> Self { + + }*/ +} + +impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> + for UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> +{ + fn encode(&mut self, buffer: &mut [u8]) -> usize { + let mut buff_writer = BuffWriter::new(buffer); + + let mut rm_ln = self.property_len; + let property_len_enc: [u8; 4] = + VariableByteIntegerEncoder::encode(self.property_len).unwrap(); + let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); + rm_ln = rm_ln + property_len_len as u32 + 4 + self.topic_filter_len as u32; + + buff_writer.write_u8(self.fixed_header); + buff_writer.write_variable_byte_int(rm_ln); + buff_writer.write_u16(self.packet_identifier); + buff_writer.write_variable_byte_int(self.property_len); + buff_writer.encode_properties::(&self.properties); + buff_writer.write_u16(self.topic_filter_len); + buff_writer.encode_topic_filters_ref( + false, + self.topic_filter_len as usize, + &self.topic_filters, + ); + return buff_writer.position; + } + + fn decode(&mut self, buff_reader: &mut BuffReader<'a>) { + log::error!("Unsubscribe packet does not support decode funtion on client!"); + } + + fn set_property_len(&mut self, value: u32) { + self.property_len = value; + } + + fn get_property_len(&mut self) -> u32 { + return self.property_len; + } + + fn push_to_properties(&mut self, property: Property<'a>) { + self.properties.push(property); + } + + fn set_fixed_header(&mut self, header: u8) { + self.fixed_header = header; + } + + fn set_remaining_len(&mut self, remaining_len: u32) { + self.remain_len = remaining_len; + } +} diff --git a/src/utils/buffer_reader.rs b/src/utils/buffer_reader.rs index 92e33ac..0b8c272 100644 --- a/src/utils/buffer_reader.rs +++ b/src/utils/buffer_reader.rs @@ -1,14 +1,19 @@ -use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder; -use core::str; use core::mem; +use core::str; + +use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder; #[derive(Debug)] pub struct EncodedString<'a> { pub string: &'a str, - pub len: u16 + pub len: u16, } impl EncodedString<'_> { + pub fn new() -> Self { + Self { string: "", len: 0 } + } + pub fn len(&self) -> u16 { return self.len + 2; } @@ -17,10 +22,14 @@ impl EncodedString<'_> { #[derive(Debug)] pub struct BinaryData<'a> { pub bin: &'a [u8], - pub len: u16 + pub len: u16, } impl BinaryData<'_> { + pub fn new() -> Self { + Self { bin: &[0], len: 0 } + } + pub fn len(&self) -> u16 { return self.len + 2; } @@ -29,7 +38,7 @@ impl BinaryData<'_> { #[derive(Debug)] pub struct StringPair<'a> { pub name: EncodedString<'a>, - pub value: EncodedString<'a> + pub value: EncodedString<'a>, } impl StringPair<'_> { @@ -39,15 +48,33 @@ impl StringPair<'_> { } } -#[derive(core::fmt::Debug)] -#[derive(Clone)] +#[derive(Debug)] +pub struct TopicFilter<'a> { + pub filter: EncodedString<'a>, + pub sub_options: u8, +} + +impl TopicFilter<'_> { + pub fn new() -> Self { + Self { + filter: EncodedString::new(), + sub_options: 0, + } + } + + pub fn len(&self) -> u16 { + return self.filter.len + 3; + } +} + +#[derive(core::fmt::Debug, Clone)] pub enum ParseError { Utf8Error, IndexOutOfBounce, VariableByteIntegerError, IdNotFound, EncodingError, - DecodingError + DecodingError, } pub struct BuffReader<'a> { @@ -56,90 +83,112 @@ pub struct BuffReader<'a> { } impl<'a> BuffReader<'a> { - pub fn incrementPosition(& mut self, increment: usize) { + pub fn increment_position(&mut self, increment: usize) { self.position = self.position + increment; } pub fn new(buffer: &'a [u8]) -> Self { - return BuffReader { buffer: buffer, position: 0 }; + return BuffReader { + buffer: buffer, + position: 0, + }; } - pub fn readVariableByteInt(& mut self) -> Result { - let variable_byte_integer: [u8; 4] = [self.buffer[self.position], self.buffer[self.position + 1], self.buffer[self.position + 2], self.buffer[self.position + 3]]; + pub fn read_variable_byte_int(&mut self) -> Result { + let variable_byte_integer: [u8; 4] = [ + self.buffer[self.position], + self.buffer[self.position + 1], + self.buffer[self.position + 2], + self.buffer[self.position + 3], + ]; let mut len: usize = 1; - if variable_byte_integer[0] & 0x80 == 1 { - len = len + 1; - if variable_byte_integer[1] & 0x80 == 1 { - len = len + 1; - if variable_byte_integer[2] & 0x80 == 1 { - len = len + 1; + if variable_byte_integer[0] & 0x80 == 1 { + len = len + 1; + if variable_byte_integer[1] & 0x80 == 1 { + len = len + 1; + if variable_byte_integer[2] & 0x80 == 1 { + len = len + 1; } } } - self.incrementPosition(len); + self.increment_position(len); return VariableByteIntegerDecoder::decode(variable_byte_integer); } - - pub fn readU32(& mut self) -> Result { + + pub fn read_u32(&mut self) -> Result { let (int_bytes, rest) = self.buffer[self.position..].split_at(mem::size_of::()); let ret: u32 = u32::from_be_bytes(int_bytes.try_into().unwrap()); //let ret: u32 = (((self.buffer[self.position] as u32) << 24) | ((self.buffer[self.position + 1] as u32) << 16) | ((self.buffer[self.position + 2] as u32) << 8) | (self.buffer[self.position + 3] as u32)) as u32; - self.incrementPosition(4); + self.increment_position(4); return Ok(ret); } - - pub fn readU16(& mut self) -> Result { + + pub fn read_u16(&mut self) -> Result { let (int_bytes, rest) = self.buffer[self.position..].split_at(mem::size_of::()); - let ret: u16 = u16::from_be_bytes(int_bytes.try_into().unwrap()); + let ret: u16 = u16::from_be_bytes(int_bytes.try_into().unwrap()); //(((self.buffer[self.position] as u16) << 8) | (self.buffer[self.position + 1] as u16)) as u16; - self.incrementPosition(2); + self.increment_position(2); return Ok(ret); } - - pub fn readU8(& mut self) -> Result { + + pub fn read_u8(&mut self) -> Result { let ret: u8 = self.buffer[self.position]; - self.incrementPosition(1); + self.increment_position(1); return Ok(ret); } - - pub fn readString(& mut self) -> Result, ParseError> { - let len = self.readU16(); + + pub fn read_string(&mut self) -> Result, ParseError> { + let len = self.read_u16(); match len { Err(err) => return Err(err), - _ => log::debug!("[parseString] let not parsed") + _ => log::debug!("[parseString] let not parsed"), } let len_res = len.unwrap(); - let res_str = str::from_utf8(&(self.buffer[self.position..(self.position + len_res as usize)])); + let res_str = + str::from_utf8(&(self.buffer[self.position..(self.position + len_res as usize)])); if res_str.is_err() { log::error!("Could not parse utf-8 string"); return Err(ParseError::Utf8Error); } - return Ok(EncodedString { string: res_str.unwrap(), len: len_res }); + return Ok(EncodedString { + string: res_str.unwrap(), + len: len_res, + }); } - + //TODO: Index out of bounce err !!!!! - pub fn readBinary(& mut self) -> Result, ParseError> { - let len = self.readU16(); + pub fn read_binary(&mut self) -> Result, ParseError> { + let len = self.read_u16(); match len { Err(err) => return Err(err), - _ => log::debug!("[parseBinary] let not parsed") + _ => log::debug!("[parseBinary] let not parsed"), } let len_res = len.unwrap(); let res_bin = &(self.buffer[self.position..(self.position + len_res as usize)]); - return Ok(BinaryData { bin: res_bin, len: len_res }); + return Ok(BinaryData { + bin: res_bin, + len: len_res, + }); } - - pub fn readStringPair(& mut self) -> Result, ParseError> { - let name = self.readString(); + + pub fn read_string_pair(&mut self) -> Result, ParseError> { + let name = self.read_string(); match name { Err(err) => return Err(err), - _ => log::debug!("[String pair] name not parsed") + _ => log::debug!("[String pair] name not parsed"), } - let value = self.readString(); + let value = self.read_string(); match value { Err(err) => return Err(err), - _ => log::debug!("[String pair] value not parsed") + _ => log::debug!("[String pair] value not parsed"), } - return Ok(StringPair { name: name.unwrap(), value: value.unwrap() }); + return Ok(StringPair { + name: name.unwrap(), + value: value.unwrap(), + }); } -} \ No newline at end of file + + pub fn read_message(&mut self) -> &'a [u8] { + return &self.buffer[self.position..]; + } +} diff --git a/src/utils/buffer_writer.rs b/src/utils/buffer_writer.rs new file mode 100644 index 0000000..bb6f974 --- /dev/null +++ b/src/utils/buffer_writer.rs @@ -0,0 +1,135 @@ +use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerEncoder}; +use crate::packet::property::Property; +use crate::utils::buffer_reader::{BinaryData, EncodedString, StringPair, TopicFilter}; +use core::str; +use heapless::Vec; + +pub struct BuffWriter<'a> { + buffer: &'a mut [u8], + pub position: usize, +} + +impl<'a> BuffWriter<'a> { + pub fn insert_ref(&mut self, len: usize, array: &[u8]) { + let mut x: usize = 0; + if len != 0 { + loop { + self.buffer[self.position] = array[x]; + self.increment_position(1); + x = x + 1; + if x == len { + break; + } + } + } + } + + pub fn new(buffer: &'a mut [u8]) -> Self { + return BuffWriter { + buffer, + position: 0, + }; + } + + fn increment_position(&mut self, increment: usize) { + self.position = self.position + increment; + } + + pub fn write_u8(&mut self, byte: u8) { + self.buffer[self.position] = byte; + self.increment_position(1); + } + + pub fn write_u16(&mut self, two_bytes: u16) { + let bytes: [u8; 2] = two_bytes.to_be_bytes(); + self.insert_ref(2, &bytes); + } + + pub fn write_u32(&mut self, four_bytes: u32) { + let bytes: [u8; 4] = four_bytes.to_be_bytes(); + self.insert_ref(4,&bytes); + } + + pub fn write_string_ref(&mut self, str: &EncodedString<'a>) { + self.write_u16(str.len); + let bytes = str.string.as_bytes(); + self.insert_ref(str.len as usize, bytes); + } + + pub fn write_string(&mut self, str: EncodedString<'a>) { + self.write_u16(str.len); + let bytes = str.string.as_bytes(); + self.insert_ref(str.len as usize, bytes); + } + + pub fn write_binary_ref(&mut self, bin: &BinaryData<'a>) { + self.write_u16(bin.len); + self.insert_ref(bin.len as usize, bin.bin); + } + + pub fn write_binary(&mut self, bin: BinaryData<'a>) { + self.write_u16(bin.len); + self.insert_ref(bin.len as usize, bin.bin); + } + + pub fn write_string_pair_ref(&mut self, str_pair: &StringPair<'a>) { + self.write_string_ref(&str_pair.name); + self.write_string_ref(&str_pair.value); + } + + pub fn write_string_pair(&mut self, str_pair: StringPair<'a>) { + self.write_string(str_pair.name); + self.write_string(str_pair.value); + } + + pub fn write_variable_byte_int(&mut self, int: u32) { + let x: VariableByteInteger = VariableByteIntegerEncoder::encode(int).unwrap(); + let len = VariableByteIntegerEncoder::len(x); + self.insert_ref(len, &x); + } + + pub fn encode_property(&mut self, property: &Property<'a>) { + let x: u8 = property.into(); + self.write_u8(x); + property.encode(self); + } + + pub fn encode_properties(&mut self, properties: &Vec, LEN>) { + let mut i = 0; + let len = properties.len(); + if len != 0 { + loop { + let prop: &Property = properties.get(i).unwrap(); + self.encode_property(prop); + i = i + 1; + if i == len { + break; + } + } + } + } + + fn encode_topic_filter_ref(&mut self, sub: bool, topic_filter: &TopicFilter<'a>) { + self.write_string_ref(&topic_filter.filter); + if sub { + self.write_u8(topic_filter.sub_options) + } + } + + pub fn encode_topic_filters_ref( + &mut self, + sub: bool, + len: usize, + filters: &Vec, MAX>, + ) { + let mut i = 0; + loop { + let topic_filter: &TopicFilter<'a> = filters.get(i).unwrap(); + self.encode_topic_filter_ref(sub, topic_filter); + i = i + 1; + if i == len { + break; + } + } + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index fbf6310..7f6b1dc 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1 +1,2 @@ -pub mod buffer_reader; \ No newline at end of file +pub mod buffer_reader; +pub mod buffer_writer;