parent
ac85cf0158
commit
9058093065
|
@ -11,7 +11,7 @@ resolver = "2"
|
||||||
heapless = "0.7.10"
|
heapless = "0.7.10"
|
||||||
rand_core = "0.6.0"
|
rand_core = "0.6.0"
|
||||||
defmt = { version = "0.3", optional = true }
|
defmt = { version = "0.3", optional = true }
|
||||||
|
futures = { version = "0.3.21", optional = true, default-features = false}
|
||||||
log = { version = "0.4.14", optional = true }
|
log = { version = "0.4.14", optional = true }
|
||||||
tokio = { version = "1", features = ["full"], optional = true, default-features = false }
|
tokio = { version = "1", features = ["full"], optional = true, default-features = false }
|
||||||
|
|
||||||
|
@ -19,10 +19,11 @@ tokio = { version = "1", features = ["full"], optional = true, default-features
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
tokio-test = { version = "0.4.2"}
|
tokio-test = { version = "0.4.2"}
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
|
futures = { version = "0.3.21" }
|
||||||
log = { version = "0.4.14"}
|
log = { version = "0.4.14"}
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["testing"]
|
default = ["testing"]
|
||||||
testing = ["tokio", "std", "log"]
|
testing = ["tokio", "std", "log", "futures"]
|
||||||
std = []
|
std = []
|
||||||
no_std = ["defmt"]
|
no_std = ["defmt"]
|
|
@ -272,7 +272,7 @@ where
|
||||||
if i == TOPICS {
|
if i == TOPICS {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if *reasons.get(i).unwrap() != QualityOfService::into(self.config.qos) {
|
if *reasons.get(i).unwrap() != (<QualityOfService as Into<u8>>::into(self.config.qos) >> 1) {
|
||||||
return Err(ReasonCode::from(*reasons.get(i).unwrap()));
|
return Err(ReasonCode::from(*reasons.get(i).unwrap()));
|
||||||
}
|
}
|
||||||
i = i + 1;
|
i = i + 1;
|
||||||
|
|
|
@ -27,9 +27,9 @@ use core::time::Duration;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use log::LevelFilter;
|
use log::LevelFilter;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use tokio::{join, task};
|
use tokio::task;
|
||||||
use tokio_test::assert_ok;
|
use tokio_test::{assert_err, assert_ok};
|
||||||
|
use heapless::Vec;
|
||||||
use crate::client::client_config::ClientConfig;
|
use crate::client::client_config::ClientConfig;
|
||||||
use crate::client::client_v5::MqttClientV5;
|
use crate::client::client_v5::MqttClientV5;
|
||||||
use crate::network::network_trait::{NetworkConnection, NetworkConnectionFactory};
|
use crate::network::network_trait::{NetworkConnection, NetworkConnectionFactory};
|
||||||
|
@ -40,8 +40,10 @@ use crate::packet::v5::reason_codes::ReasonCode::NotAuthorized;
|
||||||
use crate::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
|
use crate::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
|
||||||
use crate::utils::types::BufferError;
|
use crate::utils::types::BufferError;
|
||||||
use std::sync::Once;
|
use std::sync::Once;
|
||||||
|
use futures::future::{join, join3};
|
||||||
|
|
||||||
static IP: [u8; 4] = [127, 0, 0, 1];
|
static IP: [u8; 4] = [127, 0, 0, 1];
|
||||||
|
static WRONG_IP: [u8; 4] = [192, 168, 1, 1];
|
||||||
static PORT: u16 = 1883;
|
static PORT: u16 = 1883;
|
||||||
static USERNAME: &str = "test";
|
static USERNAME: &str = "test";
|
||||||
static PASSWORD: &str = "testPass";
|
static PASSWORD: &str = "testPass";
|
||||||
|
@ -57,6 +59,7 @@ fn setup() {
|
||||||
|
|
||||||
async fn publish_core<'b>(
|
async fn publish_core<'b>(
|
||||||
client: &mut MqttClientV5<'b, TokioNetwork, 5>,
|
client: &mut MqttClientV5<'b, TokioNetwork, 5>,
|
||||||
|
wait: u64,
|
||||||
topic: &str,
|
topic: &str,
|
||||||
) -> Result<(), ReasonCode> {
|
) -> Result<(), ReasonCode> {
|
||||||
info!(
|
info!(
|
||||||
|
@ -66,8 +69,8 @@ async fn publish_core<'b>(
|
||||||
);
|
);
|
||||||
let mut result = { client.connect_to_broker().await };
|
let mut result = { client.connect_to_broker().await };
|
||||||
assert_ok!(result);
|
assert_ok!(result);
|
||||||
info!("[Publisher] Waiting {} seconds before sending", 5);
|
info!("[Publisher] Waiting {} seconds before sending", wait);
|
||||||
sleep(Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(wait)).await;
|
||||||
|
|
||||||
info!("[Publisher] Sending new message {} to topic {}", MSG, topic);
|
info!("[Publisher] Sending new message {} to topic {}", MSG, topic);
|
||||||
result = { client.send_message(topic, MSG).await };
|
result = { client.send_message(topic, MSG).await };
|
||||||
|
@ -79,9 +82,9 @@ async fn publish_core<'b>(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn publish(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
|
async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
|
||||||
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
|
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
|
||||||
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
|
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
|
||||||
let mut config = ClientConfig::new();
|
let mut config = ClientConfig::new();
|
||||||
config.add_qos(qos);
|
config.add_qos(qos);
|
||||||
config.add_username(USERNAME);
|
config.add_username(USERNAME);
|
||||||
|
@ -98,7 +101,7 @@ async fn publish(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
|
||||||
80,
|
80,
|
||||||
config,
|
config,
|
||||||
);
|
);
|
||||||
publish_core(&mut client, topic).await
|
publish_core(&mut client, wait, topic).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_core<'b>(
|
async fn receive_core<'b>(
|
||||||
|
@ -129,7 +132,45 @@ async fn receive_core<'b>(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
|
|
||||||
|
|
||||||
|
async fn receive_core_multiple<'b, const TOPICS: usize>(
|
||||||
|
client: &mut MqttClientV5<'b, TokioNetwork, 5>,
|
||||||
|
topic_names: &'b Vec<&'b str, TOPICS>,
|
||||||
|
) -> Result<(), ReasonCode> {
|
||||||
|
info!(
|
||||||
|
"[Receiver] Connection to broker with username {} and password {}",
|
||||||
|
USERNAME,
|
||||||
|
PASSWORD
|
||||||
|
);
|
||||||
|
let mut result = { client.connect_to_broker().await };
|
||||||
|
assert_ok!(result);
|
||||||
|
|
||||||
|
info!("[Receiver] Subscribing to topics {}, {}", topic_names.get(0).unwrap(), topic_names.get(1).unwrap());
|
||||||
|
result = { client.subscribe_to_topics(topic_names).await };
|
||||||
|
assert_ok!(result);
|
||||||
|
info!("[Receiver] Waiting for new message!");
|
||||||
|
{
|
||||||
|
let msg = { client.receive_message().await };
|
||||||
|
assert_ok!(msg);
|
||||||
|
let act_message = String::from_utf8_lossy(msg?);
|
||||||
|
info!("[Receiver] Got new message: {}", act_message);
|
||||||
|
assert_eq!(act_message, MSG);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let msg_sec = { client.receive_message().await };
|
||||||
|
assert_ok!(msg_sec);
|
||||||
|
let act_message_second = String::from_utf8_lossy(msg_sec?);
|
||||||
|
info!("[Receiver] Got new message: {}", act_message_second);
|
||||||
|
assert_eq!(act_message_second, MSG);
|
||||||
|
}
|
||||||
|
info!("[Receiver] Disconnecting");
|
||||||
|
result = { client.disconnect().await };
|
||||||
|
assert_ok!(result);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn receive_multiple<const TOPICS: usize>(qos: QualityOfService, topic_names: & Vec<& str, TOPICS>,) -> Result<(), ReasonCode> {
|
||||||
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
|
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
|
||||||
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
|
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
|
||||||
let mut config = ClientConfig::new();
|
let mut config = ClientConfig::new();
|
||||||
|
@ -150,6 +191,30 @@ async fn receive(qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
|
||||||
config,
|
config,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
receive_core_multiple(&mut client, topic_names).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
|
||||||
|
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
|
||||||
|
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
|
||||||
|
let mut config = ClientConfig::new();
|
||||||
|
config.add_qos(qos);
|
||||||
|
config.add_username(USERNAME);
|
||||||
|
config.add_password(PASSWORD);
|
||||||
|
config.max_packet_size = 60;
|
||||||
|
config.properties.push(Property::ReceiveMaximum(20));
|
||||||
|
let mut recv_buffer = [0; 100];
|
||||||
|
let mut write_buffer = [0; 100];
|
||||||
|
|
||||||
|
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
|
||||||
|
tokio_network,
|
||||||
|
&mut write_buffer,
|
||||||
|
100,
|
||||||
|
&mut recv_buffer,
|
||||||
|
100,
|
||||||
|
config,
|
||||||
|
);
|
||||||
|
|
||||||
receive_core(&mut client, topic).await
|
receive_core(&mut client, topic).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,36 +250,80 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
async fn simple_publish_recv() {
|
async fn simple_publish_recv() {
|
||||||
setup();
|
setup();
|
||||||
info!("Running simple integration test");
|
info!("Running simple integration test");
|
||||||
|
|
||||||
let recv =
|
let recv =
|
||||||
task::spawn(async move { receive(QualityOfService::QoS0, "test/recv/simple").await });
|
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/simple").await });
|
||||||
|
|
||||||
let publ =
|
let publ =
|
||||||
task::spawn(async move { publish(QualityOfService::QoS0, "test/recv/simple").await });
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "test/recv/simple").await });
|
||||||
|
|
||||||
let (r, p) = join!(recv, publ);
|
let (r, p) = join(recv, publ).await;
|
||||||
assert_ok!(r.unwrap());
|
assert_ok!(r.unwrap());
|
||||||
assert_ok!(p.unwrap());
|
assert_ok!(p.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
async fn simple_publish_recv_multiple() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple integration test");
|
||||||
|
let mut topic_names = Vec::<&str, 2>::new();
|
||||||
|
topic_names.push("test/topic1");
|
||||||
|
topic_names.push("test/topic2");
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive_multiple(QualityOfService::QoS0, &topic_names).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5,QualityOfService::QoS0, "test/topic1").await });
|
||||||
|
|
||||||
|
let publ2 =
|
||||||
|
task::spawn(async move { publish(IP, 10, QualityOfService::QoS0, "test/topic2").await });
|
||||||
|
|
||||||
|
let (r, p, p2) = join3(recv, publ, publ2).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
assert_ok!(p2.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
|
async fn simple_publish_recv_multiple_qos() {
|
||||||
|
setup();
|
||||||
|
info!("Running simple integration test");
|
||||||
|
let mut topic_names = Vec::<&str, 2>::new();
|
||||||
|
topic_names.push("test/topic3");
|
||||||
|
topic_names.push("test/topic4");
|
||||||
|
let recv =
|
||||||
|
task::spawn(async move { receive_multiple(QualityOfService::QoS1, &topic_names).await });
|
||||||
|
|
||||||
|
let publ =
|
||||||
|
task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/topic3").await });
|
||||||
|
|
||||||
|
let publ2 =
|
||||||
|
task::spawn(async move { publish(IP, 10, QualityOfService::QoS1, "test/topic4").await });
|
||||||
|
|
||||||
|
let ( r, p, p2) = join3(recv, publ, publ2).await;
|
||||||
|
assert_ok!(r.unwrap());
|
||||||
|
assert_ok!(p.unwrap());
|
||||||
|
assert_ok!(p2.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
async fn simple_publish_recv_qos() {
|
async fn simple_publish_recv_qos() {
|
||||||
setup();
|
setup();
|
||||||
info!("Running simple integration test with Quality of Service 1");
|
info!("Running simple integration test with Quality of Service 1");
|
||||||
|
|
||||||
let recv = task::spawn(async move { receive(QualityOfService::QoS1, "test/recv/qos").await });
|
let recv = task::spawn(async move { receive(IP, QualityOfService::QoS1, "test/recv/qos").await });
|
||||||
|
|
||||||
let publ = task::spawn(async move { publish(QualityOfService::QoS1, "test/recv/qos").await });
|
let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/qos").await });
|
||||||
let (r, p) = join!(recv, publ);
|
let (r, p) = join(recv, publ).await;
|
||||||
assert_ok!(r.unwrap());
|
assert_ok!(r.unwrap());
|
||||||
assert_ok!(p.unwrap());
|
assert_ok!(p.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||||
async fn simple_publish_recv_wrong_cred() {
|
async fn simple_publish_recv_wrong_cred() {
|
||||||
setup();
|
setup();
|
||||||
info!("Running simple integration test wrong credentials");
|
info!("Running simple integration test wrong credentials");
|
||||||
|
@ -222,10 +331,10 @@ async fn simple_publish_recv_wrong_cred() {
|
||||||
let recv = task::spawn(async move { receive_with_wrong_cred(QualityOfService::QoS1).await });
|
let recv = task::spawn(async move { receive_with_wrong_cred(QualityOfService::QoS1).await });
|
||||||
|
|
||||||
let recv_right =
|
let recv_right =
|
||||||
task::spawn(async move { receive(QualityOfService::QoS0, "test/recv/wrong").await });
|
task::spawn(async move { receive(IP, QualityOfService::QoS0, "test/recv/wrong").await });
|
||||||
|
|
||||||
let publ = task::spawn(async move { publish(QualityOfService::QoS1, "test/recv/wrong").await });
|
let publ = task::spawn(async move { publish(IP, 5, QualityOfService::QoS1, "test/recv/wrong").await });
|
||||||
let (r, rv, p) = join!(recv, recv_right, publ);
|
let (r, rv, p) = join3(recv, recv_right, publ).await;
|
||||||
assert_ok!(rv.unwrap());
|
assert_ok!(rv.unwrap());
|
||||||
assert_ok!(p.unwrap());
|
assert_ok!(p.unwrap());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user