diff --git a/.ci/hive_cred.xml b/.ci/hive_cred.xml
new file mode 100644
index 0000000..548dea0
--- /dev/null
+++ b/.ci/hive_cred.xml
@@ -0,0 +1,48 @@
+
+
+
+
+
+
+ test
+
+ testPass
+
+ superuser
+
+
+
+
+
+ superuser
+
+
+
+ #
+
+
+
+
+
\ No newline at end of file
diff --git a/.ci/hive_extension_config.xml b/.ci/hive_extension_config.xml
new file mode 100644
index 0000000..fd6ce60
--- /dev/null
+++ b/.ci/hive_extension_config.xml
@@ -0,0 +1,40 @@
+
+
+
+
+
+
+ 60
+
+
+
+
+ PLAIN
+
+
\ No newline at end of file
diff --git a/.github/workflows/integration_tests.yaml b/.github/workflows/integration_tests.yaml
index 6308484..c68c24f 100644
--- a/.github/workflows/integration_tests.yaml
+++ b/.github/workflows/integration_tests.yaml
@@ -3,8 +3,8 @@ on: [pull_request]
name: IntegrationTests
jobs:
- integration_tests:
- name: Integration tests
+ integration_tests_mosquitto:
+ name: Integration tests with mosquitto
runs-on: ubuntu-latest
steps:
- name: Git checkout
@@ -23,5 +23,34 @@ jobs:
sudo apt-get install mosquitto
mosquitto -c .ci/mosquitto.conf -d
+ - name: Run integration-tests tests
+ run: RUST_LOG=trace cargo test integration
+
+ integration_tests_hive:
+ name: Integration tests with HiveMQ
+ runs-on: ubuntu-latest
+ steps:
+ - name: Git checkout
+ uses: actions/checkout@v2
+
+ - name: Install stable toolchain
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: stable
+
+ - name: Build embedded
+ run: cargo build --target thumbv7em-none-eabihf --features "no_std" --no-default-features
+
+ - name: Start HiveMQ
+ run: |
+ curl -LO https://github.com/hivemq/hivemq-community-edition/releases/download/2021.3/hivemq-ce-2021.3.zip
+ unzip hivemq-ce-2021.3.zip
+ curl -LO https://www.hivemq.com/releases/extensions/hivemq-file-rbac-extension-4.4.0.zip
+ unzip hivemq-file-rbac-extension-4.4.0.zip
+ mv hivemq-file-rbac-extension hivemq-ce-2021.3/extensions
+ cp .ci/hive_cred.xml hivemq-ce-2021.3/extensions/hivemq-file-rbac-extension/credentials.xml
+ cp .ci/hive_extension_config.xml hivemq-ce-2021.3/extensions/hivemq-file-rbac-extension/extension-config.xml
+ hivemq-ce-2021.3/bin/run.sh &
+
- name: Run integration-tests tests
run: RUST_LOG=trace cargo test integration
\ No newline at end of file
diff --git a/mqtt/src/client/client_v5.rs b/mqtt/src/client/client.rs
similarity index 86%
rename from mqtt/src/client/client_v5.rs
rename to mqtt/src/client/client.rs
index 2a623c8..2752d92 100644
--- a/mqtt/src/client/client_v5.rs
+++ b/mqtt/src/client/client.rs
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-use crate::client::client_config::ClientConfig;
+use crate::client::client_config::{ClientConfig, MqttVersion};
use crate::network::NetworkConnection;
use crate::packet::v5::connack_packet::ConnackPacket;
use crate::packet::v5::connect_packet::ConnectPacket;
@@ -44,9 +44,10 @@ use heapless::Vec;
use rand_core::RngCore;
use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder};
use crate::network::NetworkError::Connection;
+use crate::packet::v5::property::Property;
use crate::utils::buffer_writer::BuffWriter;
-pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
+pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize> {
connection: Option,
buffer: &'a mut [u8],
buffer_len: usize,
@@ -56,7 +57,7 @@ pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
config: ClientConfig<'a, MAX_PROPERTIES>,
}
-impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5<'a, T, MAX_PROPERTIES>
+impl<'a, T, const MAX_PROPERTIES: usize> MqttClient<'a, T, MAX_PROPERTIES>
where
T: NetworkConnection,
{
@@ -79,7 +80,7 @@ where
}
}
- pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
+ async fn connect_to_broker_v5<'b>(&'b mut self) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
@@ -141,7 +142,14 @@ where
}
}
- pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
+ pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
+ match self.config.mqtt_version {
+ MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
+ MqttVersion::MQTTv5 => {self.connect_to_broker_v5().await}
+ }
+ }
+
+ async fn disconnect_v5<'b>(&'b mut self) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
@@ -168,7 +176,14 @@ where
Ok(())
}
- pub async fn send_message<'b>(
+ pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
+ match self.config.mqtt_version {
+ MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
+ MqttVersion::MQTTv5 => {self.disconnect_v5().await}
+ }
+ }
+
+ async fn send_message_v5<'b>(
&'b mut self,
topic_name: &'b str,
message: &'b str,
@@ -228,7 +243,18 @@ where
Ok(())
}
- pub async fn subscribe_to_topics<'b, const TOPICS: usize>(
+ pub async fn send_message<'b>(
+ &'b mut self,
+ topic_name: &'b str,
+ message: &'b str,
+ ) -> Result<(), ReasonCode> {
+ match self.config.mqtt_version {
+ MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
+ MqttVersion::MQTTv5 => {self.send_message_v5(topic_name, message).await}
+ }
+ }
+
+ async fn subscribe_to_topics_v5<'b, const TOPICS: usize>(
&'b mut self,
topic_names: &'b Vec<&'b str, TOPICS>,
) -> Result<(), ReasonCode> {
@@ -285,7 +311,17 @@ where
Ok(())
}
- pub async fn subscribe_to_topic<'b>(
+ pub async fn subscribe_to_topics<'b, const TOPICS: usize>(
+ &'b mut self,
+ topic_names: &'b Vec<&'b str, TOPICS>,
+ ) -> Result<(), ReasonCode> {
+ match self.config.mqtt_version {
+ MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
+ MqttVersion::MQTTv5 => {self.subscribe_to_topics_v5(topic_names).await}
+ }
+ }
+
+ async fn subscribe_to_topic_v5<'b>(
&'b mut self,
topic_name: &'b str,
) -> Result<(), ReasonCode> {
@@ -330,7 +366,17 @@ where
}
}
- pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> {
+ pub async fn subscribe_to_topic<'b>(
+ &'b mut self,
+ topic_name: &'b str,
+ ) -> Result<(), ReasonCode> {
+ match self.config.mqtt_version {
+ MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
+ MqttVersion::MQTTv5 => {self.subscribe_to_topic_v5(topic_name).await}
+ }
+ }
+
+ async fn receive_message_v5<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
@@ -374,7 +420,14 @@ where
return Ok(packet.message.unwrap());
}
- pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
+ pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> {
+ match self.config.mqtt_version {
+ MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
+ MqttVersion::MQTTv5 => {self.receive_message_v5().await}
+ }
+ }
+
+ pub async fn send_ping_v5<'b>(&'b mut self) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
@@ -400,8 +453,16 @@ where
Ok(())
}
}
+
+ pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
+ match self.config.mqtt_version {
+ MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
+ MqttVersion::MQTTv5 => {self.send_ping_v5().await}
+ }
+ }
}
+
async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: usize, recv_buffer: & mut [u8], conn: &'c mut T) -> Result {
let mut target_len = 0;
let mut rem_len: VariableByteInteger = [0; 4];
diff --git a/mqtt/src/client/client_config.rs b/mqtt/src/client/client_config.rs
index 17d60e1..238c7e8 100644
--- a/mqtt/src/client/client_config.rs
+++ b/mqtt/src/client/client_config.rs
@@ -28,6 +28,11 @@ use crate::utils::types::{BinaryData, EncodedString};
use heapless::Vec;
+#[derive(Clone, PartialEq)]
+pub enum MqttVersion {
+ MQTTv3,
+ MQTTv5
+}
#[derive(Clone)]
pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
@@ -40,10 +45,11 @@ pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
pub password: BinaryData<'a>,
pub properties: Vec, MAX_PROPERTIES>,
pub max_packet_size: u32,
+ pub mqtt_version: MqttVersion,
}
impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
- pub fn new() -> Self {
+ pub fn new(version: MqttVersion) -> Self {
Self {
qos: QualityOfService::QoS0,
keep_alive: 60,
@@ -54,6 +60,7 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
password: BinaryData::new(),
properties: Vec::, MAX_PROPERTIES>::new(),
max_packet_size: 265_000,
+ mqtt_version: version
}
}
diff --git a/mqtt/src/client/mod.rs b/mqtt/src/client/mod.rs
index 445d2ba..ccdda35 100644
--- a/mqtt/src/client/mod.rs
+++ b/mqtt/src/client/mod.rs
@@ -24,4 +24,4 @@
#[allow(unused_must_use)]
pub mod client_config;
-pub mod client_v5;
+pub mod client;
diff --git a/mqtt/tests/integration_test_single.rs b/mqtt/tests/integration_test_single.rs
index e8faa90..6e2caec 100644
--- a/mqtt/tests/integration_test_single.rs
+++ b/mqtt/tests/integration_test_single.rs
@@ -31,7 +31,7 @@ use tokio::task;
use tokio_test::{assert_err, assert_ok};
use heapless::Vec;
use rust_mqtt::client::client_config::ClientConfig;
-use rust_mqtt::client::client_v5::MqttClientV5;
+use rust_mqtt::client::client::MqttClient;
use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory};
use rust_mqtt::packet::v5::property::Property;
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
@@ -41,6 +41,7 @@ use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
use rust_mqtt::utils::types::BufferError;
use std::sync::Once;
use futures::future::{join, join3};
+use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
static IP: [u8; 4] = [127, 0, 0, 1];
static WRONG_IP: [u8; 4] = [192, 168, 1, 1];
@@ -58,7 +59,7 @@ fn setup() {
}
async fn publish_core<'b>(
- client: &mut MqttClientV5<'b, TokioNetwork, 5>,
+ client: &mut MqttClient<'b, TokioNetwork, 5>,
wait: u64,
topic: &str,
) -> Result<(), ReasonCode> {
@@ -86,7 +87,7 @@ async fn publish_core<'b>(
async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
- let mut config = ClientConfig::new();
+ let mut config = ClientConfig::new(MQTTv5);
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@@ -94,7 +95,7 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) ->
let mut recv_buffer = [0; 80];
let mut write_buffer = [0; 80];
- let mut client = MqttClientV5::::new(
+ let mut client = MqttClient::::new(
tokio_network,
&mut write_buffer,
80,
@@ -106,7 +107,7 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) ->
}
async fn receive_core<'b>(
- client: &mut MqttClientV5<'b, TokioNetwork, 5>,
+ client: &mut MqttClient<'b, TokioNetwork, 5>,
topic: &str,
) -> Result<(), ReasonCode> {
info!(
@@ -136,7 +137,7 @@ async fn receive_core<'b>(
async fn receive_core_multiple<'b, const TOPICS: usize>(
- client: &mut MqttClientV5<'b, TokioNetwork, 5>,
+ client: &mut MqttClient<'b, TokioNetwork, 5>,
topic_names: &'b Vec<&'b str, TOPICS>,
) -> Result<(), ReasonCode> {
info!(
@@ -174,7 +175,7 @@ async fn receive_core_multiple<'b, const TOPICS: usize>(
async fn receive_multiple(qos: QualityOfService, topic_names: & Vec<& str, TOPICS>,) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
- let mut config = ClientConfig::new();
+ let mut config = ClientConfig::new(MQTTv5);
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@@ -183,7 +184,7 @@ async fn receive_multiple(qos: QualityOfService, topic_name
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
- let mut client = MqttClientV5::::new(
+ let mut client = MqttClient::::new(
tokio_network,
&mut write_buffer,
100,
@@ -198,7 +199,7 @@ async fn receive_multiple(qos: QualityOfService, topic_name
async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
- let mut config = ClientConfig::new();
+ let mut config = ClientConfig::new(MQTTv5);
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@@ -207,7 +208,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(),
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
- let mut client = MqttClientV5::::new(
+ let mut client = MqttClient::::new(
tokio_network,
&mut write_buffer,
100,
@@ -222,7 +223,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(),
async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
- let mut config = ClientConfig::new();
+ let mut config = ClientConfig::new(MQTTv5);
config.add_qos(qos);
config.add_username("xyz");
config.add_password(PASSWORD);
@@ -231,7 +232,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
- let mut client = MqttClientV5::::new(
+ let mut client = MqttClient::::new(
tokio_network,
&mut write_buffer,
100,