This commit is contained in:
Ondrej Babec 2022-02-25 14:17:58 +01:00
parent 1918930761
commit e660f8ead2
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
21 changed files with 279 additions and 21 deletions

View File

@ -1,20 +1,64 @@
use crate::packet::publish_packet::PublishPacket;
use crate::network::network_trait::Network;
use crate::packet::publish_packet::{PublishPacket, QualityOfService};
use crate::network::network_trait::{Network, NetworkError};
use crate::packet::connack_packet::ConnackPacket;
use crate::packet::connect_packet::ConnectPacket;
use crate::packet::disconnect_packet::DisconnectPacket;
use crate::packet::mqtt_packet::Packet;
use crate::packet::publish_packet::QualityOfService::QoS1;
use crate::utils::buffer_reader::BuffReader;
struct MqttClientV5<T: Network> {
pub struct MqttClientV5<T, const MAX_PROPERTIES: usize> {
network_driver: T,
}
impl<T> MqttClientV5<T>
impl<T, const MAX_PROPERTIES: usize> MqttClientV5<T, MAX_PROPERTIES>
where
T: Network,
T: Network
{
fn send_message(& mut self, topic_name: & str, message: & str, buffer: & mut [u8]) {
let packet = PublishPacket::new(topic_name, message);
self.network_driver.send()
pub fn new(network_driver: T) -> Self {
Self {
network_driver,
}
}
// connect -> connack -> publish -> QoS ? -> disconn
pub async fn send_message(& mut self, topic_name: & str, message: & str, buffer: & mut [u8], qos: QualityOfService) -> Result<(), NetworkError> {
//connect
self.network_driver.create_connection() ?;
let mut connect = ConnectPacket::clean();
let mut len = connect.encode(buffer);
self.network_driver.send(buffer, len).await ?;
//connack
let connack: ConnackPacket<MAX_PROPERTIES> = self.receive::<ConnackPacket<MAX_PROPERTIES>>(buffer).await ?;
if connack.connect_reason_code != 0x00 {
todo!();
}
// publish
let mut packet = PublishPacket::new(topic_name, message);
len = packet.encode(buffer);
let result = self.network_driver.send(buffer, len).await ?;
//QoS1
if qos.into() == QoS1.into() {
todo!();
}
//Disconnect
let mut disconnect = DisconnectPacket::new();
len = disconnect.encode(buffer);
self.network_driver.send(buffer, len);
return result;
}
fn receive_message(& mut self) {
pub async fn receive<P: Packet<'a>>(& mut self, buffer: & mut [u8]) -> Result<P, ()> {
self.network_driver.receive(buffer).await ?;
let mut packet = P::new();
packet.decode(&mut BuffReader::new(buffer));
return Ok(packet);
}
pub async fn receive_message(& mut self, buffer: & mut [u8]) -> Result<(), NetworkError> {
}
}

View File

@ -2,12 +2,18 @@
#![macro_use]
#![cfg_attr(not(feature = "std"), no_std)]
#![allow(dead_code)]
#![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)]
#![feature(async)]
extern crate alloc;
pub mod encoding;
pub mod packet;
pub mod utils;
pub mod client;
mod network;
pub mod network;
pub mod tokio_network;
#[allow(unused_variables)]
pub fn print_stack(file: &'static str, line: u32) {

View File

@ -1,7 +1,10 @@
use rust_mqtt::client::client_v5::MqttClientV5;
use rust_mqtt::network::network_trait::Network;
use rust_mqtt::packet::connect_packet::ConnectPacket;
use rust_mqtt::packet::mqtt_packet::Packet;
use rust_mqtt::packet::publish_packet::PublishPacket;
use rust_mqtt::packet::subscription_packet::SubscriptionPacket;
use rust_mqtt::tokio_network::TokioNetwork;
fn main() {
env_logger::builder()
@ -9,12 +12,12 @@ fn main() {
.format_timestamp_nanos()
.init();
let mut pckt: SubscriptionPacket<1, 0> = SubscriptionPacket::new();
/*let mut pckt: SubscriptionPacket<1, 0> = SubscriptionPacket::new();
let mut res = vec![0; 140];
let lnsub = pckt.encode(&mut res);
println!("{:02X?}", &res[0..lnsub]);
let mut res2 = vec![0; 260];
let mut x = b"hello world";
let mut pblsh = PublishPacket::<0>::new(x);
let lnpblsh = pblsh.encode(&mut res2);
println!("{:02X?}", &res2[0..lnpblsh]);
@ -24,5 +27,11 @@ fn main() {
let mut cntrl = ConnectPacket::<3, 0>::clean();
let lncntrl = cntrl.encode(&mut res3);
println!("{:02X?}", &res3[0..lncntrl]);
log::info!("xxx");
log::info!("xxx");*/
let mut ip: [u8; 4] = [37, 205, 11, 180];
let mut port: u16 = 1883;
let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port);
let client = MqttClientV5::new::<TokioNetwork, 5>(tokio_network);
let mut x = b"hello world";
let mut res2 = vec![0; 260];
}

View File

@ -1,5 +1,33 @@
use core::fmt::Error;
use core::future::Future;
use crate::packet::mqtt_packet::Packet;
pub enum NetworkError {
Connection,
Unknown,
}
pub trait Network {
fn send(buffer: & mut [u8]);
fn receive(buffer: & mut [u8]);
type ConnectionFuture<'m>: Future<Output = Result<(), NetworkError>>
where
Self: 'm;
type WriteFuture<'m>: Future<Output = Result<(), NetworkError>>
where
Self: 'm;
type ReadFuture<'m>: Future<Output = Result<usize, NetworkError>>
where
Self: 'm;
fn new(ip: [u8; 4], port: u16) -> Self;
fn create_connection(& mut self) -> Self::ConnectionFuture<'m>;
fn send(& mut self, buffer: & mut [u8], len: usize) -> Self::WriteFuture<'m>;
fn receive(& mut self, buffer: & mut [u8]) -> Self::ReadFuture<'m>;
}

View File

@ -46,6 +46,9 @@ impl<'a, const MAX_PROPERTIES: usize> AuthPacket<'a, MAX_PROPERTIES> {
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for AuthPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
todo!()
}
/*fn new() -> Packet<'a, MAX_PROPERTIES> {
return AuthPacket { fixed_header: PacketType::Auth.into(), remain_len: 0, auth_reason: 0, property_len: 0, properties: Vec::<Property<'a>, MAX_PROPERTIES>::new() }
}*/

View File

@ -32,6 +32,10 @@ impl<'a, const MAX_PROPERTIES: usize> ConnackPacket<'a, MAX_PROPERTIES> {
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for ConnackPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);
buff_writer.write_u8(self.fixed_header);

View File

@ -101,6 +101,10 @@ impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize>
impl<'a, const MAX_PROPERTIES: usize, const MAX_WILL_PROPERTIES: usize> Packet<'a>
for ConnectPacket<'a, MAX_PROPERTIES, MAX_WILL_PROPERTIES>
{
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);

View File

@ -30,9 +30,23 @@ impl<'a, const MAX_PROPERTIES: usize> DisconnectPacket<'a, MAX_PROPERTIES> {
self.disconnect_reason = buff_reader.read_u8().unwrap();
self.decode_properties(buff_reader);
}
fn add_reason(& mut self, reason: u8) {
self.disconnect_reason = reason;
}
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for DisconnectPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
Self {
fixed_header: PacketType::Disconnect.into(),
remain_len: 5,
disconnect_reason: 0x00,
property_len: 0,
properties: Vec::<Property<'a>, MAX_PROPERTIES>::new()
}
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);
buff_writer.write_u8(self.fixed_header);

View File

@ -5,9 +5,10 @@ use crate::utils::buffer_reader::ParseError;
use super::property::Property;
pub trait Packet<'a> {
//fn new() -> dyn Packet<'a> where Self: Sized;
fn new() -> Self;
fn encode(&mut self, buffer: &mut [u8]) -> usize;
// -> Result<Ok(), Err()>
fn decode(&mut self, buff_reader: &mut BuffReader<'a>);
// properties

View File

@ -15,6 +15,10 @@ pub struct PingreqPacket {
impl PingreqPacket {}
impl<'a> Packet<'a> for PingreqPacket {
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);
buff_writer.write_u8(self.fixed_header);

View File

@ -22,6 +22,10 @@ impl<'a> PingrespPacket {
}
impl<'a> Packet<'a> for PingrespPacket {
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);
buff_writer.write_u8(self.fixed_header);

View File

@ -36,6 +36,10 @@ impl<'a, const MAX_PROPERTIES: usize> PubackPacket<'a, MAX_PROPERTIES> {
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubackPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);

View File

@ -36,6 +36,10 @@ impl<'a, const MAX_PROPERTIES: usize> PubcompPacket<'a, MAX_PROPERTIES> {
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubcompPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);

View File

@ -2,6 +2,7 @@ use crate::encoding::variable_byte_integer::VariableByteIntegerEncoder;
use heapless::Vec;
use crate::packet::mqtt_packet::Packet;
use crate::packet::publish_packet::QualityOfService::{INVALID, QoS0, QoS1, QoS2};
use crate::utils::buffer_reader::BuffReader;
use crate::utils::buffer_reader::EncodedString;
use crate::utils::buffer_writer::BuffWriter;
@ -9,6 +10,35 @@ use crate::utils::buffer_writer::BuffWriter;
use super::packet_type::PacketType;
use super::property::Property;
pub enum QualityOfService {
QoS0,
QoS1,
QoS2,
INVALID
}
impl From<u8> for QualityOfService {
fn from(orig: u8) -> Self {
return match orig {
0 => QoS0,
1 => QoS1,
2 => QoS2,
_ => INVALID
}
}
}
impl Into<u8> for QualityOfService {
fn into(self) -> u8 {
return match self {
QoS0 => 0,
QoS1 => 1,
QoS2 => 2,
INVALID => 3,
}
}
}
pub struct PublishPacket<'a, const MAX_PROPERTIES: usize> {
// 7 - 4 mqtt control packet type, 3-0 flagy
pub fixed_header: u8,
@ -27,7 +57,7 @@ pub struct PublishPacket<'a, const MAX_PROPERTIES: usize> {
}
impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> {
pub fn new(topic_name: & str, message: &'a str) -> Self {
pub fn new(topic_name: &'a str, message: &'a str) -> Self {
let mut x = Self {
fixed_header: PacketType::Publish.into(),
remain_len: 0,
@ -41,7 +71,7 @@ impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> {
return x;
}
pub fn add_topic_name(&mut self, topic_name: & str) {
pub fn add_topic_name(&mut self, topic_name: &'a str) {
self.topic_name.string = topic_name;
self.topic_name.len = topic_name.len() as u16;
}
@ -63,6 +93,10 @@ impl<'a, const MAX_PROPERTIES: usize> PublishPacket<'a, MAX_PROPERTIES> {
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PublishPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);

View File

@ -36,6 +36,10 @@ impl<'a, const MAX_PROPERTIES: usize> PubrecPacket<'a, MAX_PROPERTIES> {
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrecPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);

View File

@ -36,6 +36,10 @@ impl<'a, const MAX_PROPERTIES: usize> PubrelPacket<'a, MAX_PROPERTIES> {
}
impl<'a, const MAX_PROPERTIES: usize> Packet<'a> for PubrelPacket<'a, MAX_PROPERTIES> {
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);

View File

@ -52,6 +52,10 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize>
impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a>
for SubackPacket<'a, MAX_REASONS, MAX_PROPERTIES>
{
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
log::error!("SUBACK packet does not support encoding!");
return 0;

View File

@ -53,6 +53,10 @@ impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize>
impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a>
for SubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES>
{
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);

View File

@ -50,6 +50,10 @@ impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize>
impl<'a, const MAX_REASONS: usize, const MAX_PROPERTIES: usize> Packet<'a>
for UnsubackPacket<'a, MAX_REASONS, MAX_PROPERTIES>
{
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
log::error!("UNSUBACK packet does not support encoding!");
return 0;

View File

@ -32,14 +32,16 @@ pub struct UnsubscriptionPacket<'a, const MAX_FILTERS: usize, const MAX_PROPERTI
impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize>
UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES>
{
/*pub fn new() -> Self {
}*/
}
impl<'a, const MAX_FILTERS: usize, const MAX_PROPERTIES: usize> Packet<'a>
for UnsubscriptionPacket<'a, MAX_FILTERS, MAX_PROPERTIES>
{
fn new() -> Self {
todo!()
}
fn encode(&mut self, buffer: &mut [u8]) -> usize {
let mut buff_writer = BuffWriter::new(buffer);

73
src/tokio_network.rs Normal file
View File

@ -0,0 +1,73 @@
use alloc::format;
use alloc::string::String;
use core::borrow::BorrowMut;
use core::fmt::Error;
use core::future::Future;
use core::ptr::null;
use embassy::io::WriteAll;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use crate::network::network_trait::{Network, NetworkError};
use crate::packet::mqtt_packet::Packet;
pub struct TokioNetwork<'a> {
ip: [u8; 4],
port: u16,
socket: &'a mut TcpStream,
}
impl<'a> TokioNetwork<'a> {
fn convert_ip(& mut self) -> String {
String::from(format!("{}.{}.{}.{}:{}", self.ip[0], self.ip[1], self.ip[2], self.ip[3], self.port))
}
}
impl Network for TokioNetwork {
type ConnectionFuture<'m> where Self: 'm = impl Future<Output = Result<(), NetworkError>> + 'm;
type WriteFuture<'m> where Self: 'm = impl Future<Output = Result<(), NetworkError>> + 'm;
type ReadFuture<'m> where Self: 'm = impl Future<Output = Result<usize, NetworkError>> + 'm;
fn new(ip: [u8; 4], port: u16) -> Self {
return Self {
ip,
port,
socket: &mut (TcpStream),
}
}
fn create_connection(&mut self) -> Self::ConnectionFuture<'m> {
async move {
TcpStream::connect(self.convert_ip())
.await
.map_err(|_| NetworkError::Connection);
}
}
fn send<'m>(&mut self, buffer: &mut [u8], len: usize) -> Self::WriteFuture<'m> {
async move {
self.socket.write_all(&buffer[0..len])
.await
.map_err(|_| NetworkError::Unknown);
}
}
fn receive<'m>(&mut self, buffer: &mut [u8]) -> Self::ReadFuture<'m> {
async move {
self.socket.read(buffer)
.await
.map_err(|_| NetworkError::Connection);
}
}
/*fn send(&mut self, buffer: &mut [u8], len: usize) -> Result<(), NetworkError> {
self.socket.write_all(&buffer[0..len]);
Ok(())
}
fn receive(&mut self, buffer: &mut [u8]) -> Result<usize, NetworkError> {
let len = self.socket.read(buffer).await ?;
Ok(len)
}*/
}