From eb62dbd0233026e1a0a285905c4248fc85e09712 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Thu, 11 Apr 2024 12:36:17 +0300 Subject: [PATCH] perf: :zap: optimized data packets 100% --- front/src/components.rs | 5 +++-- front/src/main.rs | 4 ++-- front/src/status.rs | 27 ++++++++++++++++++++------- front/src/streaming.rs | 36 ++++++++++++++++++++++++------------ streamer/src/streaming.rs | 18 ++++++++++++------ 5 files changed, 61 insertions(+), 29 deletions(-) diff --git a/front/src/components.rs b/front/src/components.rs index f36aad6..b145aa8 100644 --- a/front/src/components.rs +++ b/front/src/components.rs @@ -107,12 +107,13 @@ pub fn coin_status_renderer(server_address: String) -> Element { is_loading.set(true); async move { match coin_status_check(&server_address).await { - Ok(coin_status) => { + Some(coin_status) => { is_loading.set(false); coin_result.set(coin_status); } - Err(_) => { + None => { is_loading.set(false); + coin_result.set(CoinStatus {status: Coin::Dead}); } } } diff --git a/front/src/main.rs b/front/src/main.rs index f7cb69f..d7389b7 100644 --- a/front/src/main.rs +++ b/front/src/main.rs @@ -12,8 +12,8 @@ fn app() -> Element { rsx! { page_base {} listen_renderer {} - coin_status_renderer {server_address:server_address.clone()} - server_status_renderer {server_address:server_address.clone()} + // coin_status_renderer {server_address:server_address.clone()} + // server_status_renderer {server_address:server_address.clone()} } } diff --git a/front/src/status.rs b/front/src/status.rs index 029ecd1..0baf241 100644 --- a/front/src/status.rs +++ b/front/src/status.rs @@ -20,12 +20,14 @@ impl Server { pub enum Coin { Tail, Head, + Dead, } impl Coin { pub fn to_string(&mut self) -> String { match self { Self::Head => String::from("Head"), Self::Tail => String::from("Tail"), + Self::Dead => String::from("Dead"), } } } @@ -73,11 +75,22 @@ pub async fn server_status_check( } } } -pub async fn coin_status_check(server_address: &String) -> Result { - Ok(reqwest::get(format!("{}{}", server_address, "/coin")) - .await - .unwrap() - .json::() - .await - .unwrap()) +pub async fn coin_status_check(server_address: &String) -> Option { + match reqwest::get(format!("{}{}", server_address, "/coin")).await { + Ok(response) => { + match response.json::().await { + Ok(coin_status)=> { + Some(coin_status) + } + Err(err_val) => { + log::error!("Error: Can't Deserialise -> {}", err_val); + None + } + } + }, + Err(err_val) => { + log::error!("Error: Response from Server -> {}", err_val); + None + }, + } } diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 72fa11e..8d077ba 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -31,24 +31,20 @@ pub async fn start_listening( is_maintaining.set((true, true)); let ring = HeapRb::::new(BUFFER_LENGTH); let (producer, consumer) = ring.split(); - spawn({ + let _sound_stream_task = spawn({ async move { sound_stream(is_listening, stream_consumer, producer).await; is_listening.set(false); is_maintaining.set((false, is_maintaining().1)); } }); - spawn({ + let _listen_podcast_task = spawn({ async move { listen_podcast(is_listening, consumer).await; is_listening.set(false); //stream_producer.send("Disconnect ME".into()).await.unwrap(); stream_producer.close().await.unwrap(); - //buffer time waiting actually - tokio_with_wasm::tokio::time::sleep(Duration::from_secs(2)).await; - log::info!("{:#?}", is_maintaining()); is_maintaining.set((is_maintaining().0, false)); - log::info!("pod{:#?}", is_maintaining()); } }); } @@ -63,14 +59,30 @@ pub async fn sound_stream( while let Some(msg) = stream_consumer.next().await { if is_listening() { - let data = String::from_utf8(msg.unwrap().into()).unwrap(); - let data_parsed: Vec<&str> = data.split("#").collect(); - for element in data_parsed { - let single_data: f32 = match element.parse() { - Ok(single) => single, + let data = msg.unwrap().to_string(); + //log::info!("{:#?}", data); + //log::info!("{}", data.len()); + + let mut datum_parsed:Vec = vec![]; + let mut data_parsed:Vec = vec![]; + + + for char in data.chars() { + if char == '+' || char == '-' { + data_parsed.push(datum_parsed.iter().collect()); + datum_parsed.clear(); + + } + datum_parsed.push(char); + + } + + for single_data in data_parsed { + let sample = match single_data.parse::() { + Ok(sample) => sample, Err(_) => 0.0, }; - if let Err(_) = producer.push(single_data) {} + if let Err(_) = producer.push(sample){} } } else { break; diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 307eb20..aa8c957 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -33,13 +33,19 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece Ok(single_data) => { let ring = HeapRb::::new(BUFFER_LENGTH); let (mut producer, mut consumer) = ring.split(); - let single_data_packet = single_data.to_string().as_bytes().to_vec(); - let terminator = "#".as_bytes().to_vec(); - - for element in single_data_packet { - producer.push(element).unwrap(); + let mut charred:Vec = single_data.to_string().chars().collect(); + if charred[0] == '0' { + charred.insert(0, '+'); } - for element in terminator { + charred.truncate(6); + let mut single_data_packet:Vec = vec![]; + for char in charred { + let char_packet = char.to_string().as_bytes().to_vec(); + for byte in char_packet { + single_data_packet.push(byte); + } + } + for element in single_data_packet { producer.push(element).unwrap(); } while !consumer.is_empty() {