Rust MQTT

This commit is contained in:
Ondrej Babec 2022-03-12 15:56:37 +01:00
parent c8ee05821a
commit d993457add
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
77 changed files with 1010 additions and 1661 deletions

2
.ci/mosquitto.conf Normal file
View File

@ -0,0 +1,2 @@
allow_anonymous false
password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt

1
.ci/mqtt_pass.txt Normal file
View File

@ -0,0 +1 @@
test:$7$101$XGspXBoC6refncib$u5t0Adz5h8Xn9XfYtKfa5kWrPNMGd+H7u2vbl0S8qmr/HCREZjjEyqU88QybSV0SsgmyFrXMIkCozEmnPeTm+g==

View File

@ -0,0 +1,25 @@
on: [pull_request]
name: IntegrationTests
jobs:
unit_tests:
name: Integration tests
runs-on: ubuntu-latest
steps:
- name: Git checkout
uses: actions/checkout@v2
- name: Build
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- run: cargo build
- name: Start Mosquitto
run: |
sudo apt-get install mosquitto
mosquitto -c .ci/mosquitto.conf -d
- name: Run integration tests
run: cargo test integration

14
.github/workflows/unit_tests.yaml vendored Normal file
View File

@ -0,0 +1,14 @@
on: [pull_request]
name: UnitTests
jobs:
unit_tests:
name: Unit tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- run: cargo test unit

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
/target /target
.idea .idea
.vscode .vscode
Cargo.lock

1056
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,21 +1,12 @@
[package] [workspace]
name = "rust-mqtt" members = [
version = "0.0.1" "examples/drogue",
authors = ["Ondrej Babec <ond.babec@gmail.com>"] "mqtt",
edition = "2021" ]
resolver = "2" resolver = "2"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
embassy = { version = "0.1.0", default-features = false, features = ["std"] }
drogue-device = { version = "0.1.0", default-features = false, features = ["log", "std"] }
env_logger = "0.9.0"
log = "0.4.14"
heapless = "0.7.10"
tokio = { version = "1", features = ["full"] }
rand_core = "0.6.0"
[patch.crates-io] [patch.crates-io]
embassy = { git = "https://github.com/embassy-rs/embassy.git", rev = "d76cd5ceaf5140c48ef97180beae156c0c0e07c8" } embassy = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" }
drogue-device = { git = "https://github.com/drogue-iot/drogue-device.git", rev = "ce915ad027880992789a73bfc53094e8b155c66b" } embassy-traits = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" }
embassy-net = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" }
drogue-device = { git = "https://github.com/drogue-iot/drogue-device.git", rev = "62ff20e278a6a705056173171714b1bbdc078df5" }

View File

