Use traits from embedded-io (#19)

* Use embedded-io and embedded-nal-async

Rather than rolling it's own trait implementations, make use of the
traits from embedded-io for connection read/write and embedded-nal-async
for connection close.

Remove NetworkConnectionFactory abstraction. If needed, the TcpClient
abstraction from e-n-a can be used.

* Remove unneeded tokio feature
This commit is contained in:
Ulf Lilleengen 2022-06-20 09:55:33 +02:00 committed by GitHub
parent e47667156d
commit 03546536b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 133 additions and 254 deletions

View File

@ -15,10 +15,11 @@ heapless = "0.7.10"
rand_core = "0.6.0"
defmt = { version = "0.3", optional = true }
log = { version = "0.4.14", optional = true }
tokio = { version = "1", features = ["full"], optional = true, default-features = false }
embedded-io = { version = "0.3.0", features = ["async"]}
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
embedded-io = { version = "0.3.0", features = ["tokio"]}
tokio-test = { version = "0.4.2"}
env_logger = "0.9.0"
futures = { version = "0.3.21" }
@ -27,6 +28,6 @@ serial_test = "0.6.0"
[features]
default = ["std"]
std = ["tokio", "log"]
std = ["embedded-io/std", "log"]
no_std = ["defmt"]
tls = []
tls = []

View File

@ -22,13 +22,12 @@
* SOFTWARE.
*/
use embedded_io::asynch::{Read, Write};
use heapless::Vec;
use rand_core::RngCore;
use crate::client::client_config::{ClientConfig, MqttVersion};
use crate::encoding::variable_byte_integer::{
VariableByteInteger, VariableByteIntegerDecoder,
};
use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder};
use crate::network::NetworkConnection;
use crate::packet::v5::connack_packet::ConnackPacket;
use crate::packet::v5::connect_packet::ConnectPacket;
@ -37,8 +36,8 @@ use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::pingreq_packet::PingreqPacket;
use crate::packet::v5::pingresp_packet::PingrespPacket;
use crate::packet::v5::puback_packet::PubackPacket;
use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService};
use crate::packet::v5::publish_packet::QualityOfService::QoS1;
use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService};
use crate::packet::v5::reason_codes::ReasonCode;
use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError};
use crate::packet::v5::suback_packet::SubackPacket;
@ -49,8 +48,11 @@ use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::BufferError;
pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> {
connection: Option<T>,
pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore>
where
T: Read + Write,
{
connection: Option<NetworkConnection<T>>,
buffer: &'a mut [u8],
buffer_len: usize,
recv_buffer: &'a mut [u8],
@ -60,7 +62,7 @@ pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> {
impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R>
where
T: NetworkConnection,
T: Read + Write,
R: RngCore,
{
pub fn new(
@ -72,7 +74,7 @@ where
config: ClientConfig<'a, MAX_PROPERTIES, R>,
) -> Self {
Self {
connection: Some(network_driver),
connection: Some(NetworkConnection::new(network_driver)),
buffer,
buffer_len,
recv_buffer,
@ -162,7 +164,7 @@ where
let len = disconnect.encode(self.buffer, self.buffer_len);
if let Err(err) = len {
warn!("[DECODE ERR]: {}", err);
self.connection.take().unwrap().close().await?;
let _ = self.connection.take();
return Err(ReasonCode::BuffError);
}
@ -170,12 +172,8 @@ where
warn!("Could not send DISCONNECT packet");
}
if let Err(e) = self.connection.take().unwrap().close().await {
warn!("Could not close the TCP handle");
return Err(e);
} else {
trace!("Closed TCP handle");
}
// Drop connection
let _ = self.connection.take();
Ok(())
}
@ -541,11 +539,11 @@ where
}
#[cfg(not(feature = "tls"))]
async fn receive_packet<'c, T: NetworkConnection>(
async fn receive_packet<'c, T: Read + Write>(
buffer: &mut [u8],
buffer_len: usize,
recv_buffer: &mut [u8],
conn: &'c mut T,
conn: &'c mut NetworkConnection<T>,
) -> Result<usize, ReasonCode> {
let target_len: usize;
let mut rem_len: Result<VariableByteInteger, ()>;
@ -592,7 +590,8 @@ async fn receive_packet<'c, T: NetworkConnection>(
.receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)])
.await?;
i = i + len;
if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)])
if let Err(_e) =
writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)])
{
error!("Error occurred during write to buffer!");
return Err(BuffError);
@ -605,21 +604,19 @@ async fn receive_packet<'c, T: NetworkConnection>(
}
#[cfg(feature = "tls")]
async fn receive_packet<'c, T: NetworkConnection>(
async fn receive_packet<'c, T: Read + Write>(
buffer: &mut [u8],
buffer_len: usize,
recv_buffer: &mut [u8],
conn: &'c mut T,
conn: &'c mut NetworkConnection<T>,
) -> Result<usize, ReasonCode> {
trace!("Reading packet");
let mut writer = BuffWriter::new(buffer, buffer_len);
let len = conn
.receive(recv_buffer)
.await?;
let len = conn.receive(recv_buffer).await?;
if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + len)])
{
error!("Error occurred during write to buffer!");
return Err(BuffError);
}
Ok(len)
Ok(len)
}

