Increase read capabilities

This commit is contained in:
Ondrej Babec 2022-04-16 11:51:22 +02:00
parent 913a849cdf
commit 0e7bdadf58
No known key found for this signature in database
GPG Key ID: 13E577E3769B2079
4 changed files with 80 additions and 34 deletions

View File

@ -1,3 +1,4 @@
allow_anonymous false allow_anonymous false
password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt listener 1883 10.0.1.17
#password_file /home/runner/work/rust-mqtt/rust-mqtt/.ci/mqtt_pass.txt
password_file /Users/obabec/development/school/rust-mqtt/.ci/mqtt_pass.txt

View File

@ -1 +1 @@
test:testPass test:$7$101$IY9q8LLi2gHZZRBi$dq+KePHnbDmjlxdZsqmYy6B/yYjHoK/qsCOQ/sXpkvdDoN3E0+8DkKl4XRe7mhI2YPv3Jopo1zcicobqIHbLEA==

View File

@ -45,6 +45,7 @@ use rand_core::RngCore;
use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder}; use crate::encoding::variable_byte_integer::{VariableByteInteger, VariableByteIntegerDecoder, VariableByteIntegerEncoder};
use crate::network::NetworkError::Connection; use crate::network::NetworkError::Connection;
use crate::packet::v5::property::Property; use crate::packet::v5::property::Property;
use crate::packet::v5::reason_codes::ReasonCode::{BuffError, NetworkError};
use crate::utils::buffer_writer::BuffWriter; use crate::utils::buffer_writer::BuffWriter;
pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize> { pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize> {
@ -382,6 +383,7 @@ where
} }
let mut conn = self.connection.as_mut().unwrap(); let mut conn = self.connection.as_mut().unwrap();
let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? }; let read = { receive_packet(self.buffer, self.buffer_len, self.recv_buffer, conn).await? };
let mut packet = PublishPacket::<'b, 5>::new(); let mut packet = PublishPacket::<'b, 5>::new();
if let Err(err) = { if let Err(err) = {
packet.decode(&mut BuffReader::new(self.buffer, read)) packet.decode(&mut BuffReader::new(self.buffer, read))
@ -465,37 +467,45 @@ where
async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: usize, recv_buffer: & mut [u8], conn: &'c mut T) -> Result<usize, ReasonCode> { async fn receive_packet<'c, T:NetworkConnection>(buffer: & mut [u8],buffer_len: usize, recv_buffer: & mut [u8], conn: &'c mut T) -> Result<usize, ReasonCode> {
let mut target_len = 0; let mut target_len = 0;
let mut rem_len: VariableByteInteger = [0; 4]; let mut rem_len: Result<VariableByteInteger, ()>;
let mut rem_len_len: usize = 0; let mut rem_len_len: usize = 0;
let mut complete_len: bool = false;
let mut writer = BuffWriter::new(buffer, buffer_len); let mut writer = BuffWriter::new(buffer, buffer_len);
let mut i = 0;
// Get len of packet
loop { loop {
let len: usize = conn.receive(recv_buffer).await?; let len: usize = conn.receive(&mut recv_buffer[writer.position..(writer.position+1)]).await?;
if len > 0 { i = i + len;
trace!("Received len: {}", len); if let Err(e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) {
if let Err(e) = writer.insert_ref(len, &recv_buffer) { return Err(ReasonCode::BuffError);
error!("Buffer operation failed with: {}", e); }
return Err(ReasonCode::BuffError); if (i > 1) {
rem_len = writer.get_rem_len();
if rem_len.is_ok() {
break;
} }
if i >= 5 {
if writer.position >= 1 && target_len == 0 { return Err(NetworkError);
let tmp_rem_len = writer.get_rem_len();
if tmp_rem_len.is_err() {
continue;
}
rem_len = tmp_rem_len.unwrap();
rem_len_len = VariableByteIntegerEncoder::len(rem_len);
if let Ok(res) = VariableByteIntegerDecoder::decode(rem_len) {
target_len = res as usize;
} else {
return Err(ReasonCode::BuffError);
}
}
if target_len != 0 && (target_len + rem_len_len + 1) >= writer.position {
trace!("Just read packet with len {}", (target_len + rem_len_len + 1));
return Ok(target_len + rem_len_len + 1);
} }
} }
} }
rem_len_len = i;
i = 0;
if let Ok(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()) {
target_len = l as usize;
} else {
return Err(BuffError);
}
loop {
let len: usize = conn.receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)]).await?;
i = i + len;
if let Err(e) = writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)]) {
return Err(BuffError);
}
if writer.position == target_len + rem_len_len {
return Ok(target_len + rem_len_len);
}
}
} }

View File

@ -85,7 +85,7 @@ async fn publish_core<'b>(
if count == amount { if count == amount {
break; break;
} }
sleep(Duration::from_millis(5)).await; //sleep(Duration::from_millis(5)).await;
} }
@ -162,15 +162,16 @@ async fn receive(ip: [u8; 4], qos: QualityOfService, topic: &str, amount: u16) -
config.add_password(PASSWORD); config.add_password(PASSWORD);
config.max_packet_size = 6000; config.max_packet_size = 6000;
config.keep_alive = 60000; config.keep_alive = 60000;
let mut recv_buffer = [0; 100]; config.max_packet_size = 300;
let mut write_buffer = [0; 100]; let mut recv_buffer = [0; 500];
let mut write_buffer = [0; 500];
let mut client = MqttClient::<TokioNetwork, 5>::new( let mut client = MqttClient::<TokioNetwork, 5>::new(
tokio_network, tokio_network,
&mut write_buffer, &mut write_buffer,
100, 500,
&mut recv_buffer, &mut recv_buffer,
100, 500,
config, config,
); );
@ -364,6 +365,23 @@ async fn load_test_ten_thousand_qos() {
assert_ok!(p.unwrap()); assert_ok!(p.unwrap());
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[serial]
async fn load_test_ten_thousand() {
setup();
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS0, "ten/thousand", 10000).await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "ten/thousand", 10000).await });
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
assert_ok!(p.unwrap());
}
// 72s // 72s
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[serial] #[serial]
@ -381,3 +399,20 @@ async fn load_test_twenty_thousand_qos() {
assert_ok!(r.unwrap()); assert_ok!(r.unwrap());
assert_ok!(p.unwrap()); assert_ok!(p.unwrap());
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[serial]
async fn load_test_twenty_thousand() {
setup();
info!("Running simple tests test");
let recv =
task::spawn(async move { receive(IP, QualityOfService::QoS0, "twenty/thousand", 20000).await });
let publ =
task::spawn(async move { publish(IP, 5, QualityOfService::QoS0, "twenty/thousand", 20000).await });
let (r, p) = join(recv, publ).await;
assert_ok!(r.unwrap());
assert_ok!(p.unwrap());
}