Integration tests

This commit is contained in:
Ondrej Babec 2022-03-10 10:13:40 +01:00
parent 21e929c7ed
commit c8ee05821a
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
4 changed files with 155 additions and 56 deletions

View File

@ -1,4 +1,3 @@
use drogue_device::drogue::config;
use crate::client::client_config::ClientConfig;
use crate::network::network_trait::Network;
use crate::packet::v5::connack_packet::ConnackPacket;
@ -19,6 +18,7 @@ use crate::utils::types::BufferError;
use heapless::Vec;
use rand_core::RngCore;
use crate::packet::v5::property::Property;
use crate::packet::v5::reason_codes::ReasonCode::BuffError;
pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
network_driver: &'a mut T,
@ -53,6 +53,7 @@ where
}
}
// Muze prijit disconnect kvuli male velikosti packetu
pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
let len = {
let mut connect = ConnectPacket::<'b, MAX_PROPERTIES, 0>::new();
@ -79,6 +80,13 @@ where
self.network_driver.receive(self.buffer).await?;
let mut packet = ConnackPacket::<'b, 5>::new();
if let Err(err) = packet.decode(&mut BuffReader::new(self.buffer, self.buffer_len)) {
if err == BufferError::PacketTypeMismatch {
let mut disc = DisconnectPacket::<'b, 5>::new();
if disc.decode(&mut BuffReader::new(self.buffer, self.buffer_len)).is_ok() {
log::error!("Client was disconnected with reason: ");
return Err(ReasonCode::from(disc.disconnect_reason));
}
}
Err(err)
} else {
Ok(packet.connect_reason_code)
@ -207,7 +215,7 @@ where
if i == TOPICS {
break;
}
if *reasons.get(i).unwrap() != self.config.qos.into() {
if *reasons.get(i).unwrap() != QualityOfService::into(self.config.qos) {
return Err(ReasonCode::from(*reasons.get(i).unwrap()));
}
i = i + 1;
@ -262,6 +270,13 @@ where
if let Err(err) =
packet.decode(&mut BuffReader::new(self.recv_buffer, self.recv_buffer_len))
{
if err == BufferError::PacketTypeMismatch {
let mut disc = DisconnectPacket::<'b, 5>::new();
if disc.decode(&mut BuffReader::new(self.buffer, self.buffer_len)).is_ok() {
log::error!("Client was disconnected with reason: ");
return Err(ReasonCode::from(disc.disconnect_reason));
}
}
log::error!("[DECODE ERR]: {}", err);
return Err(ReasonCode::BuffError);
}

View File

@ -34,7 +34,7 @@ use crate::utils::types::{BufferError, EncodedString};
use super::packet_type::PacketType;
use super::property::Property;
#[derive(Clone, Copy)]
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum QualityOfService {
QoS0,
QoS1,

View File

@ -1,6 +1,7 @@
use core::fmt::{Display, Formatter};
#[derive(Debug)]
#[derive(PartialEq)]
pub enum ReasonCode {
Success,
GrantedQoS1,

View File

@ -32,6 +32,7 @@ use crate::client::client_v5::MqttClientV5;
use crate::network::network_trait::Network;
use crate::packet::v5::property::Property;
use crate::packet::v5::publish_packet::QualityOfService;
use crate::packet::v5::reason_codes::ReasonCode::NotAuthorized;
use crate::tokio_network::TokioNetwork;
static IP: [u8; 4] = [127, 0, 0, 1];
@ -39,16 +40,46 @@ static PORT: u16 = 1883;
static USERNAME: &str = "test";
static PASSWORD: &str = "testPass";
static TOPIC: &str = "test/topic";
static MESSAGE: &str = "testMessage";
static MSG: &str = "testMessage";
async fn publish() {
fn init() {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.format_timestamp_nanos()
.try_init();
}
async fn publish_core<'b>(client: & mut MqttClientV5<'b, TokioNetwork, 5>) {
log::info!("[Publisher] Connection to broker with username {} and password {}", USERNAME, PASSWORD);
let mut result = {
client.connect_to_broker().await
};
assert!(result.is_ok());
log::info!("[Publisher] Waiting {} seconds before sending", 5);
sleep(Duration::from_secs(5)).await;
log::info!("[Publisher] Sending new message {}", MSG);
result = {
client.send_message(TOPIC, MSG).await
};
assert!(result.is_ok());
log::info!("[Publisher] Disconnecting!");
result = {
client.disconnect().await
};
assert!(result.is_ok());
}
async fn publish(qos: QualityOfService) {
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT);
tokio_network.create_connection().await;
let mut config = ClientConfig::new();
config.add_qos(QualityOfService::QoS0);
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 80;
config.max_packet_size = 100;
let mut recv_buffer = [0; 80];
let mut write_buffer = [0; 80];
@ -60,49 +91,11 @@ async fn publish() {
80,
config,
);
log::info!("[Publisher] Connection to broker with username {} and password {}", USERNAME, PASSWORD);
let mut result = {
client.connect_to_broker().await
};
assert!(result.is_ok());
log::info!("[Publisher] Waiting {} seconds before sending", 5);
sleep(Duration::from_secs(5)).await;
log::info!("[Publisher] Sending new message {}", MESSAGE);
result = {
client.send_message(TOPIC, MESSAGE).await
};
assert!(result.is_ok());
log::info!("[Publisher] Disconnecting!");
result = {
client.disconnect().await
};
assert!(result.is_ok());
publish_core(& mut client)
.await;
}
async fn receive() {
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT);
tokio_network.create_connection().await;
let mut config = ClientConfig::new();
config.add_qos(QualityOfService::QoS0);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 60;
config.properties.push(Property::ReceiveMaximum(20));
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 2>::new(
&mut tokio_network,
&mut write_buffer,
100,
&mut recv_buffer,
100,
config,
);
async fn receive_core<'b>(client: & mut MqttClientV5<'b, TokioNetwork, 5>) {
log::info!("[Receiver] Connection to broker with username {} and password {}", USERNAME, PASSWORD);
let mut result = {client.connect_to_broker().await};
assert!(result.is_ok());
@ -120,7 +113,7 @@ async fn receive() {
assert!(msg.is_ok());
let act_message = String::from_utf8_lossy(msg.unwrap());
log::info!("[Receiver] Got new message: {}", act_message);
assert_eq!(act_message, MESSAGE);
assert_eq!(act_message, MSG);
log::info!("[Receiver] Disconnecting");
result = {
@ -129,21 +122,111 @@ async fn receive() {
assert!(result.is_ok());
}
async fn receive(qos: QualityOfService) {
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT);
tokio_network.create_connection().await;
let mut config = ClientConfig::new();
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
config.max_packet_size = 60;
config.properties.push(Property::ReceiveMaximum(20));
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
&mut tokio_network,
&mut write_buffer,
100,
&mut recv_buffer,
100,
config,
);
receive_core(& mut client)
.await;
}
async fn receive_with_wrong_cred(qos: QualityOfService) {
let mut tokio_network: TokioNetwork = TokioNetwork::new(IP, PORT);
tokio_network.create_connection().await;
let mut config = ClientConfig::new();
config.add_qos(qos);
config.add_username("xyz");
config.add_password(PASSWORD);
config.max_packet_size = 60;
config.properties.push(Property::ReceiveMaximum(20));
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
&mut tokio_network,
&mut write_buffer,
100,
&mut recv_buffer,
100,
config,
);
log::info!("[Receiver] Connection to broker with username {} and password {}", "xyz", PASSWORD);
let mut result = {client.connect_to_broker().await};
assert!(result.is_err());
assert_eq!(result.unwrap_err(), NotAuthorized);
}
#[tokio::test]
async fn simple_publish_recv() {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.format_timestamp_nanos()
.init();
init();
log::info!("Running simple integration test");
let recv = task::spawn(async move {
receive().await;
receive(QualityOfService::QoS0)
.await;
});
let publ = task::spawn(async move {
publish().await;
publish(QualityOfService::QoS0)
.await;
});
join!(recv, publ);
}
#[tokio::test]
async fn simple_publish_recv_qos() {
init();
log::info!("Running simple integration test with Quality of Service 1");
let recv = task::spawn(async move {
receive(QualityOfService::QoS1)
.await;
});
let publ = task::spawn(async move {
publish(QualityOfService::QoS1)
.await;
});
join!(recv, publ);
}
#[tokio::test]
async fn simple_publish_recv_wrong_cred() {
init();
log::info!("Running simple integration test with Quality of Service 1");
let recv = task::spawn(async move {
receive_with_wrong_cred(QualityOfService::QoS1)
.await;
});
let recv_right = task::spawn(async move {
receive(QualityOfService::QoS0)
.await;
});
let publ = task::spawn(async move {
publish(QualityOfService::QoS1)
.await;
});
join!(recv, recv_right, publ);
}