feat: speaker on/of capability

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-22 16:50:44 +03:00
parent 6efb12d3b0
commit 52115f649a
4 changed files with 87 additions and 30 deletions

View file

@ -12,15 +12,15 @@ use tokio::sync::{
}; };
use crate::{ use crate::{
ClientConfig, MICROPHONE_BUFFER_LENGHT, ClientConfig, MICROPHONE_BUFFER_LENGHT, SPEAKER_BUFFER_LENGHT,
stream::{connect, disconnect_watcher}, stream::{connect, disconnect_watcher},
voice::record, voice::{play, record},
}; };
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct Signal { struct Signal {
microphone_stop_sender: Option<oneshot::Sender<bool>>, microphone_stop_sender: Option<oneshot::Sender<bool>>,
// speaker: Option<oneshot::Sender<bool>>, speaker_stop_sender: Option<oneshot::Sender<bool>>,
connection_stop_sender: Option<oneshot::Sender<bool>>, connection_stop_sender: Option<oneshot::Sender<bool>>,
} }
@ -40,6 +40,21 @@ impl Signal {
Err(Error::Signal("Reset".to_string())) Err(Error::Signal("Reset".to_string()))
} }
fn reset_speaker(&mut self) -> Result<(), Error> {
if let Some(speaker_signal) = &self.speaker_stop_sender {
if !speaker_signal.is_closed() {
self.speaker_stop_sender
.take()
.expect("Never")
.send(true)
.unwrap();
self.speaker_stop_sender = None;
return Ok(());
}
}
Err(Error::Signal("Reset".to_string()))
}
fn reset_connection(&mut self) -> Result<(), Error> { fn reset_connection(&mut self) -> Result<(), Error> {
if let Some(connection_signal) = &self.connection_stop_sender { if let Some(connection_signal) = &self.connection_stop_sender {
if !connection_signal.is_closed() { if !connection_signal.is_closed() {
@ -59,14 +74,14 @@ impl Signal {
#[derive(Debug)] #[derive(Debug)]
struct Channel { struct Channel {
microphone: Arc<broadcast::Sender<f32>>, microphone: Arc<broadcast::Sender<f32>>,
// speaker: (broadcast::Sender<f32>, broadcast::Receiver<f32>), speaker: Arc<broadcast::Sender<f32>>,
} }
impl Channel { impl Channel {
fn new() -> Self { fn new() -> Self {
Self { Self {
microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT / 4).0.into(), microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT).0.into(),
// speaker: broadcast::channel(BUFFER_LENGTH), speaker: broadcast::channel(SPEAKER_BUFFER_LENGHT).0.into(),
} }
} }
} }
@ -75,6 +90,7 @@ impl Channel {
struct GUIStatus { struct GUIStatus {
room: State, room: State,
microphone: State, microphone: State,
speaker: State,
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
@ -96,6 +112,8 @@ pub enum Message {
LeaveRoom, LeaveRoom,
UnmuteMicrophone, UnmuteMicrophone,
MuteMicrophone, MuteMicrophone,
UnmuteSpeaker,
MuteSpeaker,
} }
#[derive(Debug)] #[derive(Debug)]
@ -113,6 +131,12 @@ impl App {
Ok(()) Ok(())
} }
fn reset_speaker(&mut self) -> Result<(), Error> {
self.signal.reset_speaker()?;
self.gui_status.write().unwrap().speaker = State::Passive;
Ok(())
}
fn reset_connection(&mut self) -> Result<(), Error> { fn reset_connection(&mut self) -> Result<(), Error> {
self.signal.reset_connection()?; self.signal.reset_connection()?;
self.gui_status.write().unwrap().room = State::Passive; self.gui_status.write().unwrap().room = State::Passive;
@ -139,12 +163,18 @@ impl App {
}; };
let microphone_button = match self.gui_status.read().unwrap().microphone { let microphone_button = match self.gui_status.read().unwrap().microphone {
State::Active => button("Mute").on_press(Message::MuteMicrophone), State::Active => button("Microphone ON").on_press(Message::MuteMicrophone),
State::Passive => button("Unmute").on_press(Message::UnmuteMicrophone), State::Passive => button("Microphone OFF").on_press(Message::UnmuteMicrophone),
State::Loading => button("Loading"), State::Loading => button("Microphone Loading"),
};
let speaker_button = match self.gui_status.read().unwrap().speaker {
State::Active => button("Speaker ON").on_press(Message::MuteSpeaker),
State::Passive => button("Speaker OFF").on_press(Message::UnmuteSpeaker),
State::Loading => button("Speaker Loading"),
}; };
column![ column![
row![join_room_button, microphone_button] row![join_room_button, microphone_button, speaker_button]
.spacing(20) .spacing(20)
.align_y(Center) .align_y(Center)
] ]
@ -159,28 +189,33 @@ impl App {
let client_config = self.client_config.clone(); let client_config = self.client_config.clone();
let gui_status = self.gui_status.clone(); let gui_status = self.gui_status.clone();
let microphone_receiver = self.channel.microphone.subscribe(); let microphone_receiver = self.channel.microphone.subscribe();
let speaker_sender = self.channel.speaker.clone();
let (connection_stop_sender, connection_stop_receiver) = oneshot::channel(); let (connection_stop_sender, connection_stop_receiver) = oneshot::channel();
self.signal.connection_stop_sender = Some(connection_stop_sender); self.signal.connection_stop_sender = Some(connection_stop_sender);
Task::perform( Task::perform(
async move { async move {
match connect(microphone_receiver, client_config).await { match connect(microphone_receiver, speaker_sender, client_config).await {
Ok(connection_return) => { Ok(connection_return) => {
tokio::spawn(disconnect_watcher( tokio::spawn(disconnect_watcher(
connection_stop_receiver, connection_stop_receiver,
connection_return, connection_return,
)); ));
gui_status.write().unwrap().room = State::Active; gui_status.write().unwrap().room = State::Active;
Some(Message::UnmuteSpeaker)
} }
Err(err_val) => { Err(err_val) => {
eprintln!("Error: Connect | {}", err_val); eprintln!("Error: Connect | {}", err_val);
gui_status.write().unwrap().room = State::Passive; gui_status.write().unwrap().room = State::Passive;
None
} }
} }
}, },
|_| {}, |what_to_do_with_speaker| match what_to_do_with_speaker {
Some(activate) => activate,
None => Message::None,
},
) )
.map(|_| Message::None)
} }
Message::LeaveRoom => { Message::LeaveRoom => {
self.gui_status.write().unwrap().room = State::Loading; self.gui_status.write().unwrap().room = State::Loading;
@ -217,6 +252,34 @@ impl App {
} }
Task::none() Task::none()
} }
Message::UnmuteSpeaker => {
self.gui_status.write().unwrap().speaker = State::Loading;
let speaker_receiver = self.channel.speaker.subscribe();
let speaker_stop_signal = oneshot::channel();
self.signal.speaker_stop_sender = Some(speaker_stop_signal.0);
let is_speaker_started_signal = oneshot::channel();
tokio::spawn(play(
speaker_receiver,
is_speaker_started_signal.0,
speaker_stop_signal.1,
));
let gui_status = self.gui_status.clone();
Task::perform(
async move {
if let Ok(_) = is_speaker_started_signal.1.await {
gui_status.write().unwrap().speaker = State::Active;
}
},
|_| Message::None,
)
}
Message::MuteSpeaker => {
self.gui_status.write().unwrap().speaker = State::Loading;
if let Err(err_val) = self.reset_speaker() {
eprintln!("Error: Mute Speaker | {}", err_val);
}
Task::none()
}
} }
} }
} }

