This commit is contained in:
Ondrej Babec 2022-03-14 11:11:50 +01:00
parent 7e10c2b0a4
commit 4baceade72
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
47 changed files with 712 additions and 197 deletions

View File

@ -1,2 +1,3 @@
allow_anonymous false
password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt

View File

@ -3,18 +3,19 @@ on: [pull_request]
name: IntegrationTests
jobs:
unit_tests:
integration_tests:
name: Integration tests
runs-on: ubuntu-latest
steps:
- name: Git checkout
uses: actions/checkout@v2
- name: Build
uses: actions-rs/toolchain@v1
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- run: cargo build
- name: Build embedded
run: cargo build --target thumbv7em-none-eabihf --features "no_std" --no-default-features
- name: Start Mosquitto
run: |
@ -22,4 +23,4 @@ jobs:
mosquitto -c .ci/mosquitto.conf -d
- name: Run integration tests
run: cargo test integration
run: RUST_LOG=trace cargo test integration --features "testing"

View File

@ -8,7 +8,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- run: cargo test unit
- name: Run Unit tests
run: RUST_LOG=trace cargo test unit --features "testing"

View File

@ -1,6 +1,6 @@
[workspace]
members = [
"examples/drogue",
#"examples/drogue",
"mqtt",
]
resolver = "2"
@ -10,3 +10,4 @@ embassy = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd983
embassy-traits = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" }
embassy-net = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" }
drogue-device = { git = "https://github.com/drogue-iot/drogue-device.git", rev = "62ff20e278a6a705056173171714b1bbdc078df5" }
embassy-nrf = { git = "https://github.com/embassy-rs/embassy.git", rev = "3d6b8bd9832d5a29cab4aa21434663e6ea6f4488" }

View File

@ -1,20 +1,11 @@
[package]
name = "embassy-network"
name = "drogue-network"
version = "0.0.1"
authors = ["Ondrej Babec <ond.babec@gmail.com>"]
edition = "2021"
resolver = "2"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
env_logger = "0.9.0"
rust-mqtt = { path = "../../mqtt"}
embassy = { version = "0.1.0", features = ["std"] }
log = "0.4.14"
embedded-hal = { version = "0.2", features = ["unproven"] }
embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.6", git = "https://github.com/embassy-rs/embedded-hal", branch = "embassy" }
embedded-hal-async = { version = "0.0.1", git = "https://github.com/embassy-rs/embedded-hal", branch = "embassy"}
drogue-device = { version = "0.1.0", features = ["std"] }
defmt = { version = "0.3" }
rust-mqtt = { path = "../../mqtt", features = ["no_std"], default-features = false }
drogue-device = { path = "/drogue-device/device", features = ["time"], default-features = false }

View File

