diff --git a/src/client/client_v5.rs b/src/client/client_v5.rs index 404d813..22bcfbe 100644 --- a/src/client/client_v5.rs +++ b/src/client/client_v5.rs @@ -1,3 +1,4 @@ +use core::future::Future; use crate::network::network_trait::{Network, NetworkError}; use crate::packet::connack_packet::ConnackPacket; use crate::packet::connect_packet::ConnectPacket; @@ -23,42 +24,53 @@ where } } // connect -> connack -> publish -> QoS ? -> disconn - pub async fn send_message(&'a mut self, topic_name: & str, message: & str, qos: QualityOfService) -> Result<(), NetworkError> { - //connect - self.network_driver.create_connection().await ?; + pub async fn send_message(&'a mut self, topic_name: & str, message: & str, qos: QualityOfService) -> impl Future> { + async move { + let mut len = { + let mut connect = ConnectPacket::<3, 0>::clean(); + connect.encode(self.buffer) + }; - let mut connect = ConnectPacket::<3, 0>::clean(); - let mut len = connect.encode(self.buffer); + self.network_driver.send(self.buffer, len).await?; - self.network_driver.send(self.buffer, len).await ?; - //connack - let connack: ConnackPacket = self.receive::>().await ?; - if connack.connect_reason_code != 0x00 { - todo!(); + //connack + let connack = { + let connack = self.receive().await?; + let mut packet = ConnackPacket::new(); + packet.decode(&mut BuffReader::new(self.buffer)); + packet + }; + + if connack.connect_reason_code != 0x00 { + todo!(); + } + + // publish + + len = { + let mut packet = PublishPacket::<5>::new(topic_name, message); + packet.encode(self.buffer) + }; + + self.network_driver.send(self.buffer, len).await?; + + + //QoS1 + if >::into(qos) == >::into(QoS1) { + todo!(); + } + + //Disconnect + let mut disconnect = DisconnectPacket::<5>::new(); + len = disconnect.encode(self.buffer); + self.network_driver.send(self.buffer, len); + Ok(()) } - - // publish - let mut packet = PublishPacket::<5>::new(topic_name, message); - len = packet.encode(self.buffer); - self.network_driver.send(self.buffer, len).await ?; - - //QoS1 - if >::into(qos) == >::into(QoS1) { - todo!(); - } - - //Disconnect - let mut disconnect = DisconnectPacket::<5>::new(); - len = disconnect.encode(self.buffer); - self.network_driver.send(self.buffer, len); - Ok(()) } - pub async fn receive>(&'a mut self) -> Result { + pub async fn receive(&'a mut self) -> Result<(), NetworkError> { self.network_driver.receive(self.buffer).await ?; - let mut packet = P::new(); - packet.decode(&mut BuffReader::new(self.buffer)); - return Ok(packet); + Ok(()) } pub async fn receive_message(&'a mut self) -> Result<(), NetworkError> { diff --git a/src/main.rs b/src/main.rs index 90c7757..47a58af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,8 @@ fn main() { let mut ip: [u8; 4] = [37, 205, 11, 180]; let mut port: u16 = 1883; let mut tokio_network: TokioNetwork = TokioNetwork::new(ip, port); - let client = MqttClientV5::new::(tokio_network); - let mut x = b"hello world"; + tokio_network.create_connection().await; let mut res2 = vec![0; 260]; + let client = MqttClientV5::new(&mut tokio_network, &mut res2); + let mut x = b"hello world"; } diff --git a/src/tokio_network.rs b/src/tokio_network.rs index 47582cd..4db8861 100644 --- a/src/tokio_network.rs +++ b/src/tokio_network.rs @@ -24,6 +24,10 @@ impl TokioNetwork { } } +impl TokioNetwork { + +} + impl Network for TokioNetwork { type ConnectionFuture<'m> where Self: 'm = impl Future> + 'm; type WriteFuture<'m> where Self: 'm = impl Future> + 'm;