From e26b0b9fd1872ee318e8bfec2ff78f58ef7d7d7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Tue, 9 Apr 2024 02:20:24 +0300 Subject: [PATCH] refactor: :recycle: refactor ring with channel --- back/src/lib.rs | 5 ++-- back/src/streaming.rs | 62 ++++++++++++++++++++---------------------- front/src/streaming.rs | 16 ++++++----- 3 files changed, 42 insertions(+), 41 deletions(-) diff --git a/back/src/lib.rs b/back/src/lib.rs index 8c00fa7..187d31b 100644 --- a/back/src/lib.rs +++ b/back/src/lib.rs @@ -6,13 +6,14 @@ pub mod routing; pub mod streaming; pub mod utils; +#[derive(Debug, Clone)] +pub struct AppState {} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Listener { ip: IpAddr, port: u16, } -#[derive(Debug, Clone)] -pub struct AppState {} #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] enum ServerStatus { Alive, diff --git a/back/src/streaming.rs b/back/src/streaming.rs index e0389c8..6192d4d 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -1,28 +1,27 @@ -use std::{mem::MaybeUninit, sync::Arc, time::Duration}; +use std::time::Duration; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use futures_util::SinkExt; -use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; +use ringbuf::HeapRb; use tokio::{ net::{TcpListener, TcpStream}, sync::broadcast::{channel, Receiver, Sender}, - time::Instant, }; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use crate::Listener; const BUFFER_LENGTH: usize = 1000000; -const MAX_TOLERATED_MESSAGE_COUNT:usize = 10; +const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start() { let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); println!("Dude Someone Triggered"); - let ring = HeapRb::::new(BUFFER_LENGTH); - let (producer, consumer) = ring.split(); - let timer = Instant::now(); + + let (record_producer, record_consumer) = channel(BUFFER_LENGTH); + let (message_producer, _) = channel(BUFFER_LENGTH); - tokio::spawn(record(producer)); - tokio::spawn(parent_stream(timer, message_producer.clone(), consumer)); + tokio::spawn(record(record_producer)); + tokio::spawn(message_organizer(message_producer.clone(), record_consumer)); while let Ok((tcp_stream, info)) = socket.accept().await { let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); println!("New Connection: {}", info); @@ -37,18 +36,15 @@ pub async fn start() { )); } } -pub async fn parent_stream( - timer: Instant, - message_producer: Sender, - mut consumer: Consumer>>>>, -) { +async fn message_organizer(message_producer: Sender, mut consumer: Receiver) { loop { let mut single_message: Vec = Vec::new(); - let now = timer.elapsed().as_secs(); - while !consumer.is_empty() && (timer.elapsed().as_secs() + 5) > now { - match consumer.pop() { - Some(single_data) => { - let ring = HeapRb::::new(1000000); + let mut iteration = consumer.len(); + while iteration > 0 { + iteration -= 1; + match consumer.recv().await { + Ok(single_data) => { + let ring = HeapRb::::new(BUFFER_LENGTH); let (mut producer, mut consumer) = ring.split(); let single_data_packet = single_data.to_string().as_bytes().to_vec(); let terminator = "#".as_bytes().to_vec(); @@ -63,22 +59,24 @@ pub async fn parent_stream( single_message.push(consumer.pop().unwrap()); } } - None => {} + Err(_) => {} } } - match message_producer.send(single_message.into()) { - Ok(_) => {} - Err(_) => {} + tokio::time::sleep(Duration::from_secs(2)).await; + if !single_message.is_empty() { + match message_producer.send(single_message.into()) { + Ok(_) => {} + Err(_) => {} + } + println!( + "Message Counter = {} | Receiver Count = {}", + message_producer.len(), + message_producer.receiver_count() + ); } - println!( - "Message Len = {} | Receiver Count = {}", - message_producer.len(), - message_producer.receiver_count() - ); - tokio::time::sleep(Duration::from_secs(1)).await; } } -pub async fn stream( +async fn stream( listener: Listener, mut ws_stream: WebSocketStream, mut message_consumer: Receiver, @@ -112,7 +110,7 @@ pub async fn stream( } } -pub async fn record(mut producer: Producer>>>>) { +async fn record(producer: Sender) { println!("Hello, world!"); let host = cpal::default_host(); let input_device = host.default_input_device().unwrap(); @@ -123,7 +121,7 @@ pub async fn record(mut producer: Producer {} Err(_) => {} } diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 346fd54..4425880 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -5,7 +5,7 @@ use dioxus::{ prelude::spawn, signals::{Signal, Writable}, }; -use futures_util::StreamExt; +use futures_util::{stream::SplitStream, SinkExt, StreamExt}; use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; use tokio_tungstenite_wasm::WebSocketStream; @@ -20,9 +20,9 @@ pub async fn start_listening( if is_listening() { log::info!("Trying Sir"); let connect_addr = "ws://192.168.1.2:2424"; - let stream: WebSocketStream; + let (mut stream_producer, stream_consumer); match tokio_tungstenite_wasm::connect(connect_addr).await { - Ok(ws_stream) => stream = ws_stream, + Ok(ws_stream) => (stream_producer, stream_consumer) = ws_stream.split(), Err(_) => { is_listening.set(false); return; @@ -33,7 +33,7 @@ pub async fn start_listening( let (producer, consumer) = ring.split(); spawn({ async move { - sound_stream(is_listening, stream, producer).await; + sound_stream(is_listening, stream_consumer, producer).await; is_listening.set(false); is_maintaining.set((false, is_maintaining().1)); } @@ -42,6 +42,8 @@ pub async fn start_listening( async move { listen_podcast(is_listening, consumer).await; is_listening.set(false); + //stream_producer.send("Disconnect ME".into()).await.unwrap(); + stream_producer.close().await.unwrap(); //buffer time waiting actually tokio_with_wasm::tokio::time::sleep(Duration::from_secs(2)).await; log::info!("{:#?}", is_maintaining()); @@ -54,12 +56,12 @@ pub async fn start_listening( pub async fn sound_stream( is_listening: Signal, - mut stream: WebSocketStream, + mut stream_consumer: SplitStream, mut producer: Producer>>>>, ) { log::info!("Attention! We need cables"); - - while let Some(msg) = stream.next().await { + + while let Some(msg) = stream_consumer.next().await { if is_listening() { let data = String::from_utf8(msg.unwrap().into()).unwrap(); let data_parsed: Vec<&str> = data.split("#").collect();