From 9109672b8561ea4b279773a8de9c4ad3db4c46b7 Mon Sep 17 00:00:00 2001 From: obabec Date: Sat, 9 Apr 2022 15:37:44 +0200 Subject: [PATCH] Load (#12) * Load testing --- .ci/mqtt_pass.txt | 2 +- .github/workflows/integration_tests.yaml | 2 + mqtt/Cargo.toml | 1 + mqtt/src/client/client_config.rs | 2 - mqtt/tests/integration_test_single.rs | 3 +- mqtt/tests/load_test.rs | 383 +++++++++++++++++++++++ 6 files changed, 389 insertions(+), 4 deletions(-) create mode 100644 mqtt/tests/load_test.rs diff --git a/.ci/mqtt_pass.txt b/.ci/mqtt_pass.txt index 2182348..35a0f7d 100644 --- a/.ci/mqtt_pass.txt +++ b/.ci/mqtt_pass.txt @@ -1 +1 @@ -test:$7$101$XGspXBoC6refncib$u5t0Adz5h8Xn9XfYtKfa5kWrPNMGd+H7u2vbl0S8qmr/HCREZjjEyqU88QybSV0SsgmyFrXMIkCozEmnPeTm+g== +test:testPass diff --git a/.github/workflows/integration_tests.yaml b/.github/workflows/integration_tests.yaml index c68c24f..f2c3149 100644 --- a/.github/workflows/integration_tests.yaml +++ b/.github/workflows/integration_tests.yaml @@ -21,6 +21,8 @@ jobs: - name: Start Mosquitto run: | sudo apt-get install mosquitto + sudo service mosquitto stop + mosquitto_passwd -U .ci/mqtt_pass.txt mosquitto -c .ci/mosquitto.conf -d - name: Run integration-tests tests diff --git a/mqtt/Cargo.toml b/mqtt/Cargo.toml index 18be351..2a86a45 100644 --- a/mqtt/Cargo.toml +++ b/mqtt/Cargo.toml @@ -20,6 +20,7 @@ tokio-test = { version = "0.4.2"} env_logger = "0.9.0" futures = { version = "0.3.21" } log = { version = "0.4.14"} +serial_test = "0.6.0" [features] default = ["std"] diff --git a/mqtt/src/client/client_config.rs b/mqtt/src/client/client_config.rs index 238c7e8..aebd2fc 100644 --- a/mqtt/src/client/client_config.rs +++ b/mqtt/src/client/client_config.rs @@ -38,7 +38,6 @@ pub enum MqttVersion { pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> { pub qos: QualityOfService, pub keep_alive: u16, - pub client_id: EncodedString<'a>, pub username_flag: bool, pub username: EncodedString<'a>, pub password_flag: bool, @@ -53,7 +52,6 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> { Self { qos: QualityOfService::QoS0, keep_alive: 60, - client_id: EncodedString::new(), username_flag: false, username: EncodedString::new(), password_flag: false, diff --git a/mqtt/tests/integration_test_single.rs b/mqtt/tests/integration_test_single.rs index 6e2caec..c51fa82 100644 --- a/mqtt/tests/integration_test_single.rs +++ b/mqtt/tests/integration_test_single.rs @@ -203,7 +203,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), config.add_qos(qos); config.add_username(USERNAME); config.add_password(PASSWORD); - config.max_packet_size = 60; + config.max_packet_size = 6000; config.properties.push(Property::ReceiveMaximum(20)); let mut recv_buffer = [0; 100]; let mut write_buffer = [0; 100]; @@ -337,6 +337,7 @@ async fn integration_simple_publish_recv_wrong_cred() { let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await }); let (r, rv, p) = join3(recv, recv_right, publ).await; + assert_ok!(r.unwrap()); assert_ok!(rv.unwrap()); assert_ok!(p.unwrap()); } diff --git a/mqtt/tests/load_test.rs b/mqtt/tests/load_test.rs new file mode 100644 index 0000000..eed9703 --- /dev/null +++ b/mqtt/tests/load_test.rs @@ -0,0 +1,383 @@ +/* + * MIT License + * + * Copyright (c) [2022] [Ondrej Babec ] + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +extern crate alloc; +use alloc::string::String; +use core::time::Duration; +use std::future::Future; +use log::{info, LevelFilter}; +use tokio::time::sleep; +use serial_test::serial; +use tokio::task; +use tokio_test::{assert_err, assert_ok}; +use heapless::Vec; +use rust_mqtt::client::client_config::ClientConfig; +use rust_mqtt::client::client::MqttClient; +use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory}; +use rust_mqtt::packet::v5::property::Property; +use rust_mqtt::packet::v5::publish_packet::QualityOfService; +use rust_mqtt::packet::v5::reason_codes::ReasonCode; +use rust_mqtt::packet::v5::reason_codes::ReasonCode::NotAuthorized; +use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory}; +use rust_mqtt::utils::types::BufferError; +use std::sync::Once; +use futures::future::{join, join3}; +use rust_mqtt::client::client_config::MqttVersion::MQTTv5; + +static IP: [u8; 4] = [127, 0, 0, 1]; +static WRONG_IP: [u8; 4] = [192, 168, 1, 1]; +static PORT: u16 = 1883; +static USERNAME: &str = "test"; +static PASSWORD: &str = "testPass"; +static MSG: &str = "testMessage"; + +static INIT: Once = Once::new(); + +fn setup() { + INIT.call_once(|| { + env_logger::init(); + }); +} + +async fn publish_core<'b>( + client: &mut MqttClient<'b, TokioNetwork, 5>, + wait: u64, + topic: &str, + amount: u16, +) -> Result<(), ReasonCode> { + info!( + "[Publisher] Connection to broker with username {} and password {}", + USERNAME, + PASSWORD + ); + let mut result = { client.connect_to_broker().await }; + assert_ok!(result); + info!("[Publisher] Waiting {} seconds before sending", wait); + sleep(Duration::from_secs(wait)).await; + + info!("[Publisher] Sending new message {} to topic {}", MSG, topic); + let mut count = 0; + loop { + result = { client.send_message(topic, MSG).await }; + info!("[PUBLISHER] sent {}", count); + assert_ok!(result); + count = count + 1; + if count == amount { + break; + } + sleep(Duration::from_millis(5)).await; + } + + + + info!("[Publisher] Disconnecting!"); + result = { client.disconnect().await }; + assert_ok!(result); + Ok(()) +} + +async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str, amount: u16) -> Result<(), ReasonCode> { + let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); + let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let mut config = ClientConfig::new(MQTTv5); + config.add_qos(qos); + config.add_username(USERNAME); + config.add_password(PASSWORD); + config.max_packet_size = 100; + let mut recv_buffer = [0; 80]; + let mut write_buffer = [0; 80]; + + let mut client = MqttClient::::new( + tokio_network, + &mut write_buffer, + 80, + &mut recv_buffer, + 80, + config, + ); + publish_core(&mut client, wait, topic, amount).await +} + +async fn receive_core<'b>( + client: &mut MqttClient<'b, TokioNetwork, 5>, + topic: &str, + amount: u16, +) -> Result<(), ReasonCode> { + info!( + "[Receiver] Connection to broker with username {} and password {}", + USERNAME, + PASSWORD + ); + let mut result = { client.connect_to_broker().await }; + assert_ok!(result); + + info!("[Receiver] Subscribing to topic {}", topic); + result = { client.subscribe_to_topic(topic).await }; + assert_ok!(result); + info!("[Receiver] Waiting for new message!"); + let mut count = 0; + loop { + let msg = { client.receive_message().await }; + assert_ok!(msg); + let act_message = String::from_utf8_lossy(msg?); + info!("[Receiver] Got new {}. message: {}", count, act_message); + assert_eq!(act_message, MSG); + count = count + 1; + if count == amount { + break; + } + } + info!("[Receiver] Disconnecting"); + result = { client.disconnect().await }; + assert_ok!(result); + Ok(()) +} + +async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str, amount: u16) -> Result<(), ReasonCode> { + let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); + let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let mut config = ClientConfig::new(MQTTv5); + config.add_qos(qos); + config.add_username(USERNAME); + config.add_password(PASSWORD); + config.max_packet_size = 6000; + config.keep_alive = 60000; + let mut recv_buffer = [0; 100]; + let mut write_buffer = [0; 100]; + + let mut client = MqttClient::::new( + tokio_network, + &mut write_buffer, + 100, + &mut recv_buffer, + 100, + config, + ); + + receive_core(&mut client, topic, amount).await +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_ten() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/ten", 10).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/ten", 10).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn load_test_ten_qos() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/ten/qos", 10).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/ten/qos", 10).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_fifty() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/fifty", 50).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/fifty", 50).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_fifty_qos() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/fifty/qos", 50).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_hundred() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/hundred", 100).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/hundred", 100).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_hundred_qos() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS1, "hundred/qos", 100).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "hundred/qos", 100).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_five_hundred() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS0, "five/hundred", 500).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "five/hundred", 500).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_five_hundred_qos() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS1, "five/hundred/qos", 500).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "five/hundred/qos", 500).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_thousand() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS0, "thousand", 1000).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "thousand", 1000).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_thousand_qos() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS1, "thousand/qos", 1000).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "thousand/qos", 1000).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +// 72s +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_ten_thousand_qos() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS1, "ten/thousand/qos", 10000).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "ten/thousand/qos", 10000).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + +// 72s +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_twenty_thousand_qos() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "twenty/thousand/qos", 20000).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +}