diff --git a/mqtt/Cargo.toml b/mqtt/Cargo.toml index e109b18..7d4912c 100644 --- a/mqtt/Cargo.toml +++ b/mqtt/Cargo.toml @@ -11,7 +11,7 @@ resolver = "2" heapless = "0.7.10" rand_core = "0.6.0" defmt = { version = "0.3", optional = true } - +futures = { version = "0.3.21", optional = true, default-features = false} log = { version = "0.4.14", optional = true } tokio = { version = "1", features = ["full"], optional = true, default-features = false } @@ -19,10 +19,11 @@ tokio = { version = "1", features = ["full"], optional = true, default-features tokio = { version = "1", features = ["full"] } tokio-test = { version = "0.4.2"} env_logger = "0.9.0" +futures = { version = "0.3.21" } log = { version = "0.4.14"} [features] default = ["testing"] -testing = ["tokio", "std", "log"] +testing = ["tokio", "std", "log", "futures"] std = [] no_std = ["defmt"] \ No newline at end of file diff --git a/mqtt/src/client/client_v5.rs b/mqtt/src/client/client_v5.rs index e6bb010..30f2b0b 100644 --- a/mqtt/src/client/client_v5.rs +++ b/mqtt/src/client/client_v5.rs @@ -272,7 +272,7 @@ where if i == TOPICS { break; } - if *reasons.get(i).unwrap() != QualityOfService::into(self.config.qos) { + if *reasons.get(i).unwrap() != (>::into(self.config.qos) >> 1) { return Err(ReasonCode::from(*reasons.get(i).unwrap())); } i = i + 1; diff --git a/mqtt/src/tests/integration/integration_test_single.rs b/mqtt/src/tests/integration/integration_test_single.rs index 3027392..916b453 100644 --- a/mqtt/src/tests/integration/integration_test_single.rs +++ b/mqtt/src/tests/integration/integration_test_single.rs @@ -27,9 +27,9 @@ use core::time::Duration; use std::future::Future; use log::LevelFilter; use tokio::time::sleep; -use tokio::{join, task}; -use tokio_test::assert_ok; - +use tokio::task; +use tokio_test::{assert_err, assert_ok}; +use heapless::Vec; use crate::client::client_config::ClientConfig; use crate::client::client_v5::MqttClientV5; use crate::network::network_trait::{NetworkConnection, NetworkConnectionFactory}; @@ -40,8 +40,10 @@ use crate::packet::v5::reason_codes::ReasonCode::NotAuthorized; use crate::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory}; use crate::utils::types::BufferError; use std::sync::Once; +use futures::future::{join, join3}; static IP: [u8; 4] = [127, 0, 0, 1]; +static WRONG_IP: [u8; 4] = [192, 168, 1, 1]; static PORT: u16 = 1883; static USERNAME: &str = "test"; static PASSWORD: &str = "testPass"; @@ -57,6 +59,7 @@ fn setup() { async fn publish_core<'b>( client: &mut MqttClientV5<'b, TokioNetwork, 5>, + wait: u64, topic: &str, ) -> Result<(), ReasonCode> { info!( @@ -66,8 +69,8 @@ async fn publish_core<'b>( ); let mut result = { client.connect_to_broker().await }; assert_ok!(result); - info!("[Publisher] Waiting {} seconds before sending", 5); - sleep(Duration::from_secs(5)).await; + info!("[Publisher] Waiting {} seconds before sending", wait); + sleep(Duration::from_secs(wait)).await; info!("[Publisher] Sending new message {} to topic {}", MSG, topic); result = { client.send_message(topic, MSG).await }; @@ -79,9 +82,9 @@ async fn publish_core<'b>( Ok(()) } -async fn publish(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { +async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); - let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; + let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; let mut config = ClientConfig::new(); config.add_qos(qos); config.add_username(USERNAME); @@ -98,7 +101,7 @@ async fn publish(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { 80, config, ); - publish_core(&mut client, topic).await + publish_core(&mut client, wait, topic).await } async fn receive_core<'b>( @@ -129,7 +132,45 @@ async fn receive_core<'b>( Ok(()) } -async fn receive(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { + + +async fn receive_core_multiple<'b, const TOPICS: usize>( + client: &mut MqttClientV5<'b, TokioNetwork, 5>, + topic_names: &'b Vec<&'b str, TOPICS>, +) -> Result<(), ReasonCode> { + info!( + "[Receiver] Connection to broker with username {} and password {}", + USERNAME, + PASSWORD + ); + let mut result = { client.connect_to_broker().await }; + assert_ok!(result); + + info!("[Receiver] Subscribing to topics {}, {}", topic_names.get(0).unwrap(), topic_names.get(1).unwrap()); + result = { client.subscribe_to_topics(topic_names).await }; + assert_ok!(result); + info!("[Receiver] Waiting for new message!"); + { + let msg = { client.receive_message().await }; + assert_ok!(msg); + let act_message = String::from_utf8_lossy(msg?); + info!("[Receiver] Got new message: {}", act_message); + assert_eq!(act_message, MSG); + } + { + let msg_sec = { client.receive_message().await }; + assert_ok!(msg_sec); + let act_message_second = String::from_utf8_lossy(msg_sec?); + info!("[Receiver] Got new message: {}", act_message_second); + assert_eq!(act_message_second, MSG); + } + info!("[Receiver] Disconnecting"); + result = { client.disconnect().await }; + assert_ok!(result); + Ok(()) +} + +async fn receive_multiple(qos: QualityOfService, topic_names: & Vec<& str, TOPICS>,) -> Result<(), ReasonCode> { let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?; let mut config = ClientConfig::new(); @@ -150,6 +191,30 @@ async fn receive(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { config, ); + receive_core_multiple(&mut client, topic_names).await +} + +async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> { + let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new(); + let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?; + let mut config = ClientConfig::new(); + config.add_qos(qos); + config.add_username(USERNAME); + config.add_password(PASSWORD); + config.max_packet_size = 60; + config.properties.push(Property::ReceiveMaximum(20)); + let mut recv_buffer = [0; 100]; + let mut write_buffer = [0; 100]; + + let mut client = MqttClientV5::::new( + tokio_network, + &mut write_buffer, + 100, + &mut recv_buffer, + 100, + config, + ); + receive_core(&mut client, topic).await } @@ -185,36 +250,80 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_publish_recv() { setup(); info!("Running simple integration test"); let recv = - task::spawn(async move { receive(QualityOfService::QoS0, "test/recv/simple").await }); + task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/simple").await }); let publ = - task::spawn(async move { publish(QualityOfService::QoS0, "test/recv/simple").await }); + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/simple").await }); - let (r, p) = join!(recv, publ); + let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); assert_ok!(p.unwrap()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn simple_publish_recv_multiple() { + setup(); + info!("Running simple integration test"); + let mut topic_names = Vec::<&str, 2>::new(); + topic_names.push("test/topic1"); + topic_names.push("test/topic2"); + let recv = + task::spawn(async move { receive_multiple(QualityOfService::QoS0, &topic_names).await }); + + let publ = + task::spawn(async move { publish(IP, 5,QualityOfService::QoS0, "test/topic1").await }); + + let publ2 = + task::spawn(async move { publish(IP, 10, QualityOfService::QoS0, "test/topic2").await }); + + let (r, p, p2) = join3(recv, publ, publ2).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); + assert_ok!(p2.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn simple_publish_recv_multiple_qos() { + setup(); + info!("Running simple integration test"); + let mut topic_names = Vec::<&str, 2>::new(); + topic_names.push("test/topic3"); + topic_names.push("test/topic4"); + let recv = + task::spawn(async move { receive_multiple(QualityOfService::QoS1, &topic_names).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/topic3").await }); + + let publ2 = + task::spawn(async move { publish(IP, 10, QualityOfService::QoS1, "test/topic4").await }); + + let ( r, p, p2) = join3(recv, publ, publ2).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); + assert_ok!(p2.unwrap()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_publish_recv_qos() { setup(); info!("Running simple integration test with Quality of Service 1"); - let recv = task::spawn(async move { receive(QualityOfService::QoS1, "test/recv/qos").await }); + let recv = task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/qos").await }); - let publ = task::spawn(async move { publish(QualityOfService::QoS1, "test/recv/qos").await }); - let (r, p) = join!(recv, publ); + let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/qos").await }); + let (r, p) = join(recv, publ).await; assert_ok!(r.unwrap()); assert_ok!(p.unwrap()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_publish_recv_wrong_cred() { setup(); info!("Running simple integration test wrong credentials"); @@ -222,10 +331,10 @@ async fn simple_publish_recv_wrong_cred() { let recv = task::spawn(async move { receive_with_wrong_cred(QualityOfService::QoS1).await }); let recv_right = - task::spawn(async move { receive(QualityOfService::QoS0, "test/recv/wrong").await }); + task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/wrong").await }); - let publ = task::spawn(async move { publish(QualityOfService::QoS1, "test/recv/wrong").await }); - let (r, rv, p) = join!(recv, recv_right, publ); + let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await }); + let (r, rv, p) = join3(recv, recv_right, publ).await; assert_ok!(rv.unwrap()); assert_ok!(p.unwrap()); }