View File

@ -28,10 +28,10 @@
#![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)]
pub(crate) mod fmt;
pub mod client;
pub mod encoding;
pub mod network;
pub mod packet;
pub mod tests;
pub mod tokio_net;
pub mod utils;

View File

@ -22,49 +22,41 @@
* SOFTWARE.
*/
use core::future::Future;
use crate::packet::v5::reason_codes::ReasonCode;
use embedded_io::asynch::{Read, Write};
#[derive(Debug)]
pub enum NetworkError {
Connection,
Unknown,
QoSAck,
IDNotMatchedOnAck,
NoMatchingSubs,
}
/// NetworkConnectionFactory implementation should create a TCP connection and return
/// the `Connection` trait implementation. Otherwise return `ReasonCode`.
pub trait NetworkConnectionFactory: Sized {
type Connection: NetworkConnection;
type ConnectionFuture<'m>: Future<Output = Result<Self::Connection, ReasonCode>>
where
Self: 'm;
/// Connect function estabilish TCP connection and return the `Connection`.
fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m>;
pub struct NetworkConnection<T>
where
T: Read + Write,
{
io: T,
}
/// Network connection represents estabilished TCP connection created with `NetworkConnectionFactory`.
pub trait NetworkConnection {
type SendFuture<'m>: Future<Output = Result<(), ReasonCode>>
where
Self: 'm;
/// Network connection represents an established TCP connection.
impl<T> NetworkConnection<T>
where
T: Read + Write,
{
/// Create a new network handle using the provided IO implementation.
pub fn new(io: T) -> Self {
Self { io }
}
type ReceiveFuture<'m>: Future<Output = Result<usize, ReasonCode>>
where
Self: 'm;
/// Send the data from `buffer` via TCP connection.
pub async fn send(&mut self, buffer: &[u8]) -> Result<(), ReasonCode> {
let _ = self
.io
.write(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)?;
Ok(())
}
type CloseFuture<'m>: Future<Output = Result<(), ReasonCode>>;
/// Send function should enable sending the data from `buffer` via TCP connection.
fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m>;
/// Receive should enable receiving data to the `buffer` from TCP connection.
fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReceiveFuture<'m>;
/// Close function should close the TCP connection.
fn close<'m>(self) -> Self::CloseFuture<'m>;
/// Receive data to the `buffer` from TCP connection.
pub async fn receive(&mut self, buffer: &mut [u8]) -> Result<usize, ReasonCode> {
self.io
.read(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
}
}

View File

@ -26,7 +26,7 @@ use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::publish_packet::QualityOfService::{INVALID, QoS0, QoS1, QoS2};
use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1, QoS2, INVALID};
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BufferError, EncodedString};

View File

@ -1,28 +0,0 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#![macro_use]
#![allow(dead_code)]
#[cfg(feature = "tokio")]
pub mod tokio_network;

View File

@ -1,115 +0,0 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
extern crate alloc;
use alloc::format;
use alloc::string::String;
use core::future::Future;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use crate::network::{NetworkConnection, NetworkConnectionFactory};
use crate::packet::v5::reason_codes::ReasonCode;
/// TokioNetwork is an implementation of the `NetworkConnection` trait. This implementation
/// allows communication through the `Tokio` TcpStream.
pub struct TokioNetwork {
stream: TcpStream,
}
impl TokioNetwork {
pub fn new(stream: TcpStream) -> Self {
Self { stream }
}
pub fn convert_ip(ip: [u8; 4], port: u16) -> String {
String::from(format!("{}.{}.{}.{}:{}", ip[0], ip[1], ip[2], ip[3], port))
}
}
impl NetworkConnection for TokioNetwork {
type SendFuture<'m>
= impl Future<Output = Result<(), ReasonCode>> + 'm where Self: 'm;
type ReceiveFuture<'m>
= impl Future<Output = Result<usize, ReasonCode>> + 'm where Self: 'm;
type CloseFuture<'m>
= impl Future<Output = Result<(), ReasonCode>> + 'm where Self: 'm;
fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m> {
async move {
self.stream
.write_all(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReceiveFuture<'m> {
async move {
self.stream
.read(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
fn close<'m>(mut self) -> Self::CloseFuture<'m> {
async move {
self.stream
.shutdown()
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
}
/// TokioNetworkFactory is an implementation of the `NetworkConnectionFactory` trait. This implementation
/// allows to establish the `Tokio` TcpStream connection.
pub struct TokioNetworkFactory {}
impl TokioNetworkFactory {
pub fn new() -> Self {
Self {}
}
}
impl NetworkConnectionFactory for TokioNetworkFactory {
type Connection = TokioNetwork;
type ConnectionFuture<'m>
= impl Future<Output = Result<TokioNetwork, ReasonCode>> + 'm where Self: 'm;
fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m> {
async move {
let stream = TcpStream::connect(TokioNetwork::convert_ip(ip, port))
.await
.map_err(|_| ReasonCode::NetworkError)?;
Ok(TokioNetwork::new(stream))
}
}
}

View File

@ -1,7 +1,7 @@
// This code is handed from Embedded Rust documentation and
// is accessible from https://docs.rust-embedded.org/cortex-m-rt/0.6.0/rand/trait.RngCore.html
use rand_core::{Error, impls, RngCore};
use rand_core::{impls, Error, RngCore};
pub struct CountingRng(pub u64);

View File

@ -29,23 +29,24 @@ use std::sync::Once;
use futures::future::{join, join3};
use heapless::Vec;
use log::{info};
use tokio::task;
use log::info;
use std::net::{Ipv4Addr, SocketAddr};
use tokio::time::sleep;
use tokio::{net::TcpStream, task};
use tokio_test::{assert_err, assert_ok};
use embedded_io::adapters::FromTokio;
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
use rust_mqtt::network::{NetworkConnectionFactory};
use rust_mqtt::packet::v5::property::Property;
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use rust_mqtt::packet::v5::reason_codes::ReasonCode::NotAuthorized;
use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
use rust_mqtt::utils::rng_generator::CountingRng;
pub type TokioNetwork = FromTokio<TcpStream>;
static IP: [u8; 4] = [127, 0, 0, 1];
static IP: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
static PORT: u16 = 1883;
static USERNAME: &str = "test";
static PASSWORD: &str = "testPass";
@ -95,13 +96,16 @@ async fn publish_core<'b>(
}
async fn publish(
ip: [u8; 4],
ip: Ipv4Addr,
wait: u64,
qos: QualityOfService,
topic: &str,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let addr = SocketAddr::new(ip.into(), PORT);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)?;
let connection = TokioNetwork::new(connection);
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
@ -111,7 +115,7 @@ async fn publish(
let mut write_buffer = [0; 80];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
connection,
&mut write_buffer,
80,
&mut recv_buffer,
@ -122,15 +126,18 @@ async fn publish(
}
async fn publish_spec(
ip: [u8; 4],
ip: Ipv4Addr,
wait: u64,
qos: QualityOfService,
topic: &str,
message: &str,
err: bool,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let addr = SocketAddr::new(ip.into(), PORT);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)?;
let connection = TokioNetwork::new(connection);
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
@ -140,7 +147,7 @@ async fn publish_spec(
let mut write_buffer = [0; 80];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
connection,
&mut write_buffer,
80,
&mut recv_buffer,
@ -223,8 +230,11 @@ async fn receive_multiple<const TOPICS: usize>(
qos: QualityOfService,
topic_names: &Vec<&str, TOPICS>,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let addr = SocketAddr::new(IP.into(), PORT);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)?;
let connection = TokioNetwork::new(connection);
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
@ -235,7 +245,7 @@ async fn receive_multiple<const TOPICS: usize>(
let mut write_buffer = [0; 100];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
connection,
&mut write_buffer,
100,
&mut recv_buffer,
@ -246,9 +256,12 @@ async fn receive_multiple<const TOPICS: usize>(
receive_core_multiple(&mut client, topic_names).await
}
async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
async fn receive(ip: Ipv4Addr, qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let addr = SocketAddr::new(ip.into(), PORT);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)?;
let connection = TokioNetwork::new(connection);
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
@ -259,7 +272,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(),
let mut write_buffer = [0; 100];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
connection,
&mut write_buffer,
100,
&mut recv_buffer,
@ -271,8 +284,11 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(),
}
async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let addr = SocketAddr::new(IP.into(), PORT);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)?;
let connection = TokioNetwork::new(connection);
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username("xyz");
@ -283,7 +299,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode
let mut write_buffer = [0; 100];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
connection,
&mut write_buffer,
100,
&mut recv_buffer,
@ -307,8 +323,11 @@ async fn receive_multiple_second_unsub<const TOPICS: usize>(
msg_t1: &str,
msg_t2: &str,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let addr = SocketAddr::new(IP.into(), PORT);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)?;
let connection = TokioNetwork::new(connection);
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
@ -319,7 +338,7 @@ async fn receive_multiple_second_unsub<const TOPICS: usize>(
let mut write_buffer = [0; 100];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
connection,
&mut write_buffer,
100,
&mut recv_buffer,
@ -493,13 +512,17 @@ async fn integration_sub_unsub() {
});
let publ = task::spawn(async move {
assert_ok!(publish_spec(IP, 5, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await);
assert_ok!(
publish_spec(IP, 5, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await
);
publish_spec(IP, 2, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await
});
let publ2 = task::spawn(async move {
assert_ok!(publish_spec(IP, 6, QualityOfService::QoS1, "unsub/topic2", msg_t2, false).await);
assert_ok!(
publish_spec(IP, 6, QualityOfService::QoS1, "unsub/topic2", msg_t2, false).await
);
publish_spec(IP, 3, QualityOfService::QoS1, "unsub/topic2", msg_t2, true).await
});

View File

@ -27,23 +27,26 @@ use alloc::string::String;
use core::time::Duration;
use std::sync::Once;
use futures::future::{join};
use log::{info};
use futures::future::join;
use log::info;
use serial_test::serial;
use std::net::{Ipv4Addr, SocketAddr};
use tokio::task;
use tokio::time::sleep;
use tokio_test::{assert_ok};
use tokio_test::assert_ok;
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
use rust_mqtt::network::{NetworkConnectionFactory};
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
use rust_mqtt::utils::rng_generator::CountingRng;
use tokio::net::TcpStream;
static IP: [u8; 4] = [127, 0, 0, 1];
use embedded_io::adapters::FromTokio;
pub type TokioNetwork = FromTokio<TcpStream>;
static IP: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
static PORT: u16 = 1883;
static USERNAME: &str = "test";
static PASSWORD: &str = "testPass";
@ -92,14 +95,17 @@ async fn publish_core<'b>(
}
async fn publish(
ip: [u8; 4],
ip: Ipv4Addr,
wait: u64,
qos: QualityOfService,
topic: &str,
amount: u16,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let addr = SocketAddr::new(ip.into(), PORT);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)?;
let connection = TokioNetwork::new(connection);
let mut config = ClientConfig::new(MQTTv5, CountingRng(50000));
config.add_qos(qos);
config.add_username(USERNAME);
@ -109,7 +115,7 @@ async fn publish(
let mut write_buffer = [0; 80];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
connection,
&mut write_buffer,
80,
&mut recv_buffer,
@ -154,13 +160,16 @@ async fn receive_core<'b>(
}
async fn receive(
ip: [u8; 4],
ip: Ipv4Addr,
qos: QualityOfService,
topic: &str,
amount: u16,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let addr = SocketAddr::new(ip.into(), PORT);
let connection = TcpStream::connect(addr)
.await
.map_err(|_| ReasonCode::NetworkError)?;
let connection = TokioNetwork::new(connection);
let mut config = ClientConfig::new(MQTTv5, CountingRng(50000));
config.add_qos(qos);
config.add_username(USERNAME);
@ -172,7 +181,7 @@ async fn receive(
let mut write_buffer = [0; 500];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
connection,
&mut write_buffer,
500,
&mut recv_buffer,