From 5729feef15fedb93d75faa4f58828121b1b6504b Mon Sep 17 00:00:00 2001 From: Ondrej Babec Date: Fri, 25 Feb 2022 16:35:46 +0100 Subject: [PATCH] Type issue --- src/client/client_v5.rs | 73 ++++++++++++++++++++--------------------- src/main.rs | 1 + src/tokio_network.rs | 1 - 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/src/client/client_v5.rs b/src/client/client_v5.rs index 22bcfbe..34198c2 100644 --- a/src/client/client_v5.rs +++ b/src/client/client_v5.rs @@ -24,48 +24,47 @@ where } } // connect -> connack -> publish -> QoS ? -> disconn - pub async fn send_message(&'a mut self, topic_name: & str, message: & str, qos: QualityOfService) -> impl Future> { - async move { - let mut len = { - let mut connect = ConnectPacket::<3, 0>::clean(); - connect.encode(self.buffer) - }; + pub async fn send_message(&'a mut self, topic_name: & str, message: & str, qos: QualityOfService) -> Result<(), NetworkError> { - self.network_driver.send(self.buffer, len).await?; + let mut len = { + let mut connect = ConnectPacket::<3, 0>::clean(); + connect.encode(self.buffer) + }; - //connack - let connack = { - let connack = self.receive().await?; - let mut packet = ConnackPacket::new(); - packet.decode(&mut BuffReader::new(self.buffer)); - packet - }; + self.network_driver.send(self.buffer, len).await ?; - if connack.connect_reason_code != 0x00 { - todo!(); - } + //connack + let connack = { + self.receive().await ?; + let mut packet = ConnackPacket::new(); + packet.decode(&mut BuffReader::new(self.buffer)); + packet + }; - // publish - - len = { - let mut packet = PublishPacket::<5>::new(topic_name, message); - packet.encode(self.buffer) - }; - - self.network_driver.send(self.buffer, len).await?; - - - //QoS1 - if >::into(qos) == >::into(QoS1) { - todo!(); - } - - //Disconnect - let mut disconnect = DisconnectPacket::<5>::new(); - len = disconnect.encode(self.buffer); - self.network_driver.send(self.buffer, len); - Ok(()) + if connack.connect_reason_code != 0x00 { + todo!(); } + + // publish + + len = { + let mut packet = PublishPacket::<5>::new(topic_name, message); + packet.encode(self.buffer) + }; + + self.network_driver.send(self.buffer, len).await ?; + + + //QoS1 + if >::into(qos) == >::into(QoS1) { + todo!(); + } + + //Disconnect + let mut disconnect = DisconnectPacket::<5>::new(); + len = disconnect.encode(self.buffer); + self.network_driver.send(self.buffer, len); + Ok(()) } pub async fn receive(&'a mut self) -> Result<(), NetworkError> { diff --git a/src/main.rs b/src/main.rs index 47a58af..dea981a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use rust_mqtt::packet::publish_packet::PublishPacket; use rust_mqtt::packet::subscription_packet::SubscriptionPacket; use rust_mqtt::tokio_network::TokioNetwork; +#[tokio::main] fn main() { env_logger::builder() .filter_level(log::LevelFilter::Info) diff --git a/src/tokio_network.rs b/src/tokio_network.rs index 4db8861..9d9fe1d 100644 --- a/src/tokio_network.rs +++ b/src/tokio_network.rs @@ -5,7 +5,6 @@ use core::fmt::Error; use core::future::Future; use core::ptr::null; -use embassy::io::WriteAll; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream};