From e660f8ead2a930ad4f8286018a3dd6d0350ebb80 Mon Sep 17 00:00:00 2001 From: Ondrej Babec Date: Fri, 25 Feb 2022 14:17:58 +0100 Subject: [PATCH] Async --- src/client/client_v5.rs | 62 ++++++++++++++++++++---- src/lib.rs | 8 +++- src/main.rs | 15 ++++-- src/network/network_trait.rs | 34 ++++++++++++-- src/packet/auth_packet.rs | 3 ++ src/packet/connack_packet.rs | 4 ++ src/packet/connect_packet.rs | 4 ++ src/packet/disconnect_packet.rs | 14 ++++++ src/packet/mqtt_packet.rs | 3 +- src/packet/pingreq_packet.rs | 4 ++ src/packet/pingresp_packet.rs | 4 ++ src/packet/puback_packet.rs | 4 ++ src/packet/pubcomp_packet.rs | 4 ++ src/packet/publish_packet.rs | 38 ++++++++++++++- src/packet/pubrec_packet.rs | 4 ++ src/packet/pubrel_packet.rs | 4 ++ src/packet/suback_packet.rs | 4 ++ src/packet/subscription_packet.rs | 4 ++ src/packet/unsuback_packet.rs | 4 ++ src/packet/unsubscription_packet.rs | 6 ++- src/tokio_network.rs | 73 +++++++++++++++++++++++++++++ 21 files changed, 279 insertions(+), 21 deletions(-) create mode 100644 src/tokio_network.rs diff --git a/src/client/client_v5.rs b/src/client/client_v5.rs index 6f0e3c3..5834dfa 100644 --- a/src/client/client_v5.rs +++ b/src/client/client_v5.rs @@ -1,20 +1,64 @@ -use crate::packet::publish_packet::PublishPacket; -use crate::network::network_trait::Network; +use crate::packet::publish_packet::{PublishPacket, QualityOfService}; +use crate::network::network_trait::{Network, NetworkError}; +use crate::packet::connack_packet::ConnackPacket; +use crate::packet::connect_packet::ConnectPacket; +use crate::packet::disconnect_packet::DisconnectPacket; +use crate::packet::mqtt_packet::Packet; +use crate::packet::publish_packet::QualityOfService::QoS1; +use crate::utils::buffer_reader::BuffReader; -struct MqttClientV5 { +pub struct MqttClientV5 { network_driver: T, } -impl MqttClientV5 +impl MqttClientV5 where - T: Network, + T: Network { - fn send_message(& mut self, topic_name: & str, message: & str, buffer: & mut [u8]) { - let packet = PublishPacket::new(topic_name, message); - self.network_driver.send() + pub fn new(network_driver: T) -> Self { + Self { + network_driver, + } + } + // connect -> connack -> publish -> QoS ? -> disconn + pub async fn send_message(& mut self, topic_name: & str, message: & str, buffer: & mut [u8], qos: QualityOfService) -> Result<(), NetworkError> { + //connect + self.network_driver.create_connection() ?; + + let mut connect = ConnectPacket::clean(); + let mut len = connect.encode(buffer); + self.network_driver.send(buffer, len).await ?; + //connack + let connack: ConnackPacket = self.receive::>(buffer).await ?; + if connack.connect_reason_code != 0x00 { + todo!(); + } + + // publish + let mut packet = PublishPacket::new(topic_name, message); + len = packet.encode(buffer); + let result = self.network_driver.send(buffer, len).await ?; + + //QoS1 + if qos.into() == QoS1.into() { + todo!(); + } + + //Disconnect + let mut disconnect = DisconnectPacket::new(); + len = disconnect.encode(buffer); + self.network_driver.send(buffer, len); + return result; } - fn receive_message(& mut self) { + pub async fn receive>(& mut self, buffer: & mut [u8]) -> Result { + self.network_driver.receive(buffer).await ?; + let mut packet = P::new(); + packet.decode(&mut BuffReader::new(buffer)); + return Ok(packet); + } + + pub async fn receive_message(& mut self, buffer: & mut [u8]) -> Result<(), NetworkError> { } } diff --git a/src/lib.rs b/src/lib.rs index 4b69db3..12a7066 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,12 +2,18 @@ #![macro_use] #![cfg_attr(not(feature = "std"), no_std)] #![allow(dead_code)] +#![feature(type_alias_impl_trait)] +#![feature(generic_associated_types)] +#![feature(async)] + +extern crate alloc; pub mod encoding; pub mod packet; pub mod utils; pub mod client; -mod network; +pub mod network; +pub mod tokio_network; #[allow(unused_variables)] pub fn print_stack(file: &'static str, line: u32) { diff --git a/src/main.rs b/src/main.rs index 8764f37..90c7757 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,10 @@ +use rust_mqtt::client::client_v5::MqttClientV5; +use rust_mqtt::network::network_trait::Network; use rust_mqtt::packet::connect_packet::ConnectPacket; use rust_mqtt::packet::mqtt_packet::Packet; use rust_mqtt::packet::publish_packet::PublishPacket; use rust_mqtt::packet::subscription_packet::SubscriptionPacket; +use rust_mqtt::tokio_network::TokioNetwork; fn main() { env_logger::builder() @@ -9,12 +12,12 @@ fn main() { .format_timestamp_nanos() .init(); - let mut pckt: SubscriptionPacket<1, 0> = SubscriptionPacket::new(); + /*let mut pckt: SubscriptionPacket<1, 0> = SubscriptionPacket::new(); let mut res = vec![0; 140]; let lnsub = pckt.encode(&mut res); println!("{:02X?}", &res[0..lnsub]); let mut res2 = vec![0; 260]; - let mut x = b"hello world"; + let mut pblsh = PublishPacket::<0>::new(x); let lnpblsh = pblsh.encode(&mut res2); println!("{:02X?}", &res2[0..lnpblsh]); @@ -24,5 +27,11 @@ fn main() { let mut cntrl = ConnectPacket::<3, 0>::clean(); let lncntrl = cntrl.encode(&mut res3); println!("{:02X?}", &res3[0..lncntrl]); - log::info!("xxx"); + log::info!("xxx");*/ + let mut ip: [u8; 4] = [37, 205, 11, 180]; + let mut port: u16 = 1883; + let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); + let client = MqttClientV5::new::(tokio_network); + let mut x = b"hello world"; + let mut res2 = vec![0; 260]; } diff --git a/src/network/network_trait.rs b/src/network/network_trait.rs index 2564695..d45a2b0 100644 --- a/src/network/network_trait.rs +++ b/src/network/network_trait.rs @@ -1,5 +1,33 @@ +use core::fmt::Error; + +use core::future::Future; +use crate::packet::mqtt_packet::Packet; + +pub enum NetworkError { + Connection, + Unknown, +} + + pub trait Network { - fn send(buffer: & mut [u8]); - fn receive(buffer: & mut [u8]); -} \ No newline at end of file + type ConnectionFuture<'m>: Future> + where + Self: 'm; + + type WriteFuture<'m>: Future> + where + Self: 'm; + + type ReadFuture<'m>: Future> + where + Self: 'm; + + fn new(ip: [u8; 4], port: u16) -> Self; + + fn create_connection(& mut self) -> Self::ConnectionFuture<'m>; + + fn send(& mut self, buffer: & mut [u8], len: usize) -> Self::WriteFuture<'m>; + + fn receive(& mut self, buffer: & mut [u8]) -> Self::ReadFuture<'m>; +} diff --git a/src/packet/auth_packet.rs b/src/packet/auth_packet.rs index 4e62253..4e7314a 100644 --- a/src/packet/auth_packet.rs +++ b/src/packet/auth_packet.rs @@ -46,6 +46,9 @@ impl<'a, const MAX_PROPERTIES: usize> AuthPacket<'a, MAX_PROPERTIES> { } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for AuthPacket<'a, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } /*fn new() -> Packet<'a, MAX_PROPERTIES> { return AuthPacket { fixed_header: PacketType::Auth.into(), remain_len: 0, auth_reason: 0, property_len: 0, properties: Vec::, MAX_PROPERTIES>::new() } }*/ diff --git a/src/packet/connack_packet.rs b/src/packet/connack_packet.rs index 83cd1cb..76601c1 100644 --- a/src/packet/connack_packet.rs +++ b/src/packet/connack_packet.rs @@ -32,6 +32,10 @@ impl<'a, const MAX_PROPERTIES: usize> ConnackPacket<'a, MAX_PROPERTIES> { } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); buff_writer.write_u8(self.fixed_header); diff --git a/src/packet/connect_packet.rs b/src/packet/connect_packet.rs index 9e55674..aaf5a6c 100644 --- a/src/packet/connect_packet.rs +++ b/src/packet/connect_packet.rs @@ -101,6 +101,10 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'a> for ConnectPacket<'a, MAX_PROPERTIES, MAX_WILL_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); diff --git a/src/packet/disconnect_packet.rs b/src/packet/disconnect_packet.rs index f182e50..2a85962 100644 --- a/src/packet/disconnect_packet.rs +++ b/src/packet/disconnect_packet.rs @@ -30,9 +30,23 @@ impl<'a, const MAX_PROPERTIES: usize> DisconnectPacket<'a, MAX_PROPERTIES> { self.disconnect_reason = buff_reader.read_u8().unwrap(); self.decode_properties(buff_reader); } + + fn add_reason(& mut self, reason: u8) { + self.disconnect_reason = reason; + } } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PROPERTIES> { + fn new() -> Self { + Self { + fixed_header: PacketType::Disconnect.into(), + remain_len: 5, + disconnect_reason: 0x00, + property_len: 0, + properties: Vec::, MAX_PROPERTIES>::new() + } + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); buff_writer.write_u8(self.fixed_header); diff --git a/src/packet/mqtt_packet.rs b/src/packet/mqtt_packet.rs index b090d63..5100cc0 100644 --- a/src/packet/mqtt_packet.rs +++ b/src/packet/mqtt_packet.rs @@ -5,9 +5,10 @@ use crate::utils::buffer_reader::ParseError; use super::property::Property; pub trait Packet<'a> { - //fn new() -> dyn Packet<'a> where Self: Sized; + fn new() -> Self; fn encode(&mut self, buffer: &mut [u8]) -> usize; + // -> Result fn decode(&mut self, buff_reader: &mut BuffReader<'a>); // properties diff --git a/src/packet/pingreq_packet.rs b/src/packet/pingreq_packet.rs index 40234f2..dad2941 100644 --- a/src/packet/pingreq_packet.rs +++ b/src/packet/pingreq_packet.rs @@ -15,6 +15,10 @@ pub struct PingreqPacket { impl PingreqPacket {} impl<'a> Packet<'a> for PingreqPacket { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); buff_writer.write_u8(self.fixed_header); diff --git a/src/packet/pingresp_packet.rs b/src/packet/pingresp_packet.rs index b6c32ae..645edc0 100644 --- a/src/packet/pingresp_packet.rs +++ b/src/packet/pingresp_packet.rs @@ -22,6 +22,10 @@ impl<'a> PingrespPacket { } impl<'a> Packet<'a> for PingrespPacket { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); buff_writer.write_u8(self.fixed_header); diff --git a/src/packet/puback_packet.rs b/src/packet/puback_packet.rs index 42e8278..384212e 100644 --- a/src/packet/puback_packet.rs +++ b/src/packet/puback_packet.rs @@ -36,6 +36,10 @@ impl<'a, const MAX_PROPERTIES: usize> PubackPacket<'a, MAX_PROPERTIES> { } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); diff --git a/src/packet/pubcomp_packet.rs b/src/packet/pubcomp_packet.rs index 58811a9..2b497d3 100644 --- a/src/packet/pubcomp_packet.rs +++ b/src/packet/pubcomp_packet.rs @@ -36,6 +36,10 @@ impl<'a, const MAX_PROPERTIES: usize> PubcompPacket<'a, MAX_PROPERTIES> { } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubcompPacket<'a, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); diff --git a/src/packet/publish_packet.rs b/src/packet/publish_packet.rs index 3bbaaff..10b46ba 100644 --- a/src/packet/publish_packet.rs +++ b/src/packet/publish_packet.rs @@ -2,6 +2,7 @@ use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use heapless::Vec; use crate::packet::mqtt_packet::Packet; +use crate::packet::publish_packet::QualityOfService::{INVALID, QoS0, QoS1, QoS2}; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::EncodedString; use crate::utils::buffer_writer::BuffWriter; @@ -9,6 +10,35 @@ use crate::utils::buffer_writer::BuffWriter; use super::packet_type::PacketType; use super::property::Property; +pub enum QualityOfService { + QoS0, + QoS1, + QoS2, + INVALID +} + +impl From for QualityOfService { + fn from(orig: u8) -> Self { + return match orig { + 0 => QoS0, + 1 => QoS1, + 2 => QoS2, + _ => INVALID + } + } +} + +impl Into for QualityOfService { + fn into(self) -> u8 { + return match self { + QoS0 => 0, + QoS1 => 1, + QoS2 => 2, + INVALID => 3, + } + } +} + pub struct PublishPacket<'a, const MAX_PROPERTIES: usize> { // 7 - 4 mqtt control packet type, 3-0 flagy pub fixed_header: u8, @@ -27,7 +57,7 @@ pub struct PublishPacket<'a, const MAX_PROPERTIES: usize> { } impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> { - pub fn new(topic_name: & str, message: &'a str) -> Self { + pub fn new(topic_name: &'a str, message: &'a str) -> Self { let mut x = Self { fixed_header: PacketType::Publish.into(), remain_len: 0, @@ -41,7 +71,7 @@ impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> { return x; } - pub fn add_topic_name(&mut self, topic_name: & str) { + pub fn add_topic_name(&mut self, topic_name: &'a str) { self.topic_name.string = topic_name; self.topic_name.len = topic_name.len() as u16; } @@ -63,6 +93,10 @@ impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> { } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); diff --git a/src/packet/pubrec_packet.rs b/src/packet/pubrec_packet.rs index 996016e..c256189 100644 --- a/src/packet/pubrec_packet.rs +++ b/src/packet/pubrec_packet.rs @@ -36,6 +36,10 @@ impl<'a, const MAX_PROPERTIES: usize> PubrecPacket<'a, MAX_PROPERTIES> { } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrecPacket<'a, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); diff --git a/src/packet/pubrel_packet.rs b/src/packet/pubrel_packet.rs index 3ac8da3..e5c5035 100644 --- a/src/packet/pubrel_packet.rs +++ b/src/packet/pubrel_packet.rs @@ -36,6 +36,10 @@ impl<'a, const MAX_PROPERTIES: usize> PubrelPacket<'a, MAX_PROPERTIES> { } impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrelPacket<'a, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); diff --git a/src/packet/suback_packet.rs b/src/packet/suback_packet.rs index c90bc28..6b073ca 100644 --- a/src/packet/suback_packet.rs +++ b/src/packet/suback_packet.rs @@ -52,6 +52,10 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> for SubackPacket<'a, MAX_REASONS, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { log::error!("SUBACK packet does not support encoding!"); return 0; diff --git a/src/packet/subscription_packet.rs b/src/packet/subscription_packet.rs index 096f8ac..9170b2a 100644 --- a/src/packet/subscription_packet.rs +++ b/src/packet/subscription_packet.rs @@ -53,6 +53,10 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> for SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); diff --git a/src/packet/unsuback_packet.rs b/src/packet/unsuback_packet.rs index b1c857a..f0f8a1b 100644 --- a/src/packet/unsuback_packet.rs +++ b/src/packet/unsuback_packet.rs @@ -50,6 +50,10 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a> for UnsubackPacket<'a, MAX_REASONS, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { log::error!("UNSUBACK packet does not support encoding!"); return 0; diff --git a/src/packet/unsubscription_packet.rs b/src/packet/unsubscription_packet.rs index 1464eec..e29d901 100644 --- a/src/packet/unsubscription_packet.rs +++ b/src/packet/unsubscription_packet.rs @@ -32,14 +32,16 @@ pub struct UnsubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTI impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> { - /*pub fn new() -> Self { - }*/ } impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a> for UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES> { + fn new() -> Self { + todo!() + } + fn encode(&mut self, buffer: &mut [u8]) -> usize { let mut buff_writer = BuffWriter::new(buffer); diff --git a/src/tokio_network.rs b/src/tokio_network.rs new file mode 100644 index 0000000..1db4d07 --- /dev/null +++ b/src/tokio_network.rs @@ -0,0 +1,73 @@ +use alloc::format; +use alloc::string::String; +use core::borrow::BorrowMut; +use core::fmt::Error; +use core::future::Future; +use core::ptr::null; +use embassy::io::WriteAll; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use crate::network::network_trait::{Network, NetworkError}; +use crate::packet::mqtt_packet::Packet; + +pub struct TokioNetwork<'a> { + ip: [u8; 4], + port: u16, + socket: &'a mut TcpStream, +} + +impl<'a> TokioNetwork<'a> { + fn convert_ip(& mut self) -> String { + String::from(format!("{}.{}.{}.{}:{}", self.ip[0], self.ip[1], self.ip[2], self.ip[3], self.port)) + } +} + +impl Network for TokioNetwork { + type ConnectionFuture<'m> where Self: 'm = impl Future> + 'm; + type WriteFuture<'m> where Self: 'm = impl Future> + 'm; + type ReadFuture<'m> where Self: 'm = impl Future> + 'm; + + fn new(ip: [u8; 4], port: u16) -> Self { + return Self { + ip, + port, + socket: &mut (TcpStream), + } + } + + fn create_connection(&mut self) -> Self::ConnectionFuture<'m> { + async move { + TcpStream::connect(self.convert_ip()) + .await + .map_err(|_| NetworkError::Connection); + } + + } + + fn send<'m>(&mut self, buffer: &mut [u8], len: usize) -> Self::WriteFuture<'m> { + async move { + self.socket.write_all(&buffer[0..len]) + .await + .map_err(|_| NetworkError::Unknown); + } + + } + + fn receive<'m>(&mut self, buffer: &mut [u8]) -> Self::ReadFuture<'m> { + async move { + self.socket.read(buffer) + .await + .map_err(|_| NetworkError::Connection); + } + } + + /*fn send(&mut self, buffer: &mut [u8], len: usize) -> Result<(), NetworkError> { + self.socket.write_all(&buffer[0..len]); + Ok(()) + } + + fn receive(&mut self, buffer: &mut [u8]) -> Result { + let len = self.socket.read(buffer).await ?; + Ok(len) + }*/ +} \ No newline at end of file