Fix async

This commit is contained in:
Ondrej Babec 2022-02-25 16:20:55 +01:00
parent 7b7c8aa2aa
commit cc6498a76f
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
3 changed files with 49 additions and 32 deletions

View File

@ -1,3 +1,4 @@
use core::future::Future;
use crate::network::network_trait::{Network, NetworkError}; use crate::network::network_trait::{Network, NetworkError};
use crate::packet::connack_packet::ConnackPacket; use crate::packet::connack_packet::ConnackPacket;
use crate::packet::connect_packet::ConnectPacket; use crate::packet::connect_packet::ConnectPacket;
@ -23,42 +24,53 @@ where
} }
} }
// connect -> connack -> publish -> QoS ? -> disconn // connect -> connack -> publish -> QoS ? -> disconn
pub async fn send_message(&'a mut self, topic_name: & str, message: & str, qos: QualityOfService) -> Result<(), NetworkError> { pub async fn send_message(&'a mut self, topic_name: & str, message: & str, qos: QualityOfService) -> impl Future<Output = Result<(), NetworkError>> {
//connect async move {
self.network_driver.create_connection().await ?; let mut len = {
let mut connect = ConnectPacket::<3, 0>::clean();
connect.encode(self.buffer)
};
let mut connect = ConnectPacket::<3, 0>::clean(); self.network_driver.send(self.buffer, len).await?;
let mut len = connect.encode(self.buffer);
self.network_driver.send(self.buffer, len).await ?; //connack
//connack let connack = {
let connack: ConnackPacket<MAX_PROPERTIES> = self.receive::<ConnackPacket<MAX_PROPERTIES>>().await ?; let connack = self.receive().await?;
if connack.connect_reason_code != 0x00 { let mut packet = ConnackPacket::new();
todo!(); packet.decode(&mut BuffReader::new(self.buffer));
packet
};
if connack.connect_reason_code != 0x00 {
todo!();
}
// publish
len = {
let mut packet = PublishPacket::<5>::new(topic_name, message);
packet.encode(self.buffer)
};
self.network_driver.send(self.buffer, len).await?;
//QoS1
if <QualityOfService as Into<u8>>::into(qos) == <QualityOfService as Into<u8>>::into(QoS1) {
todo!();
}
//Disconnect
let mut disconnect = DisconnectPacket::<5>::new();
len = disconnect.encode(self.buffer);
self.network_driver.send(self.buffer, len);
Ok(())
} }
// publish
let mut packet = PublishPacket::<5>::new(topic_name, message);
len = packet.encode(self.buffer);
self.network_driver.send(self.buffer, len).await ?;
//QoS1
if <QualityOfService as Into<u8>>::into(qos) == <QualityOfService as Into<u8>>::into(QoS1) {
todo!();
}
//Disconnect
let mut disconnect = DisconnectPacket::<5>::new();
len = disconnect.encode(self.buffer);
self.network_driver.send(self.buffer, len);
Ok(())
} }
pub async fn receive<P: Packet<'p>>(&'a mut self) -> Result<P, NetworkError> { pub async fn receive(&'a mut self) -> Result<(), NetworkError> {
self.network_driver.receive(self.buffer).await ?; self.network_driver.receive(self.buffer).await ?;
let mut packet = P::new(); Ok(())
packet.decode(&mut BuffReader::new(self.buffer));
return Ok(packet);
} }
pub async fn receive_message(&'a mut self) -> Result<(), NetworkError> { pub async fn receive_message(&'a mut self) -> Result<(), NetworkError> {

View File

@ -31,7 +31,8 @@ fn main() {
let mut ip: [u8; 4] = [37, 205, 11, 180]; let mut ip: [u8; 4] = [37, 205, 11, 180];
let mut port: u16 = 1883; let mut port: u16 = 1883;
let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port);
let client = MqttClientV5::new::<TokioNetwork, 5>(tokio_network); tokio_network.create_connection().await;
let mut x = b"hello world";
let mut res2 = vec![0; 260]; let mut res2 = vec![0; 260];
let client = MqttClientV5::new(&mut tokio_network, &mut res2);
let mut x = b"hello world";
} }

View File

@ -24,6 +24,10 @@ impl TokioNetwork {
} }
} }
impl TokioNetwork {
}
impl Network for TokioNetwork { impl Network for TokioNetwork {
type ConnectionFuture<'m> where Self: 'm = impl Future<Output = Result<(), NetworkError>> + 'm; type ConnectionFuture<'m> where Self: 'm = impl Future<Output = Result<(), NetworkError>> + 'm;
type WriteFuture<'m> where Self: 'm = impl Future<Output = Result<(), NetworkError>> + 'm; type WriteFuture<'m> where Self: 'm = impl Future<Output = Result<(), NetworkError>> + 'm;