parent
96be69c520
commit
9109672b85
|
@ -1 +1 @@
|
||||||
test:$7$101$XGspXBoC6refncib$u5t0Adz5h8Xn9XfYtKfa5kWrPNMGd+H7u2vbl0S8qmr/HCREZjjEyqU88QybSV0SsgmyFrXMIkCozEmnPeTm+g==
|
test:testPass
|
||||||
|
|
2
.github/workflows/integration_tests.yaml
vendored
2
.github/workflows/integration_tests.yaml
vendored
|
@ -21,6 +21,8 @@ jobs:
|
||||||
- name: Start Mosquitto
|
- name: Start Mosquitto
|
||||||
run: |
|
run: |
|
||||||
sudo apt-get install mosquitto
|
sudo apt-get install mosquitto
|
||||||
|
sudo service mosquitto stop
|
||||||
|
mosquitto_passwd -U .ci/mqtt_pass.txt
|
||||||
mosquitto -c .ci/mosquitto.conf -d
|
mosquitto -c .ci/mosquitto.conf -d
|
||||||
|
|
||||||
- name: Run integration-tests tests
|
- name: Run integration-tests tests
|
||||||
|
|
|
@ -20,6 +20,7 @@ tokio-test = { version = "0.4.2"}
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
futures = { version = "0.3.21" }
|
futures = { version = "0.3.21" }
|
||||||
log = { version = "0.4.14"}
|
log = { version = "0.4.14"}
|
||||||
|
serial_test = "0.6.0"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["std"]
|
default = ["std"]
|
||||||
|
|
|
@ -38,7 +38,6 @@ pub enum MqttVersion {
|
||||||
pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
|
pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
|
||||||
pub qos: QualityOfService,
|
pub qos: QualityOfService,
|
||||||
pub keep_alive: u16,
|
pub keep_alive: u16,
|
||||||
pub client_id: EncodedString<'a>,
|
|
||||||
pub username_flag: bool,
|
pub username_flag: bool,
|
||||||
pub username: EncodedString<'a>,
|
pub username: EncodedString<'a>,
|
||||||
pub password_flag: bool,
|
pub password_flag: bool,
|
||||||
|
@ -53,7 +52,6 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
|
||||||
Self {
|
Self {
|
||||||
qos: QualityOfService::QoS0,
|
qos: QualityOfService::QoS0,
|
||||||
keep_alive: 60,
|
keep_alive: 60,
|
||||||
client_id: EncodedString::new(),
|
|
||||||
username_flag: false,
|
username_flag: false,
|
||||||
username: EncodedString::new(),
|
username: EncodedString::new(),
|
||||||
password_flag: false,
|
password_flag: false,
|
||||||
|
|
|
@ -203,7 +203,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(),
|
||||||
config.add_qos(qos);
|
config.add_qos(qos);
|
||||||
config.add_username(USERNAME);
|
config.add_username(USERNAME);
|
||||||
config.add_password(PASSWORD);
|
config.add_password(PASSWORD);
|
||||||
config.max_packet_size = 60;
|
config.max_packet_size = 6000;
|
||||||
config.properties.push(Property::ReceiveMaximum(20));
|
config.properties.push(Property::ReceiveMaximum(20));
|
||||||
let mut recv_buffer = [0; 100];
|
let mut recv_buffer = [0; 100];
|
||||||
let mut write_buffer = [0; 100];
|
let mut write_buffer = [0; 100];
|
||||||
|
@ -337,6 +337,7 @@ async fn integration_simple_publish_recv_wrong_cred() {
|
||||||
|
|
||||||
let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await });
|
let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await });
|
||||||
let (r, rv, p) = join3(recv, recv_right, publ).await;
|
let (r, rv, p) = join3(recv, recv_right, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
assert_ok!(rv.unwrap());
|
assert_ok!(rv.unwrap());
|
||||||
assert_ok!(p.unwrap());
|
assert_ok!(p.unwrap());
|
||||||
}
|
}
|
||||||
|
|
383
mqtt/tests/load_test.rs
Normal file
383
mqtt/tests/load_test.rs
Normal file
|
@ -0,0 +1,383 @@
|
||||||
|
/*
|
||||||
|
* MIT License
|
||||||
|
*
|
||||||
|
* Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
* of this software and associated documentation files (the "Software"), to deal
|
||||||
|
* in the Software without restriction, including without limitation the rights
|
||||||
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the Software is
|
||||||
|
* furnished to do so, subject to the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be included in all
|
||||||
|
* copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
* SOFTWARE.
|
||||||
|
*/
|
||||||
|
extern crate alloc;
|
||||||
|
use alloc::string::String;
|
||||||
|
use core::time::Duration;
|
||||||
|
use std::future::Future;
|
||||||
|
use log::{info, LevelFilter};
|
||||||
|
use tokio::time::sleep;
|
||||||
|
use serial_test::serial;
|
||||||
|
use tokio::task;
|
||||||
|
use tokio_test::{assert_err, assert_ok};
|
||||||
|
use heapless::Vec;
|
||||||
|
use rust_mqtt::client::client_config::ClientConfig;
|
||||||
|
use rust_mqtt::client::client::MqttClient;
|
||||||
|
use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory};
|
||||||
|
use rust_mqtt::packet::v5::property::Property;
|
||||||
|
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
|
||||||
|
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
|
||||||
|
use rust_mqtt::packet::v5::reason_codes::ReasonCode::NotAuthorized;
|
||||||
|
use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
|
||||||
|
use rust_mqtt::utils::types::BufferError;
|
||||||
|
use std::sync::Once;
|
||||||
|
use futures::future::{join, join3};
|
||||||
|
use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
|
||||||
|
|
||||||
|
static IP: [u8; 4] = [127, 0, 0, 1];
|
||||||
|
static WRONG_IP: [u8; 4] = [192, 168, 1, 1];
|
||||||
|
static PORT: u16 = 1883;
|
||||||
|
static USERNAME: &str = "test";
|
||||||
|
static PASSWORD: &str = "testPass";
|
||||||
|
static MSG: &str = "testMessage";
|
||||||
|
|
||||||
|
static INIT: Once = Once::new();
|
||||||
|
|
||||||
|
fn setup() {
|
||||||
|
INIT.call_once(|| {
|
||||||
|
env_logger::init();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn publish_core<'b>(
|
||||||
|
client: &mut MqttClient<'b, TokioNetwork, 5>,
|
||||||
|
wait: u64,
|
||||||
|
topic: &str,
|
||||||
|
amount: u16,
|
||||||
|
) -> Result<(), ReasonCode> {
|
||||||
|
info!(
|
||||||
|
"[Publisher] Connection to broker with username {} and password {}",
|
||||||
|
USERNAME,
|
||||||
|
PASSWORD
|
||||||
|
);
|
||||||
|
let mut result = { client.connect_to_broker().await };
|
||||||
|
assert_ok!(result);
|
||||||
|
info!("[Publisher] Waiting {} seconds before sending", wait);
|
||||||
|
sleep(Duration::from_secs(wait)).await;
|
||||||
|
|
||||||
|
info!("[Publisher] Sending new message {} to topic {}", MSG, topic);
|
||||||
|
let mut count = 0;
|
||||||
|
loop {
|
||||||
|
result = { client.send_message(topic, MSG).await };
|
||||||
|
info!("[PUBLISHER] sent {}", count);
|
||||||
|
assert_ok!(result);
|
||||||
|
count = count + 1;
|
||||||
|
if count == amount {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sleep(Duration::from_millis(5)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
info!("[Publisher] Disconnecting!");
|
||||||
|
result = { client.disconnect().await };
|
||||||
|
assert_ok!(result);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str, amount: u16) -> Result<(), ReasonCode> {
|
||||||
|
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
|
||||||
|
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
|
||||||
|
let mut config = ClientConfig::new(MQTTv5);
|
||||||
|
config.add_qos(qos);
|
||||||
|
config.add_username(USERNAME);
|
||||||
|
config.add_password(PASSWORD);
|
||||||
|
config.max_packet_size = 100;
|
||||||
|
let mut recv_buffer = [0; 80];
|
||||||
|
let mut write_buffer = [0; 80];
|
||||||
|
|
||||||
|
let mut client = MqttClient::<TokioNetwork, 5>::new(
|
||||||
|
tokio_network,
|
||||||
|
&mut write_buffer,
|
||||||
|
80,
|
||||||
|
&mut recv_buffer,
|
||||||
|
80,
|
||||||
|
config,
|
||||||
|
);
|
||||||
|
publish_core(&mut client, wait, topic, amount).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn receive_core<'b>(
|
||||||
|
client: &mut MqttClient<'b, TokioNetwork, 5>,
|
||||||
|
topic: &str,
|
||||||
|
amount: u16,
|
||||||
|
) -> Result<(), ReasonCode> {
|
||||||
|
info!(
|
||||||
|
"[Receiver] Connection to broker with username {} and password {}",
|
||||||
|
USERNAME,
|
||||||
|
PASSWORD
|
||||||
|
);
|
||||||
|
let mut result = { client.connect_to_broker().await };
|
||||||
|
assert_ok!(result);
|
||||||
|
|
||||||
|
info!("[Receiver] Subscribing to topic {}", topic);
|
||||||
|
result = { client.subscribe_to_topic(topic).await };
|
||||||
|
assert_ok!(result);
|
||||||
|
info!("[Receiver] Waiting for new message!");
|
||||||
|
let mut count = 0;
|
||||||
|
loop {
|
||||||
|
let msg = { client.receive_message().await };
|
||||||
|
assert_ok!(msg);
|
||||||
|
let act_message = String::from_utf8_lossy(msg?);
|
||||||
|
info!("[Receiver] Got new {}. message: {}", count, act_message);
|
||||||
|
assert_eq!(act_message, MSG);
|
||||||
|
count = count + 1;
|
||||||
|
if count == amount {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("[Receiver] Disconnecting");
|
||||||
|
result = { client.disconnect().await };
|
||||||
|
assert_ok!(result);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str, amount: u16) -> Result<(), ReasonCode> {
|
||||||
|
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
|
||||||
|
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
|
||||||
|
let mut config = ClientConfig::new(MQTTv5);
|
||||||
|
config.add_qos(qos);
|
||||||
|
config.add_username(USERNAME);
|
||||||
|
config.add_password(PASSWORD);
|
||||||
|
config.max_packet_size = 6000;
|
||||||
|
config.keep_alive = 60000;
|
||||||
|
let mut recv_buffer = [0; 100];
|
||||||
|
let mut write_buffer = [0; 100];
|
||||||
|
|
||||||
|
let mut client = MqttClient::<TokioNetwork, 5>::new(
|
||||||
|
tokio_network,
|
||||||
|
&mut write_buffer,
|
||||||
|
100,
|
||||||
|
&mut recv_buffer,
|
||||||
|
100,
|
||||||
|
config,
|
||||||
|
);
|
||||||
|
|
||||||
|
receive_core(&mut client, topic, amount).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_ten() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/ten", 10).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/ten", 10).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
async fn load_test_ten_qos() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/ten/qos", 10).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/ten/qos", 10).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_fifty() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/fifty", 50).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/fifty", 50).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_fifty_qos() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_hundred() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/hundred", 100).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/hundred", 100).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_hundred_qos() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS1, "hundred/qos", 100).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "hundred/qos", 100).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_five_hundred() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS0, "five/hundred", 500).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "five/hundred", 500).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_five_hundred_qos() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS1, "five/hundred/qos", 500).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "five/hundred/qos", 500).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_thousand() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS0, "thousand", 1000).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "thousand", 1000).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_thousand_qos() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS1, "thousand/qos", 1000).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "thousand/qos", 1000).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 72s
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_ten_thousand_qos() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS1, "ten/thousand/qos", 10000).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "ten/thousand/qos", 10000).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 72s
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
#[serial]
|
||||||
|
async fn load_test_twenty_thousand_qos() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple tests test");
|
||||||
|
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive(IP, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await });
|
||||||
|
|
||||||
|
let (r, p) = join(recv, publ).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user