@ -33,14 +33,11 @@ use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use drogue_device::traits::tcp;
use drogue_device::traits::tcp::TcpStack;
use embassy::io::{AsyncBufReadExt, AsyncWriteExt};
use embassy::time::Delay;
use embedded_hal_async::delay::DelayUs;
use rust_mqtt::network::network_trait::{NetworkConnection, NetworkConnectionFactory};
pub struct DrogueNetwork<A>
where
A: TcpActor + 'static,
where
A: TcpActor + 'static,
{
socket: Socket<A>,
}
@ -92,10 +89,9 @@ where
}
}
fn close(mut self) -> Self::CloseFuture<'m> {
fn close<'m>(mut self) -> Self::CloseFuture<'m> {
async move {
self.socket
.close()
self.socket.close()
.await
.map_err(|_| ReasonCode::NetworkError)
}
@ -141,11 +137,11 @@ where
.await
{
Ok(_) => {
log::trace!("Connection established");
trace!("Connection established");
Ok(DrogueNetwork::new(socket))
}
Err(e) => {
log::warn!("Error creating connection: {:?}", e);
warn!("Error creating connection:");
socket.close().await.map_err(|e| ReasonCode::NetworkError)?;
Err(ReasonCode::NetworkError)
}

228
examples/drogue/src/fmt.rs Normal file
View File

@ -0,0 +1,228 @@
#![macro_use]
#![allow(unused_macros)]
#[cfg(all(feature = "defmt", feature = "log"))]
compile_error!("You may not enable both `defmt` and `log` features.");
macro_rules! assert {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert!($($x)*);
}
};
}
macro_rules! assert_eq {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert_eq!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert_eq!($($x)*);
}
};
}
macro_rules! assert_ne {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert_ne!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert_ne!($($x)*);
}
};
}
macro_rules! debug_assert {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert!($($x)*);
}
};
}
macro_rules! debug_assert_eq {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert_eq!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert_eq!($($x)*);
}
};
}
macro_rules! debug_assert_ne {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert_ne!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert_ne!($($x)*);
}
};
}
macro_rules! todo {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::todo!($($x)*);
#[cfg(feature = "defmt")]
::defmt::todo!($($x)*);
}
};
}
macro_rules! unreachable {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::unreachable!($($x)*);
#[cfg(feature = "defmt")]
::defmt::unreachable!($($x)*);
}
};
}
macro_rules! panic {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::panic!($($x)*);
#[cfg(feature = "defmt")]
::defmt::panic!($($x)*);
}
};
}
macro_rules! trace {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::trace!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::trace!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! debug {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::debug!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::debug!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! info {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::info!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::info!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! warn {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::warn!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::warn!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! error {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::error!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::error!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
#[cfg(feature = "defmt")]
macro_rules! unwrap {
($($x:tt)*) => {
::defmt::unwrap!($($x)*)
};
}
#[cfg(not(feature = "defmt"))]
macro_rules! unwrap {
($arg:expr) => {
match $crate::fmt::Try::into_result($arg) {
::core::result::Result::Ok(t) => t,
::core::result::Result::Err(e) => {
::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e);
}
}
};
($arg:expr, $($msg:expr),+ $(,)? ) => {
match $crate::fmt::Try::into_result($arg) {
::core::result::Result::Ok(t) => t,
::core::result::Result::Err(e) => {
::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e);
}
}
}
}
#[cfg(feature = "defmt-timestamp-uptime")]
defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() }
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct NoneError;
pub trait Try {
type Ok;
type Error;
fn into_result(self) -> Result<Self::Ok, Self::Error>;
}
impl<T> Try for Option<T> {
type Ok = T;
type Error = NoneError;
#[inline]
fn into_result(self) -> Result<T, NoneError> {
self.ok_or(NoneError)
}
}
impl<T, E> Try for Result<T, E> {
type Ok = T;
type Error = E;
#[inline]
fn into_result(self) -> Self {
self
}
}

View File

@ -28,4 +28,6 @@
#![allow(dead_code)]
#![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)]
pub mod fmt;
pub mod drogue_network;

View File

@ -8,19 +8,21 @@ resolver = "2"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
env_logger = "0.9.0"
log = "0.4.14"
heapless = "0.7.10"
rand_core = "0.6.0"
defmt = { version = "0.3", optional = true }
tokio = { version = "1", features = ["full"], optional = true }
tokio-test = { version = "0.4.2", optional = true}
log = { version = "0.4.14", optional = true }
tokio = { version = "1", features = ["full"], optional = true, default-features = false }
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tokio-test = "0.4.2"
tokio-test = { version = "0.4.2"}
env_logger = "0.9.0"
log = { version = "0.4.14"}
[features]
default = ["tokio", "std", "tokio-test"]
default = ["testing"]
testing = ["tokio", "std", "log"]
std = []
no_std = []
no_std = ["defmt"]

View File

@ -25,8 +25,11 @@
use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService;
use crate::utils::types::{BinaryData, EncodedString};
use heapless::Vec;
#[derive(Clone)]
pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
pub qos: QualityOfService,
pub keep_alive: u16,

View File

@ -39,11 +39,13 @@ use crate::packet::v5::subscription_packet::SubscriptionPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::rng_generator::CountingRng;
use crate::utils::types::BufferError;
use heapless::Vec;
use rand_core::RngCore;
use crate::network::network_trait::NetworkError::Connection;
pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
network_driver: &'a mut T,
connection: Option<T>,
buffer: &'a mut [u8],
buffer_len: usize,
recv_buffer: &'a mut [u8],
@ -57,7 +59,7 @@ where
T: NetworkConnection,
{
pub fn new(
network_driver: &'a mut T,
network_driver: T,
buffer: &'a mut [u8],
buffer_len: usize,
recv_buffer: &'a mut [u8],
@ -65,7 +67,7 @@ where
config: ClientConfig<'a, MAX_PROPERTIES>,
) -> Self {
Self {
network_driver,
connection: Some(network_driver),
buffer,
buffer_len,
recv_buffer,
@ -75,8 +77,10 @@ where
}
}
// Muze prijit disconnect kvuli male velikosti packetu
pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let len = {
let mut connect = ConnectPacket::<'b, MAX_PROPERTIES, 0>::new();
connect.keep_alive = self.config.keep_alive;
@ -92,23 +96,26 @@ where
};
if let Err(err) = len {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
self.network_driver.send(self.buffer, len.unwrap()).await?;
let mut conn = self.connection.as_mut().unwrap();
trace!("Sending connect");
conn.send(self.buffer, len.unwrap()).await?;
//connack
let reason: Result<u8, BufferError> = {
self.network_driver.receive(self.buffer).await?;
let mut packet = ConnackPacket::<'b, 5>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) {
trace!("Waiting for connack");
conn.receive(self.recv_buffer).await?;
let mut packet = ConnackPacket::<'b, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) {
if err == BufferError::PacketTypeMismatch {
let mut disc = DisconnectPacket::<'b, 5>::new();
let mut disc = DisconnectPacket::<'b, MAX_PROPERTIES>::new();
if disc
.decode(&mut BuffReader::new(self.buffer, self.buffer_len))
.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len))
.is_ok()
{
log::error!("Client was disconnected with reason: ");
error!("Client was disconnected with reason: ");
return Err(ReasonCode::from(disc.disconnect_reason));
}
}
@ -119,7 +126,7 @@ where
};
if let Err(err) = reason {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
let res = reason.unwrap();
@ -131,13 +138,29 @@ where
}
pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
let mut disconnect = DisconnectPacket::<'b, 5>::new();
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let conn = self.connection.as_mut().unwrap();
trace!("Creating disconnect packet!");
let mut disconnect = DisconnectPacket::<'b, MAX_PROPERTIES>::new();
let len = disconnect.encode(self.buffer, self.buffer_len);
if let Err(err) = len {
log::error!("[DECODE ERR]: {}", err);
warn!("[DECODE ERR]: {}", err);
self.connection.take().unwrap().close().await?;
return Err(ReasonCode::BuffError);
}
self.network_driver.send(self.buffer, len.unwrap()).await?;
if let Err(e) = conn.send(self.buffer, len.unwrap()).await {
warn!("Could not send DISCONNECT packet");
}
if let Err(e) = self.connection.take().unwrap().close().await {
warn!("Could not close the TCP handle");
return Err(e);
} else {
trace!("Closed TCP handle");
}
Ok(())
}
@ -146,9 +169,13 @@ where
topic_name: &'b str,
message: &'b str,
) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let identifier: u16 = self.rng.next_u32() as u16;
let len = {
let mut packet = PublishPacket::<'b, 5>::new();
let mut packet = PublishPacket::<'b, MAX_PROPERTIES>::new();
packet.add_topic_name(topic_name);
packet.add_qos(self.config.qos);
packet.add_identifier(identifier);
@ -157,20 +184,21 @@ where
};
if let Err(err) = len {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
trace!("Sending message");
conn.send(self.buffer, len.unwrap()).await?;
self.network_driver.send(self.buffer, len.unwrap()).await?;
//QoS1
// QoS1
if <QualityOfService as Into<u8>>::into(self.config.qos)
== <QualityOfService as Into<u8>>::into(QoS1)
{
let reason: Result<[u16; 2], BufferError> = {
self.network_driver.receive(self.buffer).await?;
let mut packet = PubackPacket::<'b, 5>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len))
trace!("Waiting for ack");
conn.receive(self.recv_buffer).await?;
let mut packet = PubackPacket::<'b, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len))
{
Err(err)
} else {
@ -179,7 +207,7 @@ where
};
if let Err(err) = reason {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
@ -199,8 +227,12 @@ where
&'b mut self,
topic_names: &'b Vec<&'b str, TOPICS>,
) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let len = {
let mut subs = SubscriptionPacket::<'b, TOPICS, 1>::new();
let mut subs = SubscriptionPacket::<'b, TOPICS, MAX_PROPERTIES>::new();
let mut i = 0;
loop {
if i == TOPICS {
@ -213,17 +245,17 @@ where
};
if let Err(err) = len {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
self.network_driver.send(self.buffer, len.unwrap()).await?;
conn.send(self.buffer, len.unwrap()).await?;
let reason: Result<Vec<u8, TOPICS>, BufferError> = {
self.network_driver.receive(self.buffer).await?;
conn.receive(self.recv_buffer).await?;
let mut packet = SubackPacket::<'b, TOPICS, 5>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) {
let mut packet = SubackPacket::<'b, TOPICS, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) {
Err(err)
} else {
Ok(packet.reason_codes)
@ -231,7 +263,7 @@ where
};
if let Err(err) = reason {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
let reasons = reason.unwrap();
@ -252,24 +284,28 @@ where
&'b mut self,
topic_name: &'b str,
) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let len = {
let mut subs = SubscriptionPacket::<'b, 1, 1>::new();
let mut subs = SubscriptionPacket::<'b, 1, MAX_PROPERTIES>::new();
subs.add_new_filter(topic_name, self.config.qos);
subs.encode(self.buffer, self.buffer_len)
};
if let Err(err) = len {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
self.network_driver.send(self.buffer, len.unwrap()).await?;
conn.send(self.buffer, len.unwrap()).await?;
let reason: Result<u8, BufferError> = {
self.network_driver.receive(self.buffer).await?;
conn.receive(self.recv_buffer).await?;
let mut packet = SubackPacket::<'b, 5, 5>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) {
let mut packet = SubackPacket::<'b, 5, MAX_PROPERTIES>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) {
Err(err)
} else {
Ok(*packet.reason_codes.get(0).unwrap())
@ -277,7 +313,7 @@ where
};
if let Err(err) = reason {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
@ -290,7 +326,11 @@ where
}
pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> {
self.network_driver.receive(self.recv_buffer).await?;
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
conn.receive(self.recv_buffer).await?;
let mut packet = PublishPacket::<'b, 5>::new();
if let Err(err) =
packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len))
@ -298,30 +338,30 @@ where
if err == BufferError::PacketTypeMismatch {
let mut disc = DisconnectPacket::<'b, 5>::new();
if disc
.decode(&mut BuffReader::new(self.buffer, self.buffer_len))
.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len))
.is_ok()
{
log::error!("Client was disconnected with reason: ");
error!("Client was disconnected with reason: ");
return Err(ReasonCode::from(disc.disconnect_reason));
}
}
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
if (packet.fixed_header & 0x06)
== <QualityOfService as Into<u8>>::into(QualityOfService::QoS1)
{
let mut puback = PubackPacket::<'b, 5>::new();
let mut puback = PubackPacket::<'b, MAX_PROPERTIES>::new();
puback.packet_identifier = packet.packet_identifier;
puback.reason_code = 0x00;
{
let len = puback.encode(self.buffer, self.buffer_len);
if let Err(err) = len {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
self.network_driver.send(self.buffer, len.unwrap()).await?;
conn.send(self.buffer, len.unwrap()).await?;
}
}
@ -329,22 +369,26 @@ where
}
pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
let mut conn = self.connection.as_mut().unwrap();
let len = {
let mut packet = PingreqPacket::new();
packet.encode(self.buffer, self.buffer_len)
};
if let Err(err) = len {
log::error!("[DECODE ERR]: {}", err);
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}
self.network_driver.send(self.buffer, len.unwrap()).await?;
conn.send(self.buffer, len.unwrap()).await?;
self.network_driver.receive(self.buffer).await?;
conn.receive(self.recv_buffer).await?;
let mut packet = PingrespPacket::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) {
log::error!("[DECODE ERR]: {}", err);
if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) {
error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
} else {
Ok(())

View File

@ -1,27 +1,28 @@
/*
MIT License
Copyright (c) [2022] [Ondrej Babec <ond.babec@gmailc.com>]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
use crate::utils::types::BufferError;
/// VariableByteIntegerEncoder and VariableByteIntegerDecoder are implemented based on
@ -48,7 +49,7 @@ impl VariableByteIntegerEncoder {
const MAX_ENCODABLE: u32 = 268435455;
const MOD: u32 = 128;
if target > MAX_ENCODABLE {
log::error!("Maximal value of integer for encoding was exceeded");
error!("Maximal value of integer for encoding was exceeded");
return Err(BufferError::EncodingError);
}

228
mqtt/src/fmt.rs Normal file
View File

@ -0,0 +1,228 @@
#![macro_use]
#![allow(unused_macros)]
#[cfg(all(feature = "defmt", feature = "log"))]
compile_error!("You may not enable both `defmt` and `log` features.");
macro_rules! assert {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert!($($x)*);
}
};
}
macro_rules! assert_eq {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert_eq!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert_eq!($($x)*);
}
};
}
macro_rules! assert_ne {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::assert_ne!($($x)*);
#[cfg(feature = "defmt")]
::defmt::assert_ne!($($x)*);
}
};
}
macro_rules! debug_assert {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert!($($x)*);
}
};
}
macro_rules! debug_assert_eq {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert_eq!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert_eq!($($x)*);
}
};
}
macro_rules! debug_assert_ne {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::debug_assert_ne!($($x)*);
#[cfg(feature = "defmt")]
::defmt::debug_assert_ne!($($x)*);
}
};
}
macro_rules! todo {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::todo!($($x)*);
#[cfg(feature = "defmt")]
::defmt::todo!($($x)*);
}
};
}
macro_rules! unreachable {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::unreachable!($($x)*);
#[cfg(feature = "defmt")]
::defmt::unreachable!($($x)*);
}
};
}
macro_rules! panic {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::panic!($($x)*);
#[cfg(feature = "defmt")]
::defmt::panic!($($x)*);
}
};
}
macro_rules! trace {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::trace!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::trace!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! debug {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::debug!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::debug!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! info {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::info!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::info!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! warn {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::warn!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::warn!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
macro_rules! error {
($s:literal $(, $x:expr)* $(,)?) => {
{
#[cfg(feature = "log")]
::log::error!($s $(, $x)*);
#[cfg(feature = "defmt")]
::defmt::error!($s $(, $x)*);
#[cfg(not(any(feature = "log", feature="defmt")))]
let _ = ($( & $x ),*);
}
};
}
#[cfg(feature = "defmt")]
macro_rules! unwrap {
($($x:tt)*) => {
::defmt::unwrap!($($x)*)
};
}
#[cfg(not(feature = "defmt"))]
macro_rules! unwrap {
($arg:expr) => {
match $crate::fmt::Try::into_result($arg) {
::core::result::Result::Ok(t) => t,
::core::result::Result::Err(e) => {
::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e);
}
}
};
($arg:expr, $($msg:expr),+ $(,)? ) => {
match $crate::fmt::Try::into_result($arg) {
::core::result::Result::Ok(t) => t,
::core::result::Result::Err(e) => {
::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e);
}
}
}
}
#[cfg(feature = "defmt-timestamp-uptime")]
defmt::timestamp! {"{=u64:us}", crate::time::Instant::now().as_micros() }
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct NoneError;
pub trait Try {
type Ok;
type Error;
fn into_result(self) -> Result<Self::Ok, Self::Error>;
}
impl<T> Try for Option<T> {
type Ok = T;
type Error = NoneError;
#[inline]
fn into_result(self) -> Result<T, NoneError> {
self.ok_or(NoneError)
}
}
impl<T, E> Try for Result<T, E> {
type Ok = T;
type Error = E;
#[inline]
fn into_result(self) -> Self {
self
}
}

View File

@ -29,6 +29,7 @@
#![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)]
pub(crate) mod fmt;
pub mod client;
pub mod encoding;
pub mod network;
@ -36,3 +37,4 @@ pub mod packet;
pub mod tests;
pub mod tokio_net;
pub mod utils;

View File

@ -46,7 +46,7 @@ pub struct AuthPacket<'a, const MAX_PROPERTIES: usize> {
impl<'a, const MAX_PROPERTIES: usize> AuthPacket<'a, MAX_PROPERTIES> {
pub fn add_reason_code(&mut self, code: u8) {
if code != 0 && code != 24 && code != 25 {
log::error!("Provided reason code is not supported!");
error!("Provided reason code is not supported!");
return;
}
self.auth_reason = code;
@ -56,7 +56,7 @@ impl<'a, const MAX_PROPERTIES: usize> AuthPacket<'a, MAX_PROPERTIES> {
if p.auth_property() {
self.push_to_properties(p);
} else {
log::error!("Provided property is not correct AUTH packet property!");
error!("Provided property is not correct AUTH packet property!");
}
}
}

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -73,7 +74,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPE
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Connack).into() {
log::error!("Packet you are trying to decode is not CONNACK packet!");
error!("Packet you are trying to decode is not CONNACK packet!");
return Err(BufferError::PacketTypeMismatch);
}
self.ack_flags = buff_reader.read_u8()?;

View File

@ -187,7 +187,7 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'
}
fn decode(&mut self, _buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
log::error!("Decode function is not available for control packet!");
error!("Decode function is not available for control packet!");
Err(BufferError::WrongPacketToDecode)
}

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -79,7 +80,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PR
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Disconnect).into() {
log::error!("Packet you are trying to decode is not DISCONNECT packet!");
error!("Packet you are trying to decode is not DISCONNECT packet!");
return Err(BufferError::WrongPacketToDecode);
}
self.disconnect_reason = buff_reader.read_u8()?;

View File

@ -82,7 +82,7 @@ pub trait Packet<'a> {
if self.get_property_len() != 0 {
loop {
prop = Property::decode(buff_reader)?;
log::debug!("Parsed property {:?}", prop);
//debug!("Parsed property {:?}", prop);
x = x + prop.len() as u32 + 1;
self.push_to_properties(prop);
@ -100,6 +100,7 @@ pub trait Packet<'a> {
buff_reader: &mut BuffReader,
) -> Result<PacketType, BufferError> {
let first_byte: u8 = buff_reader.read_u8()?;
trace!("First byte of accepted packet: {:02X}", first_byte);
self.set_fixed_header(first_byte);
self.set_remaining_len(buff_reader.read_variable_byte_int()?);
return Ok(PacketType::from(first_byte));

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use crate::packet::v5::mqtt_packet::Packet;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;
@ -53,21 +54,21 @@ impl<'a> Packet<'a> for PingreqPacket {
}
fn decode(&mut self, _buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
log::error!("Pingreq Packet packet does not support decode funtion on client!");
error!("Pingreq Packet packet does not support decode funtion on client!");
Err(BufferError::WrongPacketToDecode)
}
fn set_property_len(&mut self, _value: u32) {
log::error!("PINGREQ packet does not contain any properties!");
error!("PINGREQ packet does not contain any properties!");
}
fn get_property_len(&mut self) -> u32 {
log::error!("PINGREQ packet does not contain any properties!");
error!("PINGREQ packet does not contain any properties!");
return 0;
}
fn push_to_properties(&mut self, _property: Property<'a>) {
log::error!("PINGREQ packet does not contain any properties!");
error!("PINGREQ packet does not contain any properties!");
}
fn property_allowed(&mut self, property: &Property<'a>) -> bool {

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use crate::packet::v5::mqtt_packet::Packet;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_writer::BuffWriter;
@ -55,27 +56,27 @@ impl<'a> Packet<'a> for PingrespPacket {
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
let x = self.decode_fixed_header(buff_reader)?;
if x != (PacketType::Pingresp).into() {
log::error!("Packet you are trying to decode is not PINGRESP packet!");
error!("Packet you are trying to decode is not PINGRESP packet!");
return Err(BufferError::PacketTypeMismatch);
}
if self.remain_len != 0 {
log::error!("PINGRESP packet does not have 0 lenght!");
error!("PINGRESP packet does not have 0 lenght!");
return Err(BufferError::PacketTypeMismatch);
}
Ok(())
}
fn set_property_len(&mut self, _value: u32) {
log::error!("PINGRESP packet does not contain any properties!");
error!("PINGRESP packet does not contain any properties!");
}
fn get_property_len(&mut self) -> u32 {
log::error!("PINGRESP packet does not contain any properties!");
error!("PINGRESP packet does not contain any properties!");
return 0;
}
fn push_to_properties(&mut self, _property: Property<'a>) {
log::error!("PINGRESP packet does not contain any properties!");
error!("PINGRESP packet does not contain any properties!");
}
fn property_allowed(&mut self, property: &Property<'a>) -> bool {

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -75,7 +76,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPER
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Puback).into() {
log::error!("Packet you are trying to decode is not PUBACK packet!");
error!("Packet you are trying to decode is not PUBACK packet!");
return Err(BufferError::PacketTypeMismatch);
}
self.packet_identifier = buff_reader.read_u16()?;

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -75,7 +76,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubcompPacket<'a, MAX_PROPE
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Pubcomp).into() {
log::error!("Packet you are trying to decode is not PUBCOMP packet!");
error!("Packet you are trying to decode is not PUBCOMP packet!");
return Err(BufferError::PacketTypeMismatch);
}
self.packet_identifier = buff_reader.read_u16()?;

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -136,7 +137,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPE
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Publish).into() {
log::error!("Packet you are trying to decode is not PUBLISH packet!");
error!("Packet you are trying to decode is not PUBLISH packet!");
return Err(BufferError::PacketTypeMismatch);
}
self.topic_name = buff_reader.read_string()?;

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -75,7 +76,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrecPacket<'a, MAX_PROPER
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Pubrec).into() {
log::error!("Packet you are trying to decode is not PUBREC packet!");
error!("Packet you are trying to decode is not PUBREC packet!");
return Err(BufferError::PacketTypeMismatch);
}
self.packet_identifier = buff_reader.read_u16()?;

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -75,7 +76,7 @@ impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrelPacket<'a, MAX_PROPER
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Pubrel).into() {
log::error!("Packet you are trying to decode is not PUBREL packet!");
error!("Packet you are trying to decode is not PUBREL packet!");
return Err(BufferError::PacketTypeMismatch);
}
self.packet_identifier = buff_reader.read_u16()?;

View File

@ -25,6 +25,7 @@
use core::fmt::{Display, Formatter};
#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum ReasonCode {
Success,
GrantedQoS1,

View File

@ -23,6 +23,7 @@
*/
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
@ -80,13 +81,13 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a>
}
fn encode(&mut self, _buffer: &mut [u8], _buffer_len: usize) -> Result<usize, BufferError> {
log::error!("SUBACK packet does not support encoding!");
error!("SUBACK packet does not support encoding!");
return Err(BufferError::WrongPacketToEncode);
}
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Suback).into() {
log::error!("Packet you are trying to decode is not SUBACK packet!");
error!("Packet you are trying to decode is not SUBACK packet!");
return Err(BufferError::PacketTypeMismatch);
}
self.packet_identifier = buff_reader.read_u16()?;

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use super::packet_type::PacketType;
@ -106,7 +107,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a>
}
fn decode(&mut self, _buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
log::error!("Subscribe packet does not support decode funtion on client!");
error!("Subscribe packet does not support decode funtion on client!");
Err(BufferError::WrongPacketToDecode)
}
fn set_property_len(&mut self, value: u32) {

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::packet::v5::mqtt_packet::Packet;
@ -74,13 +75,13 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a>
}
fn encode(&mut self, _buffer: &mut [u8], _buffer_len: usize) -> Result<usize, BufferError> {
log::error!("UNSUBACK packet does not support encoding!");
error!("UNSUBACK packet does not support encoding!");
Err(BufferError::WrongPacketToEncode)
}
fn decode(&mut self, buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
if self.decode_fixed_header(buff_reader)? != (PacketType::Unsuback).into() {
log::error!("Packet you are trying to decode is not UNSUBACK packet!");
error!("Packet you are trying to decode is not UNSUBACK packet!");
return Err(BufferError::PacketTypeMismatch);
}
self.packet_identifier = buff_reader.read_u16()?;

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
@ -105,7 +106,7 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a>
}
fn decode(&mut self, _buff_reader: &mut BuffReader<'a>) -> Result<(), BufferError> {
log::error!("Unsubscribe packet does not support decode funtion on client!");
error!("Unsubscribe packet does not support decode funtion on client!");
Err(BufferError::WrongPacketToDecode)
}

View File

@ -25,6 +25,7 @@ extern crate alloc;
use alloc::string::String;
use core::time::Duration;
use std::future::Future;
use log::LevelFilter;
use tokio::time::sleep;
use tokio::{join, task};
use tokio_test::assert_ok;
@ -38,6 +39,7 @@ use crate::packet::v5::reason_codes::ReasonCode;
use crate::packet::v5::reason_codes::ReasonCode::NotAuthorized;
use crate::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
use crate::utils::types::BufferError;
use std::sync::Once;
static IP: [u8; 4] = [127, 0, 0, 1];
static PORT: u16 = 1883;
@ -45,32 +47,33 @@ static USERNAME: &str = "test";
static PASSWORD: &str = "testPass";
static MSG: &str = "testMessage";
fn init() {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.format_timestamp_nanos()
.try_init();
static INIT: Once = Once::new();
fn setup() {
INIT.call_once(|| {
env_logger::init();
});
}
async fn publish_core<'b>(
client: &mut MqttClientV5<'b, TokioNetwork, 5>,
topic: &str,
) -> Result<(), ReasonCode> {
log::info!(
info!(
"[Publisher] Connection to broker with username {} and password {}",
USERNAME,
PASSWORD
);
let mut result = { client.connect_to_broker().await };
assert_ok!(result);
log::info!("[Publisher] Waiting {} seconds before sending", 5);
info!("[Publisher] Waiting {} seconds before sending", 5);
sleep(Duration::from_secs(5)).await;
log::info!("[Publisher] Sending new message {} to topic {}", MSG, topic);
info!("[Publisher] Sending new message {} to topic {}", MSG, topic);
result = { client.send_message(topic, MSG).await };
assert_ok!(result);
log::info!("[Publisher] Disconnecting!");
info!("[Publisher] Disconnecting!");
result = { client.disconnect().await };
assert_ok!(result);
Ok(())
@ -88,7 +91,7 @@ async fn publish(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut write_buffer = [0; 80];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
&mut tokio_network,
tokio_network,
&mut write_buffer,
80,
&mut recv_buffer,
@ -102,7 +105,7 @@ async fn receive_core<'b>(
client: &mut MqttClientV5<'b, TokioNetwork, 5>,
topic: &str,
) -> Result<(), ReasonCode> {
log::info!(
info!(
"[Receiver] Connection to broker with username {} and password {}",
USERNAME,
PASSWORD
@ -110,17 +113,17 @@ async fn receive_core<'b>(
let mut result = { client.connect_to_broker().await };
assert_ok!(result);
log::info!("[Receiver] Subscribing to topic {}", topic);
info!("[Receiver] Subscribing to topic {}", topic);
result = { client.subscribe_to_topic(topic).await };
assert_ok!(result);
log::info!("[Receiver] Waiting for new message!");
info!("[Receiver] Waiting for new message!");
let msg = { client.receive_message().await };
assert_ok!(msg);
let act_message = String::from_utf8_lossy(msg?);
log::info!("[Receiver] Got new message: {}", act_message);
info!("[Receiver] Got new message: {}", act_message);
assert_eq!(act_message, MSG);
log::info!("[Receiver] Disconnecting");
info!("[Receiver] Disconnecting");
result = { client.disconnect().await };
assert_ok!(result);
Ok(())
@ -139,7 +142,7 @@ async fn receive(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
&mut tokio_network,
tokio_network,
&mut write_buffer,
100,
&mut recv_buffer,
@ -163,7 +166,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
&mut tokio_network,
tokio_network,
&mut write_buffer,
100,
&mut recv_buffer,
@ -171,7 +174,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode
config,
);
log::info!(
info!(
"[Receiver] Connection to broker with username {} and password {}",
"xyz",
PASSWORD
@ -184,8 +187,8 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode
#[tokio::test]
async fn simple_publish_recv() {
init();
log::info!("Running simple integration test");
setup();
info!("Running simple integration test");
let recv =
task::spawn(async move { receive(QualityOfService::QoS0, "test/recv/simple").await });
@ -200,8 +203,8 @@ async fn simple_publish_recv() {
#[tokio::test]
async fn simple_publish_recv_qos() {
init();
log::info!("Running simple integration test with Quality of Service 1");
setup();
info!("Running simple integration test with Quality of Service 1");
let recv = task::spawn(async move { receive(QualityOfService::QoS1, "test/recv/qos").await });
@ -213,8 +216,8 @@ async fn simple_publish_recv_qos() {
#[tokio::test]
async fn simple_publish_recv_wrong_cred() {
init();
log::info!("Running simple integration test wrong credentials");
setup();
info!("Running simple integration test wrong credentials");
let recv = task::spawn(async move { receive_with_wrong_cred(QualityOfService::QoS1).await });

View File

@ -29,5 +29,5 @@ pub mod unit;
#[allow(dead_code)]
#[allow(unused_must_use)]
#[allow(unused_imports)]
#[cfg(feature = "tokio")]
#[cfg(feature = "testing")]
pub mod integration;

View File

@ -27,6 +27,7 @@ use crate::packet::v5::mqtt_packet::Packet;
use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
use crate::utils::buffer_reader::BuffReader;
use heapless::Vec;
#[test]

View File

@ -28,6 +28,7 @@ use crate::packet::v5::property::Property;
use crate::packet::v5::puback_packet::PubackPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString;
use heapless::Vec;
#[test]

View File

@ -28,6 +28,7 @@ use crate::packet::v5::property::Property;
use crate::packet::v5::pubcomp_packet::PubcompPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString;
use heapless::Vec;
#[test]

View File

@ -28,6 +28,7 @@ use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::{PublishPacket, QualityOfService};
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::EncodedString;
use heapless::Vec;
#[test]

View File

@ -28,6 +28,7 @@ use crate::packet::v5::property::Property;
use crate::packet::v5::pubrec_packet::PubrecPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test]

View File

@ -28,6 +28,7 @@ use crate::packet::v5::property::Property;
use crate::packet::v5::pubrel_packet::PubrelPacket;
use crate::utils::buffer_reader::BuffReader;
use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test]

View File

@ -27,6 +27,7 @@ use crate::packet::v5::packet_type::PacketType;
use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1};
use crate::packet::v5::subscription_packet::SubscriptionPacket;
use heapless::Vec;
#[test]

View File

@ -28,6 +28,7 @@ use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService::{QoS0, QoS1};
use crate::packet::v5::unsubscription_packet::UnsubscriptionPacket;
use crate::utils::types::{EncodedString, StringPair};
use heapless::Vec;
#[test]

View File

@ -25,6 +25,7 @@
use crate::packet::v5::property::Property;
use crate::utils::buffer_writer::BuffWriter;
use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair, TopicFilter};
use heapless::Vec;
#[test]

View File

@ -34,14 +34,15 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
pub struct TokioNetwork {
stream: Option<TcpStream>,
stream: TcpStream,
}
impl TokioNetwork {
pub fn new(stream: TcpStream) -> Self {
Self {
stream: Some(stream),
stream,
}
}
@ -73,49 +74,30 @@ impl NetworkConnection for TokioNetwork {
fn send<'m>(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> {
async move {
return if let Some(ref mut stream) = self.stream {
stream
.write_all(&buffer[0..len])
.await
.map_err(|_| ReasonCode::NetworkError)
} else {
Err(ReasonCode::NetworkError)
};
self.stream
.write_all(&buffer[0..len])
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> {
async move {
return if let Some(ref mut stream) = self.stream {
stream
.read(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
} else {
Err(ReasonCode::NetworkError)
};
self.stream
.read(buffer)
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
fn close<'m>(mut self) -> Self::CloseFuture<'m> {
async move {
return if let Some(ref mut stream) = self.stream {
stream
.shutdown()
.await
.map_err(|_| ReasonCode::NetworkError)
} else {
Err(ReasonCode::NetworkError)
};
self.stream
.shutdown()
.await
.map_err(|_| ReasonCode::NetworkError)
}
}
/*fn count_down(&'m mut self, time_in_secs: u64) -> Self::TimerFuture<'m> {
async move {
return sleep(Duration::from_secs(time_in_secs))
.await
}
}*/
}
pub struct TokioNetworkFactory {}

View File

@ -25,6 +25,7 @@
use core::mem;
use core::str;
use crate::encoding::variable_byte_integer::VariableByteIntegerDecoder;
use crate::utils::types::{BinaryData, BufferError, EncodedString, StringPair};
@ -129,7 +130,7 @@ impl<'a> BuffReader<'a> {
let res_str = str::from_utf8(&(self.buffer[self.position..(self.position + len)]));
if res_str.is_err() {
log::error!("Could not parse utf-8 string");
error!("Could not parse utf-8 string");
return Err(BufferError::Utf8Error);
}
self.increment_position(len);

View File

@ -22,6 +22,7 @@
* SOFTWARE.
*/
use heapless::Vec;
use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerEncoder};

View File

@ -25,6 +25,7 @@
use core::fmt::{Display, Formatter};
#[derive(core::fmt::Debug, Clone, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum BufferError {
Utf8Error,
InsufficientBufferSize,