diff --git a/client/src/gui.rs b/client/src/gui.rs index 7e7d9d3..5829c59 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -8,7 +8,7 @@ use iced::{ use protocol::Error; use tokio::sync::{ broadcast::{self}, - oneshot, + mpsc, oneshot, }; use crate::{ @@ -17,53 +17,26 @@ use crate::{ voice::{play, record}, }; -#[derive(Debug, Default)] +#[derive(Debug)] struct Signal { - microphone_stop_sender: Option>, - speaker_stop_sender: Option>, - connection_stop_sender: Option>, + record_control: mpsc::Sender, + play_control: mpsc::Sender, + connection_stop_sender: RwLock>>, } impl Signal { - fn reset_microphone(&mut self) -> Result<(), Error> { - if let Some(microphone_signal) = &self.microphone_stop_sender { - if !microphone_signal.is_closed() { - self.microphone_stop_sender - .take() - .expect("Never") - .send(true) - .unwrap(); - self.microphone_stop_sender = None; - return Ok(()); - } - } - Err(Error::Signal("Reset".to_string())) - } - - fn reset_speaker(&mut self) -> Result<(), Error> { - if let Some(speaker_signal) = &self.speaker_stop_sender { - if !speaker_signal.is_closed() { - self.speaker_stop_sender - .take() - .expect("Never") - .send(true) - .unwrap(); - self.speaker_stop_sender = None; - return Ok(()); - } - } - Err(Error::Signal("Reset".to_string())) - } - - fn reset_connection(&mut self) -> Result<(), Error> { - if let Some(connection_signal) = &self.connection_stop_sender { + fn reset_connection(&self) -> Result<(), Error> { + if let Some(connection_signal) = self.connection_stop_sender.read().unwrap().as_ref() { if !connection_signal.is_closed() { - self.connection_stop_sender + let mut connection_stop_sender = self.connection_stop_sender.write().unwrap(); + connection_stop_sender .take() .expect("Never") .send(true) .unwrap(); - self.connection_stop_sender = None; + + *connection_stop_sender = None; + drop(connection_stop_sender); return Ok(()); } } @@ -77,23 +50,23 @@ struct Channel { speaker: Arc>, } -impl Channel { - fn new() -> Self { - Self { - microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT).0.into(), - speaker: broadcast::channel(SPEAKER_BUFFER_LENGHT).0.into(), - } - } -} - -#[derive(Debug, Default, Clone, Copy)] +#[derive(Debug, Clone, Copy)] struct GUIStatus { room: State, microphone: State, speaker: State, } +impl Default for GUIStatus { + fn default() -> Self { + Self { + room: Default::default(), + microphone: Default::default(), + speaker: State::Active, + } + } +} -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum State { Active, Passive, @@ -121,22 +94,10 @@ pub struct App { client_config: Arc, gui_status: Arc>, channel: Channel, - signal: Signal, + signal: Arc, } impl App { - fn reset_microphone(&mut self) -> Result<(), Error> { - self.signal.reset_microphone()?; - self.gui_status.write().unwrap().microphone = State::Passive; - Ok(()) - } - - fn reset_speaker(&mut self) -> Result<(), Error> { - self.signal.reset_speaker()?; - self.gui_status.write().unwrap().speaker = State::Passive; - Ok(()) - } - fn reset_connection(&mut self) -> Result<(), Error> { self.signal.reset_connection()?; self.gui_status.write().unwrap().room = State::Passive; @@ -148,11 +109,29 @@ impl App { } pub fn new() -> Self { + let record_sender = Arc::new(broadcast::channel(MICROPHONE_BUFFER_LENGHT).0); + + let record_control = mpsc::channel(1); + tokio::spawn(record(record_control.1, record_sender.clone())); + + let play_sender = Arc::new(broadcast::channel(SPEAKER_BUFFER_LENGHT).0); + + let play_control = mpsc::channel(1); + tokio::spawn(play(play_control.1, play_sender.clone().subscribe())); + App { client_config: ClientConfig::new().into(), gui_status: RwLock::new(GUIStatus::default()).into(), - channel: Channel::new(), - signal: Signal::default(), + channel: Channel { + microphone: record_sender, + speaker: play_sender, + }, + signal: Signal { + record_control: record_control.0, + play_control: play_control.0, + connection_stop_sender: None.into(), + } + .into(), } } pub fn view(&self) -> Element<'_, Message> { @@ -193,17 +172,18 @@ impl App { let speaker_sender = self.channel.speaker.clone(); let (connection_stop_sender, connection_stop_receiver) = oneshot::channel(); - self.signal.connection_stop_sender = Some(connection_stop_sender); + *self.signal.connection_stop_sender.write().unwrap() = Some(connection_stop_sender); + Task::perform( async move { match connect(microphone_receiver, speaker_sender, client_config).await { - Ok(connection_return) => { + Ok(connect_return) => { tokio::spawn(disconnect_watcher( connection_stop_receiver, - connection_return, + connect_return, )); gui_status.write().unwrap().room = State::Active; - Some(Message::UnmuteSpeaker) + Some((gui_status, Message::UnmuteSpeaker)) } Err(err_val) => { eprintln!("Error: Connect | {}", err_val); @@ -212,8 +192,14 @@ impl App { } } }, - |what_to_do_with_speaker| match what_to_do_with_speaker { - Some(activate) => activate, + |inner_return| match inner_return { + Some((gui_status, speaker_activate_message)) => { + if gui_status.read().unwrap().speaker == State::Passive { + speaker_activate_message + } else { + Message::None + } + } None => Message::None, }, ) @@ -227,59 +213,87 @@ impl App { } Message::UnmuteMicrophone => { self.gui_status.write().unwrap().microphone = State::Loading; - let microphone_sender = self.channel.microphone.clone(); - let microphone_stop_signal = oneshot::channel(); - self.signal.microphone_stop_sender = Some(microphone_stop_signal.0); - let is_microphone_started_signal = oneshot::channel(); - tokio::spawn(record( - microphone_sender, - is_microphone_started_signal.0, - microphone_stop_signal.1, - )); + let gui_status = self.gui_status.clone(); + let signal = self.signal.clone(); Task::perform( - async move { - if let Ok(_) = is_microphone_started_signal.1.await { - gui_status.write().unwrap().microphone = State::Active; + async move { signal.record_control.send(State::Active).await }, + move |result| { + match result { + Ok(_) => { + gui_status.write().unwrap().microphone = State::Active; + } + Err(err_val) => { + eprintln!("Error: Unmute Microphone | {}", err_val); + gui_status.write().unwrap().microphone = State::Passive; + } } + Message::None }, - |_| Message::None, ) } Message::MuteMicrophone => { self.gui_status.write().unwrap().microphone = State::Loading; - if let Err(err_val) = self.reset_microphone() { - eprintln!("Error: Mute Microphone | {}", err_val); - } - Task::none() + + let gui_status = self.gui_status.clone(); + let signal = self.signal.clone(); + Task::perform( + async move { signal.record_control.send(State::Passive).await }, + move |result| { + match result { + Ok(_) => { + gui_status.write().unwrap().microphone = State::Passive; + } + Err(err_val) => { + eprintln!("Error: Mute Microphone | {}", err_val); + gui_status.write().unwrap().microphone = State::Active; + } + } + Message::None + }, + ) } Message::UnmuteSpeaker => { self.gui_status.write().unwrap().speaker = State::Loading; - let speaker_receiver = self.channel.speaker.subscribe(); - let speaker_stop_signal = oneshot::channel(); - self.signal.speaker_stop_sender = Some(speaker_stop_signal.0); - let is_speaker_started_signal = oneshot::channel(); - tokio::spawn(play( - speaker_receiver, - is_speaker_started_signal.0, - speaker_stop_signal.1, - )); + let gui_status = self.gui_status.clone(); + let signal = self.signal.clone(); Task::perform( - async move { - if let Ok(_) = is_speaker_started_signal.1.await { - gui_status.write().unwrap().speaker = State::Active; + async move { signal.play_control.send(State::Active).await }, + move |result| { + match result { + Ok(_) => { + gui_status.write().unwrap().speaker = State::Active; + } + Err(err_val) => { + eprintln!("Error: Unmute Speaker | {}", err_val); + gui_status.write().unwrap().speaker = State::Passive; + } } + Message::None }, - |_| Message::None, ) } Message::MuteSpeaker => { self.gui_status.write().unwrap().speaker = State::Loading; - if let Err(err_val) = self.reset_speaker() { - eprintln!("Error: Mute Speaker | {}", err_val); - } - Task::none() + + let gui_status = self.gui_status.clone(); + let signal = self.signal.clone(); + Task::perform( + async move { signal.play_control.send(State::Passive).await }, + move |result| { + match result { + Ok(_) => { + gui_status.write().unwrap().speaker = State::Passive; + } + Err(err_val) => { + eprintln!("Error: Mute Speaker | {}", err_val); + gui_status.write().unwrap().speaker = State::Active; + } + } + Message::None + }, + ) } } } diff --git a/client/src/stream.rs b/client/src/stream.rs index 7d041cc..5e70696 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -70,7 +70,7 @@ pub async fn connect( pub async fn disconnect_watcher( connection_stop_receiver: oneshot::Receiver, - connection_return: ConnectReturn, + connect_return: ConnectReturn, ) { if let Err(err_val) = connection_stop_receiver.await { eprintln!( @@ -79,8 +79,8 @@ pub async fn disconnect_watcher( ); } - connection_return.send_audio_task.abort(); - connection_return.receive_audio_task.abort(); + connect_return.send_audio_task.abort(); + connect_return.receive_audio_task.abort(); } async fn send_audio_data( diff --git a/client/src/voice.rs b/client/src/voice.rs index a705683..cc62ade 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -1,22 +1,29 @@ use std::sync::Arc; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use tokio::sync::{broadcast, oneshot}; +use protocol::Error; +use tokio::sync::{broadcast, mpsc}; + +use crate::gui::State; pub async fn record( - microphone_sender: Arc>, - is_microphone_started_signal: oneshot::Sender, - microphone_stop_signal_receiver: oneshot::Receiver, -) { + mut record_control: mpsc::Receiver, + record_sender: Arc>, +) -> Result<(), Error> { let host = cpal::default_host(); - let input_device = host.default_input_device().unwrap(); - let config = input_device.default_input_config().unwrap().into(); + let input_device = host + .default_input_device() + .ok_or(Error::Record("Input Device".to_string()))?; + let config = input_device + .default_input_config() + .map_err(|inner| Error::Record(inner.to_string()))? + .into(); println!("Recorder Stream Config = {:#?}", config); let input = move |data: &[f32], _: &cpal::InputCallbackInfo| { for &sample in data { - if microphone_sender.receiver_count() > 0 && sample != 0.0 { - if let Err(err_val) = microphone_sender.send(sample) { + if record_sender.receiver_count() > 0 && sample != 0.0 { + if let Err(err_val) = record_sender.send(sample) { eprintln!("Error: Microphone Send | {}", err_val); } } @@ -25,30 +32,39 @@ pub async fn record( let input_stream = input_device .build_input_stream(&config, input, voice_error, None) - .unwrap(); - input_stream.play().unwrap(); - println!("Recording Started"); - if let Err(_) = is_microphone_started_signal.send(true) { - eprintln!("Error: Is Microphone Started | Send"); - } + .map_err(|inner| Error::Record(inner.to_string()))?; + + input_stream + .pause() + .map_err(|inner| Error::Record(inner.to_string()))?; tokio::task::block_in_place(|| { - if let Err(err_val) = microphone_stop_signal_receiver.blocking_recv() { - eprintln!( - "Error: Microphone Stop Signal | Local Channel | {}", - err_val - ); + loop { + match record_control.blocking_recv() { + Some(message) => match message { + State::Active => input_stream + .play() + .map_err(|inner| Error::Record(inner.to_string()))?, + State::Passive => input_stream + .pause() + .map_err(|inner| Error::Record(inner.to_string()))?, + State::Loading => {} + }, + None => { + input_stream + .pause() + .map_err(|inner| Error::Record(inner.to_string()))?; + return Ok(()); + } + } } - }); - - input_stream.pause().unwrap(); + }) } pub async fn play( - mut speaker_receiver: broadcast::Receiver, - is_speaker_started_signal: oneshot::Sender, - play_audio_stop_signal_receiver: oneshot::Receiver, -) { + mut play_control: mpsc::Receiver, + mut play_receiver: broadcast::Receiver, +) -> Result<(), Error> { let host = cpal::default_host(); let output_device = host.default_output_device().unwrap(); let config = output_device.default_output_config().unwrap().into(); @@ -56,8 +72,8 @@ pub async fn play( let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { for sample in data { - if speaker_receiver.len() > 0 { - match speaker_receiver.blocking_recv() { + if play_receiver.len() > 0 { + match play_receiver.blocking_recv() { Ok(received_sample) => *sample = received_sample, Err(err_val) => match err_val { broadcast::error::RecvError::Closed => { @@ -69,7 +85,7 @@ pub async fn play( "Error: Speaker Receive | Local Channel | Lagging by -> {}", lag_amount ); - speaker_receiver = speaker_receiver.resubscribe(); + play_receiver = play_receiver.resubscribe(); } }, } @@ -83,23 +99,31 @@ pub async fn play( .build_output_stream(&config, output, voice_error, None) .unwrap(); - output_stream.play().unwrap(); - println!("Playing Started"); - - if let Err(_) = is_speaker_started_signal.send(true) { - eprintln!("Error: Is Microphone Started | Send"); - } + output_stream + .play() + .map_err(|inner| Error::Play(inner.to_string()))?; tokio::task::block_in_place(|| { - if let Err(err_val) = play_audio_stop_signal_receiver.blocking_recv() { - eprintln!( - "Error: Speaker Stop Signal Receive | Local Channel | {}", - err_val - ); + loop { + match play_control.blocking_recv() { + Some(message) => match message { + State::Active => output_stream + .play() + .map_err(|inner| Error::Play(inner.to_string()))?, + State::Passive => output_stream + .pause() + .map_err(|inner| Error::Play(inner.to_string()))?, + State::Loading => {} + }, + None => { + output_stream + .pause() + .map_err(|inner| Error::Play(inner.to_string()))?; + return Ok(()); + } + } } - }); - - output_stream.pause().unwrap(); + }) } fn voice_error(err_val: cpal::StreamError) { diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 6c493b0..f55d996 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -14,6 +14,8 @@ pub enum Error { Send(String), Receive(String), Signal(String), + Record(String), + Play(String), } impl std::error::Error for Error { @@ -30,6 +32,8 @@ impl Display for Error { Error::Send(inner) => write!(f, "Send | {}", inner), Error::Receive(inner) => write!(f, "Receive | {}", inner), Error::Signal(inner) => write!(f, "Signal | {}", inner), + Error::Record(inner) => write!(f, "Record | {}", inner), + Error::Play(inner) => write!(f, "Play | {}", inner), } } }