From 8cad7658eae95cbd288921f78f04cb153a4f20c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Sat, 14 Jun 2025 17:14:35 +0300 Subject: [PATCH] fix: :zap: latency, underrun, overflow --- Cargo.lock | 231 ------------------------------------------- client/Cargo.toml | 2 - client/src/gui.rs | 3 +- client/src/stream.rs | 12 ++- client/src/voice.rs | 153 +++++++++++++++++++--------- 5 files changed, 122 insertions(+), 279 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a5d932..cbd7960 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -657,22 +657,14 @@ dependencies = [ "libloading", ] -[[package]] -name = "claxon" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bfbf56724aa9eca8afa4fcfadeb479e722935bb2a0900c2d37e0cc477af0688" - [[package]] name = "client" version = "0.1.0" dependencies = [ "chrono", "cpal", - "fixed-resample", "iced", "protocol", - "rodio", "s2n-quic", "tokio", ] @@ -1074,15 +1066,6 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "encoding_rs" -version = "0.8.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" -dependencies = [ - "cfg-if", -] - [[package]] name = "endi" version = "1.1.0" @@ -1178,12 +1161,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "fast-interleave" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a7e05e2b3c97d4516fa5c177133f3e4decf9c8318841e1545b260535311e3a5" - [[package]] name = "fastrand" version = "2.3.0" @@ -1199,18 +1176,6 @@ dependencies = [ "simd-adler32", ] -[[package]] -name = "fixed-resample" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39a267bc40fae9b208e2a67a99a8d794caf3a9fe1146d01c147f59bd96257a74" -dependencies = [ - "arrayvec", - "fast-interleave", - "ringbuf", - "rubato", -] - [[package]] name = "flate2" version = "1.1.1" @@ -1649,12 +1614,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "hound" -version = "3.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62adaabb884c94955b19907d60019f4e145d091c75345379e70d1ee696f7854f" - [[package]] name = "iana-time-zone" version = "0.1.63" @@ -2079,17 +2038,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "lewton" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "777b48df9aaab155475a83a7df3070395ea1ac6902f5cd062b8f2b028075c030" -dependencies = [ - "byteorder", - "ogg", - "tinyvec", -] - [[package]] name = "libc" version = "0.2.172" @@ -2355,25 +2303,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "num-bigint" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" -dependencies = [ - "num-integer", - "num-traits", -] - -[[package]] -name = "num-complex" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" -dependencies = [ - "num-traits", -] - [[package]] name = "num-derive" version = "0.4.2" @@ -2400,7 +2329,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" dependencies = [ - "num-bigint", "num-integer", "num-traits", ] @@ -2733,15 +2661,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "ogg" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6951b4e8bf21c8193da321bcce9c9dd2e13c858fe078bf9054a288b419ae5d6e" -dependencies = [ - "byteorder", -] - [[package]] name = "once_cell" version = "1.21.3" @@ -2903,21 +2822,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "portable-atomic" -version = "1.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" - -[[package]] -name = "portable-atomic-util" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" -dependencies = [ - "portable-atomic", -] - [[package]] name = "potential_utf" version = "0.1.2" @@ -2952,15 +2856,6 @@ dependencies = [ "syn", ] -[[package]] -name = "primal-check" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc0d895b311e3af9902528fbb8f928688abbd95872819320517cc24ca6b2bd08" -dependencies = [ - "num-integer", -] - [[package]] name = "proc-macro-crate" version = "3.3.0" @@ -3146,15 +3041,6 @@ dependencies = [ "font-types", ] -[[package]] -name = "realfft" -version = "3.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "390252372b7f2aac8360fc5e72eba10136b166d6faeed97e6d0c8324eb99b2b1" -dependencies = [ - "rustfft", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -3222,49 +3108,12 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "ringbuf" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe47b720588c8702e34b5979cb3271a8b1842c7cb6f57408efa70c779363488c" -dependencies = [ - "crossbeam-utils", - "portable-atomic", - "portable-atomic-util", -] - -[[package]] -name = "rodio" -version = "0.20.1" -source = "git+https://github.com/RustAudio/rodio?rev=071db6df2adfccfe1032be43dd87e5681e34292c#071db6df2adfccfe1032be43dd87e5681e34292c" -dependencies = [ - "claxon", - "cpal", - "dasp_sample", - "hound", - "lewton", - "num-rational", - "symphonia", -] - [[package]] name = "roxmltree" version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97" -[[package]] -name = "rubato" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5258099699851cfd0082aeb645feb9c084d9a5e1f1b8d5372086b989fc5e56a1" -dependencies = [ - "num-complex", - "num-integer", - "num-traits", - "realfft", -] - [[package]] name = "rustc-demangle" version = "0.1.24" @@ -3283,21 +3132,6 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" -[[package]] -name = "rustfft" -version = "6.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f266ff9b0cfc79de11fd5af76a2bc672fe3ace10c96fa06456740fa70cb1ed49" -dependencies = [ - "num-complex", - "num-integer", - "num-traits", - "primal-check", - "strength_reduce", - "transpose", - "version_check", -] - [[package]] name = "rustix" version = "0.38.44" @@ -3808,12 +3642,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "strength_reduce" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe895eb47f22e2ddd4dabc02bce419d2e643c8e3b585c78158b349195bc24d82" - [[package]] name = "strict-num" version = "0.1.1" @@ -3865,55 +3693,6 @@ dependencies = [ "zeno", ] -[[package]] -name = "symphonia" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "815c942ae7ee74737bb00f965fa5b5a2ac2ce7b6c01c0cc169bbeaf7abd5f5a9" -dependencies = [ - "lazy_static", - "symphonia-bundle-mp3", - "symphonia-core", - "symphonia-metadata", -] - -[[package]] -name = "symphonia-bundle-mp3" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c01c2aae70f0f1fb096b6f0ff112a930b1fb3626178fba3ae68b09dce71706d4" -dependencies = [ - "lazy_static", - "log", - "symphonia-core", - "symphonia-metadata", -] - -[[package]] -name = "symphonia-core" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "798306779e3dc7d5231bd5691f5a813496dc79d3f56bf82e25789f2094e022c3" -dependencies = [ - "arrayvec", - "bitflags 1.3.2", - "bytemuck", - "lazy_static", - "log", -] - -[[package]] -name = "symphonia-metadata" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc622b9841a10089c5b18e99eb904f4341615d5aa55bbf4eedde1be721a4023c" -dependencies = [ - "encoding_rs", - "lazy_static", - "log", - "symphonia-core", -] - [[package]] name = "syn" version = "2.0.101" @@ -4148,16 +3927,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "transpose" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad61aed86bc3faea4300c7aee358b4c6d0c8d6ccc36524c96e4c92ccf26e77e" -dependencies = [ - "num-integer", - "strength_reduce", -] - [[package]] name = "ttf-parser" version = "0.20.0" diff --git a/client/Cargo.toml b/client/Cargo.toml index bfc9445..d79cf2d 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -10,5 +10,3 @@ tokio = { workspace = true } s2n-quic = { workspace = true } cpal = "0.16.0" iced = { features = ["tokio"], git = "https://github.com/iced-rs/iced", rev = "d39022432c778a8cda455f40b9c12245db86ce45" } -fixed-resample = "0.8.0" -rodio = {git = "https://github.com/RustAudio/rodio", rev = "071db6df2adfccfe1032be43dd87e5681e34292c"} diff --git a/client/src/gui.rs b/client/src/gui.rs index 9e97b07..62f5ee8 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -168,7 +168,8 @@ impl App { let speaker_sender = self.channel.speaker.clone(); let (connection_stop_sender, connection_stop_receiver) = oneshot::channel(); - *self.controller.connection_stop_sender.write().unwrap() = Some(connection_stop_sender); + *self.controller.connection_stop_sender.write().unwrap() = + Some(connection_stop_sender); Task::perform( async move { diff --git a/client/src/stream.rs b/client/src/stream.rs index 0344dc0..df4b6b8 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -90,6 +90,7 @@ pub async fn disconnect_watcher( println!("Disconnected"); } +#[allow(unused_assignment)] async fn send_signals( mut send_stream: SendStream, mut microphone_receiver: broadcast::Receiver, @@ -104,7 +105,16 @@ async fn send_signals( } Err(err_val) => { eprintln!("Error: Receive Audio Datum | Local | {}", err_val); - return; + match err_val { + broadcast::error::RecvError::Closed => return, + broadcast::error::RecvError::Lagged(lag_amount) => { + println!("Warn: Latency Fix Agent | Local | Signal | {}", lag_amount); + microphone_receiver = microphone_receiver.resubscribe(); + todo!( + "This might be problematic, when we deleted the queue maybe user left and another user joined with same id ?" + ); + } + } } } } diff --git a/client/src/voice.rs b/client/src/voice.rs index eba48f7..8f17c53 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -1,27 +1,32 @@ -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; +use std::sync::{Arc, LazyLock, RwLock, atomic::AtomicBool}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use protocol::{ Error, - protocol::{DEFAULT_SAMPLE_RATE, Signal, SignalType}, + protocol::{AUDIO_DATA_SENDER_CHANNEL_LENGTH, Signal, SignalType}, }; -use rodio::{OutputStream, OutputStreamBuilder, Sink, buffer::SamplesBuffer, mixer::Mixer}; use tokio::sync::{broadcast, mpsc}; use crate::gui::State; +const LATENCY_THRESHOLD: usize = 1024 * 8; + struct SpeakerSink { speaker_id: u8, - sink: Sink, + audio_data_sender: broadcast::Sender, + audio_data_receiver: RwLock>, } impl SpeakerSink { - fn new(speaker_id: u8, mixer: &Mixer) -> Self { - let sink = Sink::connect_new(mixer); - Self { speaker_id, sink } + fn new(speaker_id: u8) -> Self { + let (audio_data_sender, audio_data_receiver) = + broadcast::channel(AUDIO_DATA_SENDER_CHANNEL_LENGTH); + let audio_data_receiver = audio_data_receiver.into(); + Self { + speaker_id, + audio_data_sender, + audio_data_receiver, + } } } @@ -80,45 +85,53 @@ pub async fn record( }) } +static ONLINE_SPEAKERS: LazyLock>> = LazyLock::new(|| vec![].into()); + async fn signal_handler( - output_stream: OutputStream, mut play_receiver: broadcast::Receiver, play_pause: Arc, ) { - let mut speaker_list = vec![]; - while let Ok(signal) = play_receiver.recv().await { match signal.get_signal_type() { SignalType::AudioDatum => { - let data = if play_pause.load(std::sync::atomic::Ordering::Relaxed) { - [signal.get_audio_datum()] - } else { - [0.0] - }; + let data = signal.get_audio_datum(); - let source = SamplesBuffer::new(2, DEFAULT_SAMPLE_RATE, data); + let speaker_list_search_result = ONLINE_SPEAKERS.read().unwrap().binary_search_by( + |speaker_sink: &SpeakerSink| { + speaker_sink.speaker_id.cmp(&signal.get_speaker_id()) + }, + ); - match speaker_list.binary_search_by(|speaker_sink: &SpeakerSink| { - speaker_sink.speaker_id.cmp(&signal.get_speaker_id()) - }) { + match speaker_list_search_result { Ok(speaker_sink_index) => { - let speaker_sink = speaker_list.get(speaker_sink_index).expect("Never"); + let online_speakers = ONLINE_SPEAKERS.read().unwrap(); + let speaker_sink = online_speakers.get(speaker_sink_index).expect("Never"); - speaker_sink.sink.append(source); + if play_pause.load(std::sync::atomic::Ordering::Relaxed) { + if let Err(err_val) = speaker_sink.audio_data_sender.send(data) { + eprintln!("Error: Send Audio Datum | Local | {}", err_val); + } + } } Err(_) => { - let speaker_sink = - SpeakerSink::new(signal.get_speaker_id(), output_stream.mixer()); + let speaker_sink = SpeakerSink::new(signal.get_speaker_id()); - speaker_sink.sink.append(source); + if play_pause.load(std::sync::atomic::Ordering::Relaxed) { + if let Err(err_val) = speaker_sink.audio_data_sender.send(data) { + eprintln!("Error: Send Audio Datum | Local | {}", err_val); + } + } - speaker_list.push(speaker_sink); - speaker_list.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id)); + let mut online_speakers = ONLINE_SPEAKERS.write().unwrap(); + online_speakers.push(speaker_sink); + online_speakers.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id)); } } } SignalType::SpeakerLeft => { - speaker_list + ONLINE_SPEAKERS + .write() + .unwrap() .retain(|speaker_sink| speaker_sink.speaker_id != signal.get_speaker_id()); } } @@ -128,37 +141,89 @@ async fn signal_handler( pub async fn play( mut play_control: mpsc::Receiver, play_receiver: broadcast::Receiver, -) { - let output_stream = OutputStreamBuilder::open_default_stream().unwrap(); +) -> Result<(), Error> { + let host = cpal::default_host(); + let output_device = host.default_output_device().unwrap(); + let config = output_device.default_output_config().unwrap().into(); + println!("Speaker Stream Config = {:#?}", config); + + let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + for sample in data { + match ONLINE_SPEAKERS.try_read() { + Ok(online_speakers) => { + *sample = online_speakers + .iter() + .map( + |speaker_sink| match speaker_sink.audio_data_receiver.try_write() { + Ok(mut speaker_sink_audio_receiver) => { + if speaker_sink_audio_receiver.len() > LATENCY_THRESHOLD { + println!( + "Warn: Latency Fix Agent | Local | Audio | {}", + speaker_sink_audio_receiver.len() + ); + *speaker_sink_audio_receiver = + speaker_sink_audio_receiver.resubscribe(); + } + match speaker_sink_audio_receiver.try_recv() { + Ok(audio_datum) => audio_datum, + Err(_) => 0.0, + } + } + Err(_) => 0.0, + }, + ) + .sum::() + .clamp(-1.0, 1.0); + } + Err(_) => *sample = 0.0, + } + } + }; + let play_pause = Arc::new(AtomicBool::new(true)); - let signal_handler = tokio::spawn(signal_handler( - output_stream, - play_receiver, - play_pause.clone(), - )); + let signal_handler = tokio::spawn(signal_handler(play_receiver, play_pause.clone())); + + let output_stream = output_device + .build_output_stream(&config, output, voice_error, None) + .unwrap(); + + output_stream + .play() + .map_err(|inner| Error::Play(inner.to_string()))?; tokio::task::block_in_place(|| { loop { match play_control.blocking_recv() { - Some(requested_state) => match requested_state { + Some(message) => match message { State::Active => { - play_pause.store(true, Ordering::Relaxed); + play_pause.store(true, std::sync::atomic::Ordering::Relaxed); + // output_stream + // .play() + // .map_err(|inner| Error::Play(inner.to_string()))? } State::Passive => { - play_pause.store(false, Ordering::Relaxed); + play_pause.store(false, std::sync::atomic::Ordering::Relaxed); + // I'm not pausing at hardware level to prevent some queue and permission + // problems at the first hand + + // output_stream + // .pause() + // .map_err(|inner| Error::Play(inner.to_string()))? } State::Loading => {} }, None => { + output_stream + .pause() + .map_err(|inner| Error::Play(inner.to_string()))?; + signal_handler.abort(); - return; + return Ok(()); } } } - }); - - signal_handler.abort(); + }) } fn voice_error(err_val: cpal::StreamError) {