@ -1,20 +1,37 @@
# Rust-mqtt # Rust-mqtt
## About
Rust-mqtt is native MQTT client for both std and no_std environments. Rust-mqtt is native MQTT client for both std and no_std environments.
Client library provides async API which can be used with various executors. Client library provides async API which can be used with various executors.
Currently, supporting only MQTTv5 but everything is prepared to extend support also
for MQTTv3 which is planned during year 2022.
## Async executors ## Async executors
For desktop usage I reccomend using Tokio async executor and for embedded For desktop usage I recommend using Tokio async executor and for embedded there is prepared wrapper for Drogue device
you should use Drogue-device framework in crate [examples](examples/drogue) crate.
## Restrains
Client supports following:
- QoS 0 & QoS 1 (All QoS 2 packets are mapped for future client extension)
- Only clean session
- Retain not supported
- Auth packet not supported
- Packet size is not limited, it is totally up to user (packet size and buffer sizes have to align)
## Building ## Building
``` ```
cargo build cargo build
``` ```
## Running (on std) ## Running tests
Integration tests are written using tokio network tcp stack and can be find under tokio_net.
``` ```
cargo run cargo test unit
cargo test integration
``` ```
## Acknowledgment
This project could not be in state in which currently is without Ulf Lilleengen and rest of the community
from [Drogue IoT](https://github.com/drogue-iot).
## Contact ## Contact
For any information contact me on email <ond.babec@gmail.com> For any information contact me on email <ond.babec@gmail.com>

View File

@ -0,0 +1,20 @@
[package]
name = "embassy-network"
version = "0.0.1"
authors = ["Ondrej Babec <ond.babec@gmail.com>"]
edition = "2021"
resolver = "2"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
env_logger = "0.9.0"
rust-mqtt = { path = "../../mqtt"}
embassy = { version = "0.1.0", features = ["std"] }
log = "0.4.14"
embedded-hal = { version = "0.2", features = ["unproven"] }
embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.6", git = "https://github.com/embassy-rs/embedded-hal", branch = "embassy" }
embedded-hal-async = { version = "0.0.1", git = "https://github.com/embassy-rs/embedded-hal", branch = "embassy"}
drogue-device = { version = "0.1.0", features = ["std"] }

View File

@ -0,0 +1,155 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publishistribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIMAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
use core::future::Future;
use core::ops::Range;
use drogue_device::actors::net::ConnectionFactory;
use drogue_device::actors::socket::Socket;
use drogue_device::actors::tcp::TcpActor;
use drogue_device::traits::ip::{IpAddress, IpAddressV4, IpProtocol, SocketAddress};
use drogue_device::Address;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use drogue_device::traits::tcp;
use drogue_device::traits::tcp::TcpStack;
use embassy::io::{AsyncBufReadExt, AsyncWriteExt};
use embassy::time::Delay;
use embedded_hal_async::delay::DelayUs;
use rust_mqtt::network::network_trait::{NetworkConnection, NetworkConnectionFactory};
pub struct DrogueNetwork<A>
where
A: TcpActor + 'static,
{
socket: Socket<A>,
}
impl<A> DrogueNetwork<A>
where
A: TcpActor + 'static,
{
fn new(socket: Socket<A>) -> Self {
Self { socket }
}
}
impl<A> NetworkConnection for DrogueNetwork<A>
where
A: TcpActor + 'static,
{
type WriteFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
type ReadFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<usize, ReasonCode>> + 'm;
type CloseFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
fn send(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> {
async move {
self.socket
.write(&buffer[0..len])
.await
.map_err(|_| ReasonCode::NetworkError)
.map(|_| ())
}
}
fn receive(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> {
async move {
self.socket
.read(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
fn close(mut self) -> Self::CloseFuture<'m> {
async move {
self.socket
.close()
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
}
pub struct DrogueConnectionFactory<A>
where
A: TcpActor + 'static,
{
network: Address<A>,
}
impl<A> DrogueConnectionFactory<A>
where
A: TcpActor + 'static,
{
pub fn new(network: Address<A>) -> Self {
Self { network }
}
}
impl<A> NetworkConnectionFactory for DrogueConnectionFactory<A>
where
A: TcpActor + 'static,
{
type Connection = DrogueNetwork<A>;
type ConnectionFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<Self::Connection, ReasonCode>> + 'm;
fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m> {
async move {
let mut socket = Socket::new(self.network.clone(), self.network.open().await.unwrap());
match socket
.connect(
IpProtocol::Tcp,
SocketAddress::new(IpAddress::new_v4(ip[0], ip[1], ip[2], ip[3]), port),
)
.await
{
Ok(_) => {
log::trace!("Connection established");
Ok(DrogueNetwork::new(socket))
}
Err(e) => {
log::warn!("Error creating connection: {:?}", e);
socket.close().await.map_err(|e| ReasonCode::NetworkError)?;
Err(ReasonCode::NetworkError)
}
}
}
}
}

View File

@ -0,0 +1,31 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#![no_std]
#![feature(in_band_lifetimes)]
#![macro_use]
#![allow(dead_code)]
#![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)]
pub mod drogue_network;

26
mqtt/Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
name = "rust-mqtt"
version = "0.0.1"
authors = ["Ondrej Babec <ond.babec@gmail.com>"]
edition = "2021"
resolver = "2"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
env_logger = "0.9.0"
log = "0.4.14"
heapless = "0.7.10"
rand_core = "0.6.0"
tokio = { version = "1", features = ["full"], optional = true }
tokio-test = { version = "0.4.2", optional = true}
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tokio-test = "0.4.2"
[features]
default = ["tokio", "std", "tokio-test"]
std = []
no_std = []

View File

@ -22,10 +22,10 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService; use crate::packet::v5::publish_packet::QualityOfService;
use crate::utils::types::{BinaryData, EncodedString}; use crate::utils::types::{BinaryData, EncodedString};
use heapless::Vec;
pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> { pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
pub qos: QualityOfService, pub qos: QualityOfService,
@ -41,7 +41,7 @@ pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> { impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
qos: QualityOfService::QoS0, qos: QualityOfService::QoS0,
keep_alive: 60, keep_alive: 60,
client_id: EncodedString::new(), client_id: EncodedString::new(),
@ -80,7 +80,7 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
} }
} }
pub fn add_max_packet_size_as_prop(& mut self) -> u32 { pub fn add_max_packet_size_as_prop(&mut self) -> u32 {
if self.properties.len() < MAX_PROPERTIES { if self.properties.len() < MAX_PROPERTIES {
let prop = Property::MaximumPacketSize(self.max_packet_size); let prop = Property::MaximumPacketSize(self.max_packet_size);
self.properties.push(prop); self.properties.push(prop);

View File

@ -1,5 +1,29 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
use crate::client::client_config::ClientConfig; use crate::client::client_config::ClientConfig;
use crate::network::network_trait::Network; use crate::network::network_trait::NetworkConnection;
use crate::packet::v5::connack_packet::ConnackPacket; use crate::packet::v5::connack_packet::ConnackPacket;
use crate::packet::v5::connect_packet::ConnectPacket; use crate::packet::v5::connect_packet::ConnectPacket;
use crate::packet::v5::disconnect_packet::DisconnectPacket; use crate::packet::v5::disconnect_packet::DisconnectPacket;
@ -17,8 +41,6 @@ use crate::utils::rng_generator::CountingRng;
use crate::utils::types::BufferError; use crate::utils::types::BufferError;
use heapless::Vec; use heapless::Vec;
use rand_core::RngCore; use rand_core::RngCore;
use crate::packet::v5::property::Property;
use crate::packet::v5::reason_codes::ReasonCode::BuffError;
pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> { pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
network_driver: &'a mut T, network_driver: &'a mut T,
@ -32,7 +54,7 @@ pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5<'a, T, MAX_PROPERTIES> impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5<'a, T, MAX_PROPERTIES>
where where
T: Network, T: NetworkConnection,
{ {
pub fn new( pub fn new(
network_driver: &'a mut T, network_driver: &'a mut T,
@ -82,7 +104,10 @@ where
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) { if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) {
if err == BufferError::PacketTypeMismatch { if err == BufferError::PacketTypeMismatch {
let mut disc = DisconnectPacket::<'b, 5>::new(); let mut disc = DisconnectPacket::<'b, 5>::new();
if disc.decode(&mut BuffReader::new(self.buffer, self.buffer_len)).is_ok() { if disc
.decode(&mut BuffReader::new(self.buffer, self.buffer_len))
.is_ok()
{
log::error!("Client was disconnected with reason: "); log::error!("Client was disconnected with reason: ");
return Err(ReasonCode::from(disc.disconnect_reason)); return Err(ReasonCode::from(disc.disconnect_reason));
} }
@ -272,7 +297,10 @@ where
{ {
if err == BufferError::PacketTypeMismatch { if err == BufferError::PacketTypeMismatch {
let mut disc = DisconnectPacket::<'b, 5>::new(); let mut disc = DisconnectPacket::<'b, 5>::new();
if disc.decode(&mut BuffReader::new(self.buffer, self.buffer_len)).is_ok() { if disc
.decode(&mut BuffReader::new(self.buffer, self.buffer_len))
.is_ok()
{
log::error!("Client was disconnected with reason: "); log::error!("Client was disconnected with reason: ");
return Err(ReasonCode::from(disc.disconnect_reason)); return Err(ReasonCode::from(disc.disconnect_reason));
} }

View File

@ -22,5 +22,6 @@
* SOFTWARE. * SOFTWARE.
*/ */
#[allow(unused_must_use)]
pub mod client_config; pub mod client_config;
pub mod client_v5; pub mod client_v5;

View File

@ -29,12 +29,10 @@
#![feature(type_alias_impl_trait)] #![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)] #![feature(generic_associated_types)]
extern crate alloc;
pub mod client; pub mod client;
pub mod encoding; pub mod encoding;
pub mod network; pub mod network;
pub mod packet; pub mod packet;
pub mod tokio_network;
pub mod utils;
pub mod tests; pub mod tests;
pub mod tokio_net;
pub mod utils;

View File

@ -0,0 +1,66 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
use core::future::Future;
use crate::packet::v5::reason_codes::ReasonCode;
#[derive(Debug)]
pub enum NetworkError {
Connection,
Unknown,
QoSAck,
IDNotMatchedOnAck,
NoMatchingSubs,
}
pub trait NetworkConnectionFactory: Sized {
type Connection: NetworkConnection;
type ConnectionFuture<'m>: Future<Output = Result<Self::Connection, ReasonCode>>
where
Self: 'm;
fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m>;
}
pub trait NetworkConnection {
type WriteFuture<'m>: Future<Output = Result<(), ReasonCode>>
where
Self: 'm;
type ReadFuture<'m>: Future<Output = Result<usize, ReasonCode>>
where
Self: 'm;
type CloseFuture<'m>: Future<Output = Result<(), ReasonCode>>
where
Self: 'm;
fn send(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m>;
fn receive(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m>;
fn close(self) -> Self::CloseFuture<'m>;
}

View File

@ -22,4 +22,5 @@
* SOFTWARE. * SOFTWARE.
*/ */
pub mod variable_byte_integer_unit; #[allow(unused_must_use)]
pub mod v5;

View File

@ -42,8 +42,7 @@ pub struct ConnackPacket<'a, const MAX_PROPERTIES: usize> {
pub properties: Vec<Property<'a>, MAX_PROPERTIES>, pub properties: Vec<Property<'a>, MAX_PROPERTIES>,
} }
impl<'a, const MAX_PROPERTIES: usize> ConnackPacket<'a, MAX_PROPERTIES> { impl<'a, const MAX_PROPERTIES: usize> ConnackPacket<'a, MAX_PROPERTIES> {}
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPERTIES> { impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPERTIES> {
fn new() -> Self { fn new() -> Self {

View File

@ -99,7 +99,7 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize>
self.connect_flags = self.connect_flags | 0x40; self.connect_flags = self.connect_flags | 0x40;
} }
pub fn add_client_id(& mut self, id: &EncodedString<'a>) { pub fn add_client_id(&mut self, id: &EncodedString<'a>) {
self.client_id = (*id).clone(); self.client_id = (*id).clone();
} }
} }

View File

@ -22,10 +22,10 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::BufferError; use crate::utils::types::BufferError;
use heapless::Vec;
use super::property::Property; use super::property::Property;
@ -45,9 +45,12 @@ pub trait Packet<'a> {
/// Method enables pushing new property into packet properties /// Method enables pushing new property into packet properties
fn push_to_properties(&mut self, property: Property<'a>); fn push_to_properties(&mut self, property: Property<'a>);
/// Returns if property is allowed for packet /// Returns if property is allowed for packet
fn property_allowed(& mut self, property: & Property<'a>) -> bool; fn property_allowed(&mut self, property: &Property<'a>) -> bool;
/// Method enables adding properties from client config - each packet decides if property can be used with that or not /// Method enables adding properties from client config - each packet decides if property can be used with that or not
fn add_properties<const MAX_PROPERTIES: usize>(& mut self, properties: &Vec<Property<'a>, MAX_PROPERTIES>) -> u32 { fn add_properties<const MAX_PROPERTIES: usize>(
&mut self,
properties: &Vec<Property<'a>, MAX_PROPERTIES>,
) -> u32 {
let mut i = 0; let mut i = 0;
let max = properties.len(); let max = properties.len();
let mut res: u32 = 0; let mut res: u32 = 0;

View File

@ -27,8 +27,7 @@ use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter; use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair}; use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair};
#[derive(Debug)] #[derive(Debug, Clone)]
#[derive(Clone)]
pub enum Property<'a> { pub enum Property<'a> {
PayloadFormat(u8), PayloadFormat(u8),
MessageExpiryInterval(u32), MessageExpiryInterval(u32),
@ -214,7 +213,10 @@ impl<'a> Property<'a> {
Property::ContentType(u) => u.len(), Property::ContentType(u) => u.len(),
Property::ResponseTopic(u) => u.len(), Property::ResponseTopic(u) => u.len(),
Property::CorrelationData(u) => u.len(), Property::CorrelationData(u) => u.len(),
Property::SubscriptionIdentifier(u) => VariableByteIntegerEncoder::len(VariableByteIntegerEncoder::encode(*u).unwrap()) as u16, Property::SubscriptionIdentifier(u) => {
VariableByteIntegerEncoder::len(VariableByteIntegerEncoder::encode(*u).unwrap())
as u16
}
Property::SessionExpiryInterval(_u) => 4, Property::SessionExpiryInterval(_u) => 4,
Property::AssignedClientIdentifier(u) => u.len(), Property::AssignedClientIdentifier(u) => u.len(),
Property::ServerKeepAlive(_u) => 2, Property::ServerKeepAlive(_u) => 2,

View File

@ -1,7 +1,30 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
use core::fmt::{Display, Formatter}; use core::fmt::{Display, Formatter};
#[derive(Debug)] #[derive(Debug, PartialEq)]
#[derive(PartialEq)]
pub enum ReasonCode { pub enum ReasonCode {
Success, Success,
GrantedQoS1, GrantedQoS1,
@ -46,6 +69,7 @@ pub enum ReasonCode {
MaximumConnectTime, MaximumConnectTime,
SubscriptionIdentifiersNotSupported, SubscriptionIdentifiersNotSupported,
WildcardSubscriptionNotSupported, WildcardSubscriptionNotSupported,
TimerNotSupported,
BuffError, BuffError,
NetworkError, NetworkError,
} }
@ -96,6 +120,7 @@ impl Into<u8> for ReasonCode {
ReasonCode::MaximumConnectTime => 0xA0, ReasonCode::MaximumConnectTime => 0xA0,
ReasonCode::SubscriptionIdentifiersNotSupported => 0xA1, ReasonCode::SubscriptionIdentifiersNotSupported => 0xA1,
ReasonCode::WildcardSubscriptionNotSupported => 0xA2, ReasonCode::WildcardSubscriptionNotSupported => 0xA2,
ReasonCode::TimerNotSupported => 0xFD,
ReasonCode::BuffError => 0xFE, ReasonCode::BuffError => 0xFE,
ReasonCode::NetworkError => 0xFF, ReasonCode::NetworkError => 0xFF,
}; };
@ -147,6 +172,7 @@ impl From<u8> for ReasonCode {
0xA0 => ReasonCode::MaximumConnectTime, 0xA0 => ReasonCode::MaximumConnectTime,
0xA1 => ReasonCode::SubscriptionIdentifiersNotSupported, 0xA1 => ReasonCode::SubscriptionIdentifiersNotSupported,
0xA2 => ReasonCode::WildcardSubscriptionNotSupported, 0xA2 => ReasonCode::WildcardSubscriptionNotSupported,
0xFD => ReasonCode::TimerNotSupported,
0xFE => ReasonCode::BuffError, 0xFE => ReasonCode::BuffError,
_ => ReasonCode::NetworkError, _ => ReasonCode::NetworkError,
}; };
@ -207,6 +233,7 @@ impl Display for ReasonCode {
ReasonCode::WildcardSubscriptionNotSupported => { ReasonCode::WildcardSubscriptionNotSupported => {
write!(f, "Wildcard subscription not supported!") write!(f, "Wildcard subscription not supported!")
} }
ReasonCode::TimerNotSupported => write!(f, "Timer implementation is not provided"),
ReasonCode::BuffError => write!(f, "Error encountered during write / read from packet"), ReasonCode::BuffError => write!(f, "Error encountered during write / read from packet"),
ReasonCode::NetworkError => write!(f, "Unknown error!"), ReasonCode::NetworkError => write!(f, "Unknown error!"),
} }

View File

@ -22,8 +22,8 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder; use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
@ -48,10 +48,12 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize>
&mut self, &mut self,
buff_reader: &mut BuffReader<'a>, buff_reader: &mut BuffReader<'a>,
) -> Result<(), BufferError> { ) -> Result<(), BufferError> {
let rm_ln_ln = VariableByteIntegerEncoder::len(VariableByteIntegerEncoder::encode(self.remain_len).unwrap()); let rm_ln_ln = VariableByteIntegerEncoder::len(
VariableByteIntegerEncoder::encode(self.remain_len).unwrap(),
);
let max = self.remain_len as usize + rm_ln_ln + 1; let max = self.remain_len as usize + rm_ln_ln + 1;
if buff_reader.position >= max { if buff_reader.position >= max {
return Ok(()) return Ok(());
} }
loop { loop {
self.reason_codes.push(buff_reader.read_u8()?); self.reason_codes.push(buff_reader.read_u8()?);

View File

@ -21,25 +21,28 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE. * SOFTWARE.
*/ */
extern crate alloc;
use alloc::string::String; use alloc::string::String;
use core::time::Duration; use core::time::Duration;
use heapless::Vec; use std::future::Future;
use tokio::{join, task};
use tokio::time::sleep; use tokio::time::sleep;
use tokio::{join, task};
use tokio_test::assert_ok;
use crate::client::client_config::ClientConfig; use crate::client::client_config::ClientConfig;
use crate::client::client_v5::MqttClientV5; use crate::client::client_v5::MqttClientV5;
use crate::network::network_trait::Network; use crate::network::network_trait::{NetworkConnection, NetworkConnectionFactory};
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService; use crate::packet::v5::publish_packet::QualityOfService;
use crate::packet::v5::reason_codes::ReasonCode;
use crate::packet::v5::reason_codes::ReasonCode::NotAuthorized; use crate::packet::v5::reason_codes::ReasonCode::NotAuthorized;
use crate::tokio_network::TokioNetwork; use crate::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
use crate::utils::types::BufferError;
static IP: [u8; 4] = [127, 0, 0, 1]; static IP: [u8; 4] = [127, 0, 0, 1];
static PORT: u16 = 1883; static PORT: u16 = 1883;
static USERNAME: &str = "test"; static USERNAME: &str = "test";
static PASSWORD: &str = "testPass"; static PASSWORD: &str = "testPass";
static TOPIC: &str = "test/topic";
static MSG: &str = "testMessage"; static MSG: &str = "testMessage";
fn init() { fn init() {
@ -49,32 +52,33 @@ fn init() {
.try_init(); .try_init();
} }
async fn publish_core<'b>(client: & mut MqttClientV5<'b, TokioNetwork, 5>) { async fn publish_core<'b>(
log::info!("[Publisher] Connection to broker with username {} and password {}", USERNAME, PASSWORD); client: &mut MqttClientV5<'b, TokioNetwork, 5>,
let mut result = { topic: &str,
client.connect_to_broker().await ) -> Result<(), ReasonCode> {
}; log::info!(
assert!(result.is_ok()); "[Publisher] Connection to broker with username {} and password {}",
USERNAME,
PASSWORD
);
let mut result = { client.connect_to_broker().await };
assert_ok!(result);
log::info!("[Publisher] Waiting {} seconds before sending", 5); log::info!("[Publisher] Waiting {} seconds before sending", 5);
sleep(Duration::from_secs(5)).await; sleep(Duration::from_secs(5)).await;
log::info!("[Publisher] Sending new message {}", MSG); log::info!("[Publisher] Sending new message {} to topic {}", MSG, topic);
result = { result = { client.send_message(topic, MSG).await };
client.send_message(TOPIC, MSG).await assert_ok!(result);
};
assert!(result.is_ok());
log::info!("[Publisher] Disconnecting!"); log::info!("[Publisher] Disconnecting!");
result = { result = { client.disconnect().await };
client.disconnect().await assert_ok!(result);
}; Ok(())
assert!(result.is_ok());
} }
async fn publish(qos: QualityOfService) { async fn publish(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
tokio_network.create_connection().await; let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new(); let mut config = ClientConfig::new();
config.add_qos(qos); config.add_qos(qos);
config.add_username(USERNAME); config.add_username(USERNAME);
@ -91,40 +95,40 @@ async fn publish(qos: QualityOfService) {
80, 80,
config, config,
); );
publish_core(& mut client) publish_core(&mut client, topic).await
.await;
} }
async fn receive_core<'b>(client: & mut MqttClientV5<'b, TokioNetwork, 5>) { async fn receive_core<'b>(
log::info!("[Receiver] Connection to broker with username {} and password {}", USERNAME, PASSWORD); client: &mut MqttClientV5<'b, TokioNetwork, 5>,
let mut result = {client.connect_to_broker().await}; topic: &str,
assert!(result.is_ok()); ) -> Result<(), ReasonCode> {
log::info!(
log::info!("[Receiver] Subscribing to topic {}", TOPIC); "[Receiver] Connection to broker with username {} and password {}",
result = { USERNAME,
client.subscribe_to_topic(TOPIC).await PASSWORD
}; );
assert!(result.is_ok()); let mut result = { client.connect_to_broker().await };
assert_ok!(result);
log::info!("[Receiver] Subscribing to topic {}", topic);
result = { client.subscribe_to_topic(topic).await };
assert_ok!(result);
log::info!("[Receiver] Waiting for new message!"); log::info!("[Receiver] Waiting for new message!");
let msg = { let msg = { client.receive_message().await };
client.receive_message().await assert_ok!(msg);
}; let act_message = String::from_utf8_lossy(msg?);
assert!(msg.is_ok());
let act_message = String::from_utf8_lossy(msg.unwrap());
log::info!("[Receiver] Got new message: {}", act_message); log::info!("[Receiver] Got new message: {}", act_message);
assert_eq!(act_message, MSG); assert_eq!(act_message, MSG);
log::info!("[Receiver] Disconnecting"); log::info!("[Receiver] Disconnecting");
result = { result = { client.disconnect().await };
client.disconnect().await assert_ok!(result);
}; Ok(())
assert!(result.is_ok());
} }
async fn receive(qos: QualityOfService) { async fn receive(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
tokio_network.create_connection().await; let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new(); let mut config = ClientConfig::new();
config.add_qos(qos); config.add_qos(qos);
config.add_username(USERNAME); config.add_username(USERNAME);
@ -142,14 +146,13 @@ async fn receive(qos: QualityOfService) {
100, 100,
config, config,
); );
receive_core(& mut client)
.await; receive_core(&mut client, topic).await
} }
async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode> {
async fn receive_with_wrong_cred(qos: QualityOfService) { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
tokio_network.create_connection().await;
let mut config = ClientConfig::new(); let mut config = ClientConfig::new();
config.add_qos(qos); config.add_qos(qos);
config.add_username("xyz"); config.add_username("xyz");
@ -168,10 +171,15 @@ async fn receive_with_wrong_cred(qos: QualityOfService) {
config, config,
); );
log::info!("[Receiver] Connection to broker with username {} and password {}", "xyz", PASSWORD); log::info!(
let mut result = {client.connect_to_broker().await}; "[Receiver] Connection to broker with username {} and password {}",
"xyz",
PASSWORD
);
let result = { client.connect_to_broker().await };
assert!(result.is_err()); assert!(result.is_err());
assert_eq!(result.unwrap_err(), NotAuthorized); assert_eq!(result.unwrap_err(), NotAuthorized);
Ok(())
} }
#[tokio::test] #[tokio::test]
@ -179,17 +187,15 @@ async fn simple_publish_recv() {
init(); init();
log::info!("Running simple integration test"); log::info!("Running simple integration test");
let recv = task::spawn(async move { let recv =
receive(QualityOfService::QoS0) task::spawn(async move { receive(QualityOfService::QoS0, "test/recv/simple").await });
.await;
});
let publ = task::spawn(async move { let publ =
publish(QualityOfService::QoS0) task::spawn(async move { publish(QualityOfService::QoS0, "test/recv/simple").await });
.await;
});
join!(recv, publ); let (r, p) = join!(recv, publ);
assert_ok!(r.unwrap());
assert_ok!(p.unwrap());
} }
#[tokio::test] #[tokio::test]
@ -197,36 +203,26 @@ async fn simple_publish_recv_qos() {
init(); init();
log::info!("Running simple integration test with Quality of Service 1"); log::info!("Running simple integration test with Quality of Service 1");
let recv = task::spawn(async move { let recv = task::spawn(async move { receive(QualityOfService::QoS1, "test/recv/qos").await });
receive(QualityOfService::QoS1)
.await;
});
let publ = task::spawn(async move { let publ = task::spawn(async move { publish(QualityOfService::QoS1, "test/recv/qos").await });
publish(QualityOfService::QoS1) let (r, p) = join!(recv, publ);
.await; assert_ok!(r.unwrap());
}); assert_ok!(p.unwrap());
join!(recv, publ);
} }
#[tokio::test] #[tokio::test]
async fn simple_publish_recv_wrong_cred() { async fn simple_publish_recv_wrong_cred() {
init(); init();
log::info!("Running simple integration test with Quality of Service 1"); log::info!("Running simple integration test wrong credentials");
let recv = task::spawn(async move { let recv = task::spawn(async move { receive_with_wrong_cred(QualityOfService::QoS1).await });
receive_with_wrong_cred(QualityOfService::QoS1)
.await;
});
let recv_right = task::spawn(async move { let recv_right =
receive(QualityOfService::QoS0) task::spawn(async move { receive(QualityOfService::QoS0, "test/recv/wrong").await });
.await;
});
let publ = task::spawn(async move { let publ = task::spawn(async move { publish(QualityOfService::QoS1, "test/recv/wrong").await });
publish(QualityOfService::QoS1) let (r, rv, p) = join!(recv, recv_right, publ);
.await; assert_ok!(rv.unwrap());
}); assert_ok!(p.unwrap());
join!(recv, recv_right, publ);
} }

View File

@ -21,5 +21,4 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE. * SOFTWARE.
*/ */
pub mod integration_test_single; pub mod integration_test_single;

View File

@ -23,5 +23,11 @@
*/ */
#[cfg(test)] #[cfg(test)]
#[allow(unused_must_use)]
pub mod unit; pub mod unit;
#[allow(dead_code)]
#[allow(unused_must_use)]
#[allow(unused_imports)]
#[cfg(feature = "tokio")]
pub mod integration; pub mod integration;

View File

@ -22,4 +22,4 @@
* SOFTWARE. * SOFTWARE.
*/ */
pub mod v5; pub mod variable_byte_integer_unit;

View File

@ -22,7 +22,9 @@
* SOFTWARE. * SOFTWARE.
*/ */
use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder}; use crate::encoding::variable_byte_integer::{
VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder,
};
use crate::utils::types::BufferError; use crate::utils::types::BufferError;
#[test] #[test]

View File

@ -22,6 +22,6 @@
* SOFTWARE. * SOFTWARE.
*/ */
pub mod utils;
pub mod encoding; pub mod encoding;
pub mod packet; pub mod packet;
pub mod utils;

View File

@ -38,26 +38,49 @@ fn test_encode() {
connack.connect_reason_code = ReasonCode::ServerMoved.into(); connack.connect_reason_code = ReasonCode::ServerMoved.into();
connack.ack_flags = 0x45; connack.ack_flags = 0x45;
let res = connack.encode(& mut buffer, 100); let res = connack.encode(&mut buffer, 100);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(buffer[0..res.unwrap()], [0x20, 0x06, 0x45, ReasonCode::ServerMoved.into(), 0x03, 0x21, 0x00, 0x15]) assert_eq!(
buffer[0..res.unwrap()],
[
0x20,
0x06,
0x45,
ReasonCode::ServerMoved.into(),
0x03,
0x21,
0x00,
0x15
]
)
} }
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 8] = [0x20, 0x06, 0x45, ReasonCode::ServerMoved.into(), 0x03, 0x21, 0x00, 0x15]; let mut buffer: [u8; 8] = [
0x20,
0x06,
0x45,
ReasonCode::ServerMoved.into(),
0x03,
0x21,
0x00,
0x15,
];
let mut connack_res = ConnackPacket::<2>::new(); let mut connack_res = ConnackPacket::<2>::new();
let res = connack_res.decode(& mut BuffReader::new(& mut buffer, 8)); let res = connack_res.decode(&mut BuffReader::new(&mut buffer, 8));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(connack_res.property_len, 3); assert_eq!(connack_res.property_len, 3);
assert_eq!(connack_res.ack_flags, 0x45); assert_eq!(connack_res.ack_flags, 0x45);
assert_eq!(connack_res.connect_reason_code, ReasonCode::ServerMoved.into()); assert_eq!(
connack_res.connect_reason_code,
ReasonCode::ServerMoved.into()
);
assert_eq!(connack_res.property_len, 3); assert_eq!(connack_res.property_len, 3);
let prop = connack_res.properties.get(0).unwrap(); let prop = connack_res.properties.get(0).unwrap();
assert_eq!(<&Property as Into<u8>>::into(prop), 0x21); assert_eq!(<&Property as Into<u8>>::into(prop), 0x21);
if let Property::ReceiveMaximum(u) = *prop { if let Property::ReceiveMaximum(u) = *prop {
assert_eq!(u, 21); assert_eq!(u, 21);
} }
} }

View File

@ -29,10 +29,14 @@ use crate::packet::v5::mqtt_packet::Packet;
fn test_encode() { fn test_encode() {
let mut buffer: [u8; 100] = [0; 100]; let mut buffer: [u8; 100] = [0; 100];
let mut connect = ConnectPacket::<1, 0>::clean(); let mut connect = ConnectPacket::<1, 0>::clean();
let res = connect.encode(& mut buffer, 100); let res = connect.encode(&mut buffer, 100);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(buffer[0..res.unwrap()], [0x10, 0x10, 0x00, 0x04, 0x4d, 0x51, assert_eq!(
0x54, 0x54, 0x05, 0x02, 0x00, 0x3c, buffer[0..res.unwrap()],
0x03, 0x21, 0x00, 0x14, 0x00, 0x00]) [
0x10, 0x10, 0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05, 0x02, 0x00, 0x3c, 0x03, 0x21,
0x00, 0x14, 0x00, 0x00
]
)
} }

View File

@ -22,13 +22,12 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::disconnect_packet::DisconnectPacket; use crate::packet::v5::disconnect_packet::DisconnectPacket;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use heapless::Vec;
#[test] #[test]
fn test_encode() { fn test_encode() {
@ -38,16 +37,19 @@ fn test_encode() {
let mut props = Vec::<Property, 1>::new(); let mut props = Vec::<Property, 1>::new();
props.push(prop); props.push(prop);
packet.property_len = packet.add_properties(&props); packet.property_len = packet.add_properties(&props);
let res = packet.encode(& mut buffer, 100); let res = packet.encode(&mut buffer, 100);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(buffer[0..res.unwrap()], [0xE0, 0x07, 0x00, 0x05, 0x11, 0x00, 0x00, 0x02, 0x00]) assert_eq!(
buffer[0..res.unwrap()],
[0xE0, 0x07, 0x00, 0x05, 0x11, 0x00, 0x00, 0x02, 0x00]
)
} }
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 10] = [0xE0, 0x07, 0x00, 0x05, 0x11, 0x00, 0x00, 0x04, 0x00, 0x00]; let buffer: [u8; 10] = [0xE0, 0x07, 0x00, 0x05, 0x11, 0x00, 0x00, 0x04, 0x00, 0x00];
let mut packet = DisconnectPacket::<1>::new(); let mut packet = DisconnectPacket::<1>::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 10)); let res = packet.decode(&mut BuffReader::new(&buffer, 10));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, PacketType::Disconnect.into()); assert_eq!(packet.fixed_header, PacketType::Disconnect.into());
assert_eq!(packet.remain_len, 7); assert_eq!(packet.remain_len, 7);

View File

@ -32,7 +32,7 @@ fn test_encode() {
let mut packet = PingreqPacket::new(); let mut packet = PingreqPacket::new();
packet.fixed_header = PacketType::Pingreq.into(); packet.fixed_header = PacketType::Pingreq.into();
packet.remain_len = 0; packet.remain_len = 0;
let res = packet.encode(& mut buffer, 3); let res = packet.encode(&mut buffer, 3);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(buffer, [0xC0, 0x00, 0x45]) assert_eq!(buffer, [0xC0, 0x00, 0x45])
} }

View File

@ -33,16 +33,16 @@ fn test_encode() {
let mut packet = PingrespPacket::new(); let mut packet = PingrespPacket::new();
packet.fixed_header = PacketType::Pingresp.into(); packet.fixed_header = PacketType::Pingresp.into();
packet.remain_len = 0; packet.remain_len = 0;
let res = packet.encode(& mut buffer, 3); let res = packet.encode(&mut buffer, 3);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(buffer, [0xD0, 0x00, 0x45]) assert_eq!(buffer, [0xD0, 0x00, 0x45])
} }
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 3] = [0xD0, 0x00, 0x51]; let buffer: [u8; 3] = [0xD0, 0x00, 0x51];
let mut packet = PingrespPacket::new(); let mut packet = PingrespPacket::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 3)); let res = packet.decode(&mut BuffReader::new(&buffer, 3));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, PacketType::Pingresp.into()); assert_eq!(packet.fixed_header, PacketType::Pingresp.into());
assert_eq!(packet.remain_len, 0); assert_eq!(packet.remain_len, 0);

