perf: optimized data packets 50%

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-11 20:53:35 +03:00
parent e04666b620
commit 1ccc335477
3 changed files with 36 additions and 32 deletions

View file

@ -1,4 +1,3 @@
use std::time::Duration; use std::time::Duration;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
@ -43,47 +42,48 @@ pub async fn start() {
} }
async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer: Sender<Message>) { async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer: Sender<Message>) {
loop { loop {
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_millis(500)).await;
let mut messages_buffered: Vec<Message> = vec![]; let mut messages_buffered: Vec<Message> = vec![];
while message_consumer.len() > 0 { while message_consumer.len() > 0 {
match message_consumer.recv().await { match message_consumer.recv().await {
Ok(msg) => { Ok(msg) => {
messages_buffered.push(msg); messages_buffered.push(msg);
}Err(_) => { }
Err(_) => {}
}
} }
} }
if messages_buffered.len() > 0 {
for message_buffered in messages_buffered { for message_buffered in messages_buffered {
match buffered_producer.send(message_buffered) { match buffered_producer.send(message_buffered) {
Ok(_) => {}, Ok(_) => {}
Err(_) => {}, Err(_) => {}
}
} }
} }
} }
} }
async fn streamer_stream(record_producer:Sender<Message>, mut ws_stream: WebSocketStream<TcpStream>) { async fn streamer_stream(
record_producer: Sender<Message>,
mut ws_stream: WebSocketStream<TcpStream>,
) {
while let Some(message_with_question) = ws_stream.next().await { while let Some(message_with_question) = ws_stream.next().await {
match message_with_question { match message_with_question {
Ok(message) => { Ok(message) => {
println!("{}", message.len()); println!("{}", message.len());
match record_producer.send(message) { match record_producer.send(message) {
Ok(_) => { Ok(_) => {}
Err(_) => {}
}
Err(_) => {
} }
} }
} Err(_) => {}
Err(_) => {
}
} }
} }
} }
async fn message_organizer(message_producer: Sender<Message>, mut record_consumer: Receiver<Message>) { async fn message_organizer(
message_producer: Sender<Message>,
mut record_consumer: Receiver<Message>,
) {
loop { loop {
let mut messages = String::new(); let mut messages = String::new();
let mut iteration = record_consumer.len(); let mut iteration = record_consumer.len();
@ -93,11 +93,8 @@ async fn message_organizer(message_producer: Sender<Message>, mut record_consume
Ok(single_message) => { Ok(single_message) => {
let single_message_packet = single_message.to_string(); let single_message_packet = single_message.to_string();
messages = format!("{}{}", messages, single_message_packet); messages = format!("{}{}", messages, single_message_packet);
}
Err(_) => {
} }
Err(_) => {}
} }
} }
if messages.len() > 0 { if messages.len() > 0 {
@ -147,4 +144,3 @@ async fn stream(
} }
} }
} }

View file

@ -61,7 +61,7 @@ pub async fn sound_stream(
if is_listening() { if is_listening() {
let data = msg.unwrap().to_string(); let data = msg.unwrap().to_string();
//log::info!("{:#?}", data); //log::info!("{:#?}", data);
//log::info!("{}", data.len()); log::info!("{}", data.len());
let mut datum_parsed:Vec<char> = vec![]; let mut datum_parsed:Vec<char> = vec![];
let mut data_parsed:Vec<String> = vec![]; let mut data_parsed:Vec<String> = vec![];
@ -71,10 +71,14 @@ pub async fn sound_stream(
if char == '+' || char == '-' { if char == '+' || char == '-' {
data_parsed.push(datum_parsed.iter().collect()); data_parsed.push(datum_parsed.iter().collect());
datum_parsed.clear(); datum_parsed.clear();
} }
datum_parsed.push(char); datum_parsed.push(char);
if data.len() > 2 {
if char == '+' || char == '-' {
datum_parsed.push('0');
datum_parsed.push('.');
}
}
} }
for single_data in data_parsed { for single_data in data_parsed {

View file

@ -33,12 +33,16 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
Ok(single_data) => { Ok(single_data) => {
let ring = HeapRb::<u8>::new(BUFFER_LENGTH); let ring = HeapRb::<u8>::new(BUFFER_LENGTH);
let (mut producer, mut consumer) = ring.split(); let (mut producer, mut consumer) = ring.split();
let mut charred:Vec<char> = single_data.to_string().chars().collect(); let mut charred: Vec<char> = single_data.to_string().chars().collect();
if charred[0] == '0' { if charred[0] == '0' {
charred.insert(0, '+'); charred.insert(0, '+');
} }
charred.truncate(6); if charred.len() > 2 {
let mut single_data_packet:Vec<u8> = vec![]; let _zero = charred.remove(1);
let _point = charred.remove(1);
}
charred.truncate(4);
let mut single_data_packet: Vec<u8> = vec![];
for char in charred { for char in charred {
let char_packet = char.to_string().as_bytes().to_vec(); let char_packet = char.to_string().as_bytes().to_vec();
for byte in char_packet { for byte in char_packet {
@ -66,7 +70,7 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
message_producer.receiver_count() message_producer.receiver_count()
); );
} }
tokio::time::sleep(Duration::from_millis(500)).await; tokio::time::sleep(Duration::from_millis(1000)).await;
} }
} }