From c2444019741dc200361bd502efd75f87a2e45d45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Thu, 22 May 2025 16:50:44 +0300 Subject: [PATCH] feat: :sparkles: speaker on/of capability --- client/src/gui.rs | 90 +++++++++++++++++++++++++++++++++++++------- client/src/lib.rs | 2 +- client/src/stream.rs | 21 +++-------- client/src/voice.rs | 5 +++ 4 files changed, 88 insertions(+), 30 deletions(-) diff --git a/client/src/gui.rs b/client/src/gui.rs index 1467296..7e7d9d3 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -12,15 +12,15 @@ use tokio::sync::{ }; use crate::{ - ClientConfig, MICROPHONE_BUFFER_LENGHT, + ClientConfig, MICROPHONE_BUFFER_LENGHT, SPEAKER_BUFFER_LENGHT, stream::{connect, disconnect_watcher}, - voice::record, + voice::{play, record}, }; #[derive(Debug, Default)] struct Signal { microphone_stop_sender: Option>, - // speaker: Option>, + speaker_stop_sender: Option>, connection_stop_sender: Option>, } @@ -40,6 +40,21 @@ impl Signal { Err(Error::Signal("Reset".to_string())) } + fn reset_speaker(&mut self) -> Result<(), Error> { + if let Some(speaker_signal) = &self.speaker_stop_sender { + if !speaker_signal.is_closed() { + self.speaker_stop_sender + .take() + .expect("Never") + .send(true) + .unwrap(); + self.speaker_stop_sender = None; + return Ok(()); + } + } + Err(Error::Signal("Reset".to_string())) + } + fn reset_connection(&mut self) -> Result<(), Error> { if let Some(connection_signal) = &self.connection_stop_sender { if !connection_signal.is_closed() { @@ -59,14 +74,14 @@ impl Signal { #[derive(Debug)] struct Channel { microphone: Arc>, - // speaker: (broadcast::Sender, broadcast::Receiver), + speaker: Arc>, } impl Channel { fn new() -> Self { Self { - microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT / 4).0.into(), - // speaker: broadcast::channel(BUFFER_LENGTH), + microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT).0.into(), + speaker: broadcast::channel(SPEAKER_BUFFER_LENGHT).0.into(), } } } @@ -75,6 +90,7 @@ impl Channel { struct GUIStatus { room: State, microphone: State, + speaker: State, } #[derive(Debug, Clone, Copy)] @@ -96,6 +112,8 @@ pub enum Message { LeaveRoom, UnmuteMicrophone, MuteMicrophone, + UnmuteSpeaker, + MuteSpeaker, } #[derive(Debug)] @@ -113,6 +131,12 @@ impl App { Ok(()) } + fn reset_speaker(&mut self) -> Result<(), Error> { + self.signal.reset_speaker()?; + self.gui_status.write().unwrap().speaker = State::Passive; + Ok(()) + } + fn reset_connection(&mut self) -> Result<(), Error> { self.signal.reset_connection()?; self.gui_status.write().unwrap().room = State::Passive; @@ -139,12 +163,19 @@ impl App { }; let microphone_button = match self.gui_status.read().unwrap().microphone { - State::Active => button("Mute").on_press(Message::MuteMicrophone), - State::Passive => button("Unmute").on_press(Message::UnmuteMicrophone), - State::Loading => button("Loading"), + State::Active => button("Microphone ON").on_press(Message::MuteMicrophone), + State::Passive => button("Microphone OFF").on_press(Message::UnmuteMicrophone), + State::Loading => button("Microphone Loading"), }; + + let speaker_button = match self.gui_status.read().unwrap().speaker { + State::Active => button("Speaker ON").on_press(Message::MuteSpeaker), + State::Passive => button("Speaker OFF").on_press(Message::UnmuteSpeaker), + State::Loading => button("Speaker Loading"), + }; + column![ - row![join_room_button, microphone_button] + row![join_room_button, microphone_button, speaker_button] .spacing(20) .align_y(Center) ] @@ -159,28 +190,33 @@ impl App { let client_config = self.client_config.clone(); let gui_status = self.gui_status.clone(); let microphone_receiver = self.channel.microphone.subscribe(); + let speaker_sender = self.channel.speaker.clone(); let (connection_stop_sender, connection_stop_receiver) = oneshot::channel(); self.signal.connection_stop_sender = Some(connection_stop_sender); Task::perform( async move { - match connect(microphone_receiver, client_config).await { + match connect(microphone_receiver, speaker_sender, client_config).await { Ok(connection_return) => { tokio::spawn(disconnect_watcher( connection_stop_receiver, connection_return, )); gui_status.write().unwrap().room = State::Active; + Some(Message::UnmuteSpeaker) } Err(err_val) => { eprintln!("Error: Connect | {}", err_val); gui_status.write().unwrap().room = State::Passive; + None } } }, - |_| {}, + |what_to_do_with_speaker| match what_to_do_with_speaker { + Some(activate) => activate, + None => Message::None, + }, ) - .map(|_| Message::None) } Message::LeaveRoom => { self.gui_status.write().unwrap().room = State::Loading; @@ -217,6 +253,34 @@ impl App { } Task::none() } + Message::UnmuteSpeaker => { + self.gui_status.write().unwrap().speaker = State::Loading; + let speaker_receiver = self.channel.speaker.subscribe(); + let speaker_stop_signal = oneshot::channel(); + self.signal.speaker_stop_sender = Some(speaker_stop_signal.0); + let is_speaker_started_signal = oneshot::channel(); + tokio::spawn(play( + speaker_receiver, + is_speaker_started_signal.0, + speaker_stop_signal.1, + )); + let gui_status = self.gui_status.clone(); + Task::perform( + async move { + if let Ok(_) = is_speaker_started_signal.1.await { + gui_status.write().unwrap().speaker = State::Active; + } + }, + |_| Message::None, + ) + } + Message::MuteSpeaker => { + self.gui_status.write().unwrap().speaker = State::Loading; + if let Err(err_val) = self.reset_speaker() { + eprintln!("Error: Mute Speaker | {}", err_val); + } + Task::none() + } } } } diff --git a/client/src/lib.rs b/client/src/lib.rs index 7571c6a..d3a3105 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -2,7 +2,7 @@ pub mod gui; pub mod stream; pub mod voice; -const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 16; +const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 4; const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 4; #[derive(Debug)] diff --git a/client/src/stream.rs b/client/src/stream.rs index 3b5d5f5..7d041cc 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -12,17 +12,17 @@ use tokio::{ task::JoinHandle, }; -use crate::{ClientConfig, SPEAKER_BUFFER_LENGHT, voice::play}; +use crate::ClientConfig; #[derive(Debug)] pub struct ConnectReturn { - play_audio_stop_signal_sender: oneshot::Sender, send_audio_task: JoinHandle<()>, receive_audio_task: JoinHandle<()>, } pub async fn connect( microphone_receiver: broadcast::Receiver, + speaker_sender: Arc>, client_config: Arc, ) -> Result { let client = Client::builder() @@ -59,15 +59,10 @@ pub async fn connect( let (receive_stream, send_stream) = stream.split(); - let (speaker_sender, speaker_receiver) = broadcast::channel(SPEAKER_BUFFER_LENGHT); - let (play_audio_stop_signal_sender, play_audio_stop_signal_receiver) = oneshot::channel(); - - tokio::spawn(play(speaker_receiver, play_audio_stop_signal_receiver)); let receive_audio_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender)); let send_audio_task = tokio::spawn(send_audio_data(send_stream, microphone_receiver)); Ok(ConnectReturn { - play_audio_stop_signal_sender, send_audio_task, receive_audio_task, }) @@ -86,9 +81,6 @@ pub async fn disconnect_watcher( connection_return.send_audio_task.abort(); connection_return.receive_audio_task.abort(); - if let Err(err_val) = connection_return.play_audio_stop_signal_sender.send(true) { - eprintln!("Error: Send Play Audio Stop Signal | Local | {}", err_val); - } } async fn send_audio_data( @@ -120,16 +112,13 @@ async fn send_audio_data( } async fn receive_audio_data( mut receive_stream: ReceiveStream, - speaker_sender: broadcast::Sender, + speaker_sender: Arc>, ) { loop { match receive_stream.read_f32().await { Ok(received_data) => { - // error only happens if there is no receiver, think about it - if let Err(err_val) = speaker_sender.send(received_data) { - eprintln!("Error: Send to Speaker | Local | {}", err_val); - break; - } + // todo: error only happens if there is no receiver, think about it + let _ = speaker_sender.send(received_data); } Err(err_val) => { eprintln!("Error: Receive Audio Data | Remote | {}", err_val); diff --git a/client/src/voice.rs b/client/src/voice.rs index 2790cbf..a705683 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -46,6 +46,7 @@ pub async fn record( pub async fn play( mut speaker_receiver: broadcast::Receiver, + is_speaker_started_signal: oneshot::Sender, play_audio_stop_signal_receiver: oneshot::Receiver, ) { let host = cpal::default_host(); @@ -85,6 +86,10 @@ pub async fn play( output_stream.play().unwrap(); println!("Playing Started"); + if let Err(_) = is_speaker_started_signal.send(true) { + eprintln!("Error: Is Microphone Started | Send"); + } + tokio::task::block_in_place(|| { if let Err(err_val) = play_audio_stop_signal_receiver.blocking_recv() { eprintln!(