View File

@ -22,13 +22,13 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::puback_packet::PubackPacket; use crate::packet::v5::puback_packet::PubackPacket;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString; use crate::utils::types::EncodedString;
use heapless::Vec;
#[test] #[test]
fn test_encode() { fn test_encode() {
@ -42,17 +42,22 @@ fn test_encode() {
let mut props = Vec::<Property, 1>::new(); let mut props = Vec::<Property, 1>::new();
props.push(Property::ReasonString(str)); props.push(Property::ReasonString(str));
packet.property_len = packet.add_properties(&props); packet.property_len = packet.add_properties(&props);
let res = packet.encode(& mut buffer, 14); let res = packet.encode(&mut buffer, 14);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 14); assert_eq!(res.unwrap(), 14);
assert_eq!(buffer, [0x40, 0x0C, 0x8A, 0x5C, 0x00, 0x08, 0x1F, 0x00, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]) assert_eq!(
buffer,
[0x40, 0x0C, 0x8A, 0x5C, 0x00, 0x08, 0x1F, 0x00, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]
)
} }
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 14] = [0x40, 0x0C, 0x8A, 0x5E, 0x15, 0x08, 0x1F, 0x00, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]; let buffer: [u8; 14] = [
0x40, 0x0C, 0x8A, 0x5E, 0x15, 0x08, 0x1F, 0x00, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f,
];
let mut packet = PubackPacket::<1>::new(); let mut packet = PubackPacket::<1>::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 14)); let res = packet.decode(&mut BuffReader::new(&buffer, 14));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, PacketType::Puback.into()); assert_eq!(packet.fixed_header, PacketType::Puback.into());
assert_eq!(packet.remain_len, 12); assert_eq!(packet.remain_len, 12);

