refactor: ♻️ play and record

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-23 15:11:07 +03:00
parent c244401974
commit 7bbae2b0be
4 changed files with 194 additions and 152 deletions

View file

@ -8,7 +8,7 @@ use iced::{
use protocol::Error;
use tokio::sync::{
broadcast::{self},
oneshot,
mpsc, oneshot,
};
use crate::{
@ -17,53 +17,26 @@ use crate::{
voice::{play, record},
};
#[derive(Debug, Default)]
#[derive(Debug)]
struct Signal {
microphone_stop_sender: Option<oneshot::Sender<bool>>,
speaker_stop_sender: Option<oneshot::Sender<bool>>,
connection_stop_sender: Option<oneshot::Sender<bool>>,
record_control: mpsc::Sender<State>,
play_control: mpsc::Sender<State>,
connection_stop_sender: RwLock<Option<oneshot::Sender<bool>>>,
}
impl Signal {
fn reset_microphone(&mut self) -> Result<(), Error> {
if let Some(microphone_signal) = &self.microphone_stop_sender {
if !microphone_signal.is_closed() {
self.microphone_stop_sender
.take()
.expect("Never")
.send(true)
.unwrap();
self.microphone_stop_sender = None;
return Ok(());
}
}
Err(Error::Signal("Reset".to_string()))
}
fn reset_speaker(&mut self) -> Result<(), Error> {
if let Some(speaker_signal) = &self.speaker_stop_sender {
if !speaker_signal.is_closed() {
self.speaker_stop_sender
.take()
.expect("Never")
.send(true)
.unwrap();
self.speaker_stop_sender = None;
return Ok(());
}
}
Err(Error::Signal("Reset".to_string()))
}
fn reset_connection(&mut self) -> Result<(), Error> {
if let Some(connection_signal) = &self.connection_stop_sender {
fn reset_connection(&self) -> Result<(), Error> {
if let Some(connection_signal) = self.connection_stop_sender.read().unwrap().as_ref() {
if !connection_signal.is_closed() {
self.connection_stop_sender
let mut connection_stop_sender = self.connection_stop_sender.write().unwrap();
connection_stop_sender
.take()
.expect("Never")
.send(true)
.unwrap();
self.connection_stop_sender = None;
*connection_stop_sender = None;
drop(connection_stop_sender);
return Ok(());
}
}
@ -77,23 +50,23 @@ struct Channel {
speaker: Arc<broadcast::Sender<f32>>,
}
impl Channel {
fn new() -> Self {
Self {
microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT).0.into(),
speaker: broadcast::channel(SPEAKER_BUFFER_LENGHT).0.into(),
}
}
}
#[derive(Debug, Default, Clone, Copy)]
#[derive(Debug, Clone, Copy)]
struct GUIStatus {
room: State,
microphone: State,
speaker: State,
}
impl Default for GUIStatus {
fn default() -> Self {
Self {
room: Default::default(),
microphone: Default::default(),
speaker: State::Active,
}
}
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum State {
Active,
Passive,
@ -121,22 +94,10 @@ pub struct App {
client_config: Arc<ClientConfig>,
gui_status: Arc<RwLock<GUIStatus>>,
channel: Channel,
signal: Signal,
signal: Arc<Signal>,
}
impl App {
fn reset_microphone(&mut self) -> Result<(), Error> {
self.signal.reset_microphone()?;
self.gui_status.write().unwrap().microphone = State::Passive;
Ok(())
}
fn reset_speaker(&mut self) -> Result<(), Error> {
self.signal.reset_speaker()?;
self.gui_status.write().unwrap().speaker = State::Passive;
Ok(())
}
fn reset_connection(&mut self) -> Result<(), Error> {
self.signal.reset_connection()?;
self.gui_status.write().unwrap().room = State::Passive;
@ -148,11 +109,29 @@ impl App {
}
pub fn new() -> Self {
let record_sender = Arc::new(broadcast::channel(MICROPHONE_BUFFER_LENGHT).0);
let record_control = mpsc::channel(1);
tokio::spawn(record(record_control.1, record_sender.clone()));
let play_sender = Arc::new(broadcast::channel(SPEAKER_BUFFER_LENGHT).0);
let play_control = mpsc::channel(1);
tokio::spawn(play(play_control.1, play_sender.clone().subscribe()));
App {
client_config: ClientConfig::new().into(),
gui_status: RwLock::new(GUIStatus::default()).into(),
channel: Channel::new(),
signal: Signal::default(),
channel: Channel {
microphone: record_sender,
speaker: play_sender,
},
signal: Signal {
record_control: record_control.0,
play_control: play_control.0,
connection_stop_sender: None.into(),
}
.into(),
}
}
pub fn view(&self) -> Element<'_, Message> {
@ -193,17 +172,18 @@ impl App {
let speaker_sender = self.channel.speaker.clone();
let (connection_stop_sender, connection_stop_receiver) = oneshot::channel();
self.signal.connection_stop_sender = Some(connection_stop_sender);
*self.signal.connection_stop_sender.write().unwrap() = Some(connection_stop_sender);
Task::perform(
async move {
match connect(microphone_receiver, speaker_sender, client_config).await {
Ok(connection_return) => {
Ok(connect_return) => {
tokio::spawn(disconnect_watcher(
connection_stop_receiver,
connection_return,
connect_return,
));
gui_status.write().unwrap().room = State::Active;
Some(Message::UnmuteSpeaker)
Some((gui_status, Message::UnmuteSpeaker))
}
Err(err_val) => {
eprintln!("Error: Connect | {}", err_val);
@ -212,8 +192,14 @@ impl App {
}
}
},
|what_to_do_with_speaker| match what_to_do_with_speaker {
Some(activate) => activate,
|inner_return| match inner_return {
Some((gui_status, speaker_activate_message)) => {
if gui_status.read().unwrap().speaker == State::Passive {
speaker_activate_message
} else {
Message::None
}
}
None => Message::None,
},
)
@ -227,59 +213,87 @@ impl App {
}
Message::UnmuteMicrophone => {
self.gui_status.write().unwrap().microphone = State::Loading;
let microphone_sender = self.channel.microphone.clone();
let microphone_stop_signal = oneshot::channel();
self.signal.microphone_stop_sender = Some(microphone_stop_signal.0);
let is_microphone_started_signal = oneshot::channel();
tokio::spawn(record(
microphone_sender,
is_microphone_started_signal.0,
microphone_stop_signal.1,
));
let gui_status = self.gui_status.clone();
let signal = self.signal.clone();
Task::perform(
async move {
if let Ok(_) = is_microphone_started_signal.1.await {
async move { signal.record_control.send(State::Active).await },
move |result| {
match result {
Ok(_) => {
gui_status.write().unwrap().microphone = State::Active;
}
Err(err_val) => {
eprintln!("Error: Unmute Microphone | {}", err_val);
gui_status.write().unwrap().microphone = State::Passive;
}
}
Message::None
},
|_| Message::None,
)
}
Message::MuteMicrophone => {
self.gui_status.write().unwrap().microphone = State::Loading;
if let Err(err_val) = self.reset_microphone() {
eprintln!("Error: Mute Microphone | {}", err_val);
let gui_status = self.gui_status.clone();
let signal = self.signal.clone();
Task::perform(
async move { signal.record_control.send(State::Passive).await },
move |result| {
match result {
Ok(_) => {
gui_status.write().unwrap().microphone = State::Passive;
}
Task::none()
Err(err_val) => {
eprintln!("Error: Mute Microphone | {}", err_val);
gui_status.write().unwrap().microphone = State::Active;
}
}
Message::None
},
)
}
Message::UnmuteSpeaker => {
self.gui_status.write().unwrap().speaker = State::Loading;
let speaker_receiver = self.channel.speaker.subscribe();
let speaker_stop_signal = oneshot::channel();
self.signal.speaker_stop_sender = Some(speaker_stop_signal.0);
let is_speaker_started_signal = oneshot::channel();
tokio::spawn(play(
speaker_receiver,
is_speaker_started_signal.0,
speaker_stop_signal.1,
));
let gui_status = self.gui_status.clone();
let signal = self.signal.clone();
Task::perform(
async move {
if let Ok(_) = is_speaker_started_signal.1.await {
async move { signal.play_control.send(State::Active).await },
move |result| {
match result {
Ok(_) => {
gui_status.write().unwrap().speaker = State::Active;
}
Err(err_val) => {
eprintln!("Error: Unmute Speaker | {}", err_val);
gui_status.write().unwrap().speaker = State::Passive;
}
}
Message::None
},
|_| Message::None,
)
}
Message::MuteSpeaker => {
self.gui_status.write().unwrap().speaker = State::Loading;
if let Err(err_val) = self.reset_speaker() {
eprintln!("Error: Mute Speaker | {}", err_val);
let gui_status = self.gui_status.clone();
let signal = self.signal.clone();
Task::perform(
async move { signal.play_control.send(State::Passive).await },
move |result| {
match result {
Ok(_) => {
gui_status.write().unwrap().speaker = State::Passive;
}
Task::none()
Err(err_val) => {
eprintln!("Error: Mute Speaker | {}", err_val);
gui_status.write().unwrap().speaker = State::Active;
}
}
Message::None
},
)
}
}
}

View file

@ -70,7 +70,7 @@ pub async fn connect(
pub async fn disconnect_watcher(
connection_stop_receiver: oneshot::Receiver<bool>,
connection_return: ConnectReturn,
connect_return: ConnectReturn,
) {
if let Err(err_val) = connection_stop_receiver.await {
eprintln!(
@ -79,8 +79,8 @@ pub async fn disconnect_watcher(
);
}
connection_return.send_audio_task.abort();
connection_return.receive_audio_task.abort();
connect_return.send_audio_task.abort();
connect_return.receive_audio_task.abort();
}
async fn send_audio_data(

View file

@ -1,22 +1,29 @@
use std::sync::Arc;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use tokio::sync::{broadcast, oneshot};
use protocol::Error;
use tokio::sync::{broadcast, mpsc};
use crate::gui::State;
pub async fn record(
microphone_sender: Arc<broadcast::Sender<f32>>,
is_microphone_started_signal: oneshot::Sender<bool>,
microphone_stop_signal_receiver: oneshot::Receiver<bool>,
) {
mut record_control: mpsc::Receiver<State>,
record_sender: Arc<broadcast::Sender<f32>>,
) -> Result<(), Error> {
let host = cpal::default_host();
let input_device = host.default_input_device().unwrap();
let config = input_device.default_input_config().unwrap().into();
let input_device = host
.default_input_device()
.ok_or(Error::Record("Input Device".to_string()))?;
let config = input_device
.default_input_config()
.map_err(|inner| Error::Record(inner.to_string()))?
.into();
println!("Recorder Stream Config = {:#?}", config);
let input = move |data: &[f32], _: &cpal::InputCallbackInfo| {
for &sample in data {
if microphone_sender.receiver_count() > 0 && sample != 0.0 {
if let Err(err_val) = microphone_sender.send(sample) {
if record_sender.receiver_count() > 0 && sample != 0.0 {
if let Err(err_val) = record_sender.send(sample) {
eprintln!("Error: Microphone Send | {}", err_val);
}
}
@ -25,30 +32,39 @@ pub async fn record(
let input_stream = input_device
.build_input_stream(&config, input, voice_error, None)
.unwrap();
input_stream.play().unwrap();
println!("Recording Started");
if let Err(_) = is_microphone_started_signal.send(true) {
eprintln!("Error: Is Microphone Started | Send");
}
.map_err(|inner| Error::Record(inner.to_string()))?;
input_stream
.pause()
.map_err(|inner| Error::Record(inner.to_string()))?;
tokio::task::block_in_place(|| {
if let Err(err_val) = microphone_stop_signal_receiver.blocking_recv() {
eprintln!(
"Error: Microphone Stop Signal | Local Channel | {}",
err_val
);
loop {
match record_control.blocking_recv() {
Some(message) => match message {
State::Active => input_stream
.play()
.map_err(|inner| Error::Record(inner.to_string()))?,
State::Passive => input_stream
.pause()
.map_err(|inner| Error::Record(inner.to_string()))?,
State::Loading => {}
},
None => {
input_stream
.pause()
.map_err(|inner| Error::Record(inner.to_string()))?;
return Ok(());
}
});
input_stream.pause().unwrap();
}
}
})
}
pub async fn play(
mut speaker_receiver: broadcast::Receiver<f32>,
is_speaker_started_signal: oneshot::Sender<bool>,
play_audio_stop_signal_receiver: oneshot::Receiver<bool>,
) {
mut play_control: mpsc::Receiver<State>,
mut play_receiver: broadcast::Receiver<f32>,
) -> Result<(), Error> {
let host = cpal::default_host();
let output_device = host.default_output_device().unwrap();
let config = output_device.default_output_config().unwrap().into();
@ -56,8 +72,8 @@ pub async fn play(
let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
for sample in data {
if speaker_receiver.len() > 0 {
match speaker_receiver.blocking_recv() {
if play_receiver.len() > 0 {
match play_receiver.blocking_recv() {
Ok(received_sample) => *sample = received_sample,
Err(err_val) => match err_val {
broadcast::error::RecvError::Closed => {
@ -69,7 +85,7 @@ pub async fn play(
"Error: Speaker Receive | Local Channel | Lagging by -> {}",
lag_amount
);
speaker_receiver = speaker_receiver.resubscribe();
play_receiver = play_receiver.resubscribe();
}
},
}
@ -83,23 +99,31 @@ pub async fn play(
.build_output_stream(&config, output, voice_error, None)
.unwrap();
output_stream.play().unwrap();
println!("Playing Started");
if let Err(_) = is_speaker_started_signal.send(true) {
eprintln!("Error: Is Microphone Started | Send");
}
output_stream
.play()
.map_err(|inner| Error::Play(inner.to_string()))?;
tokio::task::block_in_place(|| {
if let Err(err_val) = play_audio_stop_signal_receiver.blocking_recv() {
eprintln!(
"Error: Speaker Stop Signal Receive | Local Channel | {}",
err_val
);
loop {
match play_control.blocking_recv() {
Some(message) => match message {
State::Active => output_stream
.play()
.map_err(|inner| Error::Play(inner.to_string()))?,
State::Passive => output_stream
.pause()
.map_err(|inner| Error::Play(inner.to_string()))?,
State::Loading => {}
},
None => {
output_stream
.pause()
.map_err(|inner| Error::Play(inner.to_string()))?;
return Ok(());
}
});
output_stream.pause().unwrap();
}
}
})
}
fn voice_error(err_val: cpal::StreamError) {

View file

@ -14,6 +14,8 @@ pub enum Error {
Send(String),
Receive(String),
Signal(String),
Record(String),
Play(String),
}
impl std::error::Error for Error {
@ -30,6 +32,8 @@ impl Display for Error {
Error::Send(inner) => write!(f, "Send | {}", inner),
Error::Receive(inner) => write!(f, "Receive | {}", inner),
Error::Signal(inner) => write!(f, "Signal | {}", inner),
Error::Record(inner) => write!(f, "Record | {}", inner),
Error::Play(inner) => write!(f, "Play | {}", inner),
}
}
}