diff --git a/.ci/mosquitto.conf b/.ci/mosquitto.conf index 8e901e7..9f15bd6 100644 --- a/.ci/mosquitto.conf +++ b/.ci/mosquitto.conf @@ -1,3 +1,4 @@ allow_anonymous false -password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt - +listener 1883 10.0.1.17 +#password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt +password_file /Users/obabec/development/school/rust-mqtt/.ci/mqtt_pass.txt diff --git a/.ci/mqtt_pass.txt b/.ci/mqtt_pass.txt index 35a0f7d..598acb2 100644 --- a/.ci/mqtt_pass.txt +++ b/.ci/mqtt_pass.txt @@ -1 +1 @@ -test:testPass +test:$7$101$IY9q8LLi2gHZZRBi$dq+KePHnbDmjlxdZsqmYy6B/yYjHoK/qsCOQ/sXpkvdDoN3E0+8DkKl4XRe7mhI2YPv3Jopo1zcicobqIHbLEA== diff --git a/mqtt/src/client/client.rs b/mqtt/src/client/client.rs index 6098321..1e1f04f 100644 --- a/mqtt/src/client/client.rs +++ b/mqtt/src/client/client.rs @@ -45,6 +45,7 @@ use rand_core::RngCore; use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder}; use crate::network::NetworkError::Connection; use crate::packet::v5::property::Property; +use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError}; use crate::utils::buffer_writer::BuffWriter; pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize> { @@ -382,6 +383,7 @@ where } let mut conn = self.connection.as_mut().unwrap(); let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; + let mut packet = PublishPacket::<'b, 5>::new(); if let Err(err) = { packet.decode(&mut BuffReader::new(self.buffer, read)) @@ -465,37 +467,45 @@ where async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: usize, recv_buffer: & mut [u8], conn: &'c mut T) -> Result { let mut target_len = 0; - let mut rem_len: VariableByteInteger = [0; 4]; + let mut rem_len: Result; let mut rem_len_len: usize = 0; - let mut complete_len: bool = false; let mut writer = BuffWriter::new(buffer, buffer_len); + let mut i = 0; + + // Get len of packet loop { - let len: usize = conn.receive(recv_buffer).await?; - if len > 0 { - trace!("Received len: {}", len); - if let Err(e) = writer.insert_ref(len, &recv_buffer) { - error!("Buffer operation failed with: {}", e); - return Err(ReasonCode::BuffError); + let len: usize = conn.receive(&mut recv_buffer[writer.position..(writer.position+1)]).await?; + i = i + len; + if let Err(e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) { + return Err(ReasonCode::BuffError); + } + if (i > 1) { + rem_len = writer.get_rem_len(); + if rem_len.is_ok() { + break; } - - if writer.position >= 1 && target_len == 0 { - let tmp_rem_len = writer.get_rem_len(); - if tmp_rem_len.is_err() { - continue; - } - rem_len = tmp_rem_len.unwrap(); - rem_len_len = VariableByteIntegerEncoder::len(rem_len); - if let Ok(res) = VariableByteIntegerDecoder::decode(rem_len) { - target_len = res as usize; - } else { - return Err(ReasonCode::BuffError); - } - } - - if target_len != 0 && (target_len + rem_len_len + 1) >= writer.position { - trace!("Just read packet with len {}", (target_len + rem_len_len + 1)); - return Ok(target_len + rem_len_len + 1); + if i >= 5 { + return Err(NetworkError); } } } + + rem_len_len = i; + i = 0; + if let Ok(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()) { + target_len = l as usize; + } else { + return Err(BuffError); + } + + loop { + let len: usize = conn.receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]).await?; + i = i + len; + if let Err(e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) { + return Err(BuffError); + } + if writer.position == target_len + rem_len_len { + return Ok(target_len + rem_len_len); + } + } } \ No newline at end of file diff --git a/mqtt/tests/load_test.rs b/mqtt/tests/load_test.rs index eed9703..cd78980 100644 --- a/mqtt/tests/load_test.rs +++ b/mqtt/tests/load_test.rs @@ -85,7 +85,7 @@ async fn publish_core<'b>( if count == amount { break; } - sleep(Duration::from_millis(5)).await; + //sleep(Duration::from_millis(5)).await; } @@ -162,15 +162,16 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str, amount: u16) - config.add_password(PASSWORD); config.max_packet_size = 6000; config.keep_alive = 60000; - let mut recv_buffer = [0; 100]; - let mut write_buffer = [0; 100]; + config.max_packet_size = 300; + let mut recv_buffer = [0; 500]; + let mut write_buffer = [0; 500]; let mut client = MqttClient::::new( tokio_network, &mut write_buffer, - 100, + 500, &mut recv_buffer, - 100, + 500, config, ); @@ -364,6 +365,23 @@ async fn load_test_ten_thousand_qos() { assert_ok!(p.unwrap()); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_ten_thousand() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS0, "ten/thousand", 10000).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "ten/thousand", 10000).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} + // 72s #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[serial] @@ -381,3 +399,20 @@ async fn load_test_twenty_thousand_qos() { assert_ok!(r.unwrap()); assert_ok!(p.unwrap()); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn load_test_twenty_thousand() { + setup(); + info!("Running simple tests test"); + + let recv = + task::spawn(async move { receive(IP, QualityOfService::QoS0, "twenty/thousand", 20000).await }); + + let publ = + task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "twenty/thousand", 20000).await }); + + let (r, p) = join(recv, publ).await; + assert_ok!(r.unwrap()); + assert_ok!(p.unwrap()); +} \ No newline at end of file