View File

@ -22,13 +22,13 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::pubcomp_packet::PubcompPacket; use crate::packet::v5::pubcomp_packet::PubcompPacket;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString; use crate::utils::types::EncodedString;
use heapless::Vec;
#[test] #[test]
fn test_encode() { fn test_encode() {
@ -42,18 +42,23 @@ fn test_encode() {
str.len = 5; str.len = 5;
let mut props = Vec::<Property, 1>::new(); let mut props = Vec::<Property, 1>::new();
props.push(Property::ReasonString(str)); props.push(Property::ReasonString(str));
packet.property_len = packet.add_properties(& props); packet.property_len = packet.add_properties(&props);
let res = packet.encode(& mut buffer, 14); let res = packet.encode(&mut buffer, 14);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 14); assert_eq!(res.unwrap(), 14);
assert_eq!(buffer, [0x70, 0x0C, 0x8A, 0x5C, 0x00, 0x08, 0x1F, 0x00, 0x05, 0x57, 0x68, 0x65, 0x65, 0x6c]) assert_eq!(
buffer,
[0x70, 0x0C, 0x8A, 0x5C, 0x00, 0x08, 0x1F, 0x00, 0x05, 0x57, 0x68, 0x65, 0x65, 0x6c]
)
} }
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 14] = [0x70, 0x0C, 0x8A, 0x5C, 0x00, 0x08, 0x1F, 0x00, 0x05, 0x57, 0x68, 0x65, 0x65, 0x6c]; let buffer: [u8; 14] = [
0x70, 0x0C, 0x8A, 0x5C, 0x00, 0x08, 0x1F, 0x00, 0x05, 0x57, 0x68, 0x65, 0x65, 0x6c,
];
let mut packet = PubcompPacket::<1>::new(); let mut packet = PubcompPacket::<1>::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 14)); let res = packet.decode(&mut BuffReader::new(&buffer, 14));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, PacketType::Pubcomp.into()); assert_eq!(packet.fixed_header, PacketType::Pubcomp.into());
assert_eq!(packet.packet_identifier, 35420); assert_eq!(packet.packet_identifier, 35420);

