From 21e929c7ed9e80b8e535e30c3b36c68a20e8e737 Mon Sep 17 00:00:00 2001 From: Ondrej Babec Date: Wed, 9 Mar 2022 16:27:04 +0100 Subject: [PATCH] First integration --- src/client/client_config.rs | 15 +- src/client/client_v5.rs | 6 +- src/main.rs | 2 +- src/packet/v5/connect_packet.rs | 4 +- .../integration/integration_test_single.rs | 149 ++++++++++++++++++ src/tests/{unit/client => integration}/mod.rs | 2 +- src/tests/mod.rs | 3 +- src/tests/unit/client/client_v5_unit.rs | 28 ---- src/tests/unit/mod.rs | 3 +- src/utils/buffer_writer.rs | 7 +- 10 files changed, 179 insertions(+), 40 deletions(-) create mode 100644 src/tests/integration/integration_test_single.rs rename src/tests/{unit/client => integration}/mod.rs (97%) delete mode 100644 src/tests/unit/client/client_v5_unit.rs diff --git a/src/client/client_config.rs b/src/client/client_config.rs index a1d1fd9..546def9 100644 --- a/src/client/client_config.rs +++ b/src/client/client_config.rs @@ -36,11 +36,12 @@ pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> { pub password_flag: bool, pub password: BinaryData<'a>, pub properties: Vec, MAX_PROPERTIES>, + pub max_packet_size: u32, } impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> { pub fn new() -> Self { - Self { + Self { qos: QualityOfService::QoS0, keep_alive: 60, client_id: EncodedString::new(), @@ -48,7 +49,8 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> { username: EncodedString::new(), password_flag: false, password: BinaryData::new(), - properties: Vec::, MAX_PROPERTIES>::new() + properties: Vec::, MAX_PROPERTIES>::new(), + max_packet_size: 265_000, } } @@ -77,4 +79,13 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> { self.properties.push(prop); } } + + pub fn add_max_packet_size_as_prop(& mut self) -> u32 { + if self.properties.len() < MAX_PROPERTIES { + let prop = Property::MaximumPacketSize(self.max_packet_size); + self.properties.push(prop); + return 5; + } + return 0; + } } diff --git a/src/client/client_v5.rs b/src/client/client_v5.rs index 58fb2fa..f1d9498 100644 --- a/src/client/client_v5.rs +++ b/src/client/client_v5.rs @@ -1,3 +1,4 @@ +use drogue_device::drogue::config; use crate::client::client_config::ClientConfig; use crate::network::network_trait::Network; use crate::packet::v5::connack_packet::ConnackPacket; @@ -17,6 +18,7 @@ use crate::utils::rng_generator::CountingRng; use crate::utils::types::BufferError; use heapless::Vec; use rand_core::RngCore; +use crate::packet::v5::property::Property; pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> { network_driver: &'a mut T, @@ -53,8 +55,10 @@ where pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> { let len = { - let mut connect = ConnectPacket::<'b, 3, 0>::clean(); + let mut connect = ConnectPacket::<'b, MAX_PROPERTIES, 0>::new(); connect.keep_alive = self.config.keep_alive; + self.config.add_max_packet_size_as_prop(); + connect.property_len = connect.add_properties(&self.config.properties); if self.config.username_flag { connect.add_username(&self.config.username); } diff --git a/src/main.rs b/src/main.rs index de8bf2b..e5d4bdb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use rust_mqtt::packet::v5::subscription_packet::SubscriptionPacket; use rust_mqtt::tokio_network::TokioNetwork; async fn receive() { - let mut ip: [u8; 4] = [37, 205, 11, 180]; + let mut ip: [u8; 4] = [127, 0, 0, 1]; let mut port: u16 = 1883; let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); tokio_network.create_connection().await; diff --git a/src/packet/v5/connect_packet.rs b/src/packet/v5/connect_packet.rs index 555c629..7a5b44d 100644 --- a/src/packet/v5/connect_packet.rs +++ b/src/packet/v5/connect_packet.rs @@ -136,8 +136,8 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<' let mut rm_ln = self.property_len; let property_len_enc: [u8; 4] = VariableByteIntegerEncoder::encode(self.property_len)?; let property_len_len = VariableByteIntegerEncoder::len(property_len_enc); - // Number 12 => protocol_name_len + protocol_name + protocol_version + connect_flags + keep_alive + client_id_len - rm_ln = rm_ln + property_len_len as u32 + 12; + // Number 12 => protocol_name_len + protocol_name (6) + protocol_version (1)+ connect_flags (1) + keep_alive (2) + client_id_len (2) + rm_ln = rm_ln + property_len_len as u32 + 10 + self.client_id.len as u32 + 2; if self.connect_flags & 0x04 != 0 { let wil_prop_len_enc = VariableByteIntegerEncoder::encode(self.will_property_len)?; diff --git a/src/tests/integration/integration_test_single.rs b/src/tests/integration/integration_test_single.rs new file mode 100644 index 0000000..998b60f --- /dev/null +++ b/src/tests/integration/integration_test_single.rs @@ -0,0 +1,149 @@ +/* + * MIT License + * + * Copyright (c) [2022] [Ondrej Babec ] + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +use alloc::string::String; +use core::time::Duration; +use heapless::Vec; +use tokio::{join, task}; +use tokio::time::sleep; +use crate::client::client_config::ClientConfig; +use crate::client::client_v5::MqttClientV5; +use crate::network::network_trait::Network; +use crate::packet::v5::property::Property; +use crate::packet::v5::publish_packet::QualityOfService; +use crate::tokio_network::TokioNetwork; + +static IP: [u8; 4] = [127, 0, 0, 1]; +static PORT: u16 = 1883; +static USERNAME: &str = "test"; +static PASSWORD: &str = "testPass"; +static TOPIC: &str = "test/topic"; +static MESSAGE: &str = "testMessage"; + +async fn publish() { + let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); + tokio_network.create_connection().await; + let mut config = ClientConfig::new(); + config.add_qos(QualityOfService::QoS0); + config.add_username(USERNAME); + config.add_password(PASSWORD); + config.max_packet_size = 80; + let mut recv_buffer = [0; 80]; + let mut write_buffer = [0; 80]; + + let mut client = MqttClientV5::::new( + &mut tokio_network, + &mut write_buffer, + 80, + &mut recv_buffer, + 80, + config, + ); + + log::info!("[Publisher] Connection to broker with username {} and password {}", USERNAME, PASSWORD); + let mut result = { + client.connect_to_broker().await + }; + assert!(result.is_ok()); + + log::info!("[Publisher] Waiting {} seconds before sending", 5); + sleep(Duration::from_secs(5)).await; + + log::info!("[Publisher] Sending new message {}", MESSAGE); + result = { + client.send_message(TOPIC, MESSAGE).await + }; + assert!(result.is_ok()); + + log::info!("[Publisher] Disconnecting!"); + result = { + client.disconnect().await + }; + assert!(result.is_ok()); +} + +async fn receive() { + let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); + tokio_network.create_connection().await; + let mut config = ClientConfig::new(); + config.add_qos(QualityOfService::QoS0); + config.add_username(USERNAME); + config.add_password(PASSWORD); + config.max_packet_size = 60; + config.properties.push(Property::ReceiveMaximum(20)); + let mut recv_buffer = [0; 100]; + let mut write_buffer = [0; 100]; + + let mut client = MqttClientV5::::new( + &mut tokio_network, + &mut write_buffer, + 100, + &mut recv_buffer, + 100, + config, + ); + log::info!("[Receiver] Connection to broker with username {} and password {}", USERNAME, PASSWORD); + let mut result = {client.connect_to_broker().await}; + assert!(result.is_ok()); + + log::info!("[Receiver] Subscribing to topic {}", TOPIC); + result = { + client.subscribe_to_topic(TOPIC).await + }; + assert!(result.is_ok()); + + log::info!("[Receiver] Waiting for new message!"); + let msg = { + client.receive_message().await + }; + assert!(msg.is_ok()); + let act_message = String::from_utf8_lossy(msg.unwrap()); + log::info!("[Receiver] Got new message: {}", act_message); + assert_eq!(act_message, MESSAGE); + + log::info!("[Receiver] Disconnecting"); + result = { + client.disconnect().await + }; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn simple_publish_recv() { + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .format_timestamp_nanos() + .init(); + + log::info!("Running simple integration test"); + + let recv = task::spawn(async move { + receive().await; + }); + + let publ = task::spawn(async move { + publish().await; + }); + join!(recv, publ); +} \ No newline at end of file diff --git a/src/tests/unit/client/mod.rs b/src/tests/integration/mod.rs similarity index 97% rename from src/tests/unit/client/mod.rs rename to src/tests/integration/mod.rs index 0c9e02d..c47f092 100644 --- a/src/tests/unit/client/mod.rs +++ b/src/tests/integration/mod.rs @@ -22,4 +22,4 @@ * SOFTWARE. */ -pub mod client_v5_unit; \ No newline at end of file +pub mod integration_test_single; \ No newline at end of file diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 0b21c1a..6c30d3c 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -23,4 +23,5 @@ */ #[cfg(test)] -pub mod unit; \ No newline at end of file +pub mod unit; +pub mod integration; \ No newline at end of file diff --git a/src/tests/unit/client/client_v5_unit.rs b/src/tests/unit/client/client_v5_unit.rs deleted file mode 100644 index e518c59..0000000 --- a/src/tests/unit/client/client_v5_unit.rs +++ /dev/null @@ -1,28 +0,0 @@ -/* - * MIT License - * - * Copyright (c) [2022] [Ondrej Babec ] - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -#[test] -fn it_works() { - assert_eq!(2 + 2, 4); -} \ No newline at end of file diff --git a/src/tests/unit/mod.rs b/src/tests/unit/mod.rs index baded47..ee493a5 100644 --- a/src/tests/unit/mod.rs +++ b/src/tests/unit/mod.rs @@ -24,5 +24,4 @@ pub mod utils; pub mod encoding; -pub mod packet; -pub mod client; \ No newline at end of file +pub mod packet; \ No newline at end of file diff --git a/src/utils/buffer_writer.rs b/src/utils/buffer_writer.rs index df8f16c..6c85c80 100644 --- a/src/utils/buffer_writer.rs +++ b/src/utils/buffer_writer.rs @@ -87,8 +87,11 @@ impl<'a> BuffWriter<'a> { pub fn write_string_ref(&mut self, str: &EncodedString<'a>) -> Result<(), BufferError> { self.write_u16(str.len)?; - let bytes = str.string.as_bytes(); - return self.insert_ref(str.len as usize, bytes); + if str.len != 0 { + let bytes = str.string.as_bytes(); + return self.insert_ref(str.len as usize, bytes); + } + return Ok(()) } pub fn write_binary_ref(&mut self, bin: &BinaryData<'a>) -> Result<(), BufferError> {