From 6efb12d3b0dafa32e67f97611877812f0bb395fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Wed, 21 May 2025 22:33:04 +0300 Subject: [PATCH] feat: :sparkles: gui status handling if there is no remote server --- client/src/gui.rs | 73 +++++++++++++++++++++++--------------------- client/src/stream.rs | 72 ++++++++++++++++++++++++------------------- client/src/voice.rs | 4 +-- 3 files changed, 81 insertions(+), 68 deletions(-) diff --git a/client/src/gui.rs b/client/src/gui.rs index c62cb00..1467296 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -1,7 +1,4 @@ -use std::{ - sync::{Arc, RwLock}, - time::Duration, -}; +use std::sync::{Arc, RwLock}; use iced::{ Alignment::Center, @@ -9,29 +6,34 @@ use iced::{ widget::{button, column, row}, }; use protocol::Error; -use tokio::{ - sync::{ - broadcast::{self}, - oneshot, - }, - time::sleep, +use tokio::sync::{ + broadcast::{self}, + oneshot, }; -use crate::{ClientConfig, MICROPHONE_BUFFER_LENGHT, stream::connect, voice::record}; +use crate::{ + ClientConfig, MICROPHONE_BUFFER_LENGHT, + stream::{connect, disconnect_watcher}, + voice::record, +}; #[derive(Debug, Default)] struct Signal { - microphone: Option>, + microphone_stop_sender: Option>, // speaker: Option>, - connection: Option>, + connection_stop_sender: Option>, } impl Signal { fn reset_microphone(&mut self) -> Result<(), Error> { - if let Some(microphone_signal) = &self.microphone { + if let Some(microphone_signal) = &self.microphone_stop_sender { if !microphone_signal.is_closed() { - self.microphone.take().expect("Never").send(true).unwrap(); - self.microphone = None; + self.microphone_stop_sender + .take() + .expect("Never") + .send(true) + .unwrap(); + self.microphone_stop_sender = None; return Ok(()); } } @@ -39,10 +41,14 @@ impl Signal { } fn reset_connection(&mut self) -> Result<(), Error> { - if let Some(connection_signal) = &self.connection { + if let Some(connection_signal) = &self.connection_stop_sender { if !connection_signal.is_closed() { - self.connection.take().expect("Never").send(true).unwrap(); - self.connection = None; + self.connection_stop_sender + .take() + .expect("Never") + .send(true) + .unwrap(); + self.connection_stop_sender = None; return Ok(()); } } @@ -152,26 +158,23 @@ impl App { self.gui_status.write().unwrap().room = State::Loading; let client_config = self.client_config.clone(); let gui_status = self.gui_status.clone(); - let microphone_sender_for_producing_receiver = self.channel.microphone.clone(); + let microphone_receiver = self.channel.microphone.subscribe(); - let connection_signal = oneshot::channel(); - self.signal.connection = Some(connection_signal.0); - let is_connection_started_signal = oneshot::channel(); - tokio::spawn(connect( - connection_signal.1, - is_connection_started_signal.0, - microphone_sender_for_producing_receiver, - client_config, - )); - - let is_connection_started_task = tokio::spawn(is_connection_started_signal.1); + let (connection_stop_sender, connection_stop_receiver) = oneshot::channel(); + self.signal.connection_stop_sender = Some(connection_stop_sender); Task::perform( async move { - match is_connection_started_task.await { - Ok(_) => gui_status.write().unwrap().room = State::Active, + match connect(microphone_receiver, client_config).await { + Ok(connection_return) => { + tokio::spawn(disconnect_watcher( + connection_stop_receiver, + connection_return, + )); + gui_status.write().unwrap().room = State::Active; + } Err(err_val) => { + eprintln!("Error: Connect | {}", err_val); gui_status.write().unwrap().room = State::Passive; - eprintln!("Error: Connection Task | {}", err_val); } } }, @@ -190,7 +193,7 @@ impl App { self.gui_status.write().unwrap().microphone = State::Loading; let microphone_sender = self.channel.microphone.clone(); let microphone_stop_signal = oneshot::channel(); - self.signal.microphone = Some(microphone_stop_signal.0); + self.signal.microphone_stop_sender = Some(microphone_stop_signal.0); let is_microphone_started_signal = oneshot::channel(); tokio::spawn(record( microphone_sender, diff --git a/client/src/stream.rs b/client/src/stream.rs index 5c0e8a0..3b5d5f5 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -9,16 +9,22 @@ use s2n_quic::{ use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, sync::{broadcast, oneshot}, + task::JoinHandle, }; use crate::{ClientConfig, SPEAKER_BUFFER_LENGHT, voice::play}; +#[derive(Debug)] +pub struct ConnectReturn { + play_audio_stop_signal_sender: oneshot::Sender, + send_audio_task: JoinHandle<()>, + receive_audio_task: JoinHandle<()>, +} + pub async fn connect( - connection_stop_signal_receiver: oneshot::Receiver, - is_connection_started_signal: oneshot::Sender, - microphone_sender_for_producing_receiver: Arc>, + microphone_receiver: broadcast::Receiver, client_config: Arc, -) -> Result<(), Error> { +) -> Result { let client = Client::builder() .with_io("0:0") .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))? @@ -50,52 +56,54 @@ pub async fn connect( .open_bidirectional_stream() .await .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?; + let (receive_stream, send_stream) = stream.split(); + let (speaker_sender, speaker_receiver) = broadcast::channel(SPEAKER_BUFFER_LENGHT); - let (speaker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel(); - tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver)); - let receive_voice_data_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender)); - let send_voice_data_task = tokio::spawn(send_audio_data( - send_stream, - microphone_sender_for_producing_receiver, - )); + let (play_audio_stop_signal_sender, play_audio_stop_signal_receiver) = oneshot::channel(); - if let Err(err_val) = is_connection_started_signal.send(true) { - eprintln!("Error: Is Connection Started | Local | Send | {}", err_val); - } + tokio::spawn(play(speaker_receiver, play_audio_stop_signal_receiver)); + let receive_audio_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender)); + let send_audio_task = tokio::spawn(send_audio_data(send_stream, microphone_receiver)); - if let Err(err_val) = connection_stop_signal_receiver.await { + Ok(ConnectReturn { + play_audio_stop_signal_sender, + send_audio_task, + receive_audio_task, + }) +} + +pub async fn disconnect_watcher( + connection_stop_receiver: oneshot::Receiver, + connection_return: ConnectReturn, +) { + if let Err(err_val) = connection_stop_receiver.await { eprintln!( - "Error: Connection Stop Signal | Local | Receive | {}", + "Error: Receive Connection Stop Signal | Local | {}", err_val ); } - if let Err(err_val) = speaker_stop_signal_sender.send(true) { - eprintln!("Error: Speaker Stop Signal | Local | Send | {}", err_val); + connection_return.send_audio_task.abort(); + connection_return.receive_audio_task.abort(); + if let Err(err_val) = connection_return.play_audio_stop_signal_sender.send(true) { + eprintln!("Error: Send Play Audio Stop Signal | Local | {}", err_val); } - - println!("Connection Is Closing"); - - receive_voice_data_task.abort(); - send_voice_data_task.abort(); - - println!("Connection Is Closed"); - - Ok(()) } async fn send_audio_data( mut send_stream: SendStream, - microphone_sender_for_producing_receiver: Arc>, + old_microphone_receiver: broadcast::Receiver, ) { - let mut microphone_receiver = microphone_sender_for_producing_receiver.subscribe(); + let mut microphone_receiver = old_microphone_receiver.resubscribe(); + drop(old_microphone_receiver); loop { match microphone_receiver.recv().await { Ok(microphone_data) => { if let Err(err_val) = send_stream.write_f32(microphone_data).await { eprintln!("Error: Send Microphone Data | Remote | {}", err_val); - break; + todo!("GUI Status: Disconnect"); + // break; } } Err(err_val) => { @@ -117,6 +125,7 @@ async fn receive_audio_data( loop { match receive_stream.read_f32().await { Ok(received_data) => { + // error only happens if there is no receiver, think about it if let Err(err_val) = speaker_sender.send(received_data) { eprintln!("Error: Send to Speaker | Local | {}", err_val); break; @@ -124,7 +133,8 @@ async fn receive_audio_data( } Err(err_val) => { eprintln!("Error: Receive Audio Data | Remote | {}", err_val); - break; + todo!("GUI Status Disconnect"); + // break; } } } diff --git a/client/src/voice.rs b/client/src/voice.rs index 51367ba..2790cbf 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -46,7 +46,7 @@ pub async fn record( pub async fn play( mut speaker_receiver: broadcast::Receiver, - speaker_stop_signal_receiver: oneshot::Receiver, + play_audio_stop_signal_receiver: oneshot::Receiver, ) { let host = cpal::default_host(); let output_device = host.default_output_device().unwrap(); @@ -86,7 +86,7 @@ pub async fn play( println!("Playing Started"); tokio::task::block_in_place(|| { - if let Err(err_val) = speaker_stop_signal_receiver.blocking_recv() { + if let Err(err_val) = play_audio_stop_signal_receiver.blocking_recv() { eprintln!( "Error: Speaker Stop Signal Receive | Local Channel | {}", err_val