View File

@ -22,13 +22,13 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService}; use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService};
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString; use crate::utils::types::EncodedString;
use heapless::Vec;
#[test] #[test]
fn test_encode() { fn test_encode() {
@ -45,23 +45,31 @@ fn test_encode() {
props.push(Property::PayloadFormat(0x01)); props.push(Property::PayloadFormat(0x01));
props.push(Property::MessageExpiryInterval(45678)); props.push(Property::MessageExpiryInterval(45678));
packet.property_len = packet.add_properties(&props); packet.property_len = packet.add_properties(&props);
static MESSAGE: [u8; 11] = [0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64]; static MESSAGE: [u8; 11] = [
0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64,
];
packet.add_message(&MESSAGE); packet.add_message(&MESSAGE);
let res = packet.encode(& mut buffer, 100); let res = packet.encode(&mut buffer, 100);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 29); assert_eq!(res.unwrap(), 29);
assert_eq!(buffer, [0x32, 0x1B, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x5B, 0x88, assert_eq!(
0x07, 0x01, 0x01, 0x02, 0x00, 0x00, 0xB2, 0x6E, buffer,
0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64]) [
0x32, 0x1B, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x5B, 0x88, 0x07, 0x01, 0x01, 0x02,
0x00, 0x00, 0xB2, 0x6E, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c,
0x64
]
)
} }
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer:[u8; 29] = [0x32, 0x1B, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x5B, 0x88, let buffer: [u8; 29] = [
0x07, 0x01, 0x01, 0x02, 0x00, 0x00, 0xB2, 0x6E, 0x32, 0x1B, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x5B, 0x88, 0x07, 0x01, 0x01, 0x02, 0x00,
0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64]; 0x00, 0xB2, 0x6E, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64,
];
let mut packet = PublishPacket::<2>::new(); let mut packet = PublishPacket::<2>::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 29)); let res = packet.decode(&mut BuffReader::new(&buffer, 29));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, 0x32); assert_eq!(packet.fixed_header, 0x32);
assert_eq!(packet.topic_name.len, 4); assert_eq!(packet.topic_name.len, 4);
@ -81,6 +89,9 @@ fn test_decode() {
assert_eq!(u, 45678); assert_eq!(u, 45678);
} }
if let Some(message) = packet.message { if let Some(message) = packet.message {
assert_eq!(*message, [0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64]); assert_eq!(
*message,
[0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x77, 0x6f, 0x72, 0x6c, 0x64]
);
} }
} }

View File

@ -22,14 +22,13 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::puback_packet::PubackPacket;
use crate::packet::v5::pubrec_packet::PubrecPacket; use crate::packet::v5::pubrec_packet::PubrecPacket;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::{EncodedString, StringPair}; use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test] #[test]
fn test_encode() { fn test_encode() {
@ -49,17 +48,26 @@ fn test_encode() {
let mut props = Vec::<Property, 1>::new(); let mut props = Vec::<Property, 1>::new();
props.push(Property::UserProperty(pair)); props.push(Property::UserProperty(pair));
packet.property_len = packet.add_properties(&props); packet.property_len = packet.add_properties(&props);
let res = packet.encode(& mut buffer, 20); let res = packet.encode(&mut buffer, 20);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 20); assert_eq!(res.unwrap(), 20);
assert_eq!(buffer, [0x50, 0x12, 0x8A, 0x5C, 0x12, 0x0E, 0x26, 0x00, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x31, 0x00, 0x04, 0x76, 0x61, 0x6c, 0x31]) assert_eq!(
buffer,
[
0x50, 0x12, 0x8A, 0x5C, 0x12, 0x0E, 0x26, 0x00, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x31,
0x00, 0x04, 0x76, 0x61, 0x6c, 0x31
]
)
} }
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 20] = [0x50, 0x12, 0x8A, 0x5C, 0x12, 0x0E, 0x26, 0x00, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x31, 0x00, 0x04, 0x76, 0x61, 0x6c, 0x31]; let buffer: [u8; 20] = [
0x50, 0x12, 0x8A, 0x5C, 0x12, 0x0E, 0x26, 0x00, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x31, 0x00,
0x04, 0x76, 0x61, 0x6c, 0x31,
];
let mut packet = PubrecPacket::<1>::new(); let mut packet = PubrecPacket::<1>::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 20)); let res = packet.decode(&mut BuffReader::new(&buffer, 20));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, PacketType::Pubrec.into()); assert_eq!(packet.fixed_header, PacketType::Pubrec.into());
assert_eq!(packet.remain_len, 18); assert_eq!(packet.remain_len, 18);

View File

@ -22,15 +22,13 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::puback_packet::PubackPacket;
use crate::packet::v5::pubrec_packet::PubrecPacket;
use crate::packet::v5::pubrel_packet::PubrelPacket; use crate::packet::v5::pubrel_packet::PubrelPacket;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::{EncodedString, StringPair}; use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test] #[test]
fn test_encode() { fn test_encode() {
@ -51,19 +49,26 @@ fn test_encode() {
let mut props = Vec::<Property, 1>::new(); let mut props = Vec::<Property, 1>::new();
props.push(Property::UserProperty(pair)); props.push(Property::UserProperty(pair));
packet.property_len = packet.add_properties(&props); packet.property_len = packet.add_properties(&props);
let res = packet.encode(& mut buffer, 21); let res = packet.encode(&mut buffer, 21);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 21); assert_eq!(res.unwrap(), 21);
assert_eq!(buffer, [0x60, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, assert_eq!(
0x68, 0x61, 0x68, 0x61, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x38, 0x39]) buffer,
[
0x60, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00,
0x06, 0x68, 0x65, 0x68, 0x65, 0x38, 0x39
]
)
} }
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 21] = [0x60, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, let buffer: [u8; 21] = [
0x68, 0x61, 0x68, 0x61, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x38, 0x39]; 0x60, 0x13, 0x30, 0x39, 0x86, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06,
0x68, 0x65, 0x68, 0x65, 0x38, 0x39,
];
let mut packet = PubrelPacket::<1>::new(); let mut packet = PubrelPacket::<1>::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 21)); let res = packet.decode(&mut BuffReader::new(&buffer, 21));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, PacketType::Pubrel.into()); assert_eq!(packet.fixed_header, PacketType::Pubrel.into());
assert_eq!(packet.remain_len, 19); assert_eq!(packet.remain_len, 19);

View File

@ -22,24 +22,20 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::puback_packet::PubackPacket;
use crate::packet::v5::pubrec_packet::PubrecPacket;
use crate::packet::v5::pubrel_packet::PubrelPacket;
use crate::packet::v5::suback_packet::SubackPacket; use crate::packet::v5::suback_packet::SubackPacket;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::{EncodedString, StringPair};
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 23] = [0x90, 0x15, 0xCC, 0x08, 0x0F, 0x1F, 0x00, 0x0C, let buffer: [u8; 23] = [
0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x90, 0x15, 0xCC, 0x08, 0x0F, 0x1F, 0x00, 0x0C, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x53,
0x12, 0x34, 0x56]; 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x34, 0x56,
];
let mut packet = SubackPacket::<3, 1>::new(); let mut packet = SubackPacket::<3, 1>::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 23)); let res = packet.decode(&mut BuffReader::new(&buffer, 23));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, PacketType::Suback.into()); assert_eq!(packet.fixed_header, PacketType::Suback.into());
assert_eq!(packet.remain_len, 21); assert_eq!(packet.remain_len, 21);

View File

@ -22,12 +22,12 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1}; use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1};
use crate::packet::v5::subscription_packet::SubscriptionPacket; use crate::packet::v5::subscription_packet::SubscriptionPacket;
use heapless::Vec;
#[test] #[test]
fn test_encode() { fn test_encode() {
@ -40,10 +40,15 @@ fn test_encode() {
packet.property_len = packet.add_properties(&props); packet.property_len = packet.add_properties(&props);
packet.add_new_filter("test/topic", QoS0); packet.add_new_filter("test/topic", QoS0);
packet.add_new_filter("hehe/#", QoS1); packet.add_new_filter("hehe/#", QoS1);
let res = packet.encode(& mut buffer, 30); let res = packet.encode(&mut buffer, 30);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 30); assert_eq!(res.unwrap(), 30);
assert_eq!(buffer, [0x82, 0x1C, 0x15, 0x38, 0x03, 0x0B, 0x80, 0x13, 0x00, 0x0A, assert_eq!(
0x74, 0x65, 0x73, 0x74, 0x2f, 0x74, 0x6f, 0x70, 0x69, 0x63, buffer,
0x00, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x2F, 0x23, 0x01]); [
0x82, 0x1C, 0x15, 0x38, 0x03, 0x0B, 0x80, 0x13, 0x00, 0x0A, 0x74, 0x65, 0x73, 0x74,
0x2f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x00, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x2F,
0x23, 0x01
]
);
} }

View File

