fix: latency, underrun, overflow

This commit is contained in:
Ahmet Kaan Gümüş 2025-06-14 17:14:35 +03:00
parent 5ad6184f13
commit 8cad7658ea
5 changed files with 122 additions and 279 deletions

231
Cargo.lock generated
View file

@ -657,22 +657,14 @@ dependencies = [
"libloading",
]
[[package]]
name = "claxon"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bfbf56724aa9eca8afa4fcfadeb479e722935bb2a0900c2d37e0cc477af0688"
[[package]]
name = "client"
version = "0.1.0"
dependencies = [
"chrono",
"cpal",
"fixed-resample",
"iced",
"protocol",
"rodio",
"s2n-quic",
"tokio",
]
@ -1074,15 +1066,6 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "encoding_rs"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3"
dependencies = [
"cfg-if",
]
[[package]]
name = "endi"
version = "1.1.0"
@ -1178,12 +1161,6 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "fast-interleave"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a7e05e2b3c97d4516fa5c177133f3e4decf9c8318841e1545b260535311e3a5"
[[package]]
name = "fastrand"
version = "2.3.0"
@ -1199,18 +1176,6 @@ dependencies = [
"simd-adler32",
]
[[package]]
name = "fixed-resample"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39a267bc40fae9b208e2a67a99a8d794caf3a9fe1146d01c147f59bd96257a74"
dependencies = [
"arrayvec",
"fast-interleave",
"ringbuf",
"rubato",
]
[[package]]
name = "flate2"
version = "1.1.1"
@ -1649,12 +1614,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "hound"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62adaabb884c94955b19907d60019f4e145d091c75345379e70d1ee696f7854f"
[[package]]
name = "iana-time-zone"
version = "0.1.63"
@ -2079,17 +2038,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lewton"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "777b48df9aaab155475a83a7df3070395ea1ac6902f5cd062b8f2b028075c030"
dependencies = [
"byteorder",
"ogg",
"tinyvec",
]
[[package]]
name = "libc"
version = "0.2.172"
@ -2355,25 +2303,6 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "num-bigint"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"num-integer",
"num-traits",
]
[[package]]
name = "num-complex"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495"
dependencies = [
"num-traits",
]
[[package]]
name = "num-derive"
version = "0.4.2"
@ -2400,7 +2329,6 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824"
dependencies = [
"num-bigint",
"num-integer",
"num-traits",
]
@ -2733,15 +2661,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "ogg"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6951b4e8bf21c8193da321bcce9c9dd2e13c858fe078bf9054a288b419ae5d6e"
dependencies = [
"byteorder",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@ -2903,21 +2822,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "portable-atomic"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "potential_utf"
version = "0.1.2"
@ -2952,15 +2856,6 @@ dependencies = [
"syn",
]
[[package]]
name = "primal-check"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc0d895b311e3af9902528fbb8f928688abbd95872819320517cc24ca6b2bd08"
dependencies = [
"num-integer",
]
[[package]]
name = "proc-macro-crate"
version = "3.3.0"
@ -3146,15 +3041,6 @@ dependencies = [
"font-types",
]
[[package]]
name = "realfft"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "390252372b7f2aac8360fc5e72eba10136b166d6faeed97e6d0c8324eb99b2b1"
dependencies = [
"rustfft",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@ -3222,49 +3108,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "ringbuf"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe47b720588c8702e34b5979cb3271a8b1842c7cb6f57408efa70c779363488c"
dependencies = [
"crossbeam-utils",
"portable-atomic",
"portable-atomic-util",
]
[[package]]
name = "rodio"
version = "0.20.1"
source = "git+https://github.com/RustAudio/rodio?rev=071db6df2adfccfe1032be43dd87e5681e34292c#071db6df2adfccfe1032be43dd87e5681e34292c"
dependencies = [
"claxon",
"cpal",
"dasp_sample",
"hound",
"lewton",
"num-rational",
"symphonia",
]
[[package]]
name = "roxmltree"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97"
[[package]]
name = "rubato"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5258099699851cfd0082aeb645feb9c084d9a5e1f1b8d5372086b989fc5e56a1"
dependencies = [
"num-complex",
"num-integer",
"num-traits",
"realfft",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -3283,21 +3132,6 @@ version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustfft"
version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f266ff9b0cfc79de11fd5af76a2bc672fe3ace10c96fa06456740fa70cb1ed49"
dependencies = [
"num-complex",
"num-integer",
"num-traits",
"primal-check",
"strength_reduce",
"transpose",
"version_check",
]
[[package]]
name = "rustix"
version = "0.38.44"
@ -3808,12 +3642,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "strength_reduce"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe895eb47f22e2ddd4dabc02bce419d2e643c8e3b585c78158b349195bc24d82"
[[package]]
name = "strict-num"
version = "0.1.1"
@ -3865,55 +3693,6 @@ dependencies = [
"zeno",
]
[[package]]
name = "symphonia"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "815c942ae7ee74737bb00f965fa5b5a2ac2ce7b6c01c0cc169bbeaf7abd5f5a9"
dependencies = [
"lazy_static",
"symphonia-bundle-mp3",
"symphonia-core",
"symphonia-metadata",
]
[[package]]
name = "symphonia-bundle-mp3"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c01c2aae70f0f1fb096b6f0ff112a930b1fb3626178fba3ae68b09dce71706d4"
dependencies = [
"lazy_static",
"log",
"symphonia-core",
"symphonia-metadata",
]
[[package]]
name = "symphonia-core"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "798306779e3dc7d5231bd5691f5a813496dc79d3f56bf82e25789f2094e022c3"
dependencies = [
"arrayvec",
"bitflags 1.3.2",
"bytemuck",
"lazy_static",
"log",
]
[[package]]
name = "symphonia-metadata"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc622b9841a10089c5b18e99eb904f4341615d5aa55bbf4eedde1be721a4023c"
dependencies = [
"encoding_rs",
"lazy_static",
"log",
"symphonia-core",
]
[[package]]
name = "syn"
version = "2.0.101"
@ -4148,16 +3927,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "transpose"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ad61aed86bc3faea4300c7aee358b4c6d0c8d6ccc36524c96e4c92ccf26e77e"
dependencies = [
"num-integer",
"strength_reduce",
]
[[package]]
name = "ttf-parser"
version = "0.20.0"

View file

@ -10,5 +10,3 @@ tokio = { workspace = true }
s2n-quic = { workspace = true }
cpal = "0.16.0"
iced = { features = ["tokio"], git = "https://github.com/iced-rs/iced", rev = "d39022432c778a8cda455f40b9c12245db86ce45" }
fixed-resample = "0.8.0"
rodio = {git = "https://github.com/RustAudio/rodio", rev = "071db6df2adfccfe1032be43dd87e5681e34292c"}

View file

@ -168,7 +168,8 @@ impl App {
let speaker_sender = self.channel.speaker.clone();
let (connection_stop_sender, connection_stop_receiver) = oneshot::channel();
*self.controller.connection_stop_sender.write().unwrap() = Some(connection_stop_sender);
*self.controller.connection_stop_sender.write().unwrap() =
Some(connection_stop_sender);
Task::perform(
async move {

View file

@ -90,6 +90,7 @@ pub async fn disconnect_watcher(
println!("Disconnected");
}
#[allow(unused_assignment)]
async fn send_signals(
mut send_stream: SendStream,
mut microphone_receiver: broadcast::Receiver<f32>,
@ -104,7 +105,16 @@ async fn send_signals(
}
Err(err_val) => {
eprintln!("Error: Receive Audio Datum | Local | {}", err_val);
return;
match err_val {
broadcast::error::RecvError::Closed => return,
broadcast::error::RecvError::Lagged(lag_amount) => {
println!("Warn: Latency Fix Agent | Local | Signal | {}", lag_amount);
microphone_receiver = microphone_receiver.resubscribe();
todo!(
"This might be problematic, when we deleted the queue maybe user left and another user joined with same id ?"
);
}
}
}
}
}

View file

@ -1,27 +1,32 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::sync::{Arc, LazyLock, RwLock, atomic::AtomicBool};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use protocol::{
Error,
protocol::{DEFAULT_SAMPLE_RATE, Signal, SignalType},
protocol::{AUDIO_DATA_SENDER_CHANNEL_LENGTH, Signal, SignalType},
};
use rodio::{OutputStream, OutputStreamBuilder, Sink, buffer::SamplesBuffer, mixer::Mixer};
use tokio::sync::{broadcast, mpsc};
use crate::gui::State;
const LATENCY_THRESHOLD: usize = 1024 * 8;
struct SpeakerSink {
speaker_id: u8,
sink: Sink,
audio_data_sender: broadcast::Sender<f32>,
audio_data_receiver: RwLock<broadcast::Receiver<f32>>,
}
impl SpeakerSink {
fn new(speaker_id: u8, mixer: &Mixer) -> Self {
let sink = Sink::connect_new(mixer);
Self { speaker_id, sink }
fn new(speaker_id: u8) -> Self {
let (audio_data_sender, audio_data_receiver) =
broadcast::channel(AUDIO_DATA_SENDER_CHANNEL_LENGTH);
let audio_data_receiver = audio_data_receiver.into();
Self {
speaker_id,
audio_data_sender,
audio_data_receiver,
}
}
}
@ -80,45 +85,53 @@ pub async fn record(
})
}
static ONLINE_SPEAKERS: LazyLock<RwLock<Vec<SpeakerSink>>> = LazyLock::new(|| vec![].into());
async fn signal_handler(
output_stream: OutputStream,
mut play_receiver: broadcast::Receiver<Signal>,
play_pause: Arc<AtomicBool>,
) {
let mut speaker_list = vec![];
while let Ok(signal) = play_receiver.recv().await {
match signal.get_signal_type() {
SignalType::AudioDatum => {
let data = if play_pause.load(std::sync::atomic::Ordering::Relaxed) {
[signal.get_audio_datum()]
} else {
[0.0]
};
let data = signal.get_audio_datum();
let source = SamplesBuffer::new(2, DEFAULT_SAMPLE_RATE, data);
let speaker_list_search_result = ONLINE_SPEAKERS.read().unwrap().binary_search_by(
|speaker_sink: &SpeakerSink| {
speaker_sink.speaker_id.cmp(&signal.get_speaker_id())
},
);
match speaker_list.binary_search_by(|speaker_sink: &SpeakerSink| {
speaker_sink.speaker_id.cmp(&signal.get_speaker_id())
}) {
match speaker_list_search_result {
Ok(speaker_sink_index) => {
let speaker_sink = speaker_list.get(speaker_sink_index).expect("Never");
let online_speakers = ONLINE_SPEAKERS.read().unwrap();
let speaker_sink = online_speakers.get(speaker_sink_index).expect("Never");
speaker_sink.sink.append(source);
if play_pause.load(std::sync::atomic::Ordering::Relaxed) {
if let Err(err_val) = speaker_sink.audio_data_sender.send(data) {
eprintln!("Error: Send Audio Datum | Local | {}", err_val);
}
}
}
Err(_) => {
let speaker_sink =
SpeakerSink::new(signal.get_speaker_id(), output_stream.mixer());
let speaker_sink = SpeakerSink::new(signal.get_speaker_id());
speaker_sink.sink.append(source);
if play_pause.load(std::sync::atomic::Ordering::Relaxed) {
if let Err(err_val) = speaker_sink.audio_data_sender.send(data) {
eprintln!("Error: Send Audio Datum | Local | {}", err_val);
}
}
speaker_list.push(speaker_sink);
speaker_list.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id));
let mut online_speakers = ONLINE_SPEAKERS.write().unwrap();
online_speakers.push(speaker_sink);
online_speakers.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id));
}
}
}
SignalType::SpeakerLeft => {
speaker_list
ONLINE_SPEAKERS
.write()
.unwrap()
.retain(|speaker_sink| speaker_sink.speaker_id != signal.get_speaker_id());
}
}
@ -128,37 +141,89 @@ async fn signal_handler(
pub async fn play(
mut play_control: mpsc::Receiver<State>,
play_receiver: broadcast::Receiver<Signal>,
) {
let output_stream = OutputStreamBuilder::open_default_stream().unwrap();
) -> Result<(), Error> {
let host = cpal::default_host();
let output_device = host.default_output_device().unwrap();
let config = output_device.default_output_config().unwrap().into();
println!("Speaker Stream Config = {:#?}", config);
let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
for sample in data {
match ONLINE_SPEAKERS.try_read() {
Ok(online_speakers) => {
*sample = online_speakers
.iter()
.map(
|speaker_sink| match speaker_sink.audio_data_receiver.try_write() {
Ok(mut speaker_sink_audio_receiver) => {
if speaker_sink_audio_receiver.len() > LATENCY_THRESHOLD {
println!(
"Warn: Latency Fix Agent | Local | Audio | {}",
speaker_sink_audio_receiver.len()
);
*speaker_sink_audio_receiver =
speaker_sink_audio_receiver.resubscribe();
}
match speaker_sink_audio_receiver.try_recv() {
Ok(audio_datum) => audio_datum,
Err(_) => 0.0,
}
}
Err(_) => 0.0,
},
)
.sum::<f32>()
.clamp(-1.0, 1.0);
}
Err(_) => *sample = 0.0,
}
}
};
let play_pause = Arc::new(AtomicBool::new(true));
let signal_handler = tokio::spawn(signal_handler(
output_stream,
play_receiver,
play_pause.clone(),
));
let signal_handler = tokio::spawn(signal_handler(play_receiver, play_pause.clone()));
let output_stream = output_device
.build_output_stream(&config, output, voice_error, None)
.unwrap();
output_stream
.play()
.map_err(|inner| Error::Play(inner.to_string()))?;
tokio::task::block_in_place(|| {
loop {
match play_control.blocking_recv() {
Some(requested_state) => match requested_state {
Some(message) => match message {
State::Active => {
play_pause.store(true, Ordering::Relaxed);
play_pause.store(true, std::sync::atomic::Ordering::Relaxed);
// output_stream
// .play()
// .map_err(|inner| Error::Play(inner.to_string()))?
}
State::Passive => {
play_pause.store(false, Ordering::Relaxed);
play_pause.store(false, std::sync::atomic::Ordering::Relaxed);
// I'm not pausing at hardware level to prevent some queue and permission
// problems at the first hand
// output_stream
// .pause()
// .map_err(|inner| Error::Play(inner.to_string()))?
}
State::Loading => {}
},
None => {
output_stream
.pause()
.map_err(|inner| Error::Play(inner.to_string()))?;
signal_handler.abort();
return;
return Ok(());
}
}
}
});
signal_handler.abort();
})
}
fn voice_error(err_val: cpal::StreamError) {