diff --git a/client/Cargo.toml b/client/Cargo.toml index bfc9445..d79cf2d 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -10,5 +10,3 @@ tokio = { workspace = true } s2n-quic = { workspace = true } cpal = "0.16.0" iced = { features = ["tokio"], git = "https://github.com/iced-rs/iced", rev = "d39022432c778a8cda455f40b9c12245db86ce45" } -fixed-resample = "0.8.0" -rodio = {git = "https://github.com/RustAudio/rodio", rev = "071db6df2adfccfe1032be43dd87e5681e34292c"} diff --git a/client/src/gui.rs b/client/src/gui.rs index 9e97b07..62f5ee8 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -168,7 +168,8 @@ impl App { let speaker_sender = self.channel.speaker.clone(); let (connection_stop_sender, connection_stop_receiver) = oneshot::channel(); - *self.controller.connection_stop_sender.write().unwrap() = Some(connection_stop_sender); + *self.controller.connection_stop_sender.write().unwrap() = + Some(connection_stop_sender); Task::perform( async move { diff --git a/client/src/stream.rs b/client/src/stream.rs index 0344dc0..705cd08 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -104,7 +104,13 @@ async fn send_signals( } Err(err_val) => { eprintln!("Error: Receive Audio Datum | Local | {}", err_val); - return; + match err_val { + broadcast::error::RecvError::Closed => return, + broadcast::error::RecvError::Lagged(lag_amount) => { + println!("Warn: Latency Fix Agent | Local | Signal | {}", lag_amount); + microphone_receiver = microphone_receiver.resubscribe(); + } + } } } } diff --git a/client/src/voice.rs b/client/src/voice.rs index eba48f7..8f17c53 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -1,27 +1,32 @@ -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; +use std::sync::{Arc, LazyLock, RwLock, atomic::AtomicBool}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use protocol::{ Error, - protocol::{DEFAULT_SAMPLE_RATE, Signal, SignalType}, + protocol::{AUDIO_DATA_SENDER_CHANNEL_LENGTH, Signal, SignalType}, }; -use rodio::{OutputStream, OutputStreamBuilder, Sink, buffer::SamplesBuffer, mixer::Mixer}; use tokio::sync::{broadcast, mpsc}; use crate::gui::State; +const LATENCY_THRESHOLD: usize = 1024 * 8; + struct SpeakerSink { speaker_id: u8, - sink: Sink, + audio_data_sender: broadcast::Sender, + audio_data_receiver: RwLock>, } impl SpeakerSink { - fn new(speaker_id: u8, mixer: &Mixer) -> Self { - let sink = Sink::connect_new(mixer); - Self { speaker_id, sink } + fn new(speaker_id: u8) -> Self { + let (audio_data_sender, audio_data_receiver) = + broadcast::channel(AUDIO_DATA_SENDER_CHANNEL_LENGTH); + let audio_data_receiver = audio_data_receiver.into(); + Self { + speaker_id, + audio_data_sender, + audio_data_receiver, + } } } @@ -80,45 +85,53 @@ pub async fn record( }) } +static ONLINE_SPEAKERS: LazyLock>> = LazyLock::new(|| vec![].into()); + async fn signal_handler( - output_stream: OutputStream, mut play_receiver: broadcast::Receiver, play_pause: Arc, ) { - let mut speaker_list = vec![]; - while let Ok(signal) = play_receiver.recv().await { match signal.get_signal_type() { SignalType::AudioDatum => { - let data = if play_pause.load(std::sync::atomic::Ordering::Relaxed) { - [signal.get_audio_datum()] - } else { - [0.0] - }; + let data = signal.get_audio_datum(); - let source = SamplesBuffer::new(2, DEFAULT_SAMPLE_RATE, data); + let speaker_list_search_result = ONLINE_SPEAKERS.read().unwrap().binary_search_by( + |speaker_sink: &SpeakerSink| { + speaker_sink.speaker_id.cmp(&signal.get_speaker_id()) + }, + ); - match speaker_list.binary_search_by(|speaker_sink: &SpeakerSink| { - speaker_sink.speaker_id.cmp(&signal.get_speaker_id()) - }) { + match speaker_list_search_result { Ok(speaker_sink_index) => { - let speaker_sink = speaker_list.get(speaker_sink_index).expect("Never"); + let online_speakers = ONLINE_SPEAKERS.read().unwrap(); + let speaker_sink = online_speakers.get(speaker_sink_index).expect("Never"); - speaker_sink.sink.append(source); + if play_pause.load(std::sync::atomic::Ordering::Relaxed) { + if let Err(err_val) = speaker_sink.audio_data_sender.send(data) { + eprintln!("Error: Send Audio Datum | Local | {}", err_val); + } + } } Err(_) => { - let speaker_sink = - SpeakerSink::new(signal.get_speaker_id(), output_stream.mixer()); + let speaker_sink = SpeakerSink::new(signal.get_speaker_id()); - speaker_sink.sink.append(source); + if play_pause.load(std::sync::atomic::Ordering::Relaxed) { + if let Err(err_val) = speaker_sink.audio_data_sender.send(data) { + eprintln!("Error: Send Audio Datum | Local | {}", err_val); + } + } - speaker_list.push(speaker_sink); - speaker_list.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id)); + let mut online_speakers = ONLINE_SPEAKERS.write().unwrap(); + online_speakers.push(speaker_sink); + online_speakers.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id)); } } } SignalType::SpeakerLeft => { - speaker_list + ONLINE_SPEAKERS + .write() + .unwrap() .retain(|speaker_sink| speaker_sink.speaker_id != signal.get_speaker_id()); } } @@ -128,37 +141,89 @@ async fn signal_handler( pub async fn play( mut play_control: mpsc::Receiver, play_receiver: broadcast::Receiver, -) { - let output_stream = OutputStreamBuilder::open_default_stream().unwrap(); +) -> Result<(), Error> { + let host = cpal::default_host(); + let output_device = host.default_output_device().unwrap(); + let config = output_device.default_output_config().unwrap().into(); + println!("Speaker Stream Config = {:#?}", config); + + let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + for sample in data { + match ONLINE_SPEAKERS.try_read() { + Ok(online_speakers) => { + *sample = online_speakers + .iter() + .map( + |speaker_sink| match speaker_sink.audio_data_receiver.try_write() { + Ok(mut speaker_sink_audio_receiver) => { + if speaker_sink_audio_receiver.len() > LATENCY_THRESHOLD { + println!( + "Warn: Latency Fix Agent | Local | Audio | {}", + speaker_sink_audio_receiver.len() + ); + *speaker_sink_audio_receiver = + speaker_sink_audio_receiver.resubscribe(); + } + match speaker_sink_audio_receiver.try_recv() { + Ok(audio_datum) => audio_datum, + Err(_) => 0.0, + } + } + Err(_) => 0.0, + }, + ) + .sum::() + .clamp(-1.0, 1.0); + } + Err(_) => *sample = 0.0, + } + } + }; + let play_pause = Arc::new(AtomicBool::new(true)); - let signal_handler = tokio::spawn(signal_handler( - output_stream, - play_receiver, - play_pause.clone(), - )); + let signal_handler = tokio::spawn(signal_handler(play_receiver, play_pause.clone())); + + let output_stream = output_device + .build_output_stream(&config, output, voice_error, None) + .unwrap(); + + output_stream + .play() + .map_err(|inner| Error::Play(inner.to_string()))?; tokio::task::block_in_place(|| { loop { match play_control.blocking_recv() { - Some(requested_state) => match requested_state { + Some(message) => match message { State::Active => { - play_pause.store(true, Ordering::Relaxed); + play_pause.store(true, std::sync::atomic::Ordering::Relaxed); + // output_stream + // .play() + // .map_err(|inner| Error::Play(inner.to_string()))? } State::Passive => { - play_pause.store(false, Ordering::Relaxed); + play_pause.store(false, std::sync::atomic::Ordering::Relaxed); + // I'm not pausing at hardware level to prevent some queue and permission + // problems at the first hand + + // output_stream + // .pause() + // .map_err(|inner| Error::Play(inner.to_string()))? } State::Loading => {} }, None => { + output_stream + .pause() + .map_err(|inner| Error::Play(inner.to_string()))?; + signal_handler.abort(); - return; + return Ok(()); } } } - }); - - signal_handler.abort(); + }) } fn voice_error(err_val: cpal::StreamError) {