diff --git a/back/src/lib.rs b/back/src/lib.rs index 187d31b..497a913 100644 --- a/back/src/lib.rs +++ b/back/src/lib.rs @@ -9,6 +9,12 @@ pub mod utils; #[derive(Debug, Clone)] pub struct AppState {} +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Streamer { + ip: IpAddr, + port: u16, +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Listener { ip: IpAddr, diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 029c813..2d4785b 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -3,32 +3,48 @@ use std::time::Duration; use futures_util::{SinkExt, StreamExt}; use tokio::{ net::{TcpListener, TcpStream}, - sync::broadcast::{channel, Receiver, Sender}, + sync::broadcast::{channel, Receiver, Sender}, time::Instant, }; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; -use crate::Listener; +use crate::{Listener, Streamer}; const BUFFER_LENGTH: usize = 1000000; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start() { + //need to move them for multi streamer let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); let (record_producer, record_consumer) = channel(BUFFER_LENGTH); let streamer_socket = TcpListener::bind("192.168.1.2:2525").await.unwrap(); - if let Ok((streamer_tcp, streamer_info)) = streamer_socket.accept().await { - let ws_stream = tokio_tungstenite::accept_async(streamer_tcp).await.unwrap(); - println!("New Streamer: {:#?}", streamer_info); - tokio::spawn(streamer_stream(record_producer, ws_stream)); + let timer = Instant::now(); + + loop { + match streamer_socket.accept().await { + Ok((streamer_tcp, streamer_info)) => { + match tokio_tungstenite::accept_async(streamer_tcp).await { + Ok(ws_stream) => { + println!("New Streamer: {:#?} | {:#?}", streamer_info, timer.elapsed()); + let new_streamer = Streamer { + ip: streamer_info.ip(), + port: streamer_info.port(), + }; + tokio::spawn(streamer_stream(new_streamer, record_producer, ws_stream, timer)); + break; + }, + Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), + } + }, + Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val), + } } - let (message_producer, message_consumer) = channel(BUFFER_LENGTH); let (buffered_producer, _) = channel(BUFFER_LENGTH); tokio::spawn(message_organizer(message_producer.clone(), record_consumer)); tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone())); - tokio::spawn(status_checker(buffered_producer.clone())); + tokio::spawn(status_checker(buffered_producer.clone(), timer)); while let Ok((tcp_stream, listener_info)) = socket.accept().await { let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); - println!("New Listener: {}", listener_info); + println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); let new_listener = Listener { ip: listener_info.ip(), port: listener_info.port(), @@ -40,14 +56,20 @@ pub async fn start() { )); } } -async fn status_checker(buffered_producer: Sender) { +async fn status_checker(buffered_producer: Sender, timer: Instant) { let mut listener_counter = buffered_producer.receiver_count(); + let mut bottleneck_flag = false; //let mut buffer_len = buffered_producer.len(); loop { tokio::time::sleep(Duration::from_secs(3)).await; if buffered_producer.receiver_count() != 0 { if buffered_producer.len() > 2 { - println!("Bottleneck: {}", buffered_producer.len()); + bottleneck_flag = true; + println!("Bottleneck: {} | {:#?}", buffered_producer.len(), timer.elapsed()); + } + if bottleneck_flag && buffered_producer.len() < 2 { + bottleneck_flag = false; + println!("Flawless Again"); } if listener_counter != buffered_producer.receiver_count() { listener_counter = buffered_producer.receiver_count(); @@ -74,19 +96,29 @@ async fn buffer_layer(mut message_consumer: Receiver, buffered_producer } } async fn streamer_stream( + streamer: Streamer, record_producer: Sender, mut ws_stream: WebSocketStream, + timer: Instant, ) { - while let Some(message_with_question) = ws_stream.next().await { - match message_with_question { - Ok(message) => { - //println!("{}", message.len()); - match record_producer.send(message) { - Ok(_) => {} + loop { + match ws_stream.next().await { + Some(message_with_question) => { + match message_with_question { + Ok(message) => { + //println!("{}", message.len()); + match record_producer.send(message) { + Ok(_) => {} + Err(_) => {} + } + } Err(_) => {} - } + } + } + None => { + println!("Streamer Disconnected: {} | {:#?}", format!("{}:{}", streamer.ip, streamer.port), timer.elapsed()); + return; } - Err(_) => {} } } } diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 5f8284d..4dd0d9a 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -11,13 +11,14 @@ use futures_util::{stream::SplitStream, SinkExt, StreamExt}; use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; use tokio_tungstenite_wasm::{Message, WebSocketStream}; -static BUFFER_LIMIT: usize = 800000; static BUFFER_LENGTH: usize = 1000000; +static BUFFER_LIMIT: usize = BUFFER_LENGTH/100*90; pub async fn start_listening( mut is_maintaining: Signal<(bool, bool)>, mut is_listening: Signal, ) { + //seperate record and stream, refactor if is_listening() { log::info!("Trying Sir"); let connect_addr = "ws://192.168.1.2:2424";