From 1ccc335477501fa136863578356ea1c36ab8893f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Thu, 11 Apr 2024 20:53:35 +0300 Subject: [PATCH] perf: :zap: optimized data packets 50% --- back/src/streaming.rs | 46 ++++++++++++++++++--------------------- front/src/streaming.rs | 10 ++++++--- streamer/src/streaming.rs | 12 ++++++---- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 6e464c4..d5d6ec4 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -1,4 +1,3 @@ - use std::time::Duration; use futures_util::{SinkExt, StreamExt}; @@ -22,7 +21,7 @@ pub async fn start() { println!("New Streamer: {:#?}", streamer_info); tokio::spawn(streamer_stream(record_producer, ws_stream)); } - + let (message_producer, message_consumer) = channel(BUFFER_LENGTH); let (buffered_producer, _) = channel(BUFFER_LENGTH); tokio::spawn(message_organizer(message_producer.clone(), record_consumer)); @@ -43,47 +42,48 @@ pub async fn start() { } async fn buffer_layer(mut message_consumer: Receiver, buffered_producer: Sender) { loop { - tokio::time::sleep(Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_millis(500)).await; let mut messages_buffered: Vec = vec![]; while message_consumer.len() > 0 { match message_consumer.recv().await { Ok(msg) => { messages_buffered.push(msg); - }Err(_) => { - } + Err(_) => {} } } - for message_buffered in messages_buffered { - match buffered_producer.send(message_buffered) { - Ok(_) => {}, - Err(_) => {}, + if messages_buffered.len() > 0 { + for message_buffered in messages_buffered { + match buffered_producer.send(message_buffered) { + Ok(_) => {} + Err(_) => {} + } } } } } -async fn streamer_stream(record_producer:Sender, mut ws_stream: WebSocketStream) { +async fn streamer_stream( + record_producer: Sender, + mut ws_stream: WebSocketStream, +) { while let Some(message_with_question) = ws_stream.next().await { match message_with_question { Ok(message) => { println!("{}", message.len()); match record_producer.send(message) { - Ok(_) => { - - } - Err(_) => { - - } + Ok(_) => {} + Err(_) => {} } } - Err(_) => { - - } + Err(_) => {} } } } -async fn message_organizer(message_producer: Sender, mut record_consumer: Receiver) { +async fn message_organizer( + message_producer: Sender, + mut record_consumer: Receiver, +) { loop { let mut messages = String::new(); let mut iteration = record_consumer.len(); @@ -93,11 +93,8 @@ async fn message_organizer(message_producer: Sender, mut record_consume Ok(single_message) => { let single_message_packet = single_message.to_string(); messages = format!("{}{}", messages, single_message_packet); - - } - Err(_) => { - } + Err(_) => {} } } if messages.len() > 0 { @@ -147,4 +144,3 @@ async fn stream( } } } - diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 8d077ba..f99097b 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -61,7 +61,7 @@ pub async fn sound_stream( if is_listening() { let data = msg.unwrap().to_string(); //log::info!("{:#?}", data); - //log::info!("{}", data.len()); + log::info!("{}", data.len()); let mut datum_parsed:Vec = vec![]; let mut data_parsed:Vec = vec![]; @@ -71,10 +71,14 @@ pub async fn sound_stream( if char == '+' || char == '-' { data_parsed.push(datum_parsed.iter().collect()); datum_parsed.clear(); - } datum_parsed.push(char); - + if data.len() > 2 { + if char == '+' || char == '-' { + datum_parsed.push('0'); + datum_parsed.push('.'); + } + } } for single_data in data_parsed { diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index de198e3..41c70d5 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -33,12 +33,16 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece Ok(single_data) => { let ring = HeapRb::::new(BUFFER_LENGTH); let (mut producer, mut consumer) = ring.split(); - let mut charred:Vec = single_data.to_string().chars().collect(); + let mut charred: Vec = single_data.to_string().chars().collect(); if charred[0] == '0' { charred.insert(0, '+'); } - charred.truncate(6); - let mut single_data_packet:Vec = vec![]; + if charred.len() > 2 { + let _zero = charred.remove(1); + let _point = charred.remove(1); + } + charred.truncate(4); + let mut single_data_packet: Vec = vec![]; for char in charred { let char_packet = char.to_string().as_bytes().to_vec(); for byte in char_packet { @@ -66,7 +70,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece message_producer.receiver_count() ); } - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(1000)).await; } }