diff --git a/src/encoding/variable_byte_integer.rs b/src/encoding/variable_byte_integer.rs index 1c9be49..6ed1b35 100644 --- a/src/encoding/variable_byte_integer.rs +++ b/src/encoding/variable_byte_integer.rs @@ -1,4 +1,6 @@ #![crate_name = "doc"] + +use crate::utils::buffer_reader::ParseError; /// VariableByteIntegerEncoder and VariableByteIntegerDecoder are implemented based on /// pseudo code which is introduced in MQTT version 5.0 OASIS standard accesible from /// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901107 @@ -9,12 +11,6 @@ pub struct VariableByteIntegerEncoder; /// Variable byte integers error enumeration is used by both encoder and decoder for /// error notification. -#[derive(core::fmt::Debug)] -#[derive(Clone)] -pub enum VariableByteIntegerError { - EncodingError, - DecodingError -} pub type VariableByteInteger = [u8; 4]; @@ -23,13 +19,13 @@ impl VariableByteIntegerEncoder { /// this integer into maximal 4 Bytes. MSb of each Byte is controll bit. /// This bit is saying if there is continuing Byte in stream or not, this way /// we can effectively use 1 to 4 Bytes based in integer len. - pub fn encode(mut target: u32) -> Result { + pub fn encode(mut target: u32) -> Result { // General known informations from OASIS const MAX_ENCODABLE: u32 = 268435455; const MOD: u32 = 128; if target > MAX_ENCODABLE { log::error!("Maximal value of integer for encoding was exceeded"); - return Err(VariableByteIntegerError::EncodingError); + return Err(ParseError::EncodingError); } let mut res: [u8; 4] = [0; 4]; @@ -61,7 +57,7 @@ impl VariableByteIntegerDecoder { /// Decode function takes as paramater encoded integer represented /// as array of 4 unsigned numbers of exactly 1 Byte each -> 4 Bytes maximal /// same as maximal amount of bytes for variable byte encoding in MQTT. - pub fn decode(encoded: VariableByteInteger) -> Result { + pub fn decode(encoded: VariableByteInteger) -> Result { let mut multiplier: u32 = 1; let mut ret: u32 = 0; @@ -73,7 +69,7 @@ impl VariableByteIntegerDecoder { i = i + 1; ret = ret + ((encoded_byte & 127) as u32 * multiplier) as u32; if multiplier > 128 * 128 * 128 { - return Err(VariableByteIntegerError::DecodingError); + return Err(ParseError::DecodingError); } multiplier = multiplier * 128; if (encoded_byte & 128) == 0 { diff --git a/src/main.rs b/src/main.rs index 2002ec8..535d88f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,10 @@ use rust_mqtt::packet::packet_builder::PacketBuilder; use rust_mqtt::encoding::variable_byte_integer::VariableByteIntegerEncoder; use rust_mqtt::encoding::variable_byte_integer::VariableByteIntegerDecoder; use rust_mqtt::packet::property::*; +use rust_mqtt::utils::buffer_reader::BuffReader; use heapless::Vec; +use std::fs::File; +use std::io::Read; fn main() { env_logger::builder() @@ -12,33 +15,25 @@ fn main() { .format_timestamp_nanos() .init(); - let l: u8 = 1; - let y: u32 = 2; - let z: u16 = 3; - let p: u32 = 4; + let fl = File::open("/Users/obabec/development/school/rust-mqtt/mqtt_control_example.bin"); + + let mut f = File::open("/Users/obabec/development/school/rust-mqtt/mqtt_control_example.bin").expect("no file found"); + let mut buffer: [u8; 500] = [0; 500]; + f.read(&mut buffer).expect("buffer overflow"); + let mut txt = Vec::new(); let mut payld = *b"xxxxx"; let packet = Packet::clean(txt, &mut payld); let mut packet_builder = PacketBuilder::new(packet); - packet_builder.addPacketType(PacketType::Publish); + let mut buffer_reader = BuffReader::new(&buffer); + packet_builder.decode_packet(& mut buffer_reader); - /*let s: str = "AAAAAA"; - test(&s);*/ - - let f = PacketType::from(0xA0); - let o: u8 = f.into(); - - let r = match VariableByteIntegerEncoder::encode(179) { - Ok(r) => r, - Err(_e) => [0; 4], - }; - log::info!("{:02X?}", r); - let d = VariableByteIntegerDecoder::decode(r); - log::info!("Enum val: {}", o); - log::info!("Hello world"); + let bytes: [u8; 4] = packet_builder.currentPacket.protocol_name.to_be_bytes(); + let prot = std::str::from_utf8(&bytes).unwrap(); + log::info!("Protocol name: {}", prot) } /*fn test(tst: &str) { diff --git a/src/packet/packet_builder.rs b/src/packet/packet_builder.rs index 5020ce7..66d568e 100644 --- a/src/packet/packet_builder.rs +++ b/src/packet/packet_builder.rs @@ -8,7 +8,7 @@ use crate::utils::buffer_reader::*; // metody packet buildery budou prijimat jako parametr buff reader, z ktereho bude postupne parsovat pub struct PacketBuilder<'a> { - currentPacket: Packet<'a>, + pub currentPacket: Packet<'a>, } impl<'a> PacketBuilder<'a> { @@ -58,6 +58,8 @@ impl<'a> PacketBuilder<'a> { pub fn decode_packet(& mut self, buff_reader: & mut BuffReader<'a>) { self.decodeFixedHeader(buff_reader); + let y: u8 = self.currentPacket.fixed_header & 0xF0; + let z: u8 = (PacketType::Connect).into(); if self.currentPacket.fixed_header & 0xF0 == (PacketType::Connect).into() { self.decodeControllPacket(buff_reader); } @@ -70,29 +72,21 @@ impl<'a> PacketBuilder<'a> { return PacketType::from(self.currentPacket.fixed_header); } - pub fn decodeControllPacket(& mut self, buff_reader: & mut BuffReader<'a>) { - self.currentPacket.packet_identifier = 0; - self.currentPacket.protocol_name_len = buff_reader.readU16().unwrap(); - self.currentPacket.protocol_name = buff_reader.readU32().unwrap(); - self.currentPacket.protocol_version = buff_reader.readU8().unwrap(); - self.currentPacket.connect_flags = buff_reader.readU8().unwrap(); - self.currentPacket.keep_alive = buff_reader.readU16().unwrap(); + pub fn decode_properties(& mut self, buff_reader: & mut BuffReader<'a>) { self.currentPacket.property_len = buff_reader.readVariableByteInt().unwrap(); let mut x: u32 = 0; - let mut prop: Result; - let mut res: Property; + let mut prop: Result; loop { + let mut res: Property; prop = Property::decode(buff_reader); if let Ok(res) = prop { + log::info!("Parsed property {:?}", res); x = x + res.len() as u32 + 1; self.currentPacket.properties.push(res); - } - - /*if prop.is_ok() { - } else { - log::error!("Decoding property did not went well!"); - }*/ + // error handlo + log::error!("Problem during property decoding"); + } if x == self.currentPacket.property_len { @@ -100,4 +94,14 @@ impl<'a> PacketBuilder<'a> { } } } + + pub fn decodeControllPacket(& mut self, buff_reader: & mut BuffReader<'a>) { + self.currentPacket.packet_identifier = 0; + self.currentPacket.protocol_name_len = buff_reader.readU16().unwrap(); + self.currentPacket.protocol_name = buff_reader.readU32().unwrap(); + self.currentPacket.protocol_version = buff_reader.readU8().unwrap(); + self.currentPacket.connect_flags = buff_reader.readU8().unwrap(); + self.currentPacket.keep_alive = buff_reader.readU16().unwrap(); + self.decode_properties(buff_reader); + } } \ No newline at end of file diff --git a/src/packet/packet_type.rs b/src/packet/packet_type.rs index df9fb9d..3598209 100644 --- a/src/packet/packet_type.rs +++ b/src/packet/packet_type.rs @@ -48,9 +48,8 @@ impl From for PacketType { impl Into for PacketType { fn into(self) -> u8 { match self { - PacketType::Connect => return 0x00, - PacketType::Connack => return 0x10, - PacketType::Reserved => return 0x20, + PacketType::Connect => return 0x10, + PacketType::Connack => return 0x20, PacketType::Publish => return 0x30, PacketType::Puback => return 0x40, PacketType::Pubrec => return 0x50, diff --git a/src/packet/property.rs b/src/packet/property.rs index e8583d0..059315d 100644 --- a/src/packet/property.rs +++ b/src/packet/property.rs @@ -1,66 +1,65 @@ -use crate::utils::buffer_reader::ProperyParseError; +use crate::utils::buffer_reader::ParseError; use crate::utils::buffer_reader::StringPair; use crate::utils::buffer_reader::EncodedString; use crate::utils::buffer_reader::BinaryData; -use crate::encoding::variable_byte_integer::VariableByteIntegerError; use crate::utils::buffer_reader::BuffReader; -#[derive(Clone)] +#[derive(Debug)] pub enum Property<'a> { - PayloadFormat(Result), - MessageExpiryInterval(Result), - ContentType(Result, ProperyParseError>), - ResponseTopic(Result, ProperyParseError>), - CorrelationData(Result, ProperyParseError>), - SubscriptionIdentifier(Result), - SessionExpiryInterval(Result), - AssignedClientIdentifier(Result, ProperyParseError>), - ServerKeepAlive(Result), - AuthenticationMethod(Result, ProperyParseError>), - AuthenticationData(Result, ProperyParseError>), - RequestProblemInformation(Result), - WillDelayInterval(Result), - RequestResponseInformation(Result), - ResponseInformation(Result, ProperyParseError>), - ServerReference(Result, ProperyParseError>), - ReasonString(Result, ProperyParseError>), - ReceiveMaximum(Result), - TopicAliasMaximum(Result), - TopicAlias(Result), - MaximumQoS(Result), - RetainAvailable(Result), - UserProperty(Result, ProperyParseError>), - MaximumPacketSize(Result), - WildcardSubscriptionAvailable(Result), - SubscriptionIdentifierAvailable(Result), - SharedSubscriptionAvailable(Result) + PayloadFormat(u8), + MessageExpiryInterval(u32), + ContentType(EncodedString<'a>), + ResponseTopic(EncodedString<'a>), + CorrelationData(BinaryData<'a>), + SubscriptionIdentifier(u32), + SessionExpiryInterval(u32), + AssignedClientIdentifier(EncodedString<'a>), + ServerKeepAlive(u16), + AuthenticationMethod(EncodedString<'a>), + AuthenticationData(BinaryData<'a>), + RequestProblemInformation(u8), + WillDelayInterval(u32), + RequestResponseInformation(u8), + ResponseInformation(EncodedString<'a>), + ServerReference(EncodedString<'a>), + ReasonString(EncodedString<'a>), + ReceiveMaximum(u16), + TopicAliasMaximum(u16), + TopicAlias(u16), + MaximumQoS(u8), + RetainAvailable(u8), + UserProperty(StringPair<'a>), + MaximumPacketSize(u32), + WildcardSubscriptionAvailable(u8), + SubscriptionIdentifierAvailable(u8), + SharedSubscriptionAvailable(u8) } impl<'a> Property<'a> { - pub fn len(self) -> u16 { + pub fn len(&self) -> u16 { match self { Property::PayloadFormat(u) => return 1, Property::MessageExpiryInterval(u) => return 4, - Property::ContentType(u) => return u.unwrap().len(), - Property::ResponseTopic(u) => return u.unwrap().len(), - Property::CorrelationData(u) => return u.unwrap().len(), + Property::ContentType(u) => return u.len(), + Property::ResponseTopic(u) => return u.len(), + Property::CorrelationData(u) => return u.len(), Property::SubscriptionIdentifier(u) => return 4, - Property::AssignedClientIdentifier(u) => return u.unwrap().len(), + Property::AssignedClientIdentifier(u) => return u.len(), Property::ServerKeepAlive(u) => return 2, - Property::AuthenticationMethod(u) => return u.unwrap().len(), - Property::AuthenticationData(u) => return u.unwrap().len(), + Property::AuthenticationMethod(u) => return u.len(), + Property::AuthenticationData(u) => return u.len(), Property::RequestProblemInformation(u) => return 1, Property::WillDelayInterval(u) => return 4, Property::RequestResponseInformation(u) => return 1, - Property::ResponseInformation(u) => return u.unwrap().len(), - Property::ServerReference(u) => return u.unwrap().len(), - Property::ReasonString(u) => return u.unwrap().len(), + Property::ResponseInformation(u) => return u.len(), + Property::ServerReference(u) => return u.len(), + Property::ReasonString(u) => return u.len(), Property::ReceiveMaximum(u) => return 2, Property::TopicAliasMaximum(u) => return 2, Property::TopicAlias(u) => return 2, Property::MaximumQoS(u) => return 1, Property::RetainAvailable(u) => return 1, - Property::UserProperty(u) => return u.unwrap().len(), + Property::UserProperty(u) => return u.len(), Property::MaximumPacketSize(u) => return 4, Property::WildcardSubscriptionAvailable(u) => return 1, Property::SubscriptionIdentifierAvailable(u) => return 1, @@ -69,37 +68,37 @@ impl<'a> Property<'a> { } } - pub fn decode(buff_reader: & mut BuffReader<'a>) -> Result, ProperyParseError> { + pub fn decode(buff_reader: & mut BuffReader<'a>) -> Result, ParseError> { let propertyIdentifier = buff_reader.readU8(); match propertyIdentifier { - Ok(0x01) => return Ok(Property::PayloadFormat(buff_reader.readU8())), - Ok(0x02) => return Ok(Property::MessageExpiryInterval(buff_reader.readU32())), - Ok(0x03) => return Ok(Property::ContentType(buff_reader.readString())), - Ok(0x08) => return Ok(Property::ResponseTopic(buff_reader.readString())), - Ok(0x09) => return Ok(Property::CorrelationData(buff_reader.readBinary())), - Ok(0x0B) => return Ok(Property::SubscriptionIdentifier(buff_reader.readVariableByteInt())), - Ok(0x11) => return Ok(Property::SessionExpiryInterval(buff_reader.readU32())), - Ok(0x12) => return Ok(Property::AssignedClientIdentifier(buff_reader.readString())), - Ok(0x13) => return Ok(Property::ServerKeepAlive(buff_reader.readU16())), - Ok(0x15) => return Ok(Property::AuthenticationMethod(buff_reader.readString())), - Ok(0x16) => return Ok(Property::AuthenticationData(buff_reader.readBinary())), - Ok(0x17) => return Ok(Property::RequestProblemInformation(buff_reader.readU8())), - Ok(0x18) => return Ok(Property::WillDelayInterval(buff_reader.readU32())), - Ok(0x19) => return Ok(Property::RequestResponseInformation(buff_reader.readU8())), - Ok(0x1A) => return Ok(Property::ResponseInformation(buff_reader.readString())), - Ok(0x1C) => return Ok(Property::ServerReference(buff_reader.readString())), - Ok(0x1F) => return Ok(Property::ReasonString(buff_reader.readString())), - Ok(0x21) => return Ok(Property::ReceiveMaximum(buff_reader.readU16())), - Ok(0x22) => return Ok(Property::TopicAliasMaximum(buff_reader.readU16())), - Ok(0x23) => return Ok(Property::TopicAlias(buff_reader.readU16())), - Ok(0x24) => return Ok(Property::MaximumQoS(buff_reader.readU8())), - Ok(0x25) => return Ok(Property::RetainAvailable(buff_reader.readU8())), - Ok(0x26) => return Ok(Property::UserProperty(buff_reader.readStringPair())), - Ok(0x28) => return Ok(Property::WildcardSubscriptionAvailable(buff_reader.readU8())), - Ok(0x29) => return Ok(Property::SubscriptionIdentifierAvailable(buff_reader.readU8())), - Ok(0x2A) => return Ok(Property::SharedSubscriptionAvailable(buff_reader.readU8())), + Ok(0x01) => return Ok(Property::PayloadFormat(buff_reader.readU8()?)), + Ok(0x02) => return Ok(Property::MessageExpiryInterval(buff_reader.readU32()?)), + Ok(0x03) => return Ok(Property::ContentType(buff_reader.readString()?)), + Ok(0x08) => return Ok(Property::ResponseTopic(buff_reader.readString()?)), + Ok(0x09) => return Ok(Property::CorrelationData(buff_reader.readBinary()?)), + Ok(0x0B) => return Ok(Property::SubscriptionIdentifier(buff_reader.readVariableByteInt()?)), + Ok(0x11) => return Ok(Property::SessionExpiryInterval(buff_reader.readU32()?)), + Ok(0x12) => return Ok(Property::AssignedClientIdentifier(buff_reader.readString()?)), + Ok(0x13) => return Ok(Property::ServerKeepAlive(buff_reader.readU16()?)), + Ok(0x15) => return Ok(Property::AuthenticationMethod(buff_reader.readString()?)), + Ok(0x16) => return Ok(Property::AuthenticationData(buff_reader.readBinary()?)), + Ok(0x17) => return Ok(Property::RequestProblemInformation(buff_reader.readU8()?)), + Ok(0x18) => return Ok(Property::WillDelayInterval(buff_reader.readU32()?)), + Ok(0x19) => return Ok(Property::RequestResponseInformation(buff_reader.readU8()?)), + Ok(0x1A) => return Ok(Property::ResponseInformation(buff_reader.readString()?)), + Ok(0x1C) => return Ok(Property::ServerReference(buff_reader.readString()?)), + Ok(0x1F) => return Ok(Property::ReasonString(buff_reader.readString()?)), + Ok(0x21) => return Ok(Property::ReceiveMaximum(buff_reader.readU16()?)), + Ok(0x22) => return Ok(Property::TopicAliasMaximum(buff_reader.readU16()?)), + Ok(0x23) => return Ok(Property::TopicAlias(buff_reader.readU16()?)), + Ok(0x24) => return Ok(Property::MaximumQoS(buff_reader.readU8()?)), + Ok(0x25) => return Ok(Property::RetainAvailable(buff_reader.readU8()?)), + Ok(0x26) => return Ok(Property::UserProperty(buff_reader.readStringPair()?)), + Ok(0x28) => return Ok(Property::WildcardSubscriptionAvailable(buff_reader.readU8()?)), + Ok(0x29) => return Ok(Property::SubscriptionIdentifierAvailable(buff_reader.readU8()?)), + Ok(0x2A) => return Ok(Property::SharedSubscriptionAvailable(buff_reader.readU8()?)), Err(err) => return Err(err), - _ => return Err(ProperyParseError::IdNotFound) + _ => return Err(ParseError::IdNotFound) } } } \ No newline at end of file diff --git a/src/utils/buffer_reader.rs b/src/utils/buffer_reader.rs index 8a92c64..92e33ac 100644 --- a/src/utils/buffer_reader.rs +++ b/src/utils/buffer_reader.rs @@ -1,9 +1,8 @@ use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder; -use crate::encoding::variable_byte_integer::VariableByteIntegerError; use core::str; use core::mem; -#[derive(Clone)] +#[derive(Debug)] pub struct EncodedString<'a> { pub string: &'a str, pub len: u16 @@ -15,7 +14,7 @@ impl EncodedString<'_> { } } -#[derive(Clone)] +#[derive(Debug)] pub struct BinaryData<'a> { pub bin: &'a [u8], pub len: u16 @@ -27,7 +26,7 @@ impl BinaryData<'_> { } } -#[derive(Clone)] +#[derive(Debug)] pub struct StringPair<'a> { pub name: EncodedString<'a>, pub value: EncodedString<'a> @@ -42,11 +41,13 @@ impl StringPair<'_> { #[derive(core::fmt::Debug)] #[derive(Clone)] -pub enum ProperyParseError { +pub enum ParseError { Utf8Error, IndexOutOfBounce, VariableByteIntegerError, - IdNotFound + IdNotFound, + EncodingError, + DecodingError } pub struct BuffReader<'a> { @@ -63,35 +64,45 @@ impl<'a> BuffReader<'a> { return BuffReader { buffer: buffer, position: 0 }; } - pub fn readVariableByteInt(& mut self) -> Result { + pub fn readVariableByteInt(& mut self) -> Result { let variable_byte_integer: [u8; 4] = [self.buffer[self.position], self.buffer[self.position + 1], self.buffer[self.position + 2], self.buffer[self.position + 3]]; - self.incrementPosition(4); + let mut len: usize = 1; + if variable_byte_integer[0] & 0x80 == 1 { + len = len + 1; + if variable_byte_integer[1] & 0x80 == 1 { + len = len + 1; + if variable_byte_integer[2] & 0x80 == 1 { + len = len + 1; + } + } + } + self.incrementPosition(len); return VariableByteIntegerDecoder::decode(variable_byte_integer); } - pub fn readU32(& mut self) -> Result { - let (int_bytes, rest) = self.buffer.split_at(mem::size_of::()); - let ret: u32 = u32::from_le_bytes(int_bytes.try_into().unwrap()); + pub fn readU32(& mut self) -> Result { + let (int_bytes, rest) = self.buffer[self.position..].split_at(mem::size_of::()); + let ret: u32 = u32::from_be_bytes(int_bytes.try_into().unwrap()); //let ret: u32 = (((self.buffer[self.position] as u32) << 24) | ((self.buffer[self.position + 1] as u32) << 16) | ((self.buffer[self.position + 2] as u32) << 8) | (self.buffer[self.position + 3] as u32)) as u32; self.incrementPosition(4); return Ok(ret); } - pub fn readU16(& mut self) -> Result { - let (int_bytes, rest) = self.buffer.split_at(mem::size_of::()); - let ret: u16 = u16::from_le_bytes(int_bytes.try_into().unwrap()); + pub fn readU16(& mut self) -> Result { + let (int_bytes, rest) = self.buffer[self.position..].split_at(mem::size_of::()); + let ret: u16 = u16::from_be_bytes(int_bytes.try_into().unwrap()); //(((self.buffer[self.position] as u16) << 8) | (self.buffer[self.position + 1] as u16)) as u16; self.incrementPosition(2); return Ok(ret); } - pub fn readU8(& mut self) -> Result { + pub fn readU8(& mut self) -> Result { let ret: u8 = self.buffer[self.position]; self.incrementPosition(1); return Ok(ret); } - pub fn readString(& mut self) -> Result, ProperyParseError> { + pub fn readString(& mut self) -> Result, ParseError> { let len = self.readU16(); match len { Err(err) => return Err(err), @@ -101,13 +112,13 @@ impl<'a> BuffReader<'a> { let res_str = str::from_utf8(&(self.buffer[self.position..(self.position + len_res as usize)])); if res_str.is_err() { log::error!("Could not parse utf-8 string"); - return Err(ProperyParseError::Utf8Error); + return Err(ParseError::Utf8Error); } return Ok(EncodedString { string: res_str.unwrap(), len: len_res }); } //TODO: Index out of bounce err !!!!! - pub fn readBinary(& mut self) -> Result, ProperyParseError> { + pub fn readBinary(& mut self) -> Result, ParseError> { let len = self.readU16(); match len { Err(err) => return Err(err), @@ -118,7 +129,7 @@ impl<'a> BuffReader<'a> { return Ok(BinaryData { bin: res_bin, len: len_res }); } - pub fn readStringPair(& mut self) -> Result, ProperyParseError> { + pub fn readStringPair(& mut self) -> Result, ParseError> { let name = self.readString(); match name { Err(err) => return Err(err),