Prepare client for future v3 support (#11)

* Prepare client for future v3 support
* Add hiveMQ
This commit is contained in:
obabec 2022-03-31 13:34:26 +02:00 committed by GitHub
parent 19087016a6
commit 96be69c520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 212 additions and 26 deletions

48
.ci/hive_cred.xml Normal file
View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ MIT License
~
~ Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<file-rbac>
<users>
<user>
<name>test</name>
<!--- password hash for "pass1" -->
<password>testPass</password>
<roles>
<id>superuser</id>
</roles>
</user>
</users>
<roles>
<role>
<id>superuser</id>
<permissions>
<permission>
<!-- Allow everything -->
<topic>#</topic>
</permission>
</permissions>
</role>
</roles>
</file-rbac>

View File

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ MIT License
~
~ Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<extension-configuration>
<!-- Reload interval for credentials in seconds -->
<credentials-reload-interval>60</credentials-reload-interval>
<!-- Optional list of names of listeners this extension is used for
<listener-names>
<listener-name>my-listener</listener-name>
<listener-name>my-listener-2</listener-name>
</listener-names> -->
<!-- If the credentials file is using HASHED or PLAIN passwords -->
<password-type>PLAIN</password-type>
</extension-configuration>

View File

@ -3,8 +3,8 @@ on: [pull_request]
name: IntegrationTests
jobs:
integration_tests:
name: Integration tests
integration_tests_mosquitto:
name: Integration tests with mosquitto
runs-on: ubuntu-latest
steps:
- name: Git checkout
@ -25,3 +25,32 @@ jobs:
- name: Run integration-tests tests
run: RUST_LOG=trace cargo test integration
integration_tests_hive:
name: Integration tests with HiveMQ
runs-on: ubuntu-latest
steps:
- name: Git checkout
uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Build embedded
run: cargo build --target thumbv7em-none-eabihf --features "no_std" --no-default-features
- name: Start HiveMQ
run: |
curl -LO https://github.com/hivemq/hivemq-community-edition/releases/download/2021.3/hivemq-ce-2021.3.zip
unzip hivemq-ce-2021.3.zip
curl -LO https://www.hivemq.com/releases/extensions/hivemq-file-rbac-extension-4.4.0.zip
unzip hivemq-file-rbac-extension-4.4.0.zip
mv hivemq-file-rbac-extension hivemq-ce-2021.3/extensions
cp .ci/hive_cred.xml hivemq-ce-2021.3/extensions/hivemq-file-rbac-extension/credentials.xml
cp .ci/hive_extension_config.xml hivemq-ce-2021.3/extensions/hivemq-file-rbac-extension/extension-config.xml
hivemq-ce-2021.3/bin/run.sh &
- name: Run integration-tests tests
run: RUST_LOG=trace cargo test integration

View File

@ -22,7 +22,7 @@
* SOFTWARE.
*/
use crate::client::client_config::ClientConfig;
use crate::client::client_config::{ClientConfig, MqttVersion};
use crate::network::NetworkConnection;
use crate::packet::v5::connack_packet::ConnackPacket;
use crate::packet::v5::connect_packet::ConnectPacket;
@ -44,9 +44,10 @@ use heapless::Vec;
use rand_core::RngCore;
use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder};
use crate::network::NetworkError::Connection;
use crate::packet::v5::property::Property;
use crate::utils::buffer_writer::BuffWriter;
pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize> {
connection: Option<T>,
buffer: &'a mut [u8],
buffer_len: usize,
@ -56,7 +57,7 @@ pub struct MqttClientV5<'a, T, const MAX_PROPERTIES: usize> {
config: ClientConfig<'a, MAX_PROPERTIES>,
}
impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5<'a, T, MAX_PROPERTIES>
impl<'a, T, const MAX_PROPERTIES: usize> MqttClient<'a, T, MAX_PROPERTIES>
where
T: NetworkConnection,
{
@ -79,7 +80,7 @@ where
}
}
pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
async fn connect_to_broker_v5<'b>(&'b mut self) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
@ -141,7 +142,14 @@ where
}
}
pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.connect_to_broker_v5().await}
}
}
async fn disconnect_v5<'b>(&'b mut self) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
@ -168,7 +176,14 @@ where
Ok(())
}
pub async fn send_message<'b>(
pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.disconnect_v5().await}
}
}
async fn send_message_v5<'b>(
&'b mut self,
topic_name: &'b str,
message: &'b str,
@ -228,7 +243,18 @@ where
Ok(())
}
pub async fn subscribe_to_topics<'b, const TOPICS: usize>(
pub async fn send_message<'b>(
&'b mut self,
topic_name: &'b str,
message: &'b str,
) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.send_message_v5(topic_name, message).await}
}
}
async fn subscribe_to_topics_v5<'b, const TOPICS: usize>(
&'b mut self,
topic_names: &'b Vec<&'b str, TOPICS>,
) -> Result<(), ReasonCode> {
@ -285,7 +311,17 @@ where
Ok(())
}
pub async fn subscribe_to_topic<'b>(
pub async fn subscribe_to_topics<'b, const TOPICS: usize>(
&'b mut self,
topic_names: &'b Vec<&'b str, TOPICS>,
) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.subscribe_to_topics_v5(topic_names).await}
}
}
async fn subscribe_to_topic_v5<'b>(
&'b mut self,
topic_name: &'b str,
) -> Result<(), ReasonCode> {
@ -330,7 +366,17 @@ where
}
}
pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> {
pub async fn subscribe_to_topic<'b>(
&'b mut self,
topic_name: &'b str,
) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.subscribe_to_topic_v5(topic_name).await}
}
}
async fn receive_message_v5<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
@ -374,7 +420,14 @@ where
return Ok(packet.message.unwrap());
}
pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
pub async fn receive_message<'b>(&'b mut self) -> Result<&'b [u8], ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.receive_message_v5().await}
}
}
pub async fn send_ping_v5<'b>(&'b mut self) -> Result<(), ReasonCode> {
if self.connection.is_none() {
return Err(ReasonCode::NetworkError);
}
@ -400,8 +453,16 @@ where
Ok(())
}
}
pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
match self.config.mqtt_version {
MqttVersion::MQTTv3 => {Err(ReasonCode::UnsupportedProtocolVersion)}
MqttVersion::MQTTv5 => {self.send_ping_v5().await}
}
}
}
async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: usize, recv_buffer: & mut [u8], conn: &'c mut T) -> Result<usize, ReasonCode> {
let mut target_len = 0;
let mut rem_len: VariableByteInteger = [0; 4];

