fix: ⚡ latency, underrun, overflow
This commit is contained in:
parent
5ad6184f13
commit
2076b3e39a
4 changed files with 118 additions and 48 deletions
|
@ -10,5 +10,3 @@ tokio = { workspace = true }
|
||||||
s2n-quic = { workspace = true }
|
s2n-quic = { workspace = true }
|
||||||
cpal = "0.16.0"
|
cpal = "0.16.0"
|
||||||
iced = { features = ["tokio"], git = "https://github.com/iced-rs/iced", rev = "d39022432c778a8cda455f40b9c12245db86ce45" }
|
iced = { features = ["tokio"], git = "https://github.com/iced-rs/iced", rev = "d39022432c778a8cda455f40b9c12245db86ce45" }
|
||||||
fixed-resample = "0.8.0"
|
|
||||||
rodio = {git = "https://github.com/RustAudio/rodio", rev = "071db6df2adfccfe1032be43dd87e5681e34292c"}
|
|
||||||
|
|
|
@ -168,7 +168,8 @@ impl App {
|
||||||
let speaker_sender = self.channel.speaker.clone();
|
let speaker_sender = self.channel.speaker.clone();
|
||||||
|
|
||||||
let (connection_stop_sender, connection_stop_receiver) = oneshot::channel();
|
let (connection_stop_sender, connection_stop_receiver) = oneshot::channel();
|
||||||
*self.controller.connection_stop_sender.write().unwrap() = Some(connection_stop_sender);
|
*self.controller.connection_stop_sender.write().unwrap() =
|
||||||
|
Some(connection_stop_sender);
|
||||||
|
|
||||||
Task::perform(
|
Task::perform(
|
||||||
async move {
|
async move {
|
||||||
|
|
|
@ -104,7 +104,13 @@ async fn send_signals(
|
||||||
}
|
}
|
||||||
Err(err_val) => {
|
Err(err_val) => {
|
||||||
eprintln!("Error: Receive Audio Datum | Local | {}", err_val);
|
eprintln!("Error: Receive Audio Datum | Local | {}", err_val);
|
||||||
return;
|
match err_val {
|
||||||
|
broadcast::error::RecvError::Closed => return,
|
||||||
|
broadcast::error::RecvError::Lagged(lag_amount) => {
|
||||||
|
println!("Warn: Latency Fix Agent | Local | Signal | {}", lag_amount);
|
||||||
|
microphone_receiver = microphone_receiver.resubscribe();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,27 +1,32 @@
|
||||||
use std::sync::{
|
use std::sync::{Arc, LazyLock, RwLock, atomic::AtomicBool};
|
||||||
Arc,
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
};
|
|
||||||
|
|
||||||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||||
use protocol::{
|
use protocol::{
|
||||||
Error,
|
Error,
|
||||||
protocol::{DEFAULT_SAMPLE_RATE, Signal, SignalType},
|
protocol::{AUDIO_DATA_SENDER_CHANNEL_LENGTH, Signal, SignalType},
|
||||||
};
|
};
|
||||||
use rodio::{OutputStream, OutputStreamBuilder, Sink, buffer::SamplesBuffer, mixer::Mixer};
|
|
||||||
use tokio::sync::{broadcast, mpsc};
|
use tokio::sync::{broadcast, mpsc};
|
||||||
|
|
||||||
use crate::gui::State;
|
use crate::gui::State;
|
||||||
|
|
||||||
|
const LATENCY_THRESHOLD: usize = 1024 * 8;
|
||||||
|
|
||||||
struct SpeakerSink {
|
struct SpeakerSink {
|
||||||
speaker_id: u8,
|
speaker_id: u8,
|
||||||
sink: Sink,
|
audio_data_sender: broadcast::Sender<f32>,
|
||||||
|
audio_data_receiver: RwLock<broadcast::Receiver<f32>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SpeakerSink {
|
impl SpeakerSink {
|
||||||
fn new(speaker_id: u8, mixer: &Mixer) -> Self {
|
fn new(speaker_id: u8) -> Self {
|
||||||
let sink = Sink::connect_new(mixer);
|
let (audio_data_sender, audio_data_receiver) =
|
||||||
Self { speaker_id, sink }
|
broadcast::channel(AUDIO_DATA_SENDER_CHANNEL_LENGTH);
|
||||||
|
let audio_data_receiver = audio_data_receiver.into();
|
||||||
|
Self {
|
||||||
|
speaker_id,
|
||||||
|
audio_data_sender,
|
||||||
|
audio_data_receiver,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,45 +85,53 @@ pub async fn record(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ONLINE_SPEAKERS: LazyLock<RwLock<Vec<SpeakerSink>>> = LazyLock::new(|| vec![].into());
|
||||||
|
|
||||||
async fn signal_handler(
|
async fn signal_handler(
|
||||||
output_stream: OutputStream,
|
|
||||||
mut play_receiver: broadcast::Receiver<Signal>,
|
mut play_receiver: broadcast::Receiver<Signal>,
|
||||||
play_pause: Arc<AtomicBool>,
|
play_pause: Arc<AtomicBool>,
|
||||||
) {
|
) {
|
||||||
let mut speaker_list = vec![];
|
|
||||||
|
|
||||||
while let Ok(signal) = play_receiver.recv().await {
|
while let Ok(signal) = play_receiver.recv().await {
|
||||||
match signal.get_signal_type() {
|
match signal.get_signal_type() {
|
||||||
SignalType::AudioDatum => {
|
SignalType::AudioDatum => {
|
||||||
let data = if play_pause.load(std::sync::atomic::Ordering::Relaxed) {
|
let data = signal.get_audio_datum();
|
||||||
[signal.get_audio_datum()]
|
|
||||||
} else {
|
|
||||||
[0.0]
|
|
||||||
};
|
|
||||||
|
|
||||||
let source = SamplesBuffer::new(2, DEFAULT_SAMPLE_RATE, data);
|
let speaker_list_search_result = ONLINE_SPEAKERS.read().unwrap().binary_search_by(
|
||||||
|
|speaker_sink: &SpeakerSink| {
|
||||||
|
speaker_sink.speaker_id.cmp(&signal.get_speaker_id())
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
match speaker_list.binary_search_by(|speaker_sink: &SpeakerSink| {
|
match speaker_list_search_result {
|
||||||
speaker_sink.speaker_id.cmp(&signal.get_speaker_id())
|
|
||||||
}) {
|
|
||||||
Ok(speaker_sink_index) => {
|
Ok(speaker_sink_index) => {
|
||||||
let speaker_sink = speaker_list.get(speaker_sink_index).expect("Never");
|
let online_speakers = ONLINE_SPEAKERS.read().unwrap();
|
||||||
|
let speaker_sink = online_speakers.get(speaker_sink_index).expect("Never");
|
||||||
|
|
||||||
speaker_sink.sink.append(source);
|
if play_pause.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
if let Err(err_val) = speaker_sink.audio_data_sender.send(data) {
|
||||||
|
eprintln!("Error: Send Audio Datum | Local | {}", err_val);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
let speaker_sink =
|
let speaker_sink = SpeakerSink::new(signal.get_speaker_id());
|
||||||
SpeakerSink::new(signal.get_speaker_id(), output_stream.mixer());
|
|
||||||
|
|
||||||
speaker_sink.sink.append(source);
|
if play_pause.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
if let Err(err_val) = speaker_sink.audio_data_sender.send(data) {
|
||||||
|
eprintln!("Error: Send Audio Datum | Local | {}", err_val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
speaker_list.push(speaker_sink);
|
let mut online_speakers = ONLINE_SPEAKERS.write().unwrap();
|
||||||
speaker_list.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id));
|
online_speakers.push(speaker_sink);
|
||||||
|
online_speakers.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SignalType::SpeakerLeft => {
|
SignalType::SpeakerLeft => {
|
||||||
speaker_list
|
ONLINE_SPEAKERS
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
.retain(|speaker_sink| speaker_sink.speaker_id != signal.get_speaker_id());
|
.retain(|speaker_sink| speaker_sink.speaker_id != signal.get_speaker_id());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -128,37 +141,89 @@ async fn signal_handler(
|
||||||
pub async fn play(
|
pub async fn play(
|
||||||
mut play_control: mpsc::Receiver<State>,
|
mut play_control: mpsc::Receiver<State>,
|
||||||
play_receiver: broadcast::Receiver<Signal>,
|
play_receiver: broadcast::Receiver<Signal>,
|
||||||
) {
|
) -> Result<(), Error> {
|
||||||
let output_stream = OutputStreamBuilder::open_default_stream().unwrap();
|
let host = cpal::default_host();
|
||||||
|
let output_device = host.default_output_device().unwrap();
|
||||||
|
let config = output_device.default_output_config().unwrap().into();
|
||||||
|
println!("Speaker Stream Config = {:#?}", config);
|
||||||
|
|
||||||
|
let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
|
||||||
|
for sample in data {
|
||||||
|
match ONLINE_SPEAKERS.try_read() {
|
||||||
|
Ok(online_speakers) => {
|
||||||
|
*sample = online_speakers
|
||||||
|
.iter()
|
||||||
|
.map(
|
||||||
|
|speaker_sink| match speaker_sink.audio_data_receiver.try_write() {
|
||||||
|
Ok(mut speaker_sink_audio_receiver) => {
|
||||||
|
if speaker_sink_audio_receiver.len() > LATENCY_THRESHOLD {
|
||||||
|
println!(
|
||||||
|
"Warn: Latency Fix Agent | Local | Audio | {}",
|
||||||
|
speaker_sink_audio_receiver.len()
|
||||||
|
);
|
||||||
|
*speaker_sink_audio_receiver =
|
||||||
|
speaker_sink_audio_receiver.resubscribe();
|
||||||
|
}
|
||||||
|
match speaker_sink_audio_receiver.try_recv() {
|
||||||
|
Ok(audio_datum) => audio_datum,
|
||||||
|
Err(_) => 0.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => 0.0,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.sum::<f32>()
|
||||||
|
.clamp(-1.0, 1.0);
|
||||||
|
}
|
||||||
|
Err(_) => *sample = 0.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let play_pause = Arc::new(AtomicBool::new(true));
|
let play_pause = Arc::new(AtomicBool::new(true));
|
||||||
|
|
||||||
let signal_handler = tokio::spawn(signal_handler(
|
let signal_handler = tokio::spawn(signal_handler(play_receiver, play_pause.clone()));
|
||||||
output_stream,
|
|
||||||
play_receiver,
|
let output_stream = output_device
|
||||||
play_pause.clone(),
|
.build_output_stream(&config, output, voice_error, None)
|
||||||
));
|
.unwrap();
|
||||||
|
|
||||||
|
output_stream
|
||||||
|
.play()
|
||||||
|
.map_err(|inner| Error::Play(inner.to_string()))?;
|
||||||
|
|
||||||
tokio::task::block_in_place(|| {
|
tokio::task::block_in_place(|| {
|
||||||
loop {
|
loop {
|
||||||
match play_control.blocking_recv() {
|
match play_control.blocking_recv() {
|
||||||
Some(requested_state) => match requested_state {
|
Some(message) => match message {
|
||||||
State::Active => {
|
State::Active => {
|
||||||
play_pause.store(true, Ordering::Relaxed);
|
play_pause.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
// output_stream
|
||||||
|
// .play()
|
||||||
|
// .map_err(|inner| Error::Play(inner.to_string()))?
|
||||||
}
|
}
|
||||||
State::Passive => {
|
State::Passive => {
|
||||||
play_pause.store(false, Ordering::Relaxed);
|
play_pause.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
// I'm not pausing at hardware level to prevent some queue and permission
|
||||||
|
// problems at the first hand
|
||||||
|
|
||||||
|
// output_stream
|
||||||
|
// .pause()
|
||||||
|
// .map_err(|inner| Error::Play(inner.to_string()))?
|
||||||
}
|
}
|
||||||
State::Loading => {}
|
State::Loading => {}
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
|
output_stream
|
||||||
|
.pause()
|
||||||
|
.map_err(|inner| Error::Play(inner.to_string()))?;
|
||||||
|
|
||||||
signal_handler.abort();
|
signal_handler.abort();
|
||||||
return;
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
|
|
||||||
signal_handler.abort();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn voice_error(err_val: cpal::StreamError) {
|
fn voice_error(err_val: cpal::StreamError) {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue