Rand and Unsubsribe (#14)

* Rand and Unsubsribe
This commit is contained in:
obabec 2022-04-28 13:39:47 +02:00 committed by GitHub
parent 740733e652
commit 2693c01e9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 596 additions and 410 deletions

View File

@ -1,4 +1,3 @@
allow_anonymous false
listener 1883 10.0.1.17
#password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt
password_file /Users/obabec/development/school/rust-mqtt/.ci/mqtt_pass.txt
listener 1883 0.0.0.0
password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt

View File

@ -1 +1 @@
test:$7$101$IY9q8LLi2gHZZRBi$dq+KePHnbDmjlxdZsqmYy6B/yYjHoK/qsCOQ/sXpkvdDoN3E0+8DkKl4XRe7mhI2YPv3Jopo1zcicobqIHbLEA==
test:testPass

View File

@ -1,7 +1,6 @@
[workspace]
members = [
#"examples/drogue",
"mqtt",
"mqtt"
]
resolver = "2"

View File

@ -22,45 +22,42 @@
* SOFTWARE.
*/
use crate::network::socket::Socket;
use crate::Address;
use core::future::Future;
use core::ops::Range;
use drogue_device::actors::net::ConnectionFactory;
use drogue_device::actors::socket::Socket;
use drogue_device::actors::tcp::TcpActor;
use drogue_device::traits::ip::{IpAddress, IpAddressV4, IpProtocol, SocketAddress};
use drogue_device::Address;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use drogue_device::traits::tcp;
use drogue_device::traits::tcp::TcpStack;
use rust_mqtt::network::network_trait::{NetworkConnection, NetworkConnectionFactory};
use crate::traits::tcp;
use crate::traits::tcp::TcpStack;
use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory};
pub struct DrogueNetwork<A>
where
A: TcpActor + 'static,
A: TcpStack + Clone + 'static,
{
socket: Socket<A>,
}
impl<A> DrogueNetwork<A>
where
A: TcpActor + 'static,
where
A: TcpStack + Clone + 'static,
{
fn new(socket: Socket<A>) -> Self {
pub fn new(socket: Socket<A>) -> Self {
Self { socket }
}
}
impl<A> NetworkConnection for DrogueNetwork<A>
where
A: TcpActor + 'static,
where
A: TcpStack + Clone + 'static,
{
type WriteFuture<'m>
type SendFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
type ReadFuture<'m>
type ReceiveFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<usize, ReasonCode>> + 'm;
@ -70,82 +67,37 @@ where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
fn send(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> {
fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m> {
async move {
self.socket
.write(&buffer[0..len])
.write(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
.map(|_| ())
}
}
fn receive(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> {
fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReceiveFuture<'m> {
async move {
self.socket
let r = self
.socket
.read(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
.map_err(|_| ReasonCode::NetworkError);
// Workaround for the fair access mutex, issue:
if let Ok(0) = r {
embassy::time::Timer::after(embassy::time::Duration::from_millis(10)).await;
}
r
}
}
fn close<'m>(mut self) -> Self::CloseFuture<'m> {
async move {
self.socket.close()
self.socket
.close()
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
}
pub struct DrogueConnectionFactory<A>
where
A: TcpActor + 'static,
{
network: Address<A>,
}
impl<A> DrogueConnectionFactory<A>
where
A: TcpActor + 'static,
{
pub fn new(network: Address<A>) -> Self {
Self { network }
}
}
impl<A> NetworkConnectionFactory for DrogueConnectionFactory<A>
where
A: TcpActor + 'static,
{
type Connection = DrogueNetwork<A>;
type ConnectionFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<Self::Connection, ReasonCode>> + 'm;
fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m> {
async move {
let mut socket = Socket::new(self.network.clone(), self.network.open().await.unwrap());
match socket
.connect(
IpProtocol::Tcp,
SocketAddress::new(IpAddress::new_v4(ip[0], ip[1], ip[2], ip[3]), port),
)
.await
{
Ok(_) => {
trace!("Connection established");
Ok(DrogueNetwork::new(socket))
}
Err(e) => {
warn!("Error creating connection:");
socket.close().await.map_err(|e| ReasonCode::NetworkError)?;
Err(ReasonCode::NetworkError)
}
}
}
}
}

View File

@ -22,7 +22,13 @@
* SOFTWARE.
*/
use heapless::Vec;
use rand_core::RngCore;
use crate::client::client_config::{ClientConfig, MqttVersion};
use crate::encoding::variable_byte_integer::{
VariableByteInteger, VariableByteIntegerDecoder,
};
use crate::network::NetworkConnection;
use crate::packet::v5::connack_packet::ConnackPacket;
use crate::packet::v5::connect_packet::ConnectPacket;
@ -31,36 +37,31 @@ use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::pingreq_packet::PingreqPacket;
use crate::packet::v5::pingresp_packet::PingrespPacket;
use crate::packet::v5::puback_packet::PubackPacket;
use crate::packet::v5::publish_packet::QualityOfService::QoS1;
use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService};
use crate::packet::v5::publish_packet::QualityOfService::QoS1;
use crate::packet::v5::reason_codes::ReasonCode;
use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError};
use crate::packet::v5::suback_packet::SubackPacket;
use crate::packet::v5::subscription_packet::SubscriptionPacket;
use crate::packet::v5::unsuback_packet::UnsubackPacket;
use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::rng_generator::CountingRng;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::BufferError;
use heapless::Vec;
use rand_core::RngCore;
use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder};
use crate::network::NetworkError::Connection;
use crate::packet::v5::property::Property;
use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError};
use crate::utils::buffer_writer::BuffWriter;
pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize> {
pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore> {
connection: Option<T>,
buffer: &'a mut [u8],
buffer_len: usize,
recv_buffer: &'a mut [u8],
recv_buffer_len: usize,
rng: CountingRng,
config: ClientConfig<'a, MAX_PROPERTIES>,
config: ClientConfig<'a, MAX_PROPERTIES, R>,
}
impl<'a, T, const MAX_PROPERTIES: usize> MqttClient<'a, T, MAX_PROPERTIES>
impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R>
where
T: NetworkConnection,
R: RngCore,
{
pub fn new(
network_driver: T,
@ -68,7 +69,7 @@ where
buffer_len: usize,
recv_buffer: &'a mut [u8],
recv_buffer_len: usize,
config: ClientConfig<'a, MAX_PROPERTIES>,
config: ClientConfig<'a, MAX_PROPERTIES, R>,
) -> Self {
Self {
connection: Some(network_driver),
@ -76,7 +77,6 @@ where
buffer_len,
recv_buffer,
recv_buffer_len,
rng: CountingRng(50),
config,
}
}
@ -103,7 +103,7 @@ where
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
let mut conn = self.connection.as_mut().unwrap();
let conn = self.connection.as_mut().unwrap();
trace!("Sending connect");
conn.send(&self.buffer[0..len.unwrap()]).await?;
@ -111,16 +111,14 @@ where
let reason: Result<u8, BufferError> = {
trace!("Waiting for connack");
let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let read =
{ receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let mut packet = ConnackPacket::<'b, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) {
if err == BufferError::PacketTypeMismatch {
let mut disc = DisconnectPacket::<'b, MAX_PROPERTIES>::new();
if disc
.decode(&mut BuffReader::new(self.buffer, read))
.is_ok()
{
if disc.decode(&mut BuffReader::new(self.buffer, read)).is_ok() {
error!("Client was disconnected with reason: ");
return Err(ReasonCode::from(disc.disconnect_reason));
}
@ -145,8 +143,8 @@ where
pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.connect_to_broker_v5().await}
MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion),
MqttVersion::MQTTv5 => self.connect_to_broker_v5().await,
}
}
@ -164,7 +162,7 @@ where
return Err(ReasonCode::BuffError);
}
if let Err(e) = conn.send(&self.buffer[0..len.unwrap()]).await {
if let Err(_e) = conn.send(&self.buffer[0..len.unwrap()]).await {
warn!("Could not send DISCONNECT packet");
}
@ -179,8 +177,8 @@ where
pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.disconnect_v5().await}
MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion),
MqttVersion::MQTTv5 => self.disconnect_v5().await,
}
}
@ -192,8 +190,9 @@ where
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let identifier: u16 = self.rng.next_u32() as u16;
let conn = self.connection.as_mut().unwrap();
let identifier: u16 = self.config.rng.next_u32() as u16;
//self.rng.next_u32() as u16;
let len = {
let mut packet = PublishPacket::<'b, MAX_PROPERTIES>::new();
packet.add_topic_name(topic_name);
@ -216,11 +215,11 @@ where
{
let reason: Result<[u16; 2], BufferError> = {
trace!("Waiting for ack");
let read = receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await?;
let read =
receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await?;
trace!("[PUBACK] Received packet with len");
let mut packet = PubackPacket::<'b, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read))
{
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) {
Err(err)
} else {
Ok([packet.packet_identifier, packet.reason_code as u16])
@ -250,8 +249,8 @@ where
message: &'b str,
) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.send_message_v5(topic_name, message).await}
MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion),
MqttVersion::MQTTv5 => self.send_message_v5(topic_name, message).await,
}
}
@ -262,7 +261,7 @@ where
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let conn = self.connection.as_mut().unwrap();
let len = {
let mut subs = SubscriptionPacket::<'b, TOPICS, MAX_PROPERTIES>::new();
let mut i = 0;
@ -284,7 +283,8 @@ where
conn.send(&self.buffer[0..len.unwrap()]).await?;
let reason: Result<Vec<u8, TOPICS>, BufferError> = {
let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let read =
{ receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let mut packet = SubackPacket::<'b, TOPICS, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) {
@ -304,7 +304,9 @@ where
if i == TOPICS {
break;
}
if *reasons.get(i).unwrap() != (<QualityOfService as Into<u8>>::into(self.config.qos) >> 1) {
if *reasons.get(i).unwrap()
!= (<QualityOfService as Into<u8>>::into(self.config.qos) >> 1)
{
return Err(ReasonCode::from(*reasons.get(i).unwrap()));
}
i = i + 1;
@ -317,11 +319,63 @@ where
topic_names: &'b Vec<&'b str, TOPICS>,
) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.subscribe_to_topics_v5(topic_names).await}
MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion),
MqttVersion::MQTTv5 => self.subscribe_to_topics_v5(topic_names).await,
}
}
pub async fn unsubscribe_from_topic<'b>(
&'b mut self,
topic_name: &'b str,
) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion),
MqttVersion::MQTTv5 => self.unsubscribe_from_topic_v5(topic_name).await,
}
}
pub async fn unsubscribe_from_topic_v5<'b>(
&'b mut self,
topic_name: &'b str,
) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let conn = self.connection.as_mut().unwrap();
let len = {
let mut unsub = UnsubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new();
unsub.packet_identifier = self.config.rng.next_u32() as u16;
unsub.add_new_filter(topic_name);
unsub.encode(self.buffer, self.buffer_len)
};
if let Err(err) = len {
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
conn.send(&self.buffer[0..len.unwrap()]).await?;
let reason: Result<u8, BufferError> = {
let read =
{ receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let mut packet = UnsubackPacket::<'b, 1, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) {
Err(err)
} else {
Ok(*packet.reason_codes.get(0).unwrap())
}
};
if let Err(err) = reason {
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
Ok(())
}
async fn subscribe_to_topic_v5<'b>(
&'b mut self,
topic_name: &'b str,
@ -329,7 +383,7 @@ where
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let conn = self.connection.as_mut().unwrap();
let len = {
let mut subs = SubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new();
subs.add_new_filter(topic_name, self.config.qos);
@ -344,9 +398,10 @@ where
conn.send(&self.buffer[0..len.unwrap()]).await?;
let reason: Result<u8, BufferError> = {
let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let read =
{ receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let mut packet = SubackPacket::<'b, 5, MAX_PROPERTIES>::new();
let mut packet = SubackPacket::<'b, 1, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, read)) {
Err(err)
} else {
@ -372,8 +427,8 @@ where
topic_name: &'b str,
) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.subscribe_to_topic_v5(topic_name).await}
MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion),
MqttVersion::MQTTv5 => self.subscribe_to_topic_v5(topic_name).await,
}
}
@ -381,20 +436,14 @@ where
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let conn = self.connection.as_mut().unwrap();
let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let mut packet = PublishPacket::<'b, 5>::new();
if let Err(err) = {
packet.decode(&mut BuffReader::new(self.buffer, read))
}
{
if let Err(err) = { packet.decode(&mut BuffReader::new(self.buffer, read)) } {
if err == BufferError::PacketTypeMismatch {
let mut disc = DisconnectPacket::<'b, 5>::new();
if disc.decode(&mut BuffReader::new(self.buffer, read))
.is_ok()
{
if disc.decode(&mut BuffReader::new(self.buffer, read)).is_ok() {
error!("Client was disconnected with reason: ");
return Err(ReasonCode::from(disc.disconnect_reason));
}
@ -424,8 +473,8 @@ where
pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.receive_message_v5().await}
MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion),
MqttVersion::MQTTv5 => self.receive_message_v5().await,
}
}
@ -433,7 +482,7 @@ where
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let conn = self.connection.as_mut().unwrap();
let len = {
let mut packet = PingreqPacket::new();
packet.encode(self.buffer, self.buffer_len)
@ -458,17 +507,20 @@ where
pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.send_ping_v5().await}
MqttVersion::MQTTv3 => Err(ReasonCode::UnsupportedProtocolVersion),
MqttVersion::MQTTv5 => self.send_ping_v5().await,
}
}
}
async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: usize, recv_buffer: & mut [u8], conn: &'c mut T) -> Result<usize, ReasonCode> {
let mut target_len = 0;
async fn receive_packet<'c, T: NetworkConnection>(
buffer: &mut [u8],
buffer_len: usize,
recv_buffer: &mut [u8],
conn: &'c mut T,
) -> Result<usize, ReasonCode> {
let target_len: usize;
let mut rem_len: Result<VariableByteInteger, ()>;
let mut rem_len_len: usize = 0;
let mut writer = BuffWriter::new(buffer, buffer_len);
let mut i = 0;
@ -476,14 +528,16 @@ async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len:
trace!("Reading lenght of packet");
loop {
trace!(" Reading in loop!");
let len: usize = conn.receive(&mut recv_buffer[writer.position..(writer.position+1)]).await?;
let len: usize = conn
.receive(&mut recv_buffer[writer.position..(writer.position + 1)])
.await?;
trace!(" Received data!");
i = i + len;
if let Err(e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) {
if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) {
error!("Error occurred during write to buffer!");
return Err(ReasonCode::BuffError);
}
if (i > 1) {
if i > 1 {
rem_len = writer.get_rem_len();
if rem_len.is_ok() {
break;
@ -495,7 +549,7 @@ async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len:
}
}
rem_len_len = i;
let rem_len_len = i;
i = 0;
if let Ok(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()) {
trace!("Reading packet with target len {}", l);
@ -506,9 +560,12 @@ async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len:
}
loop {
let len: usize = conn.receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]).await?;
let len: usize = conn
.receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)])
.await?;
i = i + len;
if let Err(e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) {
if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)])
{
error!("Error occurred during write to buffer!");
return Err(BuffError);
}

