perf: ⚗️ buffer layer optimizations

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-11 21:42:45 +03:00
parent 1ccc335477
commit e4125c63aa
2 changed files with 10 additions and 8 deletions

View file

@ -42,7 +42,7 @@ pub async fn start() {
} }
async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer: Sender<Message>) { async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer: Sender<Message>) {
loop { loop {
tokio::time::sleep(Duration::from_millis(500)).await; tokio::time::sleep(Duration::from_millis(200)).await;
let mut messages_buffered: Vec<Message> = vec![]; let mut messages_buffered: Vec<Message> = vec![];
while message_consumer.len() > 0 { while message_consumer.len() > 0 {
match message_consumer.recv().await { match message_consumer.recv().await {
@ -52,16 +52,18 @@ async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer
Err(_) => {} Err(_) => {}
} }
} }
let mut message_buffered_concreted = String::new();
if messages_buffered.len() > 0 { if messages_buffered.len() > 0 {
for message_buffered in messages_buffered { for message_buffered in messages_buffered {
match buffered_producer.send(message_buffered) { message_buffered_concreted = format!("{}{}", message_buffered_concreted, message_buffered);
}
match buffered_producer.send(message_buffered_concreted.into()) {
Ok(_) => {} Ok(_) => {}
Err(_) => {} Err(_) => {}
} }
} }
} }
} }
}
async fn streamer_stream( async fn streamer_stream(
record_producer: Sender<Message>, record_producer: Sender<Message>,
mut ws_stream: WebSocketStream<TcpStream>, mut ws_stream: WebSocketStream<TcpStream>,
@ -108,7 +110,7 @@ async fn message_organizer(
message_producer.receiver_count() message_producer.receiver_count()
); );
} }
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(10)).await;
} }
} }
async fn stream( async fn stream(

View file

@ -41,7 +41,7 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
let _zero = charred.remove(1); let _zero = charred.remove(1);
let _point = charred.remove(1); let _point = charred.remove(1);
} }
charred.truncate(4); charred.truncate(8);
let mut single_data_packet: Vec<u8> = vec![]; let mut single_data_packet: Vec<u8> = vec![];
for char in charred { for char in charred {
let char_packet = char.to_string().as_bytes().to_vec(); let char_packet = char.to_string().as_bytes().to_vec();
@ -70,7 +70,7 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
message_producer.receiver_count() message_producer.receiver_count()
); );
} }
tokio::time::sleep(Duration::from_millis(1000)).await; tokio::time::sleep(Duration::from_millis(10)).await;
} }
} }