From 03546536b2b0c85512bf3758968d97632a05c57c Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 20 Jun 2022 09:55:33 +0200 Subject: [PATCH] Use traits from embedded-io (#19) * Use embedded-io and embedded-nal-async Rather than rolling it's own trait implementations, make use of the traits from embedded-io for connection read/write and embedded-nal-async for connection close. Remove NetworkConnectionFactory abstraction. If needed, the TcpClient abstraction from e-n-a can be used. * Remove unneeded tokio feature --- Cargo.toml | 7 +- src/client/client.rs | 45 ++++++------ src/lib.rs | 2 +- src/network/mod.rs | 70 +++++++++---------- src/packet/v5/publish_packet.rs | 2 +- src/tokio_net/mod.rs | 28 -------- src/tokio_net/tokio_network.rs | 115 ------------------------------- src/utils/rng_generator.rs | 2 +- tests/integration_test_single.rs | 79 +++++++++++++-------- tests/load_test.rs | 37 ++++++---- 10 files changed, 133 insertions(+), 254 deletions(-) delete mode 100644 src/tokio_net/mod.rs delete mode 100644 src/tokio_net/tokio_network.rs diff --git a/Cargo.toml b/Cargo.toml index 29b08f6..3062e5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,10 +15,11 @@ heapless = "0.7.10" rand_core = "0.6.0" defmt = { version = "0.3", optional = true } log = { version = "0.4.14", optional = true } -tokio = { version = "1", features = ["full"], optional = true, default-features = false } +embedded-io = { version = "0.3.0", features = ["async"]} [dev-dependencies] tokio = { version = "1", features = ["full"] } +embedded-io = { version = "0.3.0", features = ["tokio"]} tokio-test = { version = "0.4.2"} env_logger = "0.9.0" futures = { version = "0.3.21" } @@ -27,6 +28,6 @@ serial_test = "0.6.0" [features] default = ["std"] -std = ["tokio", "log"] +std = ["embedded-io/std", "log"] no_std = ["defmt"] -tls = [] \ No newline at end of file +tls = [] diff --git a/src/client/client.rs b/src/client/client.rs index b88b418..d85be29 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -22,13 +22,12 @@ * SOFTWARE. */ +use embedded_io::asynch::{Read, Write}; use heapless::Vec; use rand_core::RngCore; use crate::client::client_config::{ClientConfig, MqttVersion}; -use crate::encoding::variable_byte_integer::{ - VariableByteInteger, VariableByteIntegerDecoder, -}; +use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder}; use crate::network::NetworkConnection; use crate::packet::v5::connack_packet::ConnackPacket; use crate::packet::v5::connect_packet::ConnectPacket; @@ -37,8 +36,8 @@ use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::pingreq_packet::PingreqPacket; use crate::packet::v5::pingresp_packet::PingrespPacket; use crate::packet::v5::puback_packet::PubackPacket; -use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService}; use crate::packet::v5::publish_packet::QualityOfService::QoS1; +use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService}; use crate::packet::v5::reason_codes::ReasonCode; use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError}; use crate::packet::v5::suback_packet::SubackPacket; @@ -49,8 +48,11 @@ use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::BufferError; -pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> { - connection: Option, +pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> +where + T: Read + Write, +{ + connection: Option>, buffer: &'a mut [u8], buffer_len: usize, recv_buffer: &'a mut [u8], @@ -60,7 +62,7 @@ pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> { impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R> where - T: NetworkConnection, + T: Read + Write, R: RngCore, { pub fn new( @@ -72,7 +74,7 @@ where config: ClientConfig<'a, MAX_PROPERTIES, R>, ) -> Self { Self { - connection: Some(network_driver), + connection: Some(NetworkConnection::new(network_driver)), buffer, buffer_len, recv_buffer, @@ -162,7 +164,7 @@ where let len = disconnect.encode(self.buffer, self.buffer_len); if let Err(err) = len { warn!("[DECODE ERR]: {}", err); - self.connection.take().unwrap().close().await?; + let _ = self.connection.take(); return Err(ReasonCode::BuffError); } @@ -170,12 +172,8 @@ where warn!("Could not send DISCONNECT packet"); } - if let Err(e) = self.connection.take().unwrap().close().await { - warn!("Could not close the TCP handle"); - return Err(e); - } else { - trace!("Closed TCP handle"); - } + // Drop connection + let _ = self.connection.take(); Ok(()) } @@ -541,11 +539,11 @@ where } #[cfg(not(feature = "tls"))] -async fn receive_packet<'c, T: NetworkConnection>( +async fn receive_packet<'c, T: Read + Write>( buffer: &mut [u8], buffer_len: usize, recv_buffer: &mut [u8], - conn: &'c mut T, + conn: &'c mut NetworkConnection, ) -> Result { let target_len: usize; let mut rem_len: Result; @@ -592,7 +590,8 @@ async fn receive_packet<'c, T: NetworkConnection>( .receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]) .await?; i = i + len; - if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) + if let Err(_e) = + writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) { error!("Error occurred during write to buffer!"); return Err(BuffError); @@ -605,21 +604,19 @@ async fn receive_packet<'c, T: NetworkConnection>( } #[cfg(feature = "tls")] -async fn receive_packet<'c, T: NetworkConnection>( +async fn receive_packet<'c, T: Read + Write>( buffer: &mut [u8], buffer_len: usize, recv_buffer: &mut [u8], - conn: &'c mut T, + conn: &'c mut NetworkConnection, ) -> Result { trace!("Reading packet"); let mut writer = BuffWriter::new(buffer, buffer_len); - let len = conn - .receive(recv_buffer) - .await?; + let len = conn.receive(recv_buffer).await?; if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + len)]) { error!("Error occurred during write to buffer!"); return Err(BuffError); } - Ok(len) + Ok(len) } diff --git a/src/lib.rs b/src/lib.rs index 4f4ff37..0cf6008 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,10 +28,10 @@ #![feature(type_alias_impl_trait)] #![feature(generic_associated_types)] pub(crate) mod fmt; + pub mod client; pub mod encoding; pub mod network; pub mod packet; pub mod tests; -pub mod tokio_net; pub mod utils; diff --git a/src/network/mod.rs b/src/network/mod.rs index e7f82d8..fafa87d 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -22,49 +22,41 @@ * SOFTWARE. */ -use core::future::Future; - use crate::packet::v5::reason_codes::ReasonCode; +use embedded_io::asynch::{Read, Write}; -#[derive(Debug)] -pub enum NetworkError { - Connection, - Unknown, - QoSAck, - IDNotMatchedOnAck, - NoMatchingSubs, -} -/// NetworkConnectionFactory implementation should create a TCP connection and return -/// the `Connection` trait implementation. Otherwise return `ReasonCode`. -pub trait NetworkConnectionFactory: Sized { - type Connection: NetworkConnection; - - type ConnectionFuture<'m>: Future> - where - Self: 'm; - - /// Connect function estabilish TCP connection and return the `Connection`. - fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m>; +pub struct NetworkConnection +where + T: Read + Write, +{ + io: T, } -/// Network connection represents estabilished TCP connection created with `NetworkConnectionFactory`. -pub trait NetworkConnection { - type SendFuture<'m>: Future> - where - Self: 'm; +/// Network connection represents an established TCP connection. +impl NetworkConnection +where + T: Read + Write, +{ + /// Create a new network handle using the provided IO implementation. + pub fn new(io: T) -> Self { + Self { io } + } - type ReceiveFuture<'m>: Future> - where - Self: 'm; + /// Send the data from `buffer` via TCP connection. + pub async fn send(&mut self, buffer: &[u8]) -> Result<(), ReasonCode> { + let _ = self + .io + .write(buffer) + .await + .map_err(|_| ReasonCode::NetworkError)?; + Ok(()) + } - type CloseFuture<'m>: Future>; - - /// Send function should enable sending the data from `buffer` via TCP connection. - fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m>; - - /// Receive should enable receiving data to the `buffer` from TCP connection. - fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReceiveFuture<'m>; - - /// Close function should close the TCP connection. - fn close<'m>(self) -> Self::CloseFuture<'m>; + /// Receive data to the `buffer` from TCP connection. + pub async fn receive(&mut self, buffer: &mut [u8]) -> Result { + self.io + .read(buffer) + .await + .map_err(|_| ReasonCode::NetworkError) + } } diff --git a/src/packet/v5/publish_packet.rs b/src/packet/v5/publish_packet.rs index 88a48b2..7dc503d 100644 --- a/src/packet/v5/publish_packet.rs +++ b/src/packet/v5/publish_packet.rs @@ -26,7 +26,7 @@ use heapless::Vec; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::packet::v5::mqtt_packet::Packet; -use crate::packet::v5::publish_packet::QualityOfService::{INVALID, QoS0, QoS1, QoS2}; +use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1, QoS2, INVALID}; use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_writer::BuffWriter; use crate::utils::types::{BufferError, EncodedString}; diff --git a/src/tokio_net/mod.rs b/src/tokio_net/mod.rs deleted file mode 100644 index 980c06e..0000000 --- a/src/tokio_net/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -/* - * MIT License - * - * Copyright (c) [2022] [Ondrej Babec ] - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -#![macro_use] -#![allow(dead_code)] -#[cfg(feature = "tokio")] -pub mod tokio_network; diff --git a/src/tokio_net/tokio_network.rs b/src/tokio_net/tokio_network.rs deleted file mode 100644 index be83fa6..0000000 --- a/src/tokio_net/tokio_network.rs +++ /dev/null @@ -1,115 +0,0 @@ -/* - * MIT License - * - * Copyright (c) [2022] [Ondrej Babec ] - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -extern crate alloc; - -use alloc::format; -use alloc::string::String; -use core::future::Future; - -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; - -use crate::network::{NetworkConnection, NetworkConnectionFactory}; -use crate::packet::v5::reason_codes::ReasonCode; - -/// TokioNetwork is an implementation of the `NetworkConnection` trait. This implementation -/// allows communication through the `Tokio` TcpStream. -pub struct TokioNetwork { - stream: TcpStream, -} - -impl TokioNetwork { - pub fn new(stream: TcpStream) -> Self { - Self { stream } - } - - pub fn convert_ip(ip: [u8; 4], port: u16) -> String { - String::from(format!("{}.{}.{}.{}:{}", ip[0], ip[1], ip[2], ip[3], port)) - } -} - -impl NetworkConnection for TokioNetwork { - type SendFuture<'m> - = impl Future> + 'm where Self: 'm; - - type ReceiveFuture<'m> - = impl Future> + 'm where Self: 'm; - - type CloseFuture<'m> - = impl Future> + 'm where Self: 'm; - - fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m> { - async move { - self.stream - .write_all(buffer) - .await - .map_err(|_| ReasonCode::NetworkError) - } - } - - fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReceiveFuture<'m> { - async move { - self.stream - .read(buffer) - .await - .map_err(|_| ReasonCode::NetworkError) - } - } - - fn close<'m>(mut self) -> Self::CloseFuture<'m> { - async move { - self.stream - .shutdown() - .await - .map_err(|_| ReasonCode::NetworkError) - } - } -} - -/// TokioNetworkFactory is an implementation of the `NetworkConnectionFactory` trait. This implementation -/// allows to establish the `Tokio` TcpStream connection. -pub struct TokioNetworkFactory {} - -impl TokioNetworkFactory { - pub fn new() -> Self { - Self {} - } -} - -impl NetworkConnectionFactory for TokioNetworkFactory { - type Connection = TokioNetwork; - - type ConnectionFuture<'m> - = impl Future> + 'm where Self: 'm; - - fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m> { - async move { - let stream = TcpStream::connect(TokioNetwork::convert_ip(ip, port)) - .await - .map_err(|_| ReasonCode::NetworkError)?; - Ok(TokioNetwork::new(stream)) - } - } -} diff --git a/src/utils/rng_generator.rs b/src/utils/rng_generator.rs index f4e91ea..f703d2e 100644 --- a/src/utils/rng_generator.rs +++ b/src/utils/rng_generator.rs @@ -1,7 +1,7 @@ // This code is handed from Embedded Rust documentation and // is accessible from https://docs.rust-embedded.org/cortex-m-rt/0.6.0/rand/trait.RngCore.html -use rand_core::{Error, impls, RngCore}; +use rand_core::{impls, Error, RngCore}; pub struct CountingRng(pub u64); diff --git a/tests/integration_test_single.rs b/tests/integration_test_single.rs index 6953b66..3303f54 100644 --- a/tests/integration_test_single.rs +++ b/tests/integration_test_single.rs @@ -29,23 +29,24 @@ use std::sync::Once; use futures::future::{join, join3}; use heapless::Vec; -use log::{info}; -use tokio::task; +use log::info; +use std::net::{Ipv4Addr, SocketAddr}; use tokio::time::sleep; +use tokio::{net::TcpStream, task}; use tokio_test::{assert_err, assert_ok}; +use embedded_io::adapters::FromTokio; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::ClientConfig; use rust_mqtt::client::client_config::MqttVersion::MQTTv5; -use rust_mqtt::network::{NetworkConnectionFactory}; use rust_mqtt::packet::v5::property::Property; use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::reason_codes::ReasonCode; use rust_mqtt::packet::v5::reason_codes::ReasonCode::NotAuthorized; -use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory}; use rust_mqtt::utils::rng_generator::CountingRng; +pub type TokioNetwork = FromTokio; -static IP: [u8; 4] = [127, 0, 0, 1]; +static IP: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); static PORT: u16 = 1883; static USERNAME: &str = "test"; static PASSWORD: &str = "testPass"; @@ -95,13 +96,16 @@ async fn publish_core<'b>( } async fn publish( - ip: [u8; 4], + ip: Ipv4Addr, wait: u64, qos: QualityOfService, topic: &str, ) -> Result<(), ReasonCode> { - let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let addr = SocketAddr::new(ip.into(), PORT); + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError)?; + let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username(USERNAME); @@ -111,7 +115,7 @@ async fn publish( let mut write_buffer = [0; 80]; let mut client = MqttClient::::new( - tokio_network, + connection, &mut write_buffer, 80, &mut recv_buffer, @@ -122,15 +126,18 @@ async fn publish( } async fn publish_spec( - ip: [u8; 4], + ip: Ipv4Addr, wait: u64, qos: QualityOfService, topic: &str, message: &str, err: bool, ) -> Result<(), ReasonCode> { - let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let addr = SocketAddr::new(ip.into(), PORT); + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError)?; + let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username(USERNAME); @@ -140,7 +147,7 @@ async fn publish_spec( let mut write_buffer = [0; 80]; let mut client = MqttClient::::new( - tokio_network, + connection, &mut write_buffer, 80, &mut recv_buffer, @@ -223,8 +230,11 @@ async fn receive_multiple( qos: QualityOfService, topic_names: &Vec<&str, TOPICS>, ) -> Result<(), ReasonCode> { - let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; + let addr = SocketAddr::new(IP.into(), PORT); + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError)?; + let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username(USERNAME); @@ -235,7 +245,7 @@ async fn receive_multiple( let mut write_buffer = [0; 100]; let mut client = MqttClient::::new( - tokio_network, + connection, &mut write_buffer, 100, &mut recv_buffer, @@ -246,9 +256,12 @@ async fn receive_multiple( receive_core_multiple(&mut client, topic_names).await } -async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { - let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; +async fn receive(ip: Ipv4Addr, qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { + let addr = SocketAddr::new(ip.into(), PORT); + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError)?; + let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username(USERNAME); @@ -259,7 +272,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), let mut write_buffer = [0; 100]; let mut client = MqttClient::::new( - tokio_network, + connection, &mut write_buffer, 100, &mut recv_buffer, @@ -271,8 +284,11 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), } async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode> { - let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; + let addr = SocketAddr::new(IP.into(), PORT); + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError)?; + let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username("xyz"); @@ -283,7 +299,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode let mut write_buffer = [0; 100]; let mut client = MqttClient::::new( - tokio_network, + connection, &mut write_buffer, 100, &mut recv_buffer, @@ -307,8 +323,11 @@ async fn receive_multiple_second_unsub( msg_t1: &str, msg_t2: &str, ) -> Result<(), ReasonCode> { - let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; + let addr = SocketAddr::new(IP.into(), PORT); + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError)?; + let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(20000)); config.add_qos(qos); config.add_username(USERNAME); @@ -319,7 +338,7 @@ async fn receive_multiple_second_unsub( let mut write_buffer = [0; 100]; let mut client = MqttClient::::new( - tokio_network, + connection, &mut write_buffer, 100, &mut recv_buffer, @@ -493,13 +512,17 @@ async fn integration_sub_unsub() { }); let publ = task::spawn(async move { - assert_ok!(publish_spec(IP, 5, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await); + assert_ok!( + publish_spec(IP, 5, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await + ); publish_spec(IP, 2, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await }); let publ2 = task::spawn(async move { - assert_ok!(publish_spec(IP, 6, QualityOfService::QoS1, "unsub/topic2", msg_t2, false).await); + assert_ok!( + publish_spec(IP, 6, QualityOfService::QoS1, "unsub/topic2", msg_t2, false).await + ); publish_spec(IP, 3, QualityOfService::QoS1, "unsub/topic2", msg_t2, true).await }); diff --git a/tests/load_test.rs b/tests/load_test.rs index 87135c0..ebf6253 100644 --- a/tests/load_test.rs +++ b/tests/load_test.rs @@ -27,23 +27,26 @@ use alloc::string::String; use core::time::Duration; use std::sync::Once; -use futures::future::{join}; -use log::{info}; +use futures::future::join; +use log::info; use serial_test::serial; +use std::net::{Ipv4Addr, SocketAddr}; use tokio::task; use tokio::time::sleep; -use tokio_test::{assert_ok}; +use tokio_test::assert_ok; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::ClientConfig; use rust_mqtt::client::client_config::MqttVersion::MQTTv5; -use rust_mqtt::network::{NetworkConnectionFactory}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::reason_codes::ReasonCode; -use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory}; use rust_mqtt::utils::rng_generator::CountingRng; +use tokio::net::TcpStream; -static IP: [u8; 4] = [127, 0, 0, 1]; +use embedded_io::adapters::FromTokio; +pub type TokioNetwork = FromTokio; + +static IP: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); static PORT: u16 = 1883; static USERNAME: &str = "test"; static PASSWORD: &str = "testPass"; @@ -92,14 +95,17 @@ async fn publish_core<'b>( } async fn publish( - ip: [u8; 4], + ip: Ipv4Addr, wait: u64, qos: QualityOfService, topic: &str, amount: u16, ) -> Result<(), ReasonCode> { - let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let addr = SocketAddr::new(ip.into(), PORT); + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError)?; + let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(50000)); config.add_qos(qos); config.add_username(USERNAME); @@ -109,7 +115,7 @@ async fn publish( let mut write_buffer = [0; 80]; let mut client = MqttClient::::new( - tokio_network, + connection, &mut write_buffer, 80, &mut recv_buffer, @@ -154,13 +160,16 @@ async fn receive_core<'b>( } async fn receive( - ip: [u8; 4], + ip: Ipv4Addr, qos: QualityOfService, topic: &str, amount: u16, ) -> Result<(), ReasonCode> { - let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let addr = SocketAddr::new(ip.into(), PORT); + let connection = TcpStream::connect(addr) + .await + .map_err(|_| ReasonCode::NetworkError)?; + let connection = TokioNetwork::new(connection); let mut config = ClientConfig::new(MQTTv5, CountingRng(50000)); config.add_qos(qos); config.add_username(USERNAME); @@ -172,7 +181,7 @@ async fn receive( let mut write_buffer = [0; 500]; let mut client = MqttClient::::new( - tokio_network, + connection, &mut write_buffer, 500, &mut recv_buffer,