View File

@ -28,6 +28,11 @@ use crate::utils::types::{BinaryData, EncodedString};
use heapless::Vec;
#[derive(Clone, PartialEq)]
pub enum MqttVersion {
MQTTv3,
MQTTv5
}
#[derive(Clone)]
pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
@ -40,10 +45,11 @@ pub struct ClientConfig<'a, const MAX_PROPERTIES: usize> {
pub password: BinaryData<'a>,
pub properties: Vec<Property<'a>, MAX_PROPERTIES>,
pub max_packet_size: u32,
pub mqtt_version: MqttVersion,
}
impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
pub fn new() -> Self {
pub fn new(version: MqttVersion) -> Self {
Self {
qos: QualityOfService::QoS0,
keep_alive: 60,
@ -54,6 +60,7 @@ impl<'a, const MAX_PROPERTIES: usize> ClientConfig<'a, MAX_PROPERTIES> {
password: BinaryData::new(),
properties: Vec::<Property<'a>, MAX_PROPERTIES>::new(),
max_packet_size: 265_000,
mqtt_version: version
}
}

View File

@ -24,4 +24,4 @@
#[allow(unused_must_use)]
pub mod client_config;
pub mod client_v5;
pub mod client;

View File

@ -31,7 +31,7 @@ use tokio::task;
use tokio_test::{assert_err, assert_ok};
use heapless::Vec;
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::client::client_v5::MqttClientV5;
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::network::{NetworkConnection, NetworkConnectionFactory};
use rust_mqtt::packet::v5::property::Property;
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
@ -41,6 +41,7 @@ use rust_mqtt::tokio_net::tokio_network::{TokioNetwork, TokioNetworkFactory};
use rust_mqtt::utils::types::BufferError;
use std::sync::Once;
use futures::future::{join, join3};
use rust_mqtt::client::client_config::MqttVersion::MQTTv5;
static IP: [u8; 4] = [127, 0, 0, 1];
static WRONG_IP: [u8; 4] = [192, 168, 1, 1];
@ -58,7 +59,7 @@ fn setup() {
}
async fn publish_core<'b>(
client: &mut MqttClientV5<'b, TokioNetwork, 5>,
client: &mut MqttClient<'b, TokioNetwork, 5>,
wait: u64,
topic: &str,
) -> Result<(), ReasonCode> {
@ -86,7 +87,7 @@ async fn publish_core<'b>(
async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new();
let mut config = ClientConfig::new(MQTTv5);
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@ -94,7 +95,7 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) ->
let mut recv_buffer = [0; 80];
let mut write_buffer = [0; 80];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5>::new(
tokio_network,
&mut write_buffer,
80,
@ -106,7 +107,7 @@ async fn publish(ip: [u8; 4], wait: u64, qos: QualityOfService, topic: &str) ->
}
async fn receive_core<'b>(
client: &mut MqttClientV5<'b, TokioNetwork, 5>,
client: &mut MqttClient<'b, TokioNetwork, 5>,
topic: &str,
) -> Result<(), ReasonCode> {
info!(
@ -136,7 +137,7 @@ async fn receive_core<'b>(
async fn receive_core_multiple<'b, const TOPICS: usize>(
client: &mut MqttClientV5<'b, TokioNetwork, 5>,
client: &mut MqttClient<'b, TokioNetwork, 5>,
topic_names: &'b Vec<&'b str, TOPICS>,
) -> Result<(), ReasonCode> {
info!(
@ -174,7 +175,7 @@ async fn receive_core_multiple<'b, const TOPICS: usize>(
async fn receive_multiple<const TOPICS: usize>(qos: QualityOfService, topic_names: & Vec<& str, TOPICS>,) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new();
let mut config = ClientConfig::new(MQTTv5);
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@ -183,7 +184,7 @@ async fn receive_multiple<const TOPICS: usize>(qos: QualityOfService, topic_name
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5>::new(
tokio_network,
&mut write_buffer,
100,
@ -198,7 +199,7 @@ async fn receive_multiple<const TOPICS: usize>(qos: QualityOfService, topic_name
async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(ip, PORT).await?;
let mut config = ClientConfig::new();
let mut config = ClientConfig::new(MQTTv5);
config.add_qos(qos);
config.add_username(USERNAME);
config.add_password(PASSWORD);
@ -207,7 +208,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(),
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5>::new(
tokio_network,
&mut write_buffer,
100,
@ -222,7 +223,7 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str) -> Result<(),
async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode> {
let mut tokio_factory: TokioNetworkFactory = TokioNetworkFactory::new();
let mut tokio_network: TokioNetwork = tokio_factory.connect(IP, PORT).await?;
let mut config = ClientConfig::new();
let mut config = ClientConfig::new(MQTTv5);
config.add_qos(qos);
config.add_username("xyz");
config.add_password(PASSWORD);
@ -231,7 +232,7 @@ async fn receive_with_wrong_cred(qos: QualityOfService) -> Result<(), ReasonCode
let mut recv_buffer = [0; 100];
let mut write_buffer = [0; 100];
let mut client = MqttClientV5::<TokioNetwork, 5>::new(
let mut client = MqttClient::<TokioNetwork, 5>::new(
tokio_network,
&mut write_buffer,
100,