From c8ee05821a12e84abc52ed666bc66b0f1a1b71d5 Mon Sep 17 00:00:00 2001 From: Ondrej Babec Date: Thu, 10 Mar 2022 10:13:40 +0100 Subject: [PATCH] Integration tests --- src/client/client_v5.rs | 19 +- src/packet/v5/publish_packet.rs | 2 +- src/packet/v5/reason_codes.rs | 1 + .../integration/integration_test_single.rs | 189 +++++++++++++----- 4 files changed, 155 insertions(+), 56 deletions(-) diff --git a/src/client/client_v5.rs b/src/client/client_v5.rs index f1d9498..eadffb7 100644 --- a/src/client/client_v5.rs +++ b/src/client/client_v5.rs @@ -1,4 +1,3 @@ -use drogue_device::drogue::config; use crate::client::client_config::ClientConfig; use crate::network::network_trait::Network; use crate::packet::v5::connack_packet::ConnackPacket; @@ -19,6 +18,7 @@ use crate::utils::types::BufferError; use heapless::Vec; use rand_core::RngCore; use crate::packet::v5::property::Property; +use crate::packet::v5::reason_codes::ReasonCode::BuffError; pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> { network_driver: &'a mut T, @@ -53,6 +53,7 @@ where } } + // Muze prijit disconnect kvuli male velikosti packetu pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> { let len = { let mut connect = ConnectPacket::<'b, MAX_PROPERTIES, 0>::new(); @@ -79,6 +80,13 @@ where self.network_driver.receive(self.buffer).await?; let mut packet = ConnackPacket::<'b, 5>::new(); if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) { + if err == BufferError::PacketTypeMismatch { + let mut disc = DisconnectPacket::<'b, 5>::new(); + if disc.decode(&mut BuffReader::new(self.buffer, self.buffer_len)).is_ok() { + log::error!("Client was disconnected with reason: "); + return Err(ReasonCode::from(disc.disconnect_reason)); + } + } Err(err) } else { Ok(packet.connect_reason_code) @@ -207,7 +215,7 @@ where if i == TOPICS { break; } - if *reasons.get(i).unwrap() != self.config.qos.into() { + if *reasons.get(i).unwrap() != QualityOfService::into(self.config.qos) { return Err(ReasonCode::from(*reasons.get(i).unwrap())); } i = i + 1; @@ -262,6 +270,13 @@ where if let Err(err) = packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len)) { + if err == BufferError::PacketTypeMismatch { + let mut disc = DisconnectPacket::<'b, 5>::new(); + if disc.decode(&mut BuffReader::new(self.buffer, self.buffer_len)).is_ok() { + log::error!("Client was disconnected with reason: "); + return Err(ReasonCode::from(disc.disconnect_reason)); + } + } log::error!("[DECODE ERR]: {}", err); return Err(ReasonCode::BuffError); } diff --git a/src/packet/v5/publish_packet.rs b/src/packet/v5/publish_packet.rs index 9f36bbe..90baf9a 100644 --- a/src/packet/v5/publish_packet.rs +++ b/src/packet/v5/publish_packet.rs @@ -34,7 +34,7 @@ use crate::utils::types::{BufferError, EncodedString}; use super::packet_type::PacketType; use super::property::Property; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq, Debug)] pub enum QualityOfService { QoS0, QoS1, diff --git a/src/packet/v5/reason_codes.rs b/src/packet/v5/reason_codes.rs index 6a72e98..808423f 100644 --- a/src/packet/v5/reason_codes.rs +++ b/src/packet/v5/reason_codes.rs @@ -1,6 +1,7 @@ use core::fmt::{Display, Formatter}; #[derive(Debug)] +#[derive(PartialEq)] pub enum ReasonCode { Success, GrantedQoS1, diff --git a/src/tests/integration/integration_test_single.rs b/src/tests/integration/integration_test_single.rs index 998b60f..71e4fa5 100644 --- a/src/tests/integration/integration_test_single.rs +++ b/src/tests/integration/integration_test_single.rs @@ -32,6 +32,7 @@ use crate::client::client_v5::MqttClientV5; use crate::network::network_trait::Network; use crate::packet::v5::property::Property; use crate::packet::v5::publish_packet::QualityOfService; +use crate::packet::v5::reason_codes::ReasonCode::NotAuthorized; use crate::tokio_network::TokioNetwork; static IP: [u8; 4] = [127, 0, 0, 1]; @@ -39,16 +40,46 @@ static PORT: u16 = 1883; static USERNAME: &str = "test"; static PASSWORD: &str = "testPass"; static TOPIC: &str = "test/topic"; -static MESSAGE: &str = "testMessage"; +static MSG: &str = "testMessage"; -async fn publish() { +fn init() { + let _ = env_logger::builder() + .filter_level(log::LevelFilter::Info) + .format_timestamp_nanos() + .try_init(); +} + +async fn publish_core<'b>(client: & mut MqttClientV5<'b, TokioNetwork, 5>) { + log::info!("[Publisher] Connection to broker with username {} and password {}", USERNAME, PASSWORD); + let mut result = { + client.connect_to_broker().await + }; + assert!(result.is_ok()); + + log::info!("[Publisher] Waiting {} seconds before sending", 5); + sleep(Duration::from_secs(5)).await; + + log::info!("[Publisher] Sending new message {}", MSG); + result = { + client.send_message(TOPIC, MSG).await + }; + assert!(result.is_ok()); + + log::info!("[Publisher] Disconnecting!"); + result = { + client.disconnect().await + }; + assert!(result.is_ok()); +} + +async fn publish(qos: QualityOfService) { let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); tokio_network.create_connection().await; let mut config = ClientConfig::new(); - config.add_qos(QualityOfService::QoS0); + config.add_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); - config.max_packet_size = 80; + config.max_packet_size = 100; let mut recv_buffer = [0; 80]; let mut write_buffer = [0; 80]; @@ -60,49 +91,11 @@ async fn publish() { 80, config, ); - - log::info!("[Publisher] Connection to broker with username {} and password {}", USERNAME, PASSWORD); - let mut result = { - client.connect_to_broker().await - }; - assert!(result.is_ok()); - - log::info!("[Publisher] Waiting {} seconds before sending", 5); - sleep(Duration::from_secs(5)).await; - - log::info!("[Publisher] Sending new message {}", MESSAGE); - result = { - client.send_message(TOPIC, MESSAGE).await - }; - assert!(result.is_ok()); - - log::info!("[Publisher] Disconnecting!"); - result = { - client.disconnect().await - }; - assert!(result.is_ok()); + publish_core(& mut client) + .await; } -async fn receive() { - let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); - tokio_network.create_connection().await; - let mut config = ClientConfig::new(); - config.add_qos(QualityOfService::QoS0); - config.add_username(USERNAME); - config.add_password(PASSWORD); - config.max_packet_size = 60; - config.properties.push(Property::ReceiveMaximum(20)); - let mut recv_buffer = [0; 100]; - let mut write_buffer = [0; 100]; - - let mut client = MqttClientV5::::new( - &mut tokio_network, - &mut write_buffer, - 100, - &mut recv_buffer, - 100, - config, - ); +async fn receive_core<'b>(client: & mut MqttClientV5<'b, TokioNetwork, 5>) { log::info!("[Receiver] Connection to broker with username {} and password {}", USERNAME, PASSWORD); let mut result = {client.connect_to_broker().await}; assert!(result.is_ok()); @@ -120,7 +113,7 @@ async fn receive() { assert!(msg.is_ok()); let act_message = String::from_utf8_lossy(msg.unwrap()); log::info!("[Receiver] Got new message: {}", act_message); - assert_eq!(act_message, MESSAGE); + assert_eq!(act_message, MSG); log::info!("[Receiver] Disconnecting"); result = { @@ -129,21 +122,111 @@ async fn receive() { assert!(result.is_ok()); } +async fn receive(qos: QualityOfService) { + let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); + tokio_network.create_connection().await; + let mut config = ClientConfig::new(); + config.add_qos(qos); + config.add_username(USERNAME); + config.add_password(PASSWORD); + config.max_packet_size = 60; + config.properties.push(Property::ReceiveMaximum(20)); + let mut recv_buffer = [0; 100]; + let mut write_buffer = [0; 100]; + + let mut client = MqttClientV5::::new( + &mut tokio_network, + &mut write_buffer, + 100, + &mut recv_buffer, + 100, + config, + ); + receive_core(& mut client) + .await; +} + + +async fn receive_with_wrong_cred(qos: QualityOfService) { + let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT); + tokio_network.create_connection().await; + let mut config = ClientConfig::new(); + config.add_qos(qos); + config.add_username("xyz"); + config.add_password(PASSWORD); + config.max_packet_size = 60; + config.properties.push(Property::ReceiveMaximum(20)); + let mut recv_buffer = [0; 100]; + let mut write_buffer = [0; 100]; + + let mut client = MqttClientV5::::new( + &mut tokio_network, + &mut write_buffer, + 100, + &mut recv_buffer, + 100, + config, + ); + + log::info!("[Receiver] Connection to broker with username {} and password {}", "xyz", PASSWORD); + let mut result = {client.connect_to_broker().await}; + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), NotAuthorized); +} + #[tokio::test] async fn simple_publish_recv() { - env_logger::builder() - .filter_level(log::LevelFilter::Info) - .format_timestamp_nanos() - .init(); - + init(); log::info!("Running simple integration test"); let recv = task::spawn(async move { - receive().await; + receive(QualityOfService::QoS0) + .await; }); let publ = task::spawn(async move { - publish().await; + publish(QualityOfService::QoS0) + .await; + }); + + join!(recv, publ); +} + +#[tokio::test] +async fn simple_publish_recv_qos() { + init(); + log::info!("Running simple integration test with Quality of Service 1"); + + let recv = task::spawn(async move { + receive(QualityOfService::QoS1) + .await; + }); + + let publ = task::spawn(async move { + publish(QualityOfService::QoS1) + .await; }); join!(recv, publ); +} + +#[tokio::test] +async fn simple_publish_recv_wrong_cred() { + init(); + log::info!("Running simple integration test with Quality of Service 1"); + + let recv = task::spawn(async move { + receive_with_wrong_cred(QualityOfService::QoS1) + .await; + }); + + let recv_right = task::spawn(async move { + receive(QualityOfService::QoS0) + .await; + }); + + let publ = task::spawn(async move { + publish(QualityOfService::QoS1) + .await; + }); + join!(recv, recv_right, publ); } \ No newline at end of file