This commit is contained in:
Ondrej Babec 2022-02-25 15:19:28 +01:00
parent e660f8ead2
commit 1e3f8e8302
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
4 changed files with 39 additions and 29 deletions

View File

@ -11,7 +11,7 @@ pub struct MqttClientV5<T, const MAX_PROPERTIES: usize> {
network_driver: T, network_driver: T,
} }
impl<T, const MAX_PROPERTIES: usize> MqttClientV5<T, MAX_PROPERTIES> impl<'a, T, const MAX_PROPERTIES: usize> MqttClientV5<T, MAX_PROPERTIES>
where where
T: Network T: Network
{ {
@ -21,11 +21,11 @@ where
} }
} }
// connect -> connack -> publish -> QoS ? -> disconn // connect -> connack -> publish -> QoS ? -> disconn
pub async fn send_message(& mut self, topic_name: & str, message: & str, buffer: & mut [u8], qos: QualityOfService) -> Result<(), NetworkError> { pub async fn send_message(& mut self, topic_name: & str, message: & str, buffer: &'a mut [u8], qos: QualityOfService) -> Result<(), NetworkError> {
//connect //connect
self.network_driver.create_connection() ?; self.network_driver.create_connection().await ?;
let mut connect = ConnectPacket::clean(); let mut connect = ConnectPacket::<3, 0>::clean();
let mut len = connect.encode(buffer); let mut len = connect.encode(buffer);
self.network_driver.send(buffer, len).await ?; self.network_driver.send(buffer, len).await ?;
//connack //connack
@ -35,23 +35,23 @@ where
} }
// publish // publish
let mut packet = PublishPacket::new(topic_name, message); let mut packet = PublishPacket::<5>::new(topic_name, message);
len = packet.encode(buffer); len = packet.encode(buffer);
let result = self.network_driver.send(buffer, len).await ?; self.network_driver.send(buffer, len).await ?;
//QoS1 //QoS1
if qos.into() == QoS1.into() { if <QualityOfService as Into<u8>>::into(qos) == <QualityOfService as Into<u8>>::into(QoS1) {
todo!(); todo!();
} }
//Disconnect //Disconnect
let mut disconnect = DisconnectPacket::new(); let mut disconnect = DisconnectPacket::<5>::new();
len = disconnect.encode(buffer); len = disconnect.encode(buffer);
self.network_driver.send(buffer, len); self.network_driver.send(buffer, len);
return result; Ok(())
} }
pub async fn receive<P: Packet<'a>>(& mut self, buffer: & mut [u8]) -> Result<P, ()> { pub async fn receive<P: Packet<'a>>(& mut self, buffer: &'a mut [u8]) -> Result<P, NetworkError> {
self.network_driver.receive(buffer).await ?; self.network_driver.receive(buffer).await ?;
let mut packet = P::new(); let mut packet = P::new();
packet.decode(&mut BuffReader::new(buffer)); packet.decode(&mut BuffReader::new(buffer));
@ -59,6 +59,6 @@ where
} }
pub async fn receive_message(& mut self, buffer: & mut [u8]) -> Result<(), NetworkError> { pub async fn receive_message(& mut self, buffer: & mut [u8]) -> Result<(), NetworkError> {
return Ok(());
} }
} }

View File

@ -4,7 +4,6 @@
#![allow(dead_code)] #![allow(dead_code)]
#![feature(type_alias_impl_trait)] #![feature(type_alias_impl_trait)]
#![feature(generic_associated_types)] #![feature(generic_associated_types)]
#![feature(async)]
extern crate alloc; extern crate alloc;

View File

@ -25,9 +25,9 @@ pub trait Network {
fn new(ip: [u8; 4], port: u16) -> Self; fn new(ip: [u8; 4], port: u16) -> Self;
fn create_connection(& mut self) -> Self::ConnectionFuture<'m>; fn create_connection(&'m mut self) -> Self::ConnectionFuture<'m>;
fn send(& mut self, buffer: & mut [u8], len: usize) -> Self::WriteFuture<'m>; fn send(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m>;
fn receive(& mut self, buffer: & mut [u8]) -> Self::ReadFuture<'m>; fn receive(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m>;
} }

View File

@ -10,13 +10,13 @@ use tokio::net::{TcpListener, TcpStream};
use crate::network::network_trait::{Network, NetworkError}; use crate::network::network_trait::{Network, NetworkError};
use crate::packet::mqtt_packet::Packet; use crate::packet::mqtt_packet::Packet;
pub struct TokioNetwork<'a> { pub struct TokioNetwork {
ip: [u8; 4], ip: [u8; 4],
port: u16, port: u16,
socket: &'a mut TcpStream, socket: Option<TcpStream>,
} }
impl<'a> TokioNetwork<'a> { impl TokioNetwork {
fn convert_ip(& mut self) -> String { fn convert_ip(& mut self) -> String {
String::from(format!("{}.{}.{}.{}:{}", self.ip[0], self.ip[1], self.ip[2], self.ip[3], self.port)) String::from(format!("{}.{}.{}.{}:{}", self.ip[0], self.ip[1], self.ip[2], self.ip[3], self.port))
} }
@ -31,33 +31,44 @@ impl Network for TokioNetwork {
return Self { return Self {
ip, ip,
port, port,
socket: &mut (TcpStream), socket: Option::None,
} }
} }
fn create_connection(&mut self) -> Self::ConnectionFuture<'m> { fn create_connection<'m>(&'m mut self) -> Self::ConnectionFuture<'m> {
async move { async move {
TcpStream::connect(self.convert_ip()) TcpStream::connect(self.convert_ip())
.await .await
.map_err(|_| NetworkError::Connection); .map(|socket| self.socket = Some(socket))
.map(|_| ())
.map_err(|_| NetworkError::Connection)
} }
} }
fn send<'m>(&mut self, buffer: &mut [u8], len: usize) -> Self::WriteFuture<'m> { fn send<'m>(&'m mut self, buffer: &'m mut [u8], len: usize) -> Self::WriteFuture<'m> {
async move { async move {
self.socket.write_all(&buffer[0..len]) return if let Some(ref mut stream) = self.socket {
.await stream.write_all(&buffer[0..len])
.map_err(|_| NetworkError::Unknown); .await
.map_err(|_| NetworkError::Unknown)
} else {
Err(NetworkError::Unknown)
}
} }
} }
fn receive<'m>(&mut self, buffer: &mut [u8]) -> Self::ReadFuture<'m> { fn receive<'m>(&'m mut self, buffer: &'m mut [u8]) -> Self::ReadFuture<'m> {
async move { async move {
self.socket.read(buffer) return if let Some(ref mut stream) = self.socket {
.await stream.read(buffer)
.map_err(|_| NetworkError::Connection); .await
.map_err(|_| NetworkError::Connection)
} else {
Err(NetworkError::Unknown)
}
} }
} }