First packet decoded

This commit is contained in:
Ondrej Babec 2022-02-10 15:59:41 +01:00
parent b0c1d41fcc
commit 332472f64a
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
6 changed files with 140 additions and 136 deletions

View File

@ -1,4 +1,6 @@
#![crate_name = "doc"]
use crate::utils::buffer_reader::ParseError;
/// VariableByteIntegerEncoder and VariableByteIntegerDecoder are implemented based on
/// pseudo code which is introduced in MQTT version 5.0 OASIS standard accesible from
/// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901107
@ -9,12 +11,6 @@
pub struct VariableByteIntegerEncoder;
/// Variable byte integers error enumeration is used by both encoder and decoder for
/// error notification.
#[derive(core::fmt::Debug)]
#[derive(Clone)]
pub enum VariableByteIntegerError {
EncodingError,
DecodingError
}
pub type VariableByteInteger = [u8; 4];
@ -23,13 +19,13 @@ impl VariableByteIntegerEncoder {
/// this integer into maximal 4 Bytes. MSb of each Byte is controll bit.
/// This bit is saying if there is continuing Byte in stream or not, this way
/// we can effectively use 1 to 4 Bytes based in integer len.
pub fn encode(mut target: u32) -> Result<VariableByteInteger, VariableByteIntegerError> {
pub fn encode(mut target: u32) -> Result<VariableByteInteger, ParseError> {
// General known informations from OASIS
const MAX_ENCODABLE: u32 = 268435455;
const MOD: u32 = 128;
if target > MAX_ENCODABLE {
log::error!("Maximal value of integer for encoding was exceeded");
return Err(VariableByteIntegerError::EncodingError);
return Err(ParseError::EncodingError);
}
let mut res: [u8; 4] = [0; 4];
@ -61,7 +57,7 @@ impl VariableByteIntegerDecoder {
/// Decode function takes as paramater encoded integer represented
/// as array of 4 unsigned numbers of exactly 1 Byte each -> 4 Bytes maximal
/// same as maximal amount of bytes for variable byte encoding in MQTT.
pub fn decode(encoded: VariableByteInteger) -> Result<u32, VariableByteIntegerError> {
pub fn decode(encoded: VariableByteInteger) -> Result<u32, ParseError> {
let mut multiplier: u32 = 1;
let mut ret: u32 = 0;
@ -73,7 +69,7 @@ impl VariableByteIntegerDecoder {
i = i + 1;
ret = ret + ((encoded_byte & 127) as u32 * multiplier) as u32;
if multiplier > 128 * 128 * 128 {
return Err(VariableByteIntegerError::DecodingError);
return Err(ParseError::DecodingError);
}
multiplier = multiplier * 128;
if (encoded_byte & 128) == 0 {

View File

@ -4,7 +4,10 @@ use rust_mqtt::packet::packet_builder::PacketBuilder;
use rust_mqtt::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use rust_mqtt::encoding::variable_byte_integer::VariableByteIntegerDecoder;
use rust_mqtt::packet::property::*;
use rust_mqtt::utils::buffer_reader::BuffReader;
use heapless::Vec;
use std::fs::File;
use std::io::Read;
fn main() {
env_logger::builder()
@ -12,33 +15,25 @@ fn main() {
.format_timestamp_nanos()
.init();
let l: u8 = 1;
let y: u32 = 2;
let z: u16 = 3;
let p: u32 = 4;
let fl = File::open("/Users/obabec/development/school/rust-mqtt/mqtt_control_example.bin");
let mut f = File::open("/Users/obabec/development/school/rust-mqtt/mqtt_control_example.bin").expect("no file found");
let mut buffer: [u8; 500] = [0; 500];
f.read(&mut buffer).expect("buffer overflow");
let mut txt = Vec::new();
let mut payld = *b"xxxxx";
let packet = Packet::clean(txt, &mut payld);
let mut packet_builder = PacketBuilder::new(packet);
packet_builder.addPacketType(PacketType::Publish);
let mut buffer_reader = BuffReader::new(&buffer);
packet_builder.decode_packet(& mut buffer_reader);
/*let s: str = "AAAAAA";
test(&s);*/
let f = PacketType::from(0xA0);
let o: u8 = f.into();
let r = match VariableByteIntegerEncoder::encode(179) {
Ok(r) => r,
Err(_e) => [0; 4],
};
log::info!("{:02X?}", r);
let d = VariableByteIntegerDecoder::decode(r);
log::info!("Enum val: {}", o);
log::info!("Hello world");
let bytes: [u8; 4] = packet_builder.currentPacket.protocol_name.to_be_bytes();
let prot = std::str::from_utf8(&bytes).unwrap();
log::info!("Protocol name: {}", prot)
}
/*fn test(tst: &str) {

View File

@ -8,7 +8,7 @@ use crate::utils::buffer_reader::*;
// metody packet buildery budou prijimat jako parametr buff reader, z ktereho bude postupne parsovat
pub struct PacketBuilder<'a> {
currentPacket: Packet<'a>,
pub currentPacket: Packet<'a>,
}
impl<'a> PacketBuilder<'a> {
@ -58,6 +58,8 @@ impl<'a> PacketBuilder<'a> {
pub fn decode_packet(& mut self, buff_reader: & mut BuffReader<'a>) {
self.decodeFixedHeader(buff_reader);
let y: u8 = self.currentPacket.fixed_header & 0xF0;
let z: u8 = (PacketType::Connect).into();
if self.currentPacket.fixed_header & 0xF0 == (PacketType::Connect).into() {
self.decodeControllPacket(buff_reader);
}
@ -70,29 +72,21 @@ impl<'a> PacketBuilder<'a> {
return PacketType::from(self.currentPacket.fixed_header);
}
pub fn decodeControllPacket(& mut self, buff_reader: & mut BuffReader<'a>) {
self.currentPacket.packet_identifier = 0;
self.currentPacket.protocol_name_len = buff_reader.readU16().unwrap();
self.currentPacket.protocol_name = buff_reader.readU32().unwrap();
self.currentPacket.protocol_version = buff_reader.readU8().unwrap();
self.currentPacket.connect_flags = buff_reader.readU8().unwrap();
self.currentPacket.keep_alive = buff_reader.readU16().unwrap();
pub fn decode_properties(& mut self, buff_reader: & mut BuffReader<'a>) {
self.currentPacket.property_len = buff_reader.readVariableByteInt().unwrap();
let mut x: u32 = 0;
let mut prop: Result<Property, ProperyParseError>;
let mut res: Property;
let mut prop: Result<Property, ParseError>;
loop {
let mut res: Property;
prop = Property::decode(buff_reader);
if let Ok(res) = prop {
log::info!("Parsed property {:?}", res);
x = x + res.len() as u32 + 1;
self.currentPacket.properties.push(res);
}
/*if prop.is_ok() {
} else {
log::error!("Decoding property did not went well!");
}*/
// error handlo
log::error!("Problem during property decoding");
}
if x == self.currentPacket.property_len {
@ -100,4 +94,14 @@ impl<'a> PacketBuilder<'a> {
}
}
}
pub fn decodeControllPacket(& mut self, buff_reader: & mut BuffReader<'a>) {
self.currentPacket.packet_identifier = 0;
self.currentPacket.protocol_name_len = buff_reader.readU16().unwrap();
self.currentPacket.protocol_name = buff_reader.readU32().unwrap();
self.currentPacket.protocol_version = buff_reader.readU8().unwrap();
self.currentPacket.connect_flags = buff_reader.readU8().unwrap();
self.currentPacket.keep_alive = buff_reader.readU16().unwrap();
self.decode_properties(buff_reader);
}
}

View File

@ -48,9 +48,8 @@ impl From<u8> for PacketType {
impl Into<u8> for PacketType {
fn into(self) -> u8 {
match self {
PacketType::Connect => return 0x00,
PacketType::Connack => return 0x10,
PacketType::Reserved => return 0x20,
PacketType::Connect => return 0x10,
PacketType::Connack => return 0x20,
PacketType::Publish => return 0x30,
PacketType::Puback => return 0x40,
PacketType::Pubrec => return 0x50,

View File

@ -1,66 +1,65 @@
use crate::utils::buffer_reader::ProperyParseError;
use crate::utils::buffer_reader::ParseError;
use crate::utils::buffer_reader::StringPair;
use crate::utils::buffer_reader::EncodedString;
use crate::utils::buffer_reader::BinaryData;
use crate::encoding::variable_byte_integer::VariableByteIntegerError;
use crate::utils::buffer_reader::BuffReader;
#[derive(Clone)]
#[derive(Debug)]
pub enum Property<'a> {
PayloadFormat(Result<u8, ProperyParseError>),
MessageExpiryInterval(Result<u32, ProperyParseError>),
ContentType(Result<EncodedString<'a>, ProperyParseError>),
ResponseTopic(Result<EncodedString<'a>, ProperyParseError>),
CorrelationData(Result<BinaryData<'a>, ProperyParseError>),
SubscriptionIdentifier(Result<u32, VariableByteIntegerError>),
SessionExpiryInterval(Result<u32, ProperyParseError>),
AssignedClientIdentifier(Result<EncodedString<'a>, ProperyParseError>),
ServerKeepAlive(Result<u16, ProperyParseError>),
AuthenticationMethod(Result<EncodedString<'a>, ProperyParseError>),
AuthenticationData(Result<BinaryData<'a>, ProperyParseError>),
RequestProblemInformation(Result<u8, ProperyParseError>),
WillDelayInterval(Result<u32, ProperyParseError>),
RequestResponseInformation(Result<u8, ProperyParseError>),
ResponseInformation(Result<EncodedString<'a>, ProperyParseError>),
ServerReference(Result<EncodedString<'a>, ProperyParseError>),
ReasonString(Result<EncodedString<'a>, ProperyParseError>),
ReceiveMaximum(Result<u16, ProperyParseError>),
TopicAliasMaximum(Result<u16, ProperyParseError>),
TopicAlias(Result<u16, ProperyParseError>),
MaximumQoS(Result<u8, ProperyParseError>),
RetainAvailable(Result<u8, ProperyParseError>),
UserProperty(Result<StringPair<'a>, ProperyParseError>),
MaximumPacketSize(Result<u32, ProperyParseError>),
WildcardSubscriptionAvailable(Result<u8, ProperyParseError>),
SubscriptionIdentifierAvailable(Result<u8, ProperyParseError>),
SharedSubscriptionAvailable(Result<u8, ProperyParseError>)
PayloadFormat(u8),
MessageExpiryInterval(u32),
ContentType(EncodedString<'a>),
ResponseTopic(EncodedString<'a>),
CorrelationData(BinaryData<'a>),
SubscriptionIdentifier(u32),
SessionExpiryInterval(u32),
AssignedClientIdentifier(EncodedString<'a>),
ServerKeepAlive(u16),
AuthenticationMethod(EncodedString<'a>),
AuthenticationData(BinaryData<'a>),
RequestProblemInformation(u8),
WillDelayInterval(u32),
RequestResponseInformation(u8),
ResponseInformation(EncodedString<'a>),
ServerReference(EncodedString<'a>),
ReasonString(EncodedString<'a>),
ReceiveMaximum(u16),
TopicAliasMaximum(u16),
TopicAlias(u16),
MaximumQoS(u8),
RetainAvailable(u8),
UserProperty(StringPair<'a>),
MaximumPacketSize(u32),
WildcardSubscriptionAvailable(u8),
SubscriptionIdentifierAvailable(u8),
SharedSubscriptionAvailable(u8)
}
impl<'a> Property<'a> {
pub fn len(self) -> u16 {
pub fn len(&self) -> u16 {
match self {
Property::PayloadFormat(u) => return 1,
Property::MessageExpiryInterval(u) => return 4,
Property::ContentType(u) => return u.unwrap().len(),
Property::ResponseTopic(u) => return u.unwrap().len(),
Property::CorrelationData(u) => return u.unwrap().len(),
Property::ContentType(u) => return u.len(),
Property::ResponseTopic(u) => return u.len(),
Property::CorrelationData(u) => return u.len(),
Property::SubscriptionIdentifier(u) => return 4,
Property::AssignedClientIdentifier(u) => return u.unwrap().len(),
Property::AssignedClientIdentifier(u) => return u.len(),
Property::ServerKeepAlive(u) => return 2,
Property::AuthenticationMethod(u) => return u.unwrap().len(),
Property::AuthenticationData(u) => return u.unwrap().len(),
Property::AuthenticationMethod(u) => return u.len(),
Property::AuthenticationData(u) => return u.len(),
Property::RequestProblemInformation(u) => return 1,
Property::WillDelayInterval(u) => return 4,
Property::RequestResponseInformation(u) => return 1,
Property::ResponseInformation(u) => return u.unwrap().len(),
Property::ServerReference(u) => return u.unwrap().len(),
Property::ReasonString(u) => return u.unwrap().len(),
Property::ResponseInformation(u) => return u.len(),
Property::ServerReference(u) => return u.len(),
Property::ReasonString(u) => return u.len(),
Property::ReceiveMaximum(u) => return 2,
Property::TopicAliasMaximum(u) => return 2,
Property::TopicAlias(u) => return 2,
Property::MaximumQoS(u) => return 1,
Property::RetainAvailable(u) => return 1,
Property::UserProperty(u) => return u.unwrap().len(),
Property::UserProperty(u) => return u.len(),
Property::MaximumPacketSize(u) => return 4,
Property::WildcardSubscriptionAvailable(u) => return 1,
Property::SubscriptionIdentifierAvailable(u) => return 1,
@ -69,37 +68,37 @@ impl<'a> Property<'a> {
}
}
pub fn decode(buff_reader: & mut BuffReader<'a>) -> Result<Property<'a>, ProperyParseError> {
pub fn decode(buff_reader: & mut BuffReader<'a>) -> Result<Property<'a>, ParseError> {
let propertyIdentifier = buff_reader.readU8();
match propertyIdentifier {
Ok(0x01) => return Ok(Property::PayloadFormat(buff_reader.readU8())),
Ok(0x02) => return Ok(Property::MessageExpiryInterval(buff_reader.readU32())),
Ok(0x03) => return Ok(Property::ContentType(buff_reader.readString())),
Ok(0x08) => return Ok(Property::ResponseTopic(buff_reader.readString())),
Ok(0x09) => return Ok(Property::CorrelationData(buff_reader.readBinary())),
Ok(0x0B) => return Ok(Property::SubscriptionIdentifier(buff_reader.readVariableByteInt())),
Ok(0x11) => return Ok(Property::SessionExpiryInterval(buff_reader.readU32())),
Ok(0x12) => return Ok(Property::AssignedClientIdentifier(buff_reader.readString())),
Ok(0x13) => return Ok(Property::ServerKeepAlive(buff_reader.readU16())),
Ok(0x15) => return Ok(Property::AuthenticationMethod(buff_reader.readString())),
Ok(0x16) => return Ok(Property::AuthenticationData(buff_reader.readBinary())),
Ok(0x17) => return Ok(Property::RequestProblemInformation(buff_reader.readU8())),
Ok(0x18) => return Ok(Property::WillDelayInterval(buff_reader.readU32())),
Ok(0x19) => return Ok(Property::RequestResponseInformation(buff_reader.readU8())),
Ok(0x1A) => return Ok(Property::ResponseInformation(buff_reader.readString())),
Ok(0x1C) => return Ok(Property::ServerReference(buff_reader.readString())),
Ok(0x1F) => return Ok(Property::ReasonString(buff_reader.readString())),
Ok(0x21) => return Ok(Property::ReceiveMaximum(buff_reader.readU16())),
Ok(0x22) => return Ok(Property::TopicAliasMaximum(buff_reader.readU16())),
Ok(0x23) => return Ok(Property::TopicAlias(buff_reader.readU16())),
Ok(0x24) => return Ok(Property::MaximumQoS(buff_reader.readU8())),
Ok(0x25) => return Ok(Property::RetainAvailable(buff_reader.readU8())),
Ok(0x26) => return Ok(Property::UserProperty(buff_reader.readStringPair())),
Ok(0x28) => return Ok(Property::WildcardSubscriptionAvailable(buff_reader.readU8())),
Ok(0x29) => return Ok(Property::SubscriptionIdentifierAvailable(buff_reader.readU8())),
Ok(0x2A) => return Ok(Property::SharedSubscriptionAvailable(buff_reader.readU8())),
Ok(0x01) => return Ok(Property::PayloadFormat(buff_reader.readU8()?)),
Ok(0x02) => return Ok(Property::MessageExpiryInterval(buff_reader.readU32()?)),
Ok(0x03) => return Ok(Property::ContentType(buff_reader.readString()?)),
Ok(0x08) => return Ok(Property::ResponseTopic(buff_reader.readString()?)),
Ok(0x09) => return Ok(Property::CorrelationData(buff_reader.readBinary()?)),
Ok(0x0B) => return Ok(Property::SubscriptionIdentifier(buff_reader.readVariableByteInt()?)),
Ok(0x11) => return Ok(Property::SessionExpiryInterval(buff_reader.readU32()?)),
Ok(0x12) => return Ok(Property::AssignedClientIdentifier(buff_reader.readString()?)),
Ok(0x13) => return Ok(Property::ServerKeepAlive(buff_reader.readU16()?)),
Ok(0x15) => return Ok(Property::AuthenticationMethod(buff_reader.readString()?)),
Ok(0x16) => return Ok(Property::AuthenticationData(buff_reader.readBinary()?)),
Ok(0x17) => return Ok(Property::RequestProblemInformation(buff_reader.readU8()?)),
Ok(0x18) => return Ok(Property::WillDelayInterval(buff_reader.readU32()?)),
Ok(0x19) => return Ok(Property::RequestResponseInformation(buff_reader.readU8()?)),
Ok(0x1A) => return Ok(Property::ResponseInformation(buff_reader.readString()?)),
Ok(0x1C) => return Ok(Property::ServerReference(buff_reader.readString()?)),
Ok(0x1F) => return Ok(Property::ReasonString(buff_reader.readString()?)),
Ok(0x21) => return Ok(Property::ReceiveMaximum(buff_reader.readU16()?)),
Ok(0x22) => return Ok(Property::TopicAliasMaximum(buff_reader.readU16()?)),
Ok(0x23) => return Ok(Property::TopicAlias(buff_reader.readU16()?)),
Ok(0x24) => return Ok(Property::MaximumQoS(buff_reader.readU8()?)),
Ok(0x25) => return Ok(Property::RetainAvailable(buff_reader.readU8()?)),
Ok(0x26) => return Ok(Property::UserProperty(buff_reader.readStringPair()?)),
Ok(0x28) => return Ok(Property::WildcardSubscriptionAvailable(buff_reader.readU8()?)),
Ok(0x29) => return Ok(Property::SubscriptionIdentifierAvailable(buff_reader.readU8()?)),
Ok(0x2A) => return Ok(Property::SharedSubscriptionAvailable(buff_reader.readU8()?)),
Err(err) => return Err(err),
_ => return Err(ProperyParseError::IdNotFound)
_ => return Err(ParseError::IdNotFound)
}
}
}

View File

@ -1,9 +1,8 @@
use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder;
use crate::encoding::variable_byte_integer::VariableByteIntegerError;
use core::str;
use core::mem;
#[derive(Clone)]
#[derive(Debug)]
pub struct EncodedString<'a> {
pub string: &'a str,
pub len: u16
@ -15,7 +14,7 @@ impl EncodedString<'_> {
}
}
#[derive(Clone)]
#[derive(Debug)]
pub struct BinaryData<'a> {
pub bin: &'a [u8],
pub len: u16
@ -27,7 +26,7 @@ impl BinaryData<'_> {
}
}
#[derive(Clone)]
#[derive(Debug)]
pub struct StringPair<'a> {
pub name: EncodedString<'a>,
pub value: EncodedString<'a>
@ -42,11 +41,13 @@ impl StringPair<'_> {
#[derive(core::fmt::Debug)]
#[derive(Clone)]
pub enum ProperyParseError {
pub enum ParseError {
Utf8Error,
IndexOutOfBounce,
VariableByteIntegerError,
IdNotFound
IdNotFound,
EncodingError,
DecodingError
}
pub struct BuffReader<'a> {
@ -63,35 +64,45 @@ impl<'a> BuffReader<'a> {
return BuffReader { buffer: buffer, position: 0 };
}
pub fn readVariableByteInt(& mut self) -> Result<u32, VariableByteIntegerError> {
pub fn readVariableByteInt(& mut self) -> Result<u32, ParseError> {
let variable_byte_integer: [u8; 4] = [self.buffer[self.position], self.buffer[self.position + 1], self.buffer[self.position + 2], self.buffer[self.position + 3]];
self.incrementPosition(4);
let mut len: usize = 1;
if variable_byte_integer[0] & 0x80 == 1 {
len = len + 1;
if variable_byte_integer[1] & 0x80 == 1 {
len = len + 1;
if variable_byte_integer[2] & 0x80 == 1 {
len = len + 1;
}
}
}
self.incrementPosition(len);
return VariableByteIntegerDecoder::decode(variable_byte_integer);
}
pub fn readU32(& mut self) -> Result<u32, ProperyParseError> {
let (int_bytes, rest) = self.buffer.split_at(mem::size_of::<u32>());
let ret: u32 = u32::from_le_bytes(int_bytes.try_into().unwrap());
pub fn readU32(& mut self) -> Result<u32, ParseError> {
let (int_bytes, rest) = self.buffer[self.position..].split_at(mem::size_of::<u32>());
let ret: u32 = u32::from_be_bytes(int_bytes.try_into().unwrap());
//let ret: u32 = (((self.buffer[self.position] as u32) << 24) | ((self.buffer[self.position + 1] as u32) << 16) | ((self.buffer[self.position + 2] as u32) << 8) | (self.buffer[self.position + 3] as u32)) as u32;
self.incrementPosition(4);
return Ok(ret);
}
pub fn readU16(& mut self) -> Result<u16, ProperyParseError> {
let (int_bytes, rest) = self.buffer.split_at(mem::size_of::<u16>());
let ret: u16 = u16::from_le_bytes(int_bytes.try_into().unwrap());
pub fn readU16(& mut self) -> Result<u16, ParseError> {
let (int_bytes, rest) = self.buffer[self.position..].split_at(mem::size_of::<u16>());
let ret: u16 = u16::from_be_bytes(int_bytes.try_into().unwrap());
//(((self.buffer[self.position] as u16) << 8) | (self.buffer[self.position + 1] as u16)) as u16;
self.incrementPosition(2);
return Ok(ret);
}
pub fn readU8(& mut self) -> Result<u8, ProperyParseError> {
pub fn readU8(& mut self) -> Result<u8, ParseError> {
let ret: u8 = self.buffer[self.position];
self.incrementPosition(1);
return Ok(ret);
}
pub fn readString(& mut self) -> Result<EncodedString<'a>, ProperyParseError> {
pub fn readString(& mut self) -> Result<EncodedString<'a>, ParseError> {
let len = self.readU16();
match len {
Err(err) => return Err(err),
@ -101,13 +112,13 @@ impl<'a> BuffReader<'a> {
let res_str = str::from_utf8(&(self.buffer[self.position..(self.position + len_res as usize)]));
if res_str.is_err() {
log::error!("Could not parse utf-8 string");
return Err(ProperyParseError::Utf8Error);
return Err(ParseError::Utf8Error);
}
return Ok(EncodedString { string: res_str.unwrap(), len: len_res });
}
//TODO: Index out of bounce err !!!!!
pub fn readBinary(& mut self) -> Result<BinaryData<'a>, ProperyParseError> {
pub fn readBinary(& mut self) -> Result<BinaryData<'a>, ParseError> {
let len = self.readU16();
match len {
Err(err) => return Err(err),
@ -118,7 +129,7 @@ impl<'a> BuffReader<'a> {
return Ok(BinaryData { bin: res_bin, len: len_res });
}
pub fn readStringPair(& mut self) -> Result<StringPair<'a>, ProperyParseError> {
pub fn readStringPair(& mut self) -> Result<StringPair<'a>, ParseError> {
let name = self.readString();
match name {
Err(err) => return Err(err),