View File

@ -22,20 +22,21 @@
* SOFTWARE.
*/
use heapless::Vec;
use rand_core::RngCore;
use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService;
use crate::utils::types::{BinaryData, EncodedString};
use heapless::Vec;
#[derive(Clone, PartialEq)]
pub enum MqttVersion {
MQTTv3,
MQTTv5
MQTTv5,
}
#[derive(Clone)]
pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
pub struct ClientConfig<'a, const MAX_PROPERTIES: usize, T: RngCore> {
pub qos: QualityOfService,
pub keep_alive: u16,
pub username_flag: bool,
@ -45,10 +46,11 @@ pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
pub properties: Vec<Property<'a>, MAX_PROPERTIES>,
pub max_packet_size: u32,
pub mqtt_version: MqttVersion,
pub rng: T,
}
impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
pub fn new(version: MqttVersion) -> Self {
impl<'a, const MAX_PROPERTIES: usize, T: RngCore> ClientConfig<'a, MAX_PROPERTIES, T> {
pub fn new(version: MqttVersion, rng: T) -> Self {
Self {
qos: QualityOfService::QoS0,
keep_alive: 60,
@ -58,7 +60,8 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
password: BinaryData::new(),
properties: Vec::<Property<'a>, MAX_PROPERTIES>::new(),
max_packet_size: 265_000,
mqtt_version: version
mqtt_version: version,
rng,
}
}

View File

@ -22,6 +22,6 @@
* SOFTWARE.
*/
pub mod client;
#[allow(unused_must_use)]
pub mod client_config;
pub mod client;

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use crate::utils::types::BufferError;
/// VariableByteIntegerEncoder and VariableByteIntegerDecoder are implemented based on

View File

@ -27,7 +27,6 @@
#![allow(dead_code)]
#![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)]
pub(crate) mod fmt;
pub mod client;
pub mod encoding;
@ -36,4 +35,3 @@ pub mod packet;
pub mod tests;
pub mod tokio_net;
pub mod utils;

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;

View File

@ -27,7 +27,6 @@ use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use crate::packet::v5::mqtt_packet::Packet;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BinaryData, BufferError, EncodedString};

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -83,6 +82,10 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PR
error!("Packet you are trying to decode is not DISCONNECT packet!");
return Err(BufferError::WrongPacketToDecode);
}
if self.remain_len == 0 {
self.disconnect_reason = 0x00;
return Ok(());
}
self.disconnect_reason = buff_reader.read_u8()?;
return self.decode_properties(buff_reader);
}

