From 1e3f8e8302849d4e661ad2f41fa153b8231d6b09 Mon Sep 17 00:00:00 2001 From: Ondrej Babec Date: Fri, 25 Feb 2022 15:19:28 +0100 Subject: [PATCH] Futures --- src/client/client_v5.rs | 22 ++++++++++---------- src/lib.rs | 1 - src/network/network_trait.rs | 6 +++--- src/tokio_network.rs | 39 +++++++++++++++++++++++------------- 4 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/client/client_v5.rs b/src/client/client_v5.rs index 5834dfa..6bd32a5 100644 --- a/src/client/client_v5.rs +++ b/src/client/client_v5.rs @@ -11,7 +11,7 @@ pub struct MqttClientV5 { network_driver: T, } -impl MqttClientV5 +impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5 where T: Network { @@ -21,11 +21,11 @@ where } } // connect -> connack -> publish -> QoS ? -> disconn - pub async fn send_message(& mut self, topic_name: & str, message: & str, buffer: & mut [u8], qos: QualityOfService) -> Result<(), NetworkError> { + pub async fn send_message(& mut self, topic_name: & str, message: & str, buffer: &'a mut [u8], qos: QualityOfService) -> Result<(), NetworkError> { //connect - self.network_driver.create_connection() ?; + self.network_driver.create_connection().await ?; - let mut connect = ConnectPacket::clean(); + let mut connect = ConnectPacket::<3, 0>::clean(); let mut len = connect.encode(buffer); self.network_driver.send(buffer, len).await ?; //connack @@ -35,23 +35,23 @@ where } // publish - let mut packet = PublishPacket::new(topic_name, message); + let mut packet = PublishPacket::<5>::new(topic_name, message); len = packet.encode(buffer); - let result = self.network_driver.send(buffer, len).await ?; + self.network_driver.send(buffer, len).await ?; //QoS1 - if qos.into() == QoS1.into() { + if >::into(qos) == >::into(QoS1) { todo!(); } //Disconnect - let mut disconnect = DisconnectPacket::new(); + let mut disconnect = DisconnectPacket::<5>::new(); len = disconnect.encode(buffer); self.network_driver.send(buffer, len); - return result; + Ok(()) } - pub async fn receive>(& mut self, buffer: & mut [u8]) -> Result { + pub async fn receive>(& mut self, buffer: &'a mut [u8]) -> Result { self.network_driver.receive(buffer).await ?; let mut packet = P::new(); packet.decode(&mut BuffReader::new(buffer)); @@ -59,6 +59,6 @@ where } pub async fn receive_message(& mut self, buffer: & mut [u8]) -> Result<(), NetworkError> { - + return Ok(()); } } diff --git a/src/lib.rs b/src/lib.rs index 12a7066..5298429 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,6 @@ #![allow(dead_code)] #![feature(type_alias_impl_trait)] #![feature(generic_associated_types)] -#![feature(async)] extern crate alloc; diff --git a/src/network/network_trait.rs b/src/network/network_trait.rs index d45a2b0..0065fa7 100644 --- a/src/network/network_trait.rs +++ b/src/network/network_trait.rs @@ -25,9 +25,9 @@ pub trait Network { fn new(ip: [u8; 4], port: u16) -> Self; - fn create_connection(& mut self) -> Self::ConnectionFuture<'m>; + fn create_connection(&'m mut self) -> Self::ConnectionFuture<'m>; - fn send(& mut self, buffer: & mut [u8], len: usize) -> Self::WriteFuture<'m>; + fn send(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m>; - fn receive(& mut self, buffer: & mut [u8]) -> Self::ReadFuture<'m>; + fn receive(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m>; } diff --git a/src/tokio_network.rs b/src/tokio_network.rs index 1db4d07..2e8ba69 100644 --- a/src/tokio_network.rs +++ b/src/tokio_network.rs @@ -10,13 +10,13 @@ use tokio::net::{TcpListener, TcpStream}; use crate::network::network_trait::{Network, NetworkError}; use crate::packet::mqtt_packet::Packet; -pub struct TokioNetwork<'a> { +pub struct TokioNetwork { ip: [u8; 4], port: u16, - socket: &'a mut TcpStream, + socket: Option, } -impl<'a> TokioNetwork<'a> { +impl TokioNetwork { fn convert_ip(& mut self) -> String { String::from(format!("{}.{}.{}.{}:{}", self.ip[0], self.ip[1], self.ip[2], self.ip[3], self.port)) } @@ -31,33 +31,44 @@ impl Network for TokioNetwork { return Self { ip, port, - socket: &mut (TcpStream), + socket: Option::None, } } - fn create_connection(&mut self) -> Self::ConnectionFuture<'m> { + fn create_connection<'m>(&'m mut self) -> Self::ConnectionFuture<'m> { async move { TcpStream::connect(self.convert_ip()) .await - .map_err(|_| NetworkError::Connection); + .map(|socket| self.socket = Some(socket)) + .map(|_| ()) + .map_err(|_| NetworkError::Connection) } } - fn send<'m>(&mut self, buffer: &mut [u8], len: usize) -> Self::WriteFuture<'m> { + fn send<'m>(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> { async move { - self.socket.write_all(&buffer[0..len]) - .await - .map_err(|_| NetworkError::Unknown); + return if let Some(ref mut stream) = self.socket { + stream.write_all(&buffer[0..len]) + .await + .map_err(|_| NetworkError::Unknown) + } else { + Err(NetworkError::Unknown) + } + } } - fn receive<'m>(&mut self, buffer: &mut [u8]) -> Self::ReadFuture<'m> { + fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> { async move { - self.socket.read(buffer) - .await - .map_err(|_| NetworkError::Connection); + return if let Some(ref mut stream) = self.socket { + stream.read(buffer) + .await + .map_err(|_| NetworkError::Connection) + } else { + Err(NetworkError::Unknown) + } } }