@ -22,25 +22,20 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::puback_packet::PubackPacket;
use crate::packet::v5::pubrec_packet::PubrecPacket;
use crate::packet::v5::pubrel_packet::PubrelPacket;
use crate::packet::v5::suback_packet::SubackPacket;
use crate::packet::v5::unsuback_packet::UnsubackPacket; use crate::packet::v5::unsuback_packet::UnsubackPacket;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::{EncodedString, StringPair};
#[test] #[test]
fn test_decode() { fn test_decode() {
let mut buffer: [u8; 22] = [0xB0, 0x14, 0xCC, 0x08, 0x0F, 0x1F, 0x00, 0x0C, let buffer: [u8; 22] = [
0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0xB0, 0x14, 0xCC, 0x08, 0x0F, 0x1F, 0x00, 0x0C, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x53,
0x77, 0x55]; 0x74, 0x72, 0x69, 0x6e, 0x67, 0x77, 0x55,
];
let mut packet = UnsubackPacket::<2, 1>::new(); let mut packet = UnsubackPacket::<2, 1>::new();
let res = packet.decode(& mut BuffReader::new(&buffer, 22)); let res = packet.decode(&mut BuffReader::new(&buffer, 22));
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(packet.fixed_header, PacketType::Unsuback.into()); assert_eq!(packet.fixed_header, PacketType::Unsuback.into());
assert_eq!(packet.remain_len, 20); assert_eq!(packet.remain_len, 20);

View File

@ -22,14 +22,13 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet; use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType; use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1}; use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1};
use crate::packet::v5::subscription_packet::SubscriptionPacket;
use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket; use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket;
use crate::utils::types::{EncodedString, StringPair}; use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test] #[test]
fn test_encode() { fn test_encode() {
@ -51,11 +50,15 @@ fn test_encode() {
packet.property_len = packet.add_properties(&props); packet.property_len = packet.add_properties(&props);
packet.add_new_filter("test/topic", QoS0); packet.add_new_filter("test/topic", QoS0);
packet.add_new_filter("hehe/#", QoS1); packet.add_new_filter("hehe/#", QoS1);
let res = packet.encode(& mut buffer, 40); let res = packet.encode(&mut buffer, 40);
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), 40); assert_eq!(res.unwrap(), 40);
assert_eq!(buffer, [0xA0, 0x26, 0x15, 0x38, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, assert_eq!(
0x68, 0x61, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x38, 0x39, buffer,
0x00, 0x0A, 0x74, 0x65, 0x73, 0x74, 0x2F, 0x74, 0x6F, 0x70, [
0x69, 0x63, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x2F, 0x23]); 0xA0, 0x26, 0x15, 0x38, 0x0F, 0x26, 0x00, 0x04, 0x68, 0x61, 0x68, 0x61, 0x00, 0x06,
0x68, 0x65, 0x68, 0x65, 0x38, 0x39, 0x00, 0x0A, 0x74, 0x65, 0x73, 0x74, 0x2F, 0x74,
0x6F, 0x70, 0x69, 0x63, 0x00, 0x06, 0x68, 0x65, 0x68, 0x65, 0x2F, 0x23
]
);
} }

View File

