From 3cb9de03711a41d4d2504b84d5d724d5ca6c00d2 Mon Sep 17 00:00:00 2001 From: Ondrej Babec Date: Sat, 26 Feb 2022 15:12:19 +0100 Subject: [PATCH] Working example of client --- src/client/client_v5.rs | 99 ++++++++++++++++++++--------- src/client/mod.rs | 2 +- src/lib.rs | 6 +- src/main.rs | 83 +++++++++++++++++------- src/network/mod.rs | 2 +- src/network/network_trait.rs | 6 +- src/packet/connack_packet.rs | 9 ++- src/packet/connect_packet.rs | 1 + src/packet/disconnect_packet.rs | 4 +- src/packet/mqtt_packet.rs | 28 ++++---- src/packet/publish_packet.rs | 50 ++++++++------- src/packet/suback_packet.rs | 9 ++- src/packet/subscription_packet.rs | 27 ++++---- src/packet/unsubscription_packet.rs | 1 - src/tokio_network.rs | 43 ++++++++----- src/utils/buffer_reader.rs | 5 +- src/utils/buffer_writer.rs | 2 +- 17 files changed, 242 insertions(+), 135 deletions(-) diff --git a/src/client/client_v5.rs b/src/client/client_v5.rs index d1ddf79..eae67f9 100644 --- a/src/client/client_v5.rs +++ b/src/client/client_v5.rs @@ -1,80 +1,117 @@ use core::future::Future; + use crate::network::network_trait::{Network, NetworkError}; use crate::packet::connack_packet::ConnackPacket; use crate::packet::connect_packet::ConnectPacket; use crate::packet::disconnect_packet::DisconnectPacket; use crate::packet::mqtt_packet::Packet; -use crate::packet::publish_packet::{PublishPacket, QualityOfService}; use crate::packet::publish_packet::QualityOfService::QoS1; +use crate::packet::publish_packet::{PublishPacket, QualityOfService}; +use crate::packet::suback_packet::SubackPacket; +use crate::packet::subscription_packet::SubscriptionPacket; use crate::utils::buffer_reader::BuffReader; pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> { network_driver: &'a mut T, - buffer: &'a mut [u8] + buffer: &'a mut [u8], } impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5<'a, T, MAX_PROPERTIES> where - T: Network + T: Network, { pub fn new(network_driver: &'a mut T, buffer: &'a mut [u8]) -> Self { Self { network_driver, - buffer + buffer, } } - // connect -> connack -> publish -> QoS ? -> disconn - pub async fn send_message(&'a mut self, topic_name: & str, message: & str, qos: QualityOfService) -> Result<(), NetworkError> { + pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), NetworkError> { let mut len = { - let mut connect = ConnectPacket::<3, 0>::clean(); + let mut connect = ConnectPacket::<'b, 3, 0>::clean(); connect.encode(self.buffer) }; - self.network_driver.send(self.buffer, len).await ?; + self.network_driver.send(self.buffer, len).await?; //connack - { - self.receive().await ?; - let mut packet = ConnackPacket::<5>::new(); + let reason: u8 = { + self.network_driver.receive(self.buffer).await?; + let mut packet = ConnackPacket::<'b, 5>::new(); packet.decode(&mut BuffReader::new(self.buffer)); - - - if packet.connect_reason_code != 0x00 { - todo!(); - } + packet.connect_reason_code }; + if reason != 0x00 { + Err(NetworkError::Connection) + } else { + Ok(()) + } + + } + + pub async fn disconnect<'b>(&'b mut self) -> Result<(), NetworkError> { + let mut disconnect = DisconnectPacket::<'b, 5>::new(); + let mut len = disconnect.encode(self.buffer); + self.network_driver.send(self.buffer, len).await?; + Ok(()) + } + // connect -> connack -> publish -> QoS ? -> disconn + pub async fn send_message<'b>( + &'b mut self, + topic_name: &'b str, + message: &'b str, + qos: QualityOfService, + ) -> Result<(), NetworkError> { // publish - - len = { - let mut packet = PublishPacket::<5>::new(topic_name, message); + let len = { + let mut packet = PublishPacket::<'b, 5>::new(); + packet.add_topic_name(topic_name); + packet.add_message(message.as_bytes()); packet.encode(self.buffer) }; - self.network_driver.send(self.buffer, len).await ?; - + self.network_driver.send(self.buffer, len).await?; //QoS1 if >::into(qos) == >::into(QoS1) { todo!(); } - - //Disconnect - let mut disconnect = DisconnectPacket::<5>::new(); - len = disconnect.encode(self.buffer); - self.network_driver.send(self.buffer, len); Ok(()) } - pub async fn receive(&'a mut self) -> Result<(), NetworkError> { - self.network_driver.receive(self.buffer).await ?; - Ok(()) + pub async fn subscribe_to_topic<'b>(&'b mut self, topic_name: &'b str) -> Result<(), NetworkError> { + let len = { + let mut subs = SubscriptionPacket::<'b, 1, 1>::new(); + subs.add_new_filter(topic_name); + subs.encode(self.buffer) + }; + let xx: [u8; 14] = (self.buffer[0..14]).try_into().unwrap(); + log::info!("{:x?}", xx); + self.network_driver.send(self.buffer, len).await?; + + let reason = { + self.network_driver.receive(self.buffer).await?; + let mut packet = SubackPacket::<'b, 5, 5>::new(); + packet.decode(&mut BuffReader::new(self.buffer)); + *packet.reason_codes.get(0).unwrap() + }; + + if reason > 1 { + Err(NetworkError::Unknown) + } else { + Ok(()) + } + } - pub async fn receive_message(&'a mut self) -> Result<(), NetworkError> { - return Ok(()); + pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], NetworkError> { + self.network_driver.receive(self.buffer).await?; + let mut packet = PublishPacket::<'b, 5>::new(); + packet.decode(&mut BuffReader::new(self.buffer)); + return Ok(packet.message.unwrap()); } } diff --git a/src/client/mod.rs b/src/client/mod.rs index ab4c08b..3f271f3 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1 +1 @@ -pub mod client_v5; \ No newline at end of file +pub mod client_v5; diff --git a/src/lib.rs b/src/lib.rs index 5298429..6c19f62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,12 +7,12 @@ extern crate alloc; -pub mod encoding; -pub mod packet; -pub mod utils; pub mod client; +pub mod encoding; pub mod network; +pub mod packet; pub mod tokio_network; +pub mod utils; #[allow(unused_variables)] pub fn print_stack(file: &'static str, line: u32) { diff --git a/src/main.rs b/src/main.rs index 0e09864..9c18a55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,60 @@ use rust_mqtt::client::client_v5::MqttClientV5; -use rust_mqtt::network::network_trait::Network; +use rust_mqtt::network::network_trait::{Network, NetworkError}; use rust_mqtt::packet::connect_packet::ConnectPacket; use rust_mqtt::packet::mqtt_packet::Packet; -use rust_mqtt::packet::publish_packet::PublishPacket; +use rust_mqtt::packet::publish_packet::{PublishPacket, QualityOfService}; use rust_mqtt::packet::subscription_packet::SubscriptionPacket; use rust_mqtt::tokio_network::TokioNetwork; +use std::time::Duration; +use tokio::time::sleep; +use tokio::{join, task}; + +async fn receive() { + let mut ip: [u8; 4] = [37, 205, 11, 180]; + let mut port: u16 = 1883; + let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); + tokio_network.create_connection().await; + let mut res2 = vec![0; 260]; + let mut client = MqttClientV5::::new(&mut tokio_network, &mut res2); + + let mut result = { client.connect_to_broker().await }; + + { + client.subscribe_to_topic("test/topic").await; + } + { + log::info!("Waiting for new message!"); + let mes = client.receive_message().await.unwrap(); + let x = String::from_utf8_lossy(mes); + log::info!("Got new message: {}", x); + } + { + client.disconnect().await; + } +} + +async fn publish(message: &str) { + let mut ip: [u8; 4] = [37, 205, 11, 180]; + let mut port: u16 = 1883; + let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); + tokio_network.create_connection().await; + let mut res2 = vec![0; 260]; + let mut client = MqttClientV5::::new(&mut tokio_network, &mut res2); + + let mut result = { client.connect_to_broker().await }; + log::info!("Waiting until send!"); + sleep(Duration::from_secs(15)); + let mut result: Result<(), NetworkError> = { + log::info!("Sending new message!"); + client + .send_message("test/topic", message, QualityOfService::QoS0) + .await + }; + + { + client.disconnect().await; + } +} #[tokio::main] async fn main() { @@ -13,27 +63,14 @@ async fn main() { .format_timestamp_nanos() .init(); - /*let mut pckt: SubscriptionPacket<1, 0> = SubscriptionPacket::new(); - let mut res = vec![0; 140]; - let lnsub = pckt.encode(&mut res); - println!("{:02X?}", &res[0..lnsub]); - let mut res2 = vec![0; 260]; + let recv = task::spawn(async move { + receive().await; + }); - let mut pblsh = PublishPacket::<0>::new(x); - let lnpblsh = pblsh.encode(&mut res2); - println!("{:02X?}", &res2[0..lnpblsh]); - log::info!("xxx"); + let publ = task::spawn(async move { + publish("hello world 123 !").await; + }); - let mut res3 = vec![0; 260]; - let mut cntrl = ConnectPacket::<3, 0>::clean(); - let lncntrl = cntrl.encode(&mut res3); - println!("{:02X?}", &res3[0..lncntrl]); - log::info!("xxx");*/ - let mut ip: [u8; 4] = [37, 205, 11, 180]; - let mut port: u16 = 1883; - let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); - tokio_network.create_connection().await; - let mut res2 = vec![0; 260]; - let client = MqttClientV5::new(&mut tokio_network, &mut res2); - let mut x = b"hello world"; + join!(recv, publ); + log::info!("Done"); } diff --git a/src/network/mod.rs b/src/network/mod.rs index 294a433..e083076 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -1 +1 @@ -pub mod network_trait; \ No newline at end of file +pub mod network_trait; diff --git a/src/network/network_trait.rs b/src/network/network_trait.rs index 4d38b56..b446bc3 100644 --- a/src/network/network_trait.rs +++ b/src/network/network_trait.rs @@ -3,17 +3,17 @@ use core::future::Future; use crate::packet::mqtt_packet::Packet; + +#[derive(Debug)] pub enum NetworkError { Connection, Unknown, } - - pub trait Network { type ConnectionFuture<'m>: Future> where - Self: 'm; + Self: 'm; type WriteFuture<'m>: Future> where diff --git a/src/packet/connack_packet.rs b/src/packet/connack_packet.rs index 3f0449e..f6c97c3 100644 --- a/src/packet/connack_packet.rs +++ b/src/packet/connack_packet.rs @@ -33,7 +33,14 @@ impl<'a, const MAX_PROPERTIES: usize> ConnackPacket<'a, MAX_PROPERTIES> { impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPERTIES> { fn new() -> Self { - todo!() + Self { + fixed_header: PacketType::Connack.into(), + remain_len: 0, + ack_flags: 0, + connect_reason_code: 0, + property_len: 0, + properties: Vec::, MAX_PROPERTIES>::new(), + } } fn encode(&mut self, buffer: &mut [u8]) -> usize { diff --git a/src/packet/connect_packet.rs b/src/packet/connect_packet.rs index 77eaa72..7a38449 100644 --- a/src/packet/connect_packet.rs +++ b/src/packet/connect_packet.rs @@ -98,6 +98,7 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> self.fixed_header = cur_type | flags; } } + impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'a> for ConnectPacket<'a, MAX_PROPERTIES, MAX_WILL_PROPERTIES> { diff --git a/src/packet/disconnect_packet.rs b/src/packet/disconnect_packet.rs index ca8cc5e..4338307 100644 --- a/src/packet/disconnect_packet.rs +++ b/src/packet/disconnect_packet.rs @@ -31,7 +31,7 @@ impl<'a, const MAX_PROPERTIES: usize> DisconnectPacket<'a, MAX_PROPERTIES> { self.decode_properties(buff_reader); } - fn add_reason(& mut self, reason: u8) { + fn add_reason(&mut self, reason: u8) { self.disconnect_reason = reason; } } @@ -43,7 +43,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PR remain_len: 5, disconnect_reason: 0x00, property_len: 0, - properties: Vec::, MAX_PROPERTIES>::new() + properties: Vec::, MAX_PROPERTIES>::new(), } } diff --git a/src/packet/mqtt_packet.rs b/src/packet/mqtt_packet.rs index 5100cc0..9a65305 100644 --- a/src/packet/mqtt_packet.rs +++ b/src/packet/mqtt_packet.rs @@ -24,20 +24,22 @@ pub trait Packet<'a> { self.set_property_len(buff_reader.read_variable_byte_int().unwrap()); let mut x: u32 = 0; let mut prop: Result; - loop { - let mut res: Property; - prop = Property::decode(buff_reader); - if let Ok(res) = prop { - log::info!("Parsed property {:?}", res); - x = x + res.len() as u32 + 1; - self.push_to_properties(res); - } else { - // error handler - log::error!("Problem during property decoding"); - } + if self.get_property_len() != 0 { + loop { + let mut res: Property; + prop = Property::decode(buff_reader); + if let Ok(res) = prop { + log::info!("Parsed property {:?}", res); + x = x + res.len() as u32 + 1; + self.push_to_properties(res); + } else { + // error handler + log::error!("Problem during property decoding"); + } - if x == self.get_property_len() { - break; + if x == self.get_property_len() { + break; + } } } } diff --git a/src/packet/publish_packet.rs b/src/packet/publish_packet.rs index 1c94662..3f4ed21 100644 --- a/src/packet/publish_packet.rs +++ b/src/packet/publish_packet.rs @@ -1,8 +1,9 @@ +use core::ptr::null; use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::packet::mqtt_packet::Packet; -use crate::packet::publish_packet::QualityOfService::{INVALID, QoS0, QoS1, QoS2}; +use crate::packet::publish_packet::QualityOfService::{QoS0, QoS1, QoS2, INVALID}; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::EncodedString; use crate::utils::buffer_writer::BuffWriter; @@ -14,7 +15,7 @@ pub enum QualityOfService { QoS0, QoS1, QoS2, - INVALID + INVALID, } impl From for QualityOfService { @@ -23,8 +24,8 @@ impl From for QualityOfService { 0 => QoS0, 1 => QoS1, 2 => QoS2, - _ => INVALID - } + _ => INVALID, + }; } } @@ -35,7 +36,7 @@ impl Into for QualityOfService { QoS1 => 1, QoS2 => 2, INVALID => 3, - } + }; } } @@ -53,29 +54,19 @@ pub struct PublishPacket<'a, const MAX_PROPERTIES: usize> { // properties pub properties: Vec, MAX_PROPERTIES>, - pub message: &'a [u8], + pub message: Option<&'a [u8]>, } impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> { - pub fn new(topic_name: &'a str, message: &'a str) -> Self { - let mut x = Self { - fixed_header: PacketType::Publish.into(), - remain_len: 0, - topic_name: EncodedString::new(), - packet_identifier: 1, - property_len: 0, - properties: Vec::, MAX_PROPERTIES>::new(), - message: message.as_bytes(), - }; - x.add_topic_name(topic_name); - return x; - } - pub fn add_topic_name(&mut self, topic_name: &'a str) { self.topic_name.string = topic_name; self.topic_name.len = topic_name.len() as u16; } + pub fn add_message(& mut self, message: &'a [u8]) { + self.message = Some(message); + } + pub fn decode_publish_packet(&mut self, buff_reader: &mut BuffReader<'a>) { if self.decode_fixed_header(buff_reader) != (PacketType::Publish).into() { log::error!("Packet you are trying to decode is not PUBLISH packet!"); @@ -88,13 +79,24 @@ impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> { self.packet_identifier = buff_reader.read_u16().unwrap(); } self.decode_properties(buff_reader); - self.message = buff_reader.read_message(); + let mut total_len = VariableByteIntegerEncoder::len( + VariableByteIntegerEncoder::encode(self.remain_len).unwrap()); + total_len = total_len + 1 + self.remain_len as usize; + self.message = Some(buff_reader.read_message(total_len)); } } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPERTIES> { fn new() -> Self { - todo!() + Self { + fixed_header: PacketType::Publish.into(), + remain_len: 0, + topic_name: EncodedString::new(), + packet_identifier: 1, + property_len: 0, + properties: Vec::, MAX_PROPERTIES>::new(), + message: None + } } fn encode(&mut self, buffer: &mut [u8]) -> usize { @@ -104,7 +106,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE let property_len_enc: [u8; 4] = VariableByteIntegerEncoder::encode(self.property_len).unwrap(); let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); - let mut msg_len = self.message.len() as u32; + let mut msg_len = self.message.unwrap().len() as u32; rm_ln = rm_ln + property_len_len as u32 + msg_len + self.topic_name.len as u32 + 2; buff_writer.write_u8(self.fixed_header); @@ -122,7 +124,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE buff_writer.write_variable_byte_int(self.property_len); buff_writer.encode_properties::(&self.properties); - buff_writer.insert_ref(msg_len as usize, self.message); + buff_writer.insert_ref(msg_len as usize, self.message.unwrap()); return buff_writer.position; } diff --git a/src/packet/suback_packet.rs b/src/packet/suback_packet.rs index b6e5adc..064c303 100644 --- a/src/packet/suback_packet.rs +++ b/src/packet/suback_packet.rs @@ -53,7 +53,14 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> for SubackPacket<'a, MAX_REASONS, MAX_PROPERTIES> { fn new() -> Self { - todo!() + Self { + fixed_header: PacketType::Suback.into(), + remain_len: 0, + packet_identifier: 0, + property_len: 0, + properties: Vec::, MAX_PROPERTIES>::new(), + reason_codes: Vec::::new() + } } fn encode(&mut self, buffer: &mut [u8]) -> usize { diff --git a/src/packet/subscription_packet.rs b/src/packet/subscription_packet.rs index 5abfe78..8a63aab 100644 --- a/src/packet/subscription_packet.rs +++ b/src/packet/subscription_packet.rs @@ -32,14 +32,27 @@ pub struct SubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> { - pub fn new() -> Self { + pub fn add_new_filter(& mut self, topic_name: &'a str) { + let len = topic_name.len(); + let mut new_filter = TopicFilter::new(); + new_filter.filter.string = topic_name; + new_filter.filter.len = len as u16; + self.topic_filters.push(new_filter); + self.topic_filter_len = self.topic_filter_len + 1; + } +} + +impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> + for SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> +{ + fn new() -> Self { let mut x = Self { fixed_header: PacketType::Subscribe.into(), remain_len: 0, packet_identifier: 1, property_len: 0, properties: Vec::, MAX_PROPERTIES>::new(), - topic_filter_len: 1, + topic_filter_len: 0, topic_filters: Vec::, MAX_FILTERS>::new(), }; let mut p = TopicFilter::new(); @@ -48,14 +61,6 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> x.topic_filters.push(p); return x; } -} - -impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> - for SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> -{ - fn new() -> Self { - todo!() - } fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); @@ -82,7 +87,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> buff_writer.write_variable_byte_int(self.property_len); buff_writer.encode_properties::(&self.properties); buff_writer.encode_topic_filters_ref( - false, + true, self.topic_filter_len as usize, &self.topic_filters, ); diff --git a/src/packet/unsubscription_packet.rs b/src/packet/unsubscription_packet.rs index ae7ba7f..8b7e7f0 100644 --- a/src/packet/unsubscription_packet.rs +++ b/src/packet/unsubscription_packet.rs @@ -32,7 +32,6 @@ pub struct UnsubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTI impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> { - } impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> diff --git a/src/tokio_network.rs b/src/tokio_network.rs index 9d9fe1d..601d0a5 100644 --- a/src/tokio_network.rs +++ b/src/tokio_network.rs @@ -18,26 +18,36 @@ pub struct TokioNetwork { } impl TokioNetwork { - fn convert_ip(& mut self) -> String { - String::from(format!("{}.{}.{}.{}:{}", self.ip[0], self.ip[1], self.ip[2], self.ip[3], self.port)) + fn convert_ip(&mut self) -> String { + String::from(format!( + "{}.{}.{}.{}:{}", + self.ip[0], self.ip[1], self.ip[2], self.ip[3], self.port + )) } } -impl TokioNetwork { - -} +impl TokioNetwork {} impl Network for TokioNetwork { - type ConnectionFuture<'m> where Self: 'm = impl Future> + 'm; - type WriteFuture<'m> where Self: 'm = impl Future> + 'm; - type ReadFuture<'m> where Self: 'm = impl Future> + 'm; + type ConnectionFuture<'m> + where + Self: 'm, + = impl Future> + 'm; + type WriteFuture<'m> + where + Self: 'm, + = impl Future> + 'm; + type ReadFuture<'m> + where + Self: 'm, + = impl Future> + 'm; fn new(ip: [u8; 4], port: u16) -> Self { return Self { ip, port, socket: Option::None, - } + }; } fn create_connection<'m>(&'m mut self) -> Self::ConnectionFuture<'m> { @@ -48,32 +58,31 @@ impl Network for TokioNetwork { .map(|_| ()) .map_err(|_| NetworkError::Connection) } - } fn send<'m>(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> { async move { return if let Some(ref mut stream) = self.socket { - stream.write_all(&buffer[0..len]) + stream + .write_all(&buffer[0..len]) .await .map_err(|_| NetworkError::Unknown) } else { Err(NetworkError::Unknown) - } - + }; } - } fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> { async move { return if let Some(ref mut stream) = self.socket { - stream.read(buffer) + stream + .read(buffer) .await .map_err(|_| NetworkError::Connection) } else { Err(NetworkError::Unknown) - } + }; } } @@ -86,4 +95,4 @@ impl Network for TokioNetwork { let len = self.socket.read(buffer).await ?; Ok(len) }*/ -} \ No newline at end of file +} diff --git a/src/utils/buffer_reader.rs b/src/utils/buffer_reader.rs index 0b8c272..d1a8276 100644 --- a/src/utils/buffer_reader.rs +++ b/src/utils/buffer_reader.rs @@ -150,6 +150,7 @@ impl<'a> BuffReader<'a> { log::error!("Could not parse utf-8 string"); return Err(ParseError::Utf8Error); } + self.increment_position(len_res as usize); return Ok(EncodedString { string: res_str.unwrap(), len: len_res, @@ -188,7 +189,7 @@ impl<'a> BuffReader<'a> { }); } - pub fn read_message(&mut self) -> &'a [u8] { - return &self.buffer[self.position..]; + pub fn read_message(&mut self, total_len: usize) -> &'a [u8] { + return &self.buffer[self.position..total_len]; } } diff --git a/src/utils/buffer_writer.rs b/src/utils/buffer_writer.rs index 162821e..3d0bc8c 100644 --- a/src/utils/buffer_writer.rs +++ b/src/utils/buffer_writer.rs @@ -49,7 +49,7 @@ impl<'a> BuffWriter<'a> { pub fn write_u32(&mut self, four_bytes: u32) { let bytes: [u8; 4] = four_bytes.to_be_bytes(); - self.insert_ref(4,&bytes); + self.insert_ref(4, &bytes); } pub fn write_string_ref(&mut self, str: &EncodedString<'a>) {