diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 32d6369..565bd0a 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -59,14 +59,15 @@ pub async fn start(relay_configs: Config) { let buffer_layer_task: Option>; let (listener_stream_tasks_producer, listener_stream_tasks_receiver) = tokio::sync::mpsc::channel(BUFFER_LENGTH); - + let mut new_streamer = Streamer { + ip: "127.0.0.1".to_string().parse().unwrap(), + port: 0000, + }; loop { match streamer_socket.accept().await { Ok((streamer_tcp, streamer_info)) => { - let new_streamer = Streamer { - ip: streamer_info.ip(), - port: streamer_info.port(), - }; + new_streamer.ip = streamer_info.ip(); + new_streamer.port = streamer_info.port(); println!( "New Streamer: {:#?} | {:#?}", streamer_info, @@ -77,7 +78,7 @@ pub async fn start(relay_configs: Config) { match tokio_tungstenite::accept_async(streamer_tcp_tls).await { Ok(ws_stream) => { tokio::spawn(streamer_stream( - new_streamer, + new_streamer.clone(), record_producer, ws_stream, timer, @@ -91,7 +92,7 @@ pub async fn start(relay_configs: Config) { match tokio_tungstenite::accept_async(streamer_tcp).await { Ok(ws_stream) => { tokio::spawn(streamer_stream( - new_streamer, + new_streamer.clone(), record_producer, ws_stream, timer, @@ -123,6 +124,7 @@ pub async fn start(relay_configs: Config) { tokio::spawn(status_checker( buffered_producer.clone(), timer, + new_streamer, streamer_alive_receiver, message_organizer_task, buffer_layer_task, @@ -163,6 +165,7 @@ pub async fn start(relay_configs: Config) { async fn status_checker( buffered_producer: Sender, timer: Instant, + streamer: Streamer, mut streamer_alive_receiver: tokio::sync::oneshot::Receiver, message_organizer_task: Option>, buffer_layer_task: Option>, @@ -175,15 +178,30 @@ async fn status_checker( tokio::time::sleep(Duration::from_secs(3)).await; match streamer_alive_receiver.try_recv() { Ok(_) => { - println!("Streamer Cleaning"); + println!( + "Cleaning: Streamer Disconnected | {}", + format!("{}:{}", streamer.ip, streamer.port) + ); + let cleaning_timer = Instant::now(); message_organizer_task.as_ref().unwrap().abort(); buffer_layer_task.as_ref().unwrap().abort(); + let mut listener_task_counter = 0; while listener_stream_tasks_receiver.len() > 0 { match listener_stream_tasks_receiver.recv().await { - Some(listener_stream_task) => listener_stream_task.abort(), + Some(listener_stream_task) => { + listener_stream_task.abort(); + listener_task_counter += 1; + } None => {} } } + println!( + "Cleaning Done: Streamer Disconnected | {} | Disconnected Listener(s) = {} | {:#?}", + format!("{}:{}", streamer.ip, streamer.port), + listener_task_counter, + cleaning_timer.elapsed() + ); + return; } Err(_) => {} }