@ -22,9 +22,6 @@
* SOFTWARE. * SOFTWARE.
*/ */
use core::str;
use log::info;
use crate::utils::buffer_reader::BuffReader; use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::BufferError; use crate::utils::types::BufferError;
@ -44,7 +41,10 @@ fn buffer_read_invalid_size() {
let mut reader: BuffReader = BuffReader::new(&BUFFER, 2); let mut reader: BuffReader = BuffReader::new(&BUFFER, 2);
let test_number = reader.read_variable_byte_int(); let test_number = reader.read_variable_byte_int();
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
@ -73,7 +73,10 @@ fn test_var_empty_buffer() {
let mut reader: BuffReader = BuffReader::new(&BUFFER, 0); let mut reader: BuffReader = BuffReader::new(&BUFFER, 0);
let test_number = reader.read_variable_byte_int(); let test_number = reader.read_variable_byte_int();
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
@ -91,7 +94,10 @@ fn test_read_u32_oob() {
let mut reader: BuffReader = BuffReader::new(&BUFFER, 3); let mut reader: BuffReader = BuffReader::new(&BUFFER, 3);
let test_number = reader.read_u32(); let test_number = reader.read_u32();
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
@ -109,7 +115,10 @@ fn test_read_u16_oob() {
let mut reader: BuffReader = BuffReader::new(&BUFFER, 1); let mut reader: BuffReader = BuffReader::new(&BUFFER, 1);
let test_number = reader.read_u16(); let test_number = reader.read_u16();
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
@ -127,7 +136,10 @@ fn test_read_u8_oob() {
let mut reader: BuffReader = BuffReader::new(&BUFFER, 0); let mut reader: BuffReader = BuffReader::new(&BUFFER, 0);
let test_number = reader.read_u8(); let test_number = reader.read_u8();
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
@ -156,7 +168,10 @@ fn test_read_string_oob() {
let mut reader: BuffReader = BuffReader::new(&BUFFER, 5); let mut reader: BuffReader = BuffReader::new(&BUFFER, 5);
let test_string = reader.read_string(); let test_string = reader.read_string();
assert!(test_string.is_err()); assert!(test_string.is_err());
assert_eq!(test_string.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_string.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
@ -181,7 +196,9 @@ fn test_read_binary_oob() {
#[test] #[test]
fn test_read_string_pair() { fn test_read_string_pair() {
static BUFFER: [u8; 11] = [0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E, 0x00, 0x03, 0xE2, 0x93, 0x87]; static BUFFER: [u8; 11] = [
0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E, 0x00, 0x03, 0xE2, 0x93, 0x87,
];
let mut reader: BuffReader = BuffReader::new(&BUFFER, 11); let mut reader: BuffReader = BuffReader::new(&BUFFER, 11);
let string_pair = reader.read_string_pair(); let string_pair = reader.read_string_pair();
assert!(string_pair.is_ok()); assert!(string_pair.is_ok());
@ -192,22 +209,27 @@ fn test_read_string_pair() {
assert_eq!(unw.value.len, 3); assert_eq!(unw.value.len, 3);
} }
#[test] #[test]
fn test_read_string_pair_wrong_utf8() { fn test_read_string_pair_wrong_utf8() {
static BUFFER: [u8; 11] = [0x00, 0x03, 0xF0, 0x9F, 0x92, 0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E]; static BUFFER: [u8; 11] = [
0x00, 0x03, 0xF0, 0x9F, 0x92, 0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E,
];
let mut reader: BuffReader = BuffReader::new(&BUFFER, 11); let mut reader: BuffReader = BuffReader::new(&BUFFER, 11);
let string_pair = reader.read_string_pair(); let string_pair = reader.read_string_pair();
assert!(string_pair.is_err()); assert!(string_pair.is_err());
assert_eq!(string_pair.unwrap_err(), BufferError::Utf8Error) assert_eq!(string_pair.unwrap_err(), BufferError::Utf8Error)
} }
#[test] #[test]
fn test_read_string_pair_oob() { fn test_read_string_pair_oob() {
static BUFFER: [u8; 11] = [0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E, 0x00, 0x04, 0xE2, 0x93, 0x87]; static BUFFER: [u8; 11] = [
0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E, 0x00, 0x04, 0xE2, 0x93, 0x87,
];
let mut reader: BuffReader = BuffReader::new(&BUFFER, 11); let mut reader: BuffReader = BuffReader::new(&BUFFER, 11);
let string_pair = reader.read_string_pair(); let string_pair = reader.read_string_pair();
assert!(string_pair.is_err()); assert!(string_pair.is_err());
assert_eq!(string_pair.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
string_pair.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }

View File

@ -22,17 +22,17 @@
* SOFTWARE. * SOFTWARE.
*/ */
use heapless::Vec;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::utils::buffer_writer::BuffWriter; use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair, TopicFilter}; use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair, TopicFilter};
use heapless::Vec;
#[test] #[test]
fn buffer_write_ref() { fn buffer_write_ref() {
static BUFFER: [u8; 5] = [0x82, 0x82, 0x03, 0x85, 0x84]; static BUFFER: [u8; 5] = [0x82, 0x82, 0x03, 0x85, 0x84];
let mut res_buffer: [u8; 5] = [0; 5]; let mut res_buffer: [u8; 5] = [0; 5];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 5); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 5);
let test_write = writer.insert_ref(5, &BUFFER); let test_write = writer.insert_ref(5, &BUFFER);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 5); assert_eq!(writer.position, 5);
@ -44,10 +44,13 @@ fn buffer_write_ref_oob() {
static BUFFER: [u8; 5] = [0x82, 0x82, 0x03, 0x85, 0x84]; static BUFFER: [u8; 5] = [0x82, 0x82, 0x03, 0x85, 0x84];
let mut res_buffer: [u8; 4] = [0; 4]; let mut res_buffer: [u8; 4] = [0; 4];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 4); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 4);
let test_number = writer.insert_ref(5, &BUFFER); let test_number = writer.insert_ref(5, &BUFFER);
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
assert_eq!(res_buffer, [0; 4]) assert_eq!(res_buffer, [0; 4])
} }
@ -55,7 +58,7 @@ fn buffer_write_ref_oob() {
fn buffer_write_u8() { fn buffer_write_u8() {
let mut res_buffer: [u8; 1] = [0; 1]; let mut res_buffer: [u8; 1] = [0; 1];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 1); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 1);
let test_write = writer.write_u8(0xFA); let test_write = writer.write_u8(0xFA);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 1); assert_eq!(writer.position, 1);
@ -66,17 +69,20 @@ fn buffer_write_u8() {
fn buffer_write_u8_oob() { fn buffer_write_u8_oob() {
let mut res_buffer: [u8; 0] = []; let mut res_buffer: [u8; 0] = [];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 0); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 0);
let test_number = writer.write_u8(0xFA); let test_number = writer.write_u8(0xFA);
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
fn buffer_write_u16() { fn buffer_write_u16() {
let mut res_buffer: [u8; 2] = [0; 2]; let mut res_buffer: [u8; 2] = [0; 2];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 2); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 2);
let test_write = writer.write_u16(0xFAED); let test_write = writer.write_u16(0xFAED);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 2); assert_eq!(writer.position, 2);
@ -87,17 +93,20 @@ fn buffer_write_u16() {
fn buffer_write_u16_oob() { fn buffer_write_u16_oob() {
let mut res_buffer: [u8; 1] = [0; 1]; let mut res_buffer: [u8; 1] = [0; 1];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 1); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 1);
let test_number = writer.write_u16(0xFAED); let test_number = writer.write_u16(0xFAED);
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
fn buffer_write_u32() { fn buffer_write_u32() {
let mut res_buffer: [u8; 4] = [0; 4]; let mut res_buffer: [u8; 4] = [0; 4];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 4); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 4);
let test_write = writer.write_u32(0xFAEDCC09); let test_write = writer.write_u32(0xFAEDCC09);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 4); assert_eq!(writer.position, 4);
@ -108,10 +117,13 @@ fn buffer_write_u32() {
fn buffer_write_u32_oob() { fn buffer_write_u32_oob() {
let mut res_buffer: [u8; 3] = [0; 3]; let mut res_buffer: [u8; 3] = [0; 3];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 3); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 3);
let test_number = writer.write_u32(0xFAEDCC08); let test_number = writer.write_u32(0xFAEDCC08);
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
#[test] #[test]
@ -120,21 +132,20 @@ fn buffer_write_string() {
let mut string = EncodedString::new(); let mut string = EncodedString::new();
string.string = "😎"; string.string = "😎";
string.len = 4; string.len = 4;
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 6); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 6);
let test_write = writer.write_string_ref(&string); let test_write = writer.write_string_ref(&string);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 6); assert_eq!(writer.position, 6);
assert_eq!(res_buffer, [0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E]); assert_eq!(res_buffer, [0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E]);
} }
#[test] #[test]
fn buffer_write_string_oob() { fn buffer_write_string_oob() {
let mut res_buffer: [u8; 5] = [0; 5]; let mut res_buffer: [u8; 5] = [0; 5];
let mut string = EncodedString::new(); let mut string = EncodedString::new();
string.string = "😎"; string.string = "😎";
string.len = 4; string.len = 4;
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 5); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 5);
let test_write = writer.write_string_ref(&string); let test_write = writer.write_string_ref(&string);
assert!(test_write.is_err()); assert!(test_write.is_err());
assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize);
@ -146,21 +157,20 @@ fn buffer_write_bin() {
let mut bin = BinaryData::new(); let mut bin = BinaryData::new();
bin.bin = &[0xAB, 0xEF, 0x88, 0x43]; bin.bin = &[0xAB, 0xEF, 0x88, 0x43];
bin.len = 4; bin.len = 4;
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 6); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 6);
let test_write = writer.write_binary_ref(&bin); let test_write = writer.write_binary_ref(&bin);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 6); assert_eq!(writer.position, 6);
assert_eq!(res_buffer, [0x00, 0x04, 0xAB, 0xEF, 0x88, 0x43]); assert_eq!(res_buffer, [0x00, 0x04, 0xAB, 0xEF, 0x88, 0x43]);
} }
#[test] #[test]
fn buffer_write_bin_oob() { fn buffer_write_bin_oob() {
let mut res_buffer: [u8; 6] = [0; 6]; let mut res_buffer: [u8; 6] = [0; 6];
let mut bin = BinaryData::new(); let mut bin = BinaryData::new();
bin.bin = &[0xAB, 0xEF, 0x88, 0x43]; bin.bin = &[0xAB, 0xEF, 0x88, 0x43];
bin.len = 4; bin.len = 4;
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 5); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 5);
let test_write = writer.write_binary_ref(&bin); let test_write = writer.write_binary_ref(&bin);
assert!(test_write.is_err()); assert!(test_write.is_err());
assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize);
@ -179,11 +189,14 @@ fn buffer_write_string_pair() {
let mut pair = StringPair::new(); let mut pair = StringPair::new();
pair.name = name; pair.name = name;
pair.value = value; pair.value = value;
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 12); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 12);
let test_write = writer.write_string_pair_ref(&pair); let test_write = writer.write_string_pair_ref(&pair);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 12); assert_eq!(writer.position, 12);
assert_eq!(res_buffer, [0x00, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E]); assert_eq!(
res_buffer,
[0x00, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x00, 0x04, 0xF0, 0x9F, 0x98, 0x8E]
);
} }
#[test] #[test]
@ -199,7 +212,7 @@ fn buffer_write_string_pair_oob() {
let mut pair = StringPair::new(); let mut pair = StringPair::new();
pair.name = name; pair.name = name;
pair.value = value; pair.value = value;
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 10); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 10);
let test_write = writer.write_string_pair_ref(&pair); let test_write = writer.write_string_pair_ref(&pair);
assert!(test_write.is_err()); assert!(test_write.is_err());
assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize) assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize)
@ -209,7 +222,7 @@ fn buffer_write_string_pair_oob() {
fn buffer_write_var_byte() { fn buffer_write_var_byte() {
let mut res_buffer: [u8; 2] = [0; 2]; let mut res_buffer: [u8; 2] = [0; 2];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 2); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 2);
let test_write = writer.write_variable_byte_int(512); let test_write = writer.write_variable_byte_int(512);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 2); assert_eq!(writer.position, 2);
@ -220,10 +233,13 @@ fn buffer_write_var_byte() {
fn buffer_write_var_byte_oob() { fn buffer_write_var_byte_oob() {
let mut res_buffer: [u8; 2] = [0; 2]; let mut res_buffer: [u8; 2] = [0; 2];
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 2); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 2);
let test_number = writer.write_variable_byte_int(453123); let test_number = writer.write_variable_byte_int(453123);
assert!(test_number.is_err()); assert!(test_number.is_err());
assert_eq!(test_number.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(
test_number.unwrap_err(),
BufferError::InsufficientBufferSize
);
} }
/*#[test] /*#[test]
@ -267,12 +283,14 @@ fn buffer_write_properties() {
let mut properties = Vec::<Property, 2>::new(); let mut properties = Vec::<Property, 2>::new();
properties.push(prop); properties.push(prop);
properties.push(prop2); properties.push(prop2);
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 13); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 13);
let test_write = writer.write_properties(&properties); let test_write = writer.write_properties(&properties);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 13); assert_eq!(writer.position, 13);
assert_eq!(res_buffer, [0x08, 0x00, 0x04, 0x4e, 0x61, 0x6d, 0x65, assert_eq!(
0x09, 0x00, 0x03, 0x12, 0x34, 0x56]); res_buffer,
[0x08, 0x00, 0x04, 0x4e, 0x61, 0x6d, 0x65, 0x09, 0x00, 0x03, 0x12, 0x34, 0x56]
);
} }
#[test] #[test]
@ -289,7 +307,7 @@ fn buffer_write_properties_oob() {
let mut properties = Vec::<Property, 2>::new(); let mut properties = Vec::<Property, 2>::new();
properties.push(prop); properties.push(prop);
properties.push(prop2); properties.push(prop2);
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 10); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 10);
let test_write = writer.write_properties(&properties); let test_write = writer.write_properties(&properties);
assert!(test_write.is_err()); assert!(test_write.is_err());
assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize); assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize);
@ -316,16 +334,20 @@ fn buffer_write_filters() {
filter2.filter = topic2; filter2.filter = topic2;
filter2.sub_options = 0x22; filter2.sub_options = 0x22;
let mut filters = Vec::<TopicFilter, 2>::new(); let mut filters = Vec::<TopicFilter, 2>::new();
filters.push(filter1); filters.push(filter1);
filters.push(filter2); filters.push(filter2);
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 15); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 15);
let test_write = writer.write_topic_filters_ref(true, 2, &filters); let test_write = writer.write_topic_filters_ref(true, 2, &filters);
assert!(test_write.is_ok()); assert!(test_write.is_ok());
assert_eq!(writer.position, 15); assert_eq!(writer.position, 15);
assert_eq!(res_buffer, [0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0xAE, assert_eq!(
0x00, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x22]); res_buffer,
[
0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0xAE, 0x00, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63,
0x22
]
);
} }
#[test] #[test]
@ -349,11 +371,10 @@ fn buffer_write_filters_oob() {
filter2.filter = topic2; filter2.filter = topic2;
filter2.sub_options = 0x22; filter2.sub_options = 0x22;
let mut filters = Vec::<TopicFilter, 2>::new(); let mut filters = Vec::<TopicFilter, 2>::new();
filters.push(filter1); filters.push(filter1);
filters.push(filter2); filters.push(filter2);
let mut writer: BuffWriter = BuffWriter::new(& mut res_buffer, 5); let mut writer: BuffWriter = BuffWriter::new(&mut res_buffer, 5);
let test_write = writer.write_topic_filters_ref(true, 2, &filters); let test_write = writer.write_topic_filters_ref(true, 2, &filters);
assert!(test_write.is_err()); assert!(test_write.is_err());
assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize) assert_eq!(test_write.unwrap_err(), BufferError::InsufficientBufferSize)

31
mqtt/src/tokio_net/mod.rs Normal file
View File

@ -0,0 +1,31 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#![feature(in_band_lifetimes)]
#![macro_use]
#![allow(dead_code)]
#![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)]
#[cfg(feature = "tokio")]
pub mod tokio_network;

View File

@ -0,0 +1,145 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
extern crate alloc;
use alloc::format;
use alloc::string::String;
use core::future::Future;
use core::time::Duration;
use crate::network::network_trait::{NetworkConnection, NetworkConnectionFactory};
use crate::packet::v5::reason_codes::ReasonCode;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
pub struct TokioNetwork {
stream: Option<TcpStream>,
}
impl TokioNetwork {
pub fn new(stream: TcpStream) -> Self {
Self {
stream: Some(stream),
}
}
pub fn convert_ip(ip: [u8; 4], port: u16) -> String {
String::from(format!("{}.{}.{}.{}:{}", ip[0], ip[1], ip[2], ip[3], port))
}
}
impl NetworkConnection for TokioNetwork {
type WriteFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
type ReadFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<usize, ReasonCode>> + 'm;
type CloseFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
/*type TimerFuture<'m>
where
Self: 'm,
= impl Future<Output = ()>;*/
fn send<'m>(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> {
async move {
return if let Some(ref mut stream) = self.stream {
stream
.write_all(&buffer[0..len])
.await
.map_err(|_| ReasonCode::NetworkError)
} else {
Err(ReasonCode::NetworkError)
};
}
}
fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> {
async move {
return if let Some(ref mut stream) = self.stream {
stream
.read(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
} else {
Err(ReasonCode::NetworkError)
};
}
}
fn close<'m>(mut self) -> Self::CloseFuture<'m> {
async move {
return if let Some(ref mut stream) = self.stream {
stream
.shutdown()
.await
.map_err(|_| ReasonCode::NetworkError)
} else {
Err(ReasonCode::NetworkError)
};
}
}
/*fn count_down(&'m mut self, time_in_secs: u64) -> Self::TimerFuture<'m> {
async move {
return sleep(Duration::from_secs(time_in_secs))
.await
}
}*/
}
pub struct TokioNetworkFactory {}
impl TokioNetworkFactory {
pub fn new() -> Self {
Self {}
}
}
impl NetworkConnectionFactory for TokioNetworkFactory {
type Connection = TokioNetwork;
type ConnectionFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<TokioNetwork, ReasonCode>> + 'm;
fn connect<'m>(&'m mut self, ip: [u8; 4], port: u16) -> Self::ConnectionFuture<'m> {
async move {
let stream = TcpStream::connect(TokioNetwork::convert_ip(ip, port))
.await
.map_err(|_| ReasonCode::NetworkError)?;
Ok(TokioNetwork::new(stream))
}
}
}

