chore: 🔊 not necessary little chore update

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-22 22:07:10 +03:00
parent 5e5751c4bc
commit 5bdc68b6ee

View file

@ -59,14 +59,15 @@ pub async fn start(relay_configs: Config) {
let buffer_layer_task: Option<JoinHandle<()>>; let buffer_layer_task: Option<JoinHandle<()>>;
let (listener_stream_tasks_producer, listener_stream_tasks_receiver) = let (listener_stream_tasks_producer, listener_stream_tasks_receiver) =
tokio::sync::mpsc::channel(BUFFER_LENGTH); tokio::sync::mpsc::channel(BUFFER_LENGTH);
let mut new_streamer = Streamer {
ip: "127.0.0.1".to_string().parse().unwrap(),
port: 0000,
};
loop { loop {
match streamer_socket.accept().await { match streamer_socket.accept().await {
Ok((streamer_tcp, streamer_info)) => { Ok((streamer_tcp, streamer_info)) => {
let new_streamer = Streamer { new_streamer.ip = streamer_info.ip();
ip: streamer_info.ip(), new_streamer.port = streamer_info.port();
port: streamer_info.port(),
};
println!( println!(
"New Streamer: {:#?} | {:#?}", "New Streamer: {:#?} | {:#?}",
streamer_info, streamer_info,
@ -77,7 +78,7 @@ pub async fn start(relay_configs: Config) {
match tokio_tungstenite::accept_async(streamer_tcp_tls).await { match tokio_tungstenite::accept_async(streamer_tcp_tls).await {
Ok(ws_stream) => { Ok(ws_stream) => {
tokio::spawn(streamer_stream( tokio::spawn(streamer_stream(
new_streamer, new_streamer.clone(),
record_producer, record_producer,
ws_stream, ws_stream,
timer, timer,
@ -91,7 +92,7 @@ pub async fn start(relay_configs: Config) {
match tokio_tungstenite::accept_async(streamer_tcp).await { match tokio_tungstenite::accept_async(streamer_tcp).await {
Ok(ws_stream) => { Ok(ws_stream) => {
tokio::spawn(streamer_stream( tokio::spawn(streamer_stream(
new_streamer, new_streamer.clone(),
record_producer, record_producer,
ws_stream, ws_stream,
timer, timer,
@ -123,6 +124,7 @@ pub async fn start(relay_configs: Config) {
tokio::spawn(status_checker( tokio::spawn(status_checker(
buffered_producer.clone(), buffered_producer.clone(),
timer, timer,
new_streamer,
streamer_alive_receiver, streamer_alive_receiver,
message_organizer_task, message_organizer_task,
buffer_layer_task, buffer_layer_task,
@ -163,6 +165,7 @@ pub async fn start(relay_configs: Config) {
async fn status_checker( async fn status_checker(
buffered_producer: Sender<Message>, buffered_producer: Sender<Message>,
timer: Instant, timer: Instant,
streamer: Streamer,
mut streamer_alive_receiver: tokio::sync::oneshot::Receiver<bool>, mut streamer_alive_receiver: tokio::sync::oneshot::Receiver<bool>,
message_organizer_task: Option<JoinHandle<()>>, message_organizer_task: Option<JoinHandle<()>>,
buffer_layer_task: Option<JoinHandle<()>>, buffer_layer_task: Option<JoinHandle<()>>,
@ -175,15 +178,30 @@ async fn status_checker(
tokio::time::sleep(Duration::from_secs(3)).await; tokio::time::sleep(Duration::from_secs(3)).await;
match streamer_alive_receiver.try_recv() { match streamer_alive_receiver.try_recv() {
Ok(_) => { Ok(_) => {
println!("Streamer Cleaning"); println!(
"Cleaning: Streamer Disconnected | {}",
format!("{}:{}", streamer.ip, streamer.port)
);
let cleaning_timer = Instant::now();
message_organizer_task.as_ref().unwrap().abort(); message_organizer_task.as_ref().unwrap().abort();
buffer_layer_task.as_ref().unwrap().abort(); buffer_layer_task.as_ref().unwrap().abort();
let mut listener_task_counter = 0;
while listener_stream_tasks_receiver.len() > 0 { while listener_stream_tasks_receiver.len() > 0 {
match listener_stream_tasks_receiver.recv().await { match listener_stream_tasks_receiver.recv().await {
Some(listener_stream_task) => listener_stream_task.abort(), Some(listener_stream_task) => {
listener_stream_task.abort();
listener_task_counter += 1;
}
None => {} None => {}
} }
} }
println!(
"Cleaning Done: Streamer Disconnected | {} | Disconnected Listener(s) = {} | {:#?}",
format!("{}:{}", streamer.ip, streamer.port),
listener_task_counter,
cleaning_timer.elapsed()
);
return;
} }
Err(_) => {} Err(_) => {}
} }