From e4125c63aa9914119970a8ace29bebb1c20fb27b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Thu, 11 Apr 2024 21:42:45 +0300 Subject: [PATCH] perf: :alembic: buffer layer optimizations --- back/src/streaming.rs | 14 ++++++++------ streamer/src/streaming.rs | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/back/src/streaming.rs b/back/src/streaming.rs index d5d6ec4..583af66 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -42,7 +42,7 @@ pub async fn start() { } async fn buffer_layer(mut message_consumer: Receiver, buffered_producer: Sender) { loop { - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(200)).await; let mut messages_buffered: Vec = vec![]; while message_consumer.len() > 0 { match message_consumer.recv().await { @@ -52,12 +52,14 @@ async fn buffer_layer(mut message_consumer: Receiver, buffered_producer Err(_) => {} } } + let mut message_buffered_concreted = String::new(); if messages_buffered.len() > 0 { for message_buffered in messages_buffered { - match buffered_producer.send(message_buffered) { - Ok(_) => {} - Err(_) => {} - } + message_buffered_concreted = format!("{}{}", message_buffered_concreted, message_buffered); + } + match buffered_producer.send(message_buffered_concreted.into()) { + Ok(_) => {} + Err(_) => {} } } } @@ -108,7 +110,7 @@ async fn message_organizer( message_producer.receiver_count() ); } - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(10)).await; } } async fn stream( diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 41c70d5..12ddd59 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -41,7 +41,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece let _zero = charred.remove(1); let _point = charred.remove(1); } - charred.truncate(4); + charred.truncate(8); let mut single_data_packet: Vec = vec![]; for char in charred { let char_packet = char.to_string().as_bytes().to_vec(); @@ -70,7 +70,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece message_producer.receiver_count() ); } - tokio::time::sleep(Duration::from_millis(1000)).await; + tokio::time::sleep(Duration::from_millis(10)).await; } }