From 5e5751c4bcb4b1f6a68572ee840c1c0c5ed9e2ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sun, 21 Apr 2024 20:22:30 +0300 Subject: [PATCH] feat: :sparkles: disconnect all listeners when streamer disconnected --- back/src/main.rs | 5 +++- back/src/streaming.rs | 63 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/back/src/main.rs b/back/src/main.rs index 8a40574..ac298d6 100644 --- a/back/src/main.rs +++ b/back/src/main.rs @@ -19,7 +19,10 @@ async fn main() { .parse::() .unwrap(), ); - println!("\n\n\tOn Air -> http://{}\n\n", relay_config.axum_address.clone()); + println!( + "\n\n\tOn Air -> http://{}\n\n", + relay_config.axum_address.clone() + ); tokio::spawn(streaming::start(relay_config)); axum_server::bind_rustls(addr, rustls_config) .serve(app.into_make_service()) diff --git a/back/src/streaming.rs b/back/src/streaming.rs index cf91ccd..32d6369 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -11,6 +11,7 @@ use rustls_pemfile::{certs, pkcs8_private_keys}; use tokio::{ net::TcpListener, sync::broadcast::{channel, Receiver, Sender}, + task::JoinHandle, time::Instant, }; use tokio_rustls::{ @@ -52,6 +53,13 @@ pub async fn start(relay_configs: Config) { .with_single_cert(fullchain, privkey) .unwrap(); let acceptor = TlsAcceptor::from(Arc::new(server_tls_config)); + + let (streamer_alive_producer, streamer_alive_receiver) = tokio::sync::oneshot::channel(); + let message_organizer_task: Option>; + let buffer_layer_task: Option>; + let (listener_stream_tasks_producer, listener_stream_tasks_receiver) = + tokio::sync::mpsc::channel(BUFFER_LENGTH); + loop { match streamer_socket.accept().await { Ok((streamer_tcp, streamer_info)) => { @@ -73,6 +81,7 @@ pub async fn start(relay_configs: Config) { record_producer, ws_stream, timer, + streamer_alive_producer, )); break; } @@ -86,6 +95,7 @@ pub async fn start(relay_configs: Config) { record_producer, ws_stream, timer, + streamer_alive_producer, )); break; } @@ -98,17 +108,26 @@ pub async fn start(relay_configs: Config) { } let (message_producer, message_consumer) = channel(BUFFER_LENGTH); let (buffered_producer, _) = channel(BUFFER_LENGTH); - tokio::spawn(message_organizer( + message_organizer_task = tokio::spawn(message_organizer( message_producer.clone(), record_consumer, relay_configs.latency, - )); - tokio::spawn(buffer_layer( + )) + .into(); + buffer_layer_task = tokio::spawn(buffer_layer( message_consumer, buffered_producer.clone(), relay_configs.latency, + )) + .into(); + tokio::spawn(status_checker( + buffered_producer.clone(), + timer, + streamer_alive_receiver, + message_organizer_task, + buffer_layer_task, + listener_stream_tasks_receiver, )); - tokio::spawn(status_checker(buffered_producer.clone(), timer)); while let Ok((tcp_stream, listener_info)) = listener_socket.accept().await { let new_listener = Listener { ip: listener_info.ip(), @@ -119,28 +138,56 @@ pub async fn start(relay_configs: Config) { let wss_stream = tokio_tungstenite::accept_async(streamer_tcp_tls) .await .unwrap(); - tokio::spawn(stream( + let listener_stream_task = tokio::spawn(stream( new_listener, wss_stream, buffered_producer.subscribe(), )); + let _ = listener_stream_tasks_producer + .send(listener_stream_task) + .await; } else { let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); - tokio::spawn(stream( + let listener_stream_task = tokio::spawn(stream( new_listener, ws_stream, buffered_producer.subscribe(), )); + let _ = listener_stream_tasks_producer + .send(listener_stream_task) + .await; } println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); } } -async fn status_checker(buffered_producer: Sender, timer: Instant) { +async fn status_checker( + buffered_producer: Sender, + timer: Instant, + mut streamer_alive_receiver: tokio::sync::oneshot::Receiver, + message_organizer_task: Option>, + buffer_layer_task: Option>, + mut listener_stream_tasks_receiver: tokio::sync::mpsc::Receiver>, +) { let mut listener_counter = buffered_producer.receiver_count(); let mut bottleneck_flag = false; //let mut buffer_len = buffered_producer.len(); loop { tokio::time::sleep(Duration::from_secs(3)).await; + match streamer_alive_receiver.try_recv() { + Ok(_) => { + println!("Streamer Cleaning"); + message_organizer_task.as_ref().unwrap().abort(); + buffer_layer_task.as_ref().unwrap().abort(); + while listener_stream_tasks_receiver.len() > 0 { + match listener_stream_tasks_receiver.recv().await { + Some(listener_stream_task) => listener_stream_task.abort(), + None => {} + } + } + } + Err(_) => {} + } + if buffered_producer.receiver_count() != 0 { if buffered_producer.len() > 2 { bottleneck_flag = true; @@ -186,6 +233,7 @@ async fn streamer_stream< record_producer: Sender, mut ws_stream: T, timer: Instant, + streamer_alive_producer: tokio::sync::oneshot::Sender, ) { loop { match ws_stream.next().await { @@ -207,6 +255,7 @@ async fn streamer_stream< format!("{}:{}", streamer.ip, streamer.port), timer.elapsed() ); + streamer_alive_producer.send(false).unwrap(); return; } }