feat: speaker on/of capability

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-22 16:50:44 +03:00
parent 6efb12d3b0
commit c244401974
4 changed files with 88 additions and 30 deletions

View file

@ -12,15 +12,15 @@ use tokio::sync::{
};
use crate::{
ClientConfig, MICROPHONE_BUFFER_LENGHT,
ClientConfig, MICROPHONE_BUFFER_LENGHT, SPEAKER_BUFFER_LENGHT,
stream::{connect, disconnect_watcher},
voice::record,
voice::{play, record},
};
#[derive(Debug, Default)]
struct Signal {
microphone_stop_sender: Option<oneshot::Sender<bool>>,
// speaker: Option<oneshot::Sender<bool>>,
speaker_stop_sender: Option<oneshot::Sender<bool>>,
connection_stop_sender: Option<oneshot::Sender<bool>>,
}
@ -40,6 +40,21 @@ impl Signal {
Err(Error::Signal("Reset".to_string()))
}
fn reset_speaker(&mut self) -> Result<(), Error> {
if let Some(speaker_signal) = &self.speaker_stop_sender {
if !speaker_signal.is_closed() {
self.speaker_stop_sender
.take()
.expect("Never")
.send(true)
.unwrap();
self.speaker_stop_sender = None;
return Ok(());
}
}
Err(Error::Signal("Reset".to_string()))
}
fn reset_connection(&mut self) -> Result<(), Error> {
if let Some(connection_signal) = &self.connection_stop_sender {
if !connection_signal.is_closed() {
@ -59,14 +74,14 @@ impl Signal {
#[derive(Debug)]
struct Channel {
microphone: Arc<broadcast::Sender<f32>>,
// speaker: (broadcast::Sender<f32>, broadcast::Receiver<f32>),
speaker: Arc<broadcast::Sender<f32>>,
}
impl Channel {
fn new() -> Self {
Self {
microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT / 4).0.into(),
// speaker: broadcast::channel(BUFFER_LENGTH),
microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT).0.into(),
speaker: broadcast::channel(SPEAKER_BUFFER_LENGHT).0.into(),
}
}
}
@ -75,6 +90,7 @@ impl Channel {
struct GUIStatus {
room: State,
microphone: State,
speaker: State,
}
#[derive(Debug, Clone, Copy)]
@ -96,6 +112,8 @@ pub enum Message {
LeaveRoom,
UnmuteMicrophone,
MuteMicrophone,
UnmuteSpeaker,
MuteSpeaker,
}
#[derive(Debug)]
@ -113,6 +131,12 @@ impl App {
Ok(())
}
fn reset_speaker(&mut self) -> Result<(), Error> {
self.signal.reset_speaker()?;
self.gui_status.write().unwrap().speaker = State::Passive;
Ok(())
}
fn reset_connection(&mut self) -> Result<(), Error> {
self.signal.reset_connection()?;
self.gui_status.write().unwrap().room = State::Passive;
@ -139,12 +163,19 @@ impl App {
};
let microphone_button = match self.gui_status.read().unwrap().microphone {
State::Active => button("Mute").on_press(Message::MuteMicrophone),
State::Passive => button("Unmute").on_press(Message::UnmuteMicrophone),
State::Loading => button("Loading"),
State::Active => button("Microphone ON").on_press(Message::MuteMicrophone),
State::Passive => button("Microphone OFF").on_press(Message::UnmuteMicrophone),
State::Loading => button("Microphone Loading"),
};
let speaker_button = match self.gui_status.read().unwrap().speaker {
State::Active => button("Speaker ON").on_press(Message::MuteSpeaker),
State::Passive => button("Speaker OFF").on_press(Message::UnmuteSpeaker),
State::Loading => button("Speaker Loading"),
};
column![
row![join_room_button, microphone_button]
row![join_room_button, microphone_button, speaker_button]
.spacing(20)
.align_y(Center)
]
@ -159,28 +190,33 @@ impl App {
let client_config = self.client_config.clone();
let gui_status = self.gui_status.clone();
let microphone_receiver = self.channel.microphone.subscribe();
let speaker_sender = self.channel.speaker.clone();
let (connection_stop_sender, connection_stop_receiver) = oneshot::channel();
self.signal.connection_stop_sender = Some(connection_stop_sender);
Task::perform(
async move {
match connect(microphone_receiver, client_config).await {
match connect(microphone_receiver, speaker_sender, client_config).await {
Ok(connection_return) => {
tokio::spawn(disconnect_watcher(
connection_stop_receiver,
connection_return,
));
gui_status.write().unwrap().room = State::Active;
Some(Message::UnmuteSpeaker)
}
Err(err_val) => {
eprintln!("Error: Connect | {}", err_val);
gui_status.write().unwrap().room = State::Passive;
None
}
}
},
|_| {},
|what_to_do_with_speaker| match what_to_do_with_speaker {
Some(activate) => activate,
None => Message::None,
},
)
.map(|_| Message::None)
}
Message::LeaveRoom => {
self.gui_status.write().unwrap().room = State::Loading;
@ -217,6 +253,34 @@ impl App {
}
Task::none()
}
Message::UnmuteSpeaker => {
self.gui_status.write().unwrap().speaker = State::Loading;
let speaker_receiver = self.channel.speaker.subscribe();
let speaker_stop_signal = oneshot::channel();
self.signal.speaker_stop_sender = Some(speaker_stop_signal.0);
let is_speaker_started_signal = oneshot::channel();
tokio::spawn(play(
speaker_receiver,
is_speaker_started_signal.0,
speaker_stop_signal.1,
));
let gui_status = self.gui_status.clone();
Task::perform(
async move {
if let Ok(_) = is_speaker_started_signal.1.await {
gui_status.write().unwrap().speaker = State::Active;
}
},
|_| Message::None,
)
}
Message::MuteSpeaker => {
self.gui_status.write().unwrap().speaker = State::Loading;
if let Err(err_val) = self.reset_speaker() {
eprintln!("Error: Mute Speaker | {}", err_val);
}
Task::none()
}
}
}
}

View file

@ -2,7 +2,7 @@ pub mod gui;
pub mod stream;
pub mod voice;
const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 16;
const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 4;
const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 4;
#[derive(Debug)]

View file

@ -12,17 +12,17 @@ use tokio::{
task::JoinHandle,
};
use crate::{ClientConfig, SPEAKER_BUFFER_LENGHT, voice::play};
use crate::ClientConfig;
#[derive(Debug)]
pub struct ConnectReturn {
play_audio_stop_signal_sender: oneshot::Sender<bool>,
send_audio_task: JoinHandle<()>,
receive_audio_task: JoinHandle<()>,
}
pub async fn connect(
microphone_receiver: broadcast::Receiver<f32>,
speaker_sender: Arc<broadcast::Sender<f32>>,
client_config: Arc<ClientConfig>,
) -> Result<ConnectReturn, Error> {
let client = Client::builder()
@ -59,15 +59,10 @@ pub async fn connect(
let (receive_stream, send_stream) = stream.split();
let (speaker_sender, speaker_receiver) = broadcast::channel(SPEAKER_BUFFER_LENGHT);
let (play_audio_stop_signal_sender, play_audio_stop_signal_receiver) = oneshot::channel();
tokio::spawn(play(speaker_receiver, play_audio_stop_signal_receiver));
let receive_audio_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender));
let send_audio_task = tokio::spawn(send_audio_data(send_stream, microphone_receiver));
Ok(ConnectReturn {
play_audio_stop_signal_sender,
send_audio_task,
receive_audio_task,
})
@ -86,9 +81,6 @@ pub async fn disconnect_watcher(
connection_return.send_audio_task.abort();
connection_return.receive_audio_task.abort();
if let Err(err_val) = connection_return.play_audio_stop_signal_sender.send(true) {
eprintln!("Error: Send Play Audio Stop Signal | Local | {}", err_val);
}
}
async fn send_audio_data(
@ -120,16 +112,13 @@ async fn send_audio_data(
}
async fn receive_audio_data(
mut receive_stream: ReceiveStream,
speaker_sender: broadcast::Sender<f32>,
speaker_sender: Arc<broadcast::Sender<f32>>,
) {
loop {
match receive_stream.read_f32().await {
Ok(received_data) => {
// error only happens if there is no receiver, think about it
if let Err(err_val) = speaker_sender.send(received_data) {
eprintln!("Error: Send to Speaker | Local | {}", err_val);
break;
}
// todo: error only happens if there is no receiver, think about it
let _ = speaker_sender.send(received_data);
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);

View file

@ -46,6 +46,7 @@ pub async fn record(
pub async fn play(
mut speaker_receiver: broadcast::Receiver<f32>,
is_speaker_started_signal: oneshot::Sender<bool>,
play_audio_stop_signal_receiver: oneshot::Receiver<bool>,
) {
let host = cpal::default_host();
@ -85,6 +86,10 @@ pub async fn play(
output_stream.play().unwrap();
println!("Playing Started");
if let Err(_) = is_speaker_started_signal.send(true) {
eprintln!("Error: Is Microphone Started | Send");
}
tokio::task::block_in_place(|| {
if let Err(err_val) = play_audio_stop_signal_receiver.blocking_recv() {
eprintln!(