View File

@ -22,10 +22,11 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::packet_type::PacketType;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::BufferError;
use heapless::Vec;
use super::property::Property;

View File

@ -77,11 +77,11 @@ impl Into<u8> for PacketType {
PacketType::Publish => 0x30,
PacketType::Puback => 0x40,
PacketType::Pubrec => 0x50,
PacketType::Pubrel => 0x60,
PacketType::Pubrel => 0x62,
PacketType::Pubcomp => 0x70,
PacketType::Subscribe => 0x82,
PacketType::Suback => 0x90,
PacketType::Unsubscribe => 0xA0,
PacketType::Unsubscribe => 0xA2,
PacketType::Unsuback => 0xB0,
PacketType::Pingreq => 0xC0,
PacketType::Pingresp => 0xD0,

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use crate::packet::v5::mqtt_packet::Packet;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use crate::packet::v5::mqtt_packet::Packet;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;

View File

@ -22,12 +22,11 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1, QoS2, INVALID};
use crate::packet::v5::publish_packet::QualityOfService::{INVALID, QoS0, QoS1, QoS2};
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BufferError, EncodedString};

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;

View File

@ -22,10 +22,9 @@
* SOFTWARE.
*/
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use crate::packet::v5::mqtt_packet::Packet;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::BufferError;

View File