View File

@ -75,7 +75,7 @@ impl<'a> BuffReader<'a> {
variable_byte_integer[x] = 0; variable_byte_integer[x] = 0;
x = x + 1; x = x + 1;
if x == 4 { if x == 4 {
break break;
} }
} }
break; break;
@ -121,14 +121,13 @@ impl<'a> BuffReader<'a> {
/// Reading UTF-8 encoded string from buffer /// Reading UTF-8 encoded string from buffer
pub fn read_string(&mut self) -> Result<EncodedString<'a>, BufferError> { pub fn read_string(&mut self) -> Result<EncodedString<'a>, BufferError> {
let len = self.read_u16() ? as usize; let len = self.read_u16()? as usize;
if self.position + len - 1 >= self.len { if self.position + len - 1 >= self.len {
return Err(BufferError::InsufficientBufferSize); return Err(BufferError::InsufficientBufferSize);
} }
let res_str = let res_str = str::from_utf8(&(self.buffer[self.position..(self.position + len)]));
str::from_utf8(&(self.buffer[self.position..(self.position + len)]));
if res_str.is_err() { if res_str.is_err() {
log::error!("Could not parse utf-8 string"); log::error!("Could not parse utf-8 string");
return Err(BufferError::Utf8Error); return Err(BufferError::Utf8Error);
@ -142,27 +141,21 @@ impl<'a> BuffReader<'a> {
/// Read Binary data from buffer /// Read Binary data from buffer
pub fn read_binary(&mut self) -> Result<BinaryData<'a>, BufferError> { pub fn read_binary(&mut self) -> Result<BinaryData<'a>, BufferError> {
let len = self.read_u16() ?; let len = self.read_u16()?;
if self.position + len as usize - 1 >= self.len { if self.position + len as usize - 1 >= self.len {
return Err(BufferError::InsufficientBufferSize); return Err(BufferError::InsufficientBufferSize);
} }
let res_bin = &(self.buffer[self.position..(self.position + len as usize)]); let res_bin = &(self.buffer[self.position..(self.position + len as usize)]);
return Ok(BinaryData { return Ok(BinaryData { bin: res_bin, len });
bin: res_bin,
len,
});
} }
/// Read string pair from buffer /// Read string pair from buffer
pub fn read_string_pair(&mut self) -> Result<StringPair<'a>, BufferError> { pub fn read_string_pair(&mut self) -> Result<StringPair<'a>, BufferError> {
let name = self.read_string() ?; let name = self.read_string()?;
let value = self.read_string() ?; let value = self.read_string()?;
return Ok(StringPair { return Ok(StringPair { name, value });
name,
value,
});
} }
/// Read payload message from buffer /// Read payload message from buffer

View File

@ -91,7 +91,7 @@ impl<'a> BuffWriter<'a> {
let bytes = str.string.as_bytes(); let bytes = str.string.as_bytes();
return self.insert_ref(str.len as usize, bytes); return self.insert_ref(str.len as usize, bytes);
} }
return Ok(()) return Ok(());
} }
pub fn write_binary_ref(&mut self, bin: &BinaryData<'a>) -> Result<(), BufferError> { pub fn write_binary_ref(&mut self, bin: &BinaryData<'a>) -> Result<(), BufferError> {

View File

@ -24,8 +24,7 @@
use core::fmt::{Display, Formatter}; use core::fmt::{Display, Formatter};
#[derive(core::fmt::Debug, Clone)] #[derive(core::fmt::Debug, Clone, PartialEq)]
#[derive(PartialEq)]
pub enum BufferError { pub enum BufferError {
Utf8Error, Utf8Error,
InsufficientBufferSize, InsufficientBufferSize,
@ -101,7 +100,7 @@ impl StringPair<'_> {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
name: EncodedString::new(), name: EncodedString::new(),
value: EncodedString::new() value: EncodedString::new(),
} }
} }
/// Returns length which is equal to sum of the lenghts of UTF-8 encoded strings in pair /// Returns length which is equal to sum of the lenghts of UTF-8 encoded strings in pair

BIN
mqtt_control_example.bin Normal file

Binary file not shown.

View File

@ -1,141 +0,0 @@
use std::time::Duration;
use heapless::Vec;
use tokio::time::sleep;
use tokio::{join, task};
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::client::client_v5::MqttClientV5;
use rust_mqtt::network::network_trait::{Network, NetworkError};
use rust_mqtt::packet::v5::connect_packet::ConnectPacket;
use rust_mqtt::packet::v5::mqtt_packet::Packet;
use rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1;
use rust_mqtt::packet::v5::publish_packet::{PublishPacket, QualityOfService};
use rust_mqtt::packet::v5::subscription_packet::SubscriptionPacket;
use rust_mqtt::tokio_network::TokioNetwork;
async fn receive() {
let mut ip: [u8; 4] = [127, 0, 0, 1];
let mut port: u16 = 1883;
let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port);
tokio_network.create_connection().await;
let mut config = ClientConfig::new();
config.add_qos(QualityOfService::QoS1);
config.add_username("test");
config.add_password("testPass");
let mut res2 = vec![0; 260];
let mut res3 = vec![0; 260];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
&mut tokio_network,
&mut res2,
260,
&mut res3,
260,
config,
);
let mut result = { client.connect_to_broker().await };
if let Err(r) = result {
log::error!("[ERROR]: {}", r);
return;
}
{
const TOPICS: usize = 2;
let t1 = "test/topic1";
let t2 = "test/topic2";
let mut names = Vec::<&str, TOPICS>::new();
names.push(&t1);
names.push(&t2);
client.subscribe_to_topics::<TOPICS>(&names).await;
//client.subscribe_to_topic("test/topic").await;
};
{
sleep(Duration::from_secs(10));
client.send_ping().await;
}
let mut o = 0;
loop {
if o == 2 {
break;
}
log::info!("Waiting for new message!");
let mes = client.receive_message().await.unwrap();
let x = String::from_utf8_lossy(mes);
log::info!("Got new message: {}", x);
o = o + 1;
}
{
client.disconnect().await;
}
}
async fn publish(message: &str) {
let mut ip: [u8; 4] = [37, 205, 11, 180];
let mut port: u16 = 1883;
let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port);
tokio_network.create_connection().await;
let config = ClientConfig::new();
let mut res2 = vec![0; 260];
let mut res3 = vec![0; 260];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
&mut tokio_network,
&mut res2,
260,
&mut res3,
260,
config,
);
let mut result = { client.connect_to_broker().await };
log::info!("Waiting until send!");
sleep(Duration::from_secs(15));
result = {
log::info!("Sending new message!");
/*client
.send_message("test/topic", message)
.await*/
client.send_ping().await
};
if let Err(e) = result {
log::error!("Chyba!");
}
/*result = {
log::info!("Sending new message!");
client
.send_message("test/topic", "Dalsi zprava :)")
.await
};
if let Err(err) = result {
log::error!("Chyba!");
}*/
{
client.disconnect().await;
}
}
#[tokio::main]
async fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.format_timestamp_nanos()
.init();
/*let recv = task::spawn(async move {
receive().await;
});
let publ = task::spawn(async move {
publish("hello world 123 !").await;
});
join!(recv, publ);*/
receive().await;
//publish("Ahoj 123").await;
log::info!("Done");
}

View File

@ -1,40 +0,0 @@
use core::future::Future;
use crate::packet::v5::reason_codes::ReasonCode;
#[derive(Debug)]
pub enum NetworkError {
Connection,
Unknown,
QoSAck,
IDNotMatchedOnAck,
NoMatchingSubs,
}
pub trait Network {
type ConnectionFuture<'m>: Future<Output = Result<(), ReasonCode>>
where
Self: 'm;
type WriteFuture<'m>: Future<Output = Result<(), ReasonCode>>
where
Self: 'm;
type ReadFuture<'m>: Future<Output = Result<usize, ReasonCode>>
where
Self: 'm;
type TimerFuture<'m>: Future<Output = ()>
where
Self: 'm;
fn new(ip: [u8; 4], port: u16) -> Self;
fn create_connection(&'m mut self) -> Self::ConnectionFuture<'m>;
fn send(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m>;
fn receive(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m>;
fn count_down(&'m mut self, time_in_secs: u64) -> Self::TimerFuture<'m>;
}

View File

@ -1,100 +0,0 @@
use alloc::format;
use alloc::string::String;
use core::future::Future;
use core::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
use crate::network::network_trait::Network;
use crate::packet::v5::reason_codes::ReasonCode;
pub struct TokioNetwork {
ip: [u8; 4],
port: u16,
socket: Option<TcpStream>,
}
impl TokioNetwork {
fn convert_ip(&mut self) -> String {
String::from(format!(
"{}.{}.{}.{}:{}",
self.ip[0], self.ip[1], self.ip[2], self.ip[3], self.port
))
}
}
impl TokioNetwork {}
impl Network for TokioNetwork {
type ConnectionFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
type WriteFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<(), ReasonCode>> + 'm;
type ReadFuture<'m>
where
Self: 'm,
= impl Future<Output = Result<usize, ReasonCode>> + 'm;
type TimerFuture<'m>
where
Self: 'm,
= impl Future<Output = ()>;
fn new(ip: [u8; 4], port: u16) -> Self {
return Self {
ip,
port,
socket: Option::None,
};
}
fn create_connection<'m>(&'m mut self) -> Self::ConnectionFuture<'m> {
async move {
TcpStream::connect(self.convert_ip())
.await
.map(|socket| self.socket = Some(socket))
.map(|_| ())
.map_err(|_| ReasonCode::NetworkError)
}
}
fn send<'m>(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> {
async move {
return if let Some(ref mut stream) = self.socket {
stream
.write_all(&buffer[0..len])
.await
.map_err(|_| ReasonCode::NetworkError)
} else {
Err(ReasonCode::NetworkError)
};
}
}
fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> {
async move {
return if let Some(ref mut stream) = self.socket {
stream
.read(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
} else {
Err(ReasonCode::NetworkError)
};
}
}
fn count_down(&'m mut self, time_in_secs: u64) -> Self::TimerFuture<'m> {
async move {
return sleep(Duration::from_secs(time_in_secs))
.await
}
}
}