View file

@ -2,7 +2,7 @@ pub mod gui;
pub mod stream; pub mod stream;
pub mod voice; pub mod voice;
const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 16; const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 4;
const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 4; const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 4;
#[derive(Debug)] #[derive(Debug)]

View file

@ -12,17 +12,17 @@ use tokio::{
task::JoinHandle, task::JoinHandle,
}; };
use crate::{ClientConfig, SPEAKER_BUFFER_LENGHT, voice::play}; use crate::ClientConfig;
#[derive(Debug)] #[derive(Debug)]
pub struct ConnectReturn { pub struct ConnectReturn {
play_audio_stop_signal_sender: oneshot::Sender<bool>,
send_audio_task: JoinHandle<()>, send_audio_task: JoinHandle<()>,
receive_audio_task: JoinHandle<()>, receive_audio_task: JoinHandle<()>,
} }
pub async fn connect( pub async fn connect(
microphone_receiver: broadcast::Receiver<f32>, microphone_receiver: broadcast::Receiver<f32>,
speaker_sender: Arc<broadcast::Sender<f32>>,
client_config: Arc<ClientConfig>, client_config: Arc<ClientConfig>,
) -> Result<ConnectReturn, Error> { ) -> Result<ConnectReturn, Error> {
let client = Client::builder() let client = Client::builder()
@ -59,15 +59,10 @@ pub async fn connect(
let (receive_stream, send_stream) = stream.split(); let (receive_stream, send_stream) = stream.split();
let (speaker_sender, speaker_receiver) = broadcast::channel(SPEAKER_BUFFER_LENGHT);
let (play_audio_stop_signal_sender, play_audio_stop_signal_receiver) = oneshot::channel();
tokio::spawn(play(speaker_receiver, play_audio_stop_signal_receiver));
let receive_audio_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender)); let receive_audio_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender));
let send_audio_task = tokio::spawn(send_audio_data(send_stream, microphone_receiver)); let send_audio_task = tokio::spawn(send_audio_data(send_stream, microphone_receiver));
Ok(ConnectReturn { Ok(ConnectReturn {
play_audio_stop_signal_sender,
send_audio_task, send_audio_task,
receive_audio_task, receive_audio_task,
}) })
@ -86,9 +81,6 @@ pub async fn disconnect_watcher(
connection_return.send_audio_task.abort(); connection_return.send_audio_task.abort();
connection_return.receive_audio_task.abort(); connection_return.receive_audio_task.abort();
if let Err(err_val) = connection_return.play_audio_stop_signal_sender.send(true) {
eprintln!("Error: Send Play Audio Stop Signal | Local | {}", err_val);
}
} }
async fn send_audio_data( async fn send_audio_data(
@ -120,16 +112,13 @@ async fn send_audio_data(
} }
async fn receive_audio_data( async fn receive_audio_data(
mut receive_stream: ReceiveStream, mut receive_stream: ReceiveStream,
speaker_sender: broadcast::Sender<f32>, speaker_sender: Arc<broadcast::Sender<f32>>,
) { ) {
loop { loop {
match receive_stream.read_f32().await { match receive_stream.read_f32().await {
Ok(received_data) => { Ok(received_data) => {
// error only happens if there is no receiver, think about it // todo: error only happens if there is no receiver, think about it
if let Err(err_val) = speaker_sender.send(received_data) { let _ = speaker_sender.send(received_data);
eprintln!("Error: Send to Speaker | Local | {}", err_val);
break;
}
} }
Err(err_val) => { Err(err_val) => {
eprintln!("Error: Receive Audio Data | Remote | {}", err_val); eprintln!("Error: Receive Audio Data | Remote | {}", err_val);

View file

@ -46,6 +46,7 @@ pub async fn record(
pub async fn play( pub async fn play(
mut speaker_receiver: broadcast::Receiver<f32>, mut speaker_receiver: broadcast::Receiver<f32>,
is_speaker_started_signal: oneshot::Sender<bool>,
play_audio_stop_signal_receiver: oneshot::Receiver<bool>, play_audio_stop_signal_receiver: oneshot::Receiver<bool>,
) { ) {
let host = cpal::default_host(); let host = cpal::default_host();
@ -85,6 +86,10 @@ pub async fn play(
output_stream.play().unwrap(); output_stream.play().unwrap();
println!("Playing Started"); println!("Playing Started");
if let Err(_) = is_speaker_started_signal.send(true) {
eprintln!("Error: Is Microphone Started | Send");
}
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
if let Err(err_val) = play_audio_stop_signal_receiver.blocking_recv() { if let Err(err_val) = play_audio_stop_signal_receiver.blocking_recv() {
eprintln!( eprintln!(