@ -22,11 +22,8 @@
* SOFTWARE.
*/
use heapless::Vec;
use super::packet_type::PacketType;
use super::property::Property;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::publish_packet::QualityOfService;
@ -34,6 +31,9 @@ use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BufferError, TopicFilter};
use super::packet_type::PacketType;
use super::property::Property;
pub struct SubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> {
pub fixed_header: u8,
pub remain_len: u32,

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;

View File

@ -22,12 +22,11 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::publish_packet::QualityOfService;
use crate::packet::v5::packet_type::PacketType;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BufferError, TopicFilter};
@ -47,13 +46,12 @@ pub struct UnsubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTI
impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize>
UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES>
{
pub fn add_new_filter(&mut self, topic_name: &'a str, qos: QualityOfService) {
pub fn add_new_filter(&mut self, topic_name: &'a str) {
let len = topic_name.len();
let mut new_filter = TopicFilter::new();
new_filter.filter.string = topic_name;
new_filter.filter.len = len as u16;
new_filter.sub_options =
new_filter.sub_options | (<QualityOfService as Into<u8>>::into(qos) >> 1);
new_filter.sub_options = new_filter.sub_options | 0x01;
self.topic_filters.push(new_filter);
self.topic_filter_len = self.topic_filter_len + 1;
}
@ -64,7 +62,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a>
{
fn new() -> Self {
Self {
fixed_header: 0,
fixed_header: PacketType::Unsubscribe.into(),
remain_len: 0,
packet_identifier: 0,
property_len: 0,

View File

@ -25,4 +25,3 @@
#[cfg(test)]
#[allow(unused_must_use)]
pub mod unit;

View File

@ -22,14 +22,14 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::disconnect_packet::DisconnectPacket;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
use crate::utils::buffer_reader::BuffReader;
use heapless::Vec;
#[test]
fn test_encode() {
let mut buffer: [u8; 10] = [0; 10];

View File

@ -22,6 +22,8 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
@ -29,8 +31,6 @@ use crate::packet::v5::puback_packet::PubackPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString;
use heapless::Vec;
#[test]
fn test_encode() {
let mut buffer: [u8; 14] = [0; 14];

View File

@ -22,6 +22,8 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
@ -29,8 +31,6 @@ use crate::packet::v5::pubcomp_packet::PubcompPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString;
use heapless::Vec;
#[test]
fn test_encode() {
let mut buffer: [u8; 14] = [0; 14];

View File

@ -22,6 +22,8 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
@ -29,8 +31,6 @@ use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService};
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString;
use heapless::Vec;
#[test]
fn test_encode() {
let mut buffer: [u8; 29] = [0; 29];

View File

@ -22,6 +22,8 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
@ -29,8 +31,6 @@ use crate::packet::v5::pubrec_packet::PubrecPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test]
fn test_encode() {
let mut buffer: [u8; 20] = [0; 20];

View File

@ -22,6 +22,8 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
@ -29,8 +31,6 @@ use crate::packet::v5::pubrel_packet::PubrelPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test]
fn test_encode() {
let mut buffer: [u8; 21] = [0; 21];
@ -56,7 +56,7 @@ fn test_encode() {
assert_eq!(
buffer,
[
0x60, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00,
0x62, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00,
0x06, 0x68, 0x65, 0x68, 0x65, 0x38, 0x39
]
)
@ -65,7 +65,7 @@ fn test_encode() {
#[test]
fn test_decode() {
let buffer: [u8; 21] = [
0x60, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06,
0x62, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06,
0x68, 0x65, 0x68, 0x65, 0x38, 0x39,
];
let mut packet = PubrelPacket::<1>::new();

View File

@ -22,14 +22,14 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1};
use crate::packet::v5::subscription_packet::SubscriptionPacket;
use heapless::Vec;
#[test]
fn test_encode() {
let mut buffer: [u8; 30] = [0; 30];

View File

@ -22,15 +22,14 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1};
use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket;
use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test]
fn test_encode() {
let mut buffer: [u8; 40] = [0; 40];
@ -49,15 +48,15 @@ fn test_encode() {
let mut props = Vec::<Property, 1>::new();
props.push(Property::UserProperty(pair));
packet.property_len = packet.add_properties(&props);
packet.add_new_filter("test/topic", QoS0);
packet.add_new_filter("hehe/#", QoS1);
packet.add_new_filter("test/topic");
packet.add_new_filter("hehe/#");
let res = packet.encode(&mut buffer, 40);
assert!(res.is_ok());
assert_eq!(res.unwrap(), 40);
assert_eq!(
buffer,
[
0xA0, 0x26, 0x15, 0x38, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06,
0xA2, 0x26, 0x15, 0x38, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06,
0x68, 0x65, 0x68, 0x65, 0x38, 0x39, 0x00, 0x0A, 0x74, 0x65, 0x73, 0x74, 0x2F, 0x74,
0x6F, 0x70, 0x69, 0x63, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x2F, 0x23
]

View File

@ -22,14 +22,14 @@
* SOFTWARE.
*/
use heapless::Vec;
use tokio_test::{assert_err, assert_ok};
use crate::encoding::variable_byte_integer::VariableByteInteger;
use crate::packet::v5::property::Property;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair, TopicFilter};
use heapless::Vec;
use tokio_test::{assert_err, assert_ok};
use crate::encoding::variable_byte_integer::VariableByteInteger;
#[test]
fn buffer_write_ref() {
static BUFFER: [u8; 5] = [0x82, 0x82, 0x03, 0x85, 0x84];
@ -411,7 +411,6 @@ fn buffer_get_rem_len_two() {
assert_eq!(rm_len.unwrap(), REF);
}
#[test]
fn buffer_get_rem_len_three() {
static BUFFER: [u8; 5] = [0x82, 0x82, 0x83, 0x05, 0x84];
@ -489,11 +488,11 @@ fn buffer_get_rem_len_cont() {
let mut res_buffer: [u8; 6] = [0; 6];
let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 6);
let mut test_write = writer.insert_ref(2, &[0x82, 0x81]);
let test_write = writer.insert_ref(2, &[0x82, 0x81]);
let rm_len = writer.get_rem_len();
assert_ok!(test_write);
assert_err!(rm_len);
test_write = writer.insert_ref(2, &[0x82, 0x01]);
writer.insert_ref(2, &[0x82, 0x01]);
let rm_len_sec = writer.get_rem_len();
assert_ok!(rm_len_sec);
assert_eq!(rm_len_sec.unwrap(), [0x81, 0x82, 0x01, 0x00]);

View File

@ -22,10 +22,7 @@
* SOFTWARE.
*/
#![feature(in_band_lifetimes)]
#![macro_use]
#![allow(dead_code)]
#![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)]
#[cfg(feature = "tokio")]
pub mod tokio_network;

View File

@ -23,17 +23,16 @@
*/
extern crate alloc;
use alloc::format;
use alloc::string::String;
use core::future::Future;
use core::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use crate::network::{NetworkConnection, NetworkConnectionFactory};
use crate::packet::v5::reason_codes::ReasonCode;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
pub struct TokioNetwork {
stream: TcpStream,
@ -41,9 +40,7 @@ pub struct TokioNetwork {
impl TokioNetwork {
pub fn new(stream: TcpStream) -> Self {
Self {
stream,
}
Self { stream }
}
pub fn convert_ip(ip: [u8; 4], port: u16) -> String {
@ -53,24 +50,13 @@ impl TokioNetwork {
impl NetworkConnection for TokioNetwork {
type SendFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
= impl Future<Output = Result<(), ReasonCode>> + 'm where Self: 'm;
type ReceiveFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<usize, ReasonCode>> + 'm;
= impl Future<Output = Result<usize, ReasonCode>> + 'm where Self: 'm;
type CloseFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
/*type TimerFuture<'m>
where
Self: 'm,
= impl Future<Output = ()>;*/
= impl Future<Output = Result<(), ReasonCode>> + 'm where Self: 'm;
fn send<'m>(&'m mut self, buffer: &'m [u8]) -> Self::SendFuture<'m> {
async move {
@ -112,9 +98,7 @@ impl NetworkConnectionFactory for TokioNetworkFactory {
type Connection = TokioNetwork;
type ConnectionFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<TokioNetwork, ReasonCode>> + 'm;
= impl Future<Output = Result<TokioNetwork, ReasonCode>> + 'm where Self: 'm;
fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m> {
async move {

View File

@ -25,7 +25,6 @@
use core::mem;
use core::str;
use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder;
use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair};

View File

@ -22,7 +22,6 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerEncoder};
@ -48,15 +47,19 @@ impl<'a> BuffWriter<'a> {
self.position = self.position + increment;
}
pub fn get_n_byte(& mut self, n: usize) -> u8 {
pub fn get_n_byte(&mut self, n: usize) -> u8 {
if self.position >= n {
return self.buffer[n]
return self.buffer[n];
}
return 0
return 0;
}
pub fn get_rem_len(& mut self) -> Result<VariableByteInteger, ()> {
let mut max = if self.position >= 5 {4} else {self.position - 1};
pub fn get_rem_len(&mut self) -> Result<VariableByteInteger, ()> {
let max = if self.position >= 5 {
4
} else {
self.position - 1
};
let mut i = 1;
let mut len: VariableByteInteger = [0; 4];
loop {

View File

@ -1,7 +1,7 @@
// This code is handed from Embedded Rust documentation and
// is accessible from https://docs.rust-embedded.org/cortex-m-rt/0.6.0/rand/trait.RngCore.html
use rand_core::{impls, Error, RngCore};
use rand_core::{Error, impls, RngCore};
pub struct CountingRng(pub u64);

View File

@ -22,29 +22,30 @@
* SOFTWARE.
*/
extern crate alloc;
use alloc::string::String;
use core::time::Duration;
use std::future::Future;
use log::{info, LevelFilter};
use tokio::time::sleep;
use tokio::task;
use tokio_test::{assert_err, assert_ok};
use std::sync::Once;
use futures::future::{join, join3};
use heapless::Vec;
use rust_mqtt::client::client_config::ClientConfig;
use log::{info};
use tokio::task;
use tokio::time::sleep;
use tokio_test::{assert_err, assert_ok};
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory};
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
use rust_mqtt::network::{NetworkConnectionFactory};
use rust_mqtt::packet::v5::property::Property;
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use rust_mqtt::packet::v5::reason_codes::ReasonCode::NotAuthorized;
use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
use rust_mqtt::utils::types::BufferError;
use std::sync::Once;
use futures::future::{join, join3};
use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
use rust_mqtt::utils::rng_generator::CountingRng;
static IP: [u8; 4] = [127, 0, 0, 1];
static WRONG_IP: [u8; 4] = [192, 168, 1, 1];
static PORT: u16 = 1883;
static USERNAME: &str = "test";
static PASSWORD: &str = "testPass";
@ -59,35 +60,49 @@ fn setup() {
}
async fn publish_core<'b>(
client: &mut MqttClient<'b, TokioNetwork, 5>,
client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>,
wait: u64,
topic: &str,
message: &str,
err: bool,
) -> Result<(), ReasonCode> {
info!(
"[Publisher] Connection to broker with username {} and password {}",
USERNAME,
PASSWORD
USERNAME, PASSWORD
);
let mut result = { client.connect_to_broker().await };
assert_ok!(result);
info!("[Publisher] Waiting {} seconds before sending", wait);
sleep(Duration::from_secs(wait)).await;
info!("[Publisher] Sending new message {} to topic {}", MSG, topic);
result = { client.send_message(topic, MSG).await };
info!(
"[Publisher] Sending new message {} to topic {}",
message, topic
);
result = client.send_message(topic, message).await;
info!("[PUBLISHER] sent");
if err == true {
assert_err!(result);
} else {
assert_ok!(result);
}
info!("[Publisher] Disconnecting!");
result = { client.disconnect().await };
result = client.disconnect().await;
assert_ok!(result);
Ok(())
}
async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
async fn publish(
ip: [u8; 4],
wait: u64,
qos: QualityOfService,
topic: &str,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5);
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@ -95,7 +110,7 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) ->
let mut recv_buffer = [0; 80];
let mut write_buffer = [0; 80];
let mut client = MqttClient::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
&mut write_buffer,
80,
@ -103,88 +118,123 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) ->
80,
config,
);
publish_core(&mut client, wait, topic).await
publish_core(&mut client, wait, topic, MSG, false).await
}
async fn publish_spec(
ip: [u8; 4],
wait: u64,
qos: QualityOfService,
topic: &str,
message: &str,
err: bool,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 100;
let mut recv_buffer = [0; 80];
let mut write_buffer = [0; 80];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
&mut write_buffer,
80,
&mut recv_buffer,
80,
config,
);
publish_core(&mut client, wait, topic, message, err).await
}
async fn receive_core<'b>(
client: &mut MqttClient<'b, TokioNetwork, 5>,
client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>,
topic: &str,
) -> Result<(), ReasonCode> {
info!(
"[Receiver] Connection to broker with username {} and password {}",
USERNAME,
PASSWORD
USERNAME, PASSWORD
);
let mut result = { client.connect_to_broker().await };
let mut result = client.connect_to_broker().await;
assert_ok!(result);
info!("[Receiver] Subscribing to topic {}", topic);
result = { client.subscribe_to_topic(topic).await };
result = client.subscribe_to_topic(topic).await;
assert_ok!(result);
info!("[Receiver] Waiting for new message!");
let msg = { client.receive_message().await };
let msg = client.receive_message().await;
assert_ok!(msg);
let act_message = String::from_utf8_lossy(msg?);
info!("[Receiver] Got new message: {}", act_message);
assert_eq!(act_message, MSG);
info!("[Receiver] Disconnecting");
result = { client.disconnect().await };
result = client.disconnect().await;
assert_ok!(result);
Ok(())
}
async fn receive_core_multiple<'b, const TOPICS: usize>(
client: &mut MqttClient<'b, TokioNetwork, 5>,
client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>,
topic_names: &'b Vec<&'b str, TOPICS>,
) -> Result<(), ReasonCode> {
info!(
"[Receiver] Connection to broker with username {} and password {}",
USERNAME,
PASSWORD
USERNAME, PASSWORD
);
let mut result = { client.connect_to_broker().await };
let mut result = client.connect_to_broker().await;
assert_ok!(result);
info!("[Receiver] Subscribing to topics {}, {}", topic_names.get(0).unwrap(), topic_names.get(1).unwrap());
result = { client.subscribe_to_topics(topic_names).await };
info!(
"[Receiver] Subscribing to topics {}, {}",
topic_names.get(0).unwrap(),
topic_names.get(1).unwrap()
);
result = client.subscribe_to_topics(topic_names).await;
assert_ok!(result);
info!("[Receiver] Waiting for new message!");
{
let msg = { client.receive_message().await };
let msg = client.receive_message().await;
assert_ok!(msg);
let act_message = String::from_utf8_lossy(msg?);
info!("[Receiver] Got new message: {}", act_message);
assert_eq!(act_message, MSG);
}
{
let msg_sec = { client.receive_message().await };
let msg_sec = client.receive_message().await;
assert_ok!(msg_sec);
let act_message_second = String::from_utf8_lossy(msg_sec?);
info!("[Receiver] Got new message: {}", act_message_second);
assert_eq!(act_message_second, MSG);
}
info!("[Receiver] Disconnecting");
result = { client.disconnect().await };
result = client.disconnect().await;
assert_ok!(result);
Ok(())
}
async fn receive_multiple<const TOPICS: usize>(qos: QualityOfService, topic_names: & Vec<& str, TOPICS>,) -> Result<(), ReasonCode> {
async fn receive_multiple<const TOPICS: usize>(
qos: QualityOfService,
topic_names: &Vec<&str, TOPICS>,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new(MQTTv5);
let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 60;
config.properties.push(Property::ReceiveMaximum(20));
assert_ok!(config.properties.push(Property::ReceiveMaximum(20)));
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClient::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
&mut write_buffer,
100,
@ -198,17 +248,17 @@ async fn receive_multiple<const TOPICS: usize>(qos: QualityOfService, topic_name
async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5);
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 6000;
config.properties.push(Property::ReceiveMaximum(20));
assert_ok!(config.properties.push(Property::ReceiveMaximum(20)));
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClient::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
&mut write_buffer,
100,
@ -222,17 +272,17 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(),
async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new(MQTTv5);
let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username("xyz");
config.add_password(PASSWORD);
config.max_packet_size = 60;
config.properties.push(Property::ReceiveMaximum(20));
assert_ok!(config.properties.push(Property::ReceiveMaximum(20)));
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClient::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
&mut write_buffer,
100,
@ -243,17 +293,98 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode
info!(
"[Receiver] Connection to broker with username {} and password {}",
"xyz",
PASSWORD
"xyz", PASSWORD
);
let result = { client.connect_to_broker().await };
let result = client.connect_to_broker().await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), NotAuthorized);
Ok(())
}
async fn receive_multiple_second_unsub<const TOPICS: usize>(
qos: QualityOfService,
topic_names: &Vec<&str, TOPICS>,
msg_t1: &str,
msg_t2: &str,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new(MQTTv5, CountingRng(20000));
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 60;
assert_ok!(config.properties.push(Property::ReceiveMaximum(20)));
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
&mut write_buffer,
100,
&mut recv_buffer,
100,
config,
);
info!(
"[Receiver] Connection to broker with username {} and password {}",
USERNAME, PASSWORD
);
let mut result = { client.connect_to_broker().await };
assert_ok!(result);
info!(
"[Receiver] Subscribing to topics {}, {}",
topic_names.get(0).unwrap(),
topic_names.get(1).unwrap()
);
result = client.subscribe_to_topics(topic_names).await;
assert_ok!(result);
info!("[Receiver] Waiting for new message!");
{
let msg = { client.receive_message().await };
assert_ok!(msg);
let act_message = String::from_utf8_lossy(msg?);
info!("[Receiver] Got new message: {}", act_message);
assert_eq!(act_message, msg_t1);
}
{
let msg_sec = { client.receive_message().await };
assert_ok!(msg_sec);
let act_message_second = String::from_utf8_lossy(msg_sec?);
info!("[Receiver] Got new message: {}", act_message_second);
assert_eq!(act_message_second, msg_t2);
}
{
let res = client
.unsubscribe_from_topic(topic_names.get(1).unwrap())
.await;
assert_ok!(res);
}
{
let msg = { client.receive_message().await };
assert_ok!(msg);
let act_message = String::from_utf8_lossy(msg?);
info!("[Receiver] Got new message: {}", act_message);
assert_eq!(act_message, msg_t1);
}
let res =
tokio::time::timeout(std::time::Duration::from_secs(10), client.receive_message()).await;
assert_err!(res);
info!("[Receiver] Disconnecting");
result = client.disconnect().await;
assert_ok!(result);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn integration_simple_publish_recv() {
async fn integration_publish_recv() {
setup();
info!("Running simple tests test");
@ -261,7 +392,9 @@ async fn integration_simple_publish_recv() {
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/simple").await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/simple").await });
task::spawn(
async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/simple").await },
);
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -269,17 +402,17 @@ async fn integration_simple_publish_recv() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn integration_simple_publish_recv_multiple() {
async fn integration_publish_recv_multiple() {
setup();
info!("Running simple tests test");
let mut topic_names = Vec::<&str, 2>::new();
topic_names.push("test/topic1");
topic_names.push("test/topic2");
assert_ok!(topic_names.push("test/topic1"));
assert_ok!(topic_names.push("test/topic2"));
let recv =
task::spawn(async move { receive_multiple(QualityOfService::QoS0, &topic_names).await });
let publ =
task::spawn(async move { publish(IP, 5,QualityOfService::QoS0, "test/topic1").await });
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/topic1").await });
let publ2 =
task::spawn(async move { publish(IP, 10, QualityOfService::QoS0, "test/topic2").await });
@ -291,12 +424,12 @@ async fn integration_simple_publish_recv_multiple() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn integration_simple_publish_recv_multiple_qos() {
async fn integration_publish_recv_multiple_qos() {
setup();
info!("Running simple tests test");
let mut topic_names = Vec::<&str, 2>::new();
topic_names.push("test/topic3");
topic_names.push("test/topic4");
assert_ok!(topic_names.push("test/topic3"));
assert_ok!(topic_names.push("test/topic4"));
let recv =
task::spawn(async move { receive_multiple(QualityOfService::QoS1, &topic_names).await });
@ -306,38 +439,72 @@ async fn integration_simple_publish_recv_multiple_qos() {
let publ2 =
task::spawn(async move { publish(IP, 10, QualityOfService::QoS1, "test/topic4").await });
let ( r, p, p2) = join3(recv, publ, publ2).await;
let (r, p, p2) = join3(recv, publ, publ2).await;
assert_ok!(r.unwrap());
assert_ok!(p.unwrap());
assert_ok!(p2.unwrap());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn integration_simple_publish_recv_qos() {
async fn integration_publish_recv_qos() {
setup();
info!("Running simple tests test with Quality of Service 1");
info!("Running tests test with Quality of Service 1");
let recv = task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/qos").await });
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/qos").await });
let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/qos").await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/qos").await });
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
assert_ok!(p.unwrap());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn integration_simple_publish_recv_wrong_cred() {
async fn integration_publish_recv_wrong_cred() {
setup();
info!("Running simple tests test wrong credentials");
info!("Running tests test wrong credentials");
let recv = task::spawn(async move { receive_with_wrong_cred(QualityOfService::QoS1).await });
let recv_right =
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/wrong").await });
let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await });
let (r, rv, p) = join3(recv, recv_right, publ).await;
assert_ok!(r.unwrap());
assert_ok!(rv.unwrap());
assert_ok!(p.unwrap());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn integration_sub_unsub() {
setup();
info!("Running tests with sub and unsub");
let mut topic_names = Vec::<&str, 2>::new();
assert_ok!(topic_names.push("unsub/topic1"));
assert_ok!(topic_names.push("unsub/topic2"));
let msg_t1 = "First topic message";
let msg_t2 = "Second topic message";
let recv = task::spawn(async move {
receive_multiple_second_unsub(QualityOfService::QoS1, &topic_names, msg_t1, msg_t2).await
});
let publ = task::spawn(async move {
assert_ok!(publish_spec(IP, 5, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await);
publish_spec(IP, 2, QualityOfService::QoS1, "unsub/topic1", msg_t1, false).await
});
let publ2 = task::spawn(async move {
assert_ok!(publish_spec(IP, 6, QualityOfService::QoS1, "unsub/topic2", msg_t2, false).await);
publish_spec(IP, 3, QualityOfService::QoS1, "unsub/topic2", msg_t2, true).await
});
let (r, p1, p2) = join3(recv, publ, publ2).await;
assert_ok!(r.unwrap());
assert_ok!(p1.unwrap());
assert_ok!(p2.unwrap());
}

View File

@ -22,30 +22,28 @@
* SOFTWARE.
*/
extern crate alloc;
use alloc::string::String;
use core::time::Duration;
use std::future::Future;
use log::{info, LevelFilter};
use tokio::time::sleep;
use std::sync::Once;
use futures::future::{join};
use log::{info};
use serial_test::serial;
use tokio::task;
use tokio_test::{assert_err, assert_ok};
use heapless::Vec;
use rust_mqtt::client::client_config::ClientConfig;
use tokio::time::sleep;
use tokio_test::{assert_ok};
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory};
use rust_mqtt::packet::v5::property::Property;
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
use rust_mqtt::network::{NetworkConnectionFactory};
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use rust_mqtt::packet::v5::reason_codes::ReasonCode::NotAuthorized;
use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
use rust_mqtt::utils::types::BufferError;
use std::sync::Once;
use futures::future::{join, join3};
use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
use rust_mqtt::utils::rng_generator::CountingRng;
static IP: [u8; 4] = [127, 0, 0, 1];
static WRONG_IP: [u8; 4] = [192, 168, 1, 1];
static PORT: u16 = 1883;
static USERNAME: &str = "test";
static PASSWORD: &str = "testPass";
@ -60,17 +58,16 @@ fn setup() {
}
async fn publish_core<'b>(
client: &mut MqttClient<'b, TokioNetwork, 5>,
client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>,
wait: u64,
topic: &str,
amount: u16,
) -> Result<(), ReasonCode> {
info!(
"[Publisher] Connection to broker with username {} and password {}",
USERNAME,
PASSWORD
USERNAME, PASSWORD
);
let mut result = { client.connect_to_broker().await };
let mut result = client.connect_to_broker().await;
assert_ok!(result);
info!("[Publisher] Waiting {} seconds before sending", wait);
sleep(Duration::from_secs(wait)).await;
@ -78,7 +75,7 @@ async fn publish_core<'b>(
info!("[Publisher] Sending new message {} to topic {}", MSG, topic);
let mut count = 0;
loop {
result = { client.send_message(topic, MSG).await };
result = client.send_message(topic, MSG).await;
info!("[PUBLISHER] sent {}", count);
assert_ok!(result);
count = count + 1;
@ -88,18 +85,22 @@ async fn publish_core<'b>(
//sleep(Duration::from_millis(5)).await;
}
info!("[Publisher] Disconnecting!");
result = { client.disconnect().await };
result = client.disconnect().await;
assert_ok!(result);
Ok(())
}
async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str, amount: u16) -> Result<(), ReasonCode> {
async fn publish(
ip: [u8; 4],
wait: u64,
qos: QualityOfService,
topic: &str,
amount: u16,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5);
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5, CountingRng(50000));
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@ -107,7 +108,7 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str, amo
let mut recv_buffer = [0; 80];
let mut write_buffer = [0; 80];
let mut client = MqttClient::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
&mut write_buffer,
80,
@ -119,25 +120,24 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str, amo
}
async fn receive_core<'b>(
client: &mut MqttClient<'b, TokioNetwork, 5>,
client: &mut MqttClient<'b, TokioNetwork, 5, CountingRng>,
topic: &str,
amount: u16,
) -> Result<(), ReasonCode> {
info!(
"[Receiver] Connection to broker with username {} and password {}",
USERNAME,
PASSWORD
USERNAME, PASSWORD
);
let mut result = { client.connect_to_broker().await };
let mut result = client.connect_to_broker().await;
assert_ok!(result);
info!("[Receiver] Subscribing to topic {}", topic);
result = { client.subscribe_to_topic(topic).await };
result = client.subscribe_to_topic(topic).await;
assert_ok!(result);
info!("[Receiver] Waiting for new message!");
let mut count = 0;
loop {
let msg = { client.receive_message().await };
let msg = client.receive_message().await;
assert_ok!(msg);
let act_message = String::from_utf8_lossy(msg?);
info!("[Receiver] Got new {}. message: {}", count, act_message);
@ -148,15 +148,20 @@ async fn receive_core<'b>(
}
}
info!("[Receiver] Disconnecting");
result = { client.disconnect().await };
result = client.disconnect().await;
assert_ok!(result);
Ok(())
}
async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str, amount: u16) -> Result<(), ReasonCode> {
async fn receive(
ip: [u8; 4],
qos: QualityOfService,
topic: &str,
amount: u16,
) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5);
let tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new(MQTTv5, CountingRng(50000));
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@ -166,7 +171,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str, amount: u16) -
let mut recv_buffer = [0; 500];
let mut write_buffer = [0; 500];
let mut client = MqttClient::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5, CountingRng>::new(
tokio_network,
&mut write_buffer,
500,
@ -188,7 +193,9 @@ async fn load_test_ten() {
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/ten", 10).await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/ten", 10).await });
task::spawn(
async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/ten", 10).await },
);
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -201,10 +208,13 @@ async fn load_test_ten_qos() {
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/ten/qos", 10).await });
task::spawn(
async move { receive(IP, QualityOfService::QoS1, "test/recv/ten/qos", 10).await },
);
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/ten/qos", 10).await });
let publ = task::spawn(async move {
publish(IP, 5, QualityOfService::QoS1, "test/recv/ten/qos", 10).await
});
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -218,10 +228,14 @@ async fn load_test_fifty() {
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/fifty", 50).await });
task::spawn(
async move { receive(IP, QualityOfService::QoS0, "test/recv/fifty", 50).await },
);
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/fifty", 50).await });
task::spawn(
async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/fifty", 50).await },
);
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -235,10 +249,13 @@ async fn load_test_fifty_qos() {
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await });
task::spawn(
async move { receive(IP, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await },
);
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await });
let publ = task::spawn(async move {
publish(IP, 5, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await
});
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -252,10 +269,13 @@ async fn load_test_hundred() {
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/hundred", 100).await });
task::spawn(
async move { receive(IP, QualityOfService::QoS0, "test/recv/hundred", 100).await },
);
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/hundred", 100).await });
let publ = task::spawn(async move {
publish(IP, 5, QualityOfService::QoS0, "test/recv/hundred", 100).await
});
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -272,7 +292,9 @@ async fn load_test_hundred_qos() {
task::spawn(async move { receive(IP, QualityOfService::QoS1, "hundred/qos", 100).await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "hundred/qos", 100).await });
task::spawn(
async move { publish(IP, 5, QualityOfService::QoS1, "hundred/qos", 100).await },
);
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -289,7 +311,9 @@ async fn load_test_five_hundred() {
task::spawn(async move { receive(IP, QualityOfService::QoS0, "five/hundred", 500).await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "five/hundred", 500).await });
task::spawn(
async move { publish(IP, 5, QualityOfService::QoS0, "five/hundred", 500).await },
);
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -303,10 +327,13 @@ async fn load_test_five_hundred_qos() {
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS1, "five/hundred/qos", 500).await });
task::spawn(
async move { receive(IP, QualityOfService::QoS1, "five/hundred/qos", 500).await },
);
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "five/hundred/qos", 500).await });
let publ = task::spawn(async move {
publish(IP, 5, QualityOfService::QoS1, "five/hundred/qos", 500).await
});
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -340,7 +367,9 @@ async fn load_test_thousand_qos() {
task::spawn(async move { receive(IP, QualityOfService::QoS1, "thousand/qos", 1000).await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "thousand/qos", 1000).await });
task::spawn(
async move { publish(IP, 5, QualityOfService::QoS1, "thousand/qos", 1000).await },
);
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -355,10 +384,13 @@ async fn load_test_ten_thousand_qos() {
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS1, "ten/thousand/qos", 10000).await });
task::spawn(
async move { receive(IP, QualityOfService::QoS1, "ten/thousand/qos", 10000).await },
);
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "ten/thousand/qos", 10000).await });
let publ = task::spawn(async move {
publish(IP, 5, QualityOfService::QoS1, "ten/thousand/qos", 10000).await
});
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -372,10 +404,14 @@ async fn load_test_ten_thousand() {
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS0, "ten/thousand", 10000).await });
task::spawn(
async move { receive(IP, QualityOfService::QoS0, "ten/thousand", 10000).await },
);
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "ten/thousand", 10000).await });
task::spawn(
async move { publish(IP, 5, QualityOfService::QoS0, "ten/thousand", 10000).await },
);
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -389,11 +425,13 @@ async fn load_test_twenty_thousand_qos() {
setup();
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await });
let recv = task::spawn(async move {
receive(IP, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await
});
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await });
let publ = task::spawn(async move {
publish(IP, 5, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await
});
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
@ -407,10 +445,13 @@ async fn load_test_twenty_thousand() {
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS0, "twenty/thousand", 20000).await });
task::spawn(
async move { receive(IP, QualityOfService::QoS0, "twenty/thousand", 20000).await },
);
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "twenty/thousand", 20000).await });
let publ = task::spawn(async move {
publish(IP, 5, QualityOfService::QoS0, "twenty/thousand", 20000).await
});
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());