First integration

This commit is contained in:
Ondrej Babec 2022-03-09 16:27:04 +01:00
parent edb07d94d3
commit 21e929c7ed
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
10 changed files with 179 additions and 40 deletions

View File

@ -36,11 +36,12 @@ pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
pub password_flag: bool,
pub password: BinaryData<'a>,
pub properties: Vec<Property<'a>, MAX_PROPERTIES>,
pub max_packet_size: u32,
}
impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
pub fn new() -> Self {
Self {
Self {
qos: QualityOfService::QoS0,
keep_alive: 60,
client_id: EncodedString::new(),
@ -48,7 +49,8 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
username: EncodedString::new(),
password_flag: false,
password: BinaryData::new(),
properties: Vec::<Property<'a>, MAX_PROPERTIES>::new()
properties: Vec::<Property<'a>, MAX_PROPERTIES>::new(),
max_packet_size: 265_000,
}
}
@ -77,4 +79,13 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
self.properties.push(prop);
}
}
pub fn add_max_packet_size_as_prop(& mut self) -> u32 {
if self.properties.len() < MAX_PROPERTIES {
let prop = Property::MaximumPacketSize(self.max_packet_size);
self.properties.push(prop);
return 5;
}
return 0;
}
}

View File

@ -1,3 +1,4 @@
use drogue_device::drogue::config;
use crate::client::client_config::ClientConfig;
use crate::network::network_trait::Network;
use crate::packet::v5::connack_packet::ConnackPacket;
@ -17,6 +18,7 @@ use crate::utils::rng_generator::CountingRng;
use crate::utils::types::BufferError;
use heapless::Vec;
use rand_core::RngCore;
use crate::packet::v5::property::Property;
pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
network_driver: &'a mut T,
@ -53,8 +55,10 @@ where
pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
let len = {
let mut connect = ConnectPacket::<'b, 3, 0>::clean();
let mut connect = ConnectPacket::<'b, MAX_PROPERTIES, 0>::new();
connect.keep_alive = self.config.keep_alive;
self.config.add_max_packet_size_as_prop();
connect.property_len = connect.add_properties(&self.config.properties);
if self.config.username_flag {
connect.add_username(&self.config.username);
}

View File

@ -16,7 +16,7 @@ use rust_mqtt::packet::v5::subscription_packet::SubscriptionPacket;
use rust_mqtt::tokio_network::TokioNetwork;
async fn receive() {
let mut ip: [u8; 4] = [37, 205, 11, 180];
let mut ip: [u8; 4] = [127, 0, 0, 1];
let mut port: u16 = 1883;
let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port);
tokio_network.create_connection().await;

View File

@ -136,8 +136,8 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'
let mut rm_ln = self.property_len;
let property_len_enc: [u8; 4] = VariableByteIntegerEncoder::encode(self.property_len)?;
let property_len_len = VariableByteIntegerEncoder::len(property_len_enc);
// Number 12 => protocol_name_len + protocol_name + protocol_version + connect_flags + keep_alive + client_id_len
rm_ln = rm_ln + property_len_len as u32 + 12;
// Number 12 => protocol_name_len + protocol_name (6) + protocol_version (1)+ connect_flags (1) + keep_alive (2) + client_id_len (2)
rm_ln = rm_ln + property_len_len as u32 + 10 + self.client_id.len as u32 + 2;
if self.connect_flags & 0x04 != 0 {
let wil_prop_len_enc = VariableByteIntegerEncoder::encode(self.will_property_len)?;

View File

@ -0,0 +1,149 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
use alloc::string::String;
use core::time::Duration;
use heapless::Vec;
use tokio::{join, task};
use tokio::time::sleep;
use crate::client::client_config::ClientConfig;
use crate::client::client_v5::MqttClientV5;
use crate::network::network_trait::Network;
use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService;
use crate::tokio_network::TokioNetwork;
static IP: [u8; 4] = [127, 0, 0, 1];
static PORT: u16 = 1883;
static USERNAME: &str = "test";
static PASSWORD: &str = "testPass";
static TOPIC: &str = "test/topic";
static MESSAGE: &str = "testMessage";
async fn publish() {
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT);
tokio_network.create_connection().await;
let mut config = ClientConfig::new();
config.add_qos(QualityOfService::QoS0);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 80;
let mut recv_buffer = [0; 80];
let mut write_buffer = [0; 80];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
&mut tokio_network,
&mut write_buffer,
80,
&mut recv_buffer,
80,
config,
);
log::info!("[Publisher] Connection to broker with username {} and password {}", USERNAME, PASSWORD);
let mut result = {
client.connect_to_broker().await
};
assert!(result.is_ok());
log::info!("[Publisher] Waiting {} seconds before sending", 5);
sleep(Duration::from_secs(5)).await;
log::info!("[Publisher] Sending new message {}", MESSAGE);
result = {
client.send_message(TOPIC, MESSAGE).await
};
assert!(result.is_ok());
log::info!("[Publisher] Disconnecting!");
result = {
client.disconnect().await
};
assert!(result.is_ok());
}
async fn receive() {
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT);
tokio_network.create_connection().await;
let mut config = ClientConfig::new();
config.add_qos(QualityOfService::QoS0);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 60;
config.properties.push(Property::ReceiveMaximum(20));
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 2>::new(
&mut tokio_network,
&mut write_buffer,
100,
&mut recv_buffer,
100,
config,
);
log::info!("[Receiver] Connection to broker with username {} and password {}", USERNAME, PASSWORD);
let mut result = {client.connect_to_broker().await};
assert!(result.is_ok());
log::info!("[Receiver] Subscribing to topic {}", TOPIC);
result = {
client.subscribe_to_topic(TOPIC).await
};
assert!(result.is_ok());
log::info!("[Receiver] Waiting for new message!");
let msg = {
client.receive_message().await
};
assert!(msg.is_ok());
let act_message = String::from_utf8_lossy(msg.unwrap());
log::info!("[Receiver] Got new message: {}", act_message);
assert_eq!(act_message, MESSAGE);
log::info!("[Receiver] Disconnecting");
result = {
client.disconnect().await
};
assert!(result.is_ok());
}
#[tokio::test]
async fn simple_publish_recv() {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.format_timestamp_nanos()
.init();
log::info!("Running simple integration test");
let recv = task::spawn(async move {
receive().await;
});
let publ = task::spawn(async move {
publish().await;
});
join!(recv, publ);
}

View File

@ -22,4 +22,4 @@
* SOFTWARE.
*/
pub mod client_v5_unit;
pub mod integration_test_single;

View File

@ -24,3 +24,4 @@
#[cfg(test)]
pub mod unit;
pub mod integration;

View File

@ -1,28 +0,0 @@
/*
* MIT License
*
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}

View File

@ -25,4 +25,3 @@
pub mod utils;
pub mod encoding;
pub mod packet;
pub mod client;

View File

@ -87,8 +87,11 @@ impl<'a> BuffWriter<'a> {
pub fn write_string_ref(&mut self, str: &EncodedString<'a>) -> Result<(), BufferError> {
self.write_u16(str.len)?;
let bytes = str.string.as_bytes();
return self.insert_ref(str.len as usize, bytes);
if str.len != 0 {
let bytes = str.string.as_bytes();
return self.insert_ref(str.len as usize, bytes);
}
return Ok(())
}
pub fn write_binary_ref(&mut self, bin: &BinaryData<'a>) -> Result<(), BufferError> {