diff --git a/back/src/lib.rs b/back/src/lib.rs index 0661cfb..8c00fa7 100644 --- a/back/src/lib.rs +++ b/back/src/lib.rs @@ -1,9 +1,16 @@ +use std::net::IpAddr; + use serde::{Deserialize, Serialize}; pub mod routing; pub mod streaming; pub mod utils; +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Listener { + ip: IpAddr, + port: u16, +} #[derive(Debug, Clone)] pub struct AppState {} #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] diff --git a/back/src/streaming.rs b/back/src/streaming.rs index b74de32..e0389c8 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -5,42 +5,54 @@ use futures_util::SinkExt; use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; use tokio::{ net::{TcpListener, TcpStream}, + sync::broadcast::{channel, Receiver, Sender}, time::Instant, }; -use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use crate::Listener; + +const BUFFER_LENGTH: usize = 1000000; +const MAX_TOLERATED_MESSAGE_COUNT:usize = 10; pub async fn start() { let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); - while let Ok((tcp_stream, _)) = socket.accept().await { - println!("Dude Someone Triggered"); - let ring = HeapRb::::new(1000000); - let (producer, consumer) = ring.split(); + println!("Dude Someone Triggered"); + let ring = HeapRb::::new(BUFFER_LENGTH); + let (producer, consumer) = ring.split(); + let timer = Instant::now(); + let (message_producer, _) = channel(BUFFER_LENGTH); + tokio::spawn(record(producer)); + tokio::spawn(parent_stream(timer, message_producer.clone(), consumer)); + while let Ok((tcp_stream, info)) = socket.accept().await { let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); - - let timer = Instant::now(); - - tokio::spawn(record(producer)); - tokio::spawn(stream(timer, ws_stream, consumer)); + println!("New Connection: {}", info); + let new_listener = Listener { + ip: info.ip(), + port: info.port(), + }; + tokio::spawn(stream( + new_listener, + ws_stream, + message_producer.subscribe(), + )); } } - -pub async fn stream( +pub async fn parent_stream( timer: Instant, - mut ws_stream: WebSocketStream, + message_producer: Sender, mut consumer: Consumer>>>>, ) { - println!("Waiting"); loop { - tokio::time::sleep(Duration::from_secs(2)).await; - let mut data: Vec = Vec::new(); + let mut single_message: Vec = Vec::new(); let now = timer.elapsed().as_secs(); - while !consumer.is_empty() && (timer.elapsed().as_secs() + 2) > now { + while !consumer.is_empty() && (timer.elapsed().as_secs() + 5) > now { match consumer.pop() { Some(single_data) => { let ring = HeapRb::::new(1000000); let (mut producer, mut consumer) = ring.split(); let single_data_packet = single_data.to_string().as_bytes().to_vec(); let terminator = "#".as_bytes().to_vec(); + for element in single_data_packet { producer.push(element).unwrap(); } @@ -48,14 +60,55 @@ pub async fn stream( producer.push(element).unwrap(); } while !consumer.is_empty() { - data.push(consumer.pop().unwrap()); + single_message.push(consumer.pop().unwrap()); } } None => {} } } - ws_stream.send(data.into()).await.unwrap(); - ws_stream.flush().await.unwrap(); + match message_producer.send(single_message.into()) { + Ok(_) => {} + Err(_) => {} + } + println!( + "Message Len = {} | Receiver Count = {}", + message_producer.len(), + message_producer.receiver_count() + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } +} +pub async fn stream( + listener: Listener, + mut ws_stream: WebSocketStream, + mut message_consumer: Receiver, +) { + while let Ok(message) = message_consumer.recv().await { + if message_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT { + println!( + "{} Forced to Disconnect | Reason -> Slow Consumer", + format!("{}:{}", listener.ip, listener.port) + ); + break; + } + match ws_stream.send(message).await { + Ok(_) => { + if let Err(_) = ws_stream.flush().await { + println!( + "{} is Disconnected", + format!("{}:{}", listener.ip, listener.port) + ); + break; + } + } + Err(_) => { + println!( + "{} is Disconnected", + format!("{}:{}", listener.ip, listener.port) + ); + break; + } + } } } @@ -74,7 +127,7 @@ pub async fn record(mut producer: Producer {} Err(_) => {} } - println!("{}", sample); + //println!("{}", sample); } }; @@ -84,7 +137,8 @@ pub async fn record(mut producer: Producer Element { } }); } - } - else { + } else { is_listening.set(false); } }; diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 5117ec3..346fd54 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -70,8 +70,7 @@ pub async fn sound_stream( }; if let Err(_) = producer.push(single_data) {} } - } - else { + } else { break; } }