From 94fc639c7444bb2c3184d52466601510281a6255 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Fri, 17 May 2024 02:51:50 +0300 Subject: [PATCH] fix: :bug: no detection for server based disconnection --- streamer/src/gui.rs | 35 ++++++++++++++++++++++++++++++-- streamer/src/gui_utils.rs | 42 +++++++++++++++++++++++++++++++++++++++ streamer/src/streaming.rs | 40 +++++++++++++++++++++++++++++++++---- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/streamer/src/gui.rs b/streamer/src/gui.rs index 539cfa7..4e8d3d1 100644 --- a/streamer/src/gui.rs +++ b/streamer/src/gui.rs @@ -91,6 +91,7 @@ struct DataChannel { struct CommunicationChannel { base_to_streaming_sender: Sender, streaming_to_base_sender: Sender, + streaming_to_base_is_finished: Sender, base_to_recording_sender: Sender, recording_to_base_sender: Sender, base_to_playing_sender: Sender, @@ -142,6 +143,7 @@ impl Streamer { communication_channel: CommunicationChannel { base_to_streaming_sender: channel(1).0, streaming_to_base_sender: channel(1).0, + streaming_to_base_is_finished: channel(1).0, base_to_recording_sender: channel(1).0, recording_to_base_sender: channel(1).0, base_to_playing_sender: channel(1).0, @@ -187,7 +189,21 @@ impl Streamer { .subscribe(); let microphone_stream_volume = self.gui_status.microphone_volume.value.clone(); let audio_stream_volume = self.gui_status.audio_volume.value.clone(); - Command::perform( + let streaming_to_base_sender_is_finished = self + .communication_channel + .streaming_to_base_is_finished + .clone(); + + let streaming_to_base_receiver_is_streaming_stopped = self + .communication_channel + .recording_to_base_sender + .subscribe(); + let streaming_to_base_receiver_is_streaming_finished = self + .communication_channel + .streaming_to_base_is_finished + .subscribe(); + + let connect_command = Command::perform( async move { gui_utils::connect( microphone_stream_receiver, @@ -195,13 +211,28 @@ impl Streamer { streamer_config, streaming_to_base_sender, base_to_streaming_receiver, + streaming_to_base_sender_is_finished, microphone_stream_volume, audio_stream_volume, ) .await }, Message::State, - ) + ); + + let is_streaming_finished_command = Command::perform( + async move { + gui_utils::is_streaming_finished( + streaming_to_base_receiver_is_streaming_finished, + streaming_to_base_receiver_is_streaming_stopped, + ) + .await + }, + Message::State, + ); + + let commands = vec![connect_command, is_streaming_finished_command]; + Command::batch(commands) } Event::Disconnect => { println!("Disconnect"); diff --git a/streamer/src/gui_utils.rs b/streamer/src/gui_utils.rs index 09df2d2..c957870 100644 --- a/streamer/src/gui_utils.rs +++ b/streamer/src/gui_utils.rs @@ -16,6 +16,7 @@ pub async fn connect( streamer_config: Config, streaming_to_base_sender: Sender, base_to_streaming_receiver: Receiver, + streaming_to_base_sender_is_finished: Sender, microphone_stream_volume: Arc>, audio_stream_volume: Arc>, ) -> State { @@ -26,6 +27,7 @@ pub async fn connect( streamer_config, base_to_streaming_receiver, streaming_to_base_sender.clone(), + streaming_to_base_sender_is_finished, microphone_stream_volume, audio_stream_volume, )); @@ -71,6 +73,46 @@ pub async fn disconnect( } } +pub async fn is_streaming_finished( + mut streaming_to_base_receiver_is_streaming_finished: Receiver, + mut streaming_to_base_receiver_is_streaming_stopped: Receiver, +) -> State { + tokio::select! { + is_streaming_finished = async move { + match streaming_to_base_receiver_is_streaming_finished.recv().await { + Ok(_) => State::Disconnected, + Err(err_val) => { + eprintln!( + "Error: Communication | Streaming to Base | Recv | Is Finished | {}", + err_val + ); + State::Connected + }, + } + } => is_streaming_finished, + is_streaming_stopped = async move { + match streaming_to_base_receiver_is_streaming_stopped.recv().await { + Ok(_) => { + while let Err(err_val) = streaming_to_base_receiver_is_streaming_stopped.recv().await { + eprintln!( + "Error: Communication | Streaming to Base | Recv | Is Stopped | {}", + err_val + ); + } + State::Disconnected + }, + Err(err_val) => { + eprintln!( + "Error: Communication | Streaming to Base | Recv | Is Stopped Never Started | {}", + err_val + ); + State::Disconnected + }, + } + } =>is_streaming_stopped, + } +} + pub async fn start_recording( microphone_stream_sender: Sender, recording_to_base_sender: Sender, diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index b4beec5..9d182ef 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -21,6 +21,7 @@ pub async fn connect( streamer_config: Config, mut base_to_streaming: Receiver, streaming_to_base: Sender, + streaming_to_base_sender_is_finished: Sender, microphone_stream_volume: Arc>, audio_stream_volume: Arc>, ) { @@ -83,6 +84,7 @@ pub async fn connect( mixer_task, base_to_streaming, streaming_to_base, + streaming_to_base_sender_is_finished, )); } } @@ -254,15 +256,45 @@ async fn status_checker( mixer_task: JoinHandle<()>, mut base_to_streaming: Receiver, streaming_to_base: Sender, + streaming_to_base_sender_is_finished: Sender, ) { - while let Err(_) = base_to_streaming.try_recv() { + let mut problem = false; + loop { + if let Ok(_) = base_to_streaming.try_recv() { + println!("Time to Retrieve"); + break; + } + if stream_task.is_finished() { + println!("Warning: Stream Task Finished"); + problem = true; + break; + } + if mixer_task.is_finished() { + println!("Warning: Mixer Task Finished"); + problem = true; + break; + } + if message_organizer_task.is_finished() { + println!("Warning: Message Organizer Task Finished"); + problem = true; + break; + } tokio::time::sleep(Duration::from_secs(3)).await; } stream_task.abort(); mixer_task.abort(); message_organizer_task.abort(); - match streaming_to_base.send(true) { - Ok(_) => println!("Cleaning Done: Streamer Disconnected"), - Err(err_val) => eprintln!("Error: Cleaning | {}", err_val), + if problem { + println!("Problem"); + match streaming_to_base_sender_is_finished.send(true) { + Ok(_) => println!("Cleaning Done: Streamer Disconnected"), + Err(err_val) => eprintln!("Error: Cleaning | Is Finished | {}", err_val), + } + } else { + println!("No Problem"); + match streaming_to_base.send(true) { + Ok(_) => println!("Cleaning Done: Streamer Disconnected"), + Err(err_val) => eprintln!("Error: Cleaning | Is Stopped | {}", err_val), + } } }