feat: remote mixer part 4

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-20 05:44:00 +03:00
parent 50917c09d5
commit 35ebb58698
6 changed files with 64 additions and 145 deletions

View file

@ -8,7 +8,7 @@ use iced::{
Element, Task, Theme,
widget::{button, column, row},
};
use protocol::{BUFFER_LENGTH, Error};
use protocol::Error;
use tokio::{
sync::{
broadcast::{self},
@ -17,7 +17,7 @@ use tokio::{
time::sleep,
};
use crate::{ClientConfig, stream::connect, voice::record};
use crate::{ClientConfig, MICROPHONE_BUFFER_LENGHT, stream::connect, voice::record};
#[derive(Debug, Default)]
struct Signal {
@ -59,7 +59,7 @@ struct Channel {
impl Channel {
fn new() -> Self {
Self {
microphone: broadcast::channel(BUFFER_LENGTH).0.into(),
microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT / 4).0.into(),
// speaker: broadcast::channel(BUFFER_LENGTH),
}
}

View file

@ -2,6 +2,9 @@ pub mod gui;
pub mod stream;
pub mod voice;
const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 16;
const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 4;
#[derive(Debug)]
pub struct ClientConfig {
certificate_path: String,

View file

@ -1,6 +1,6 @@
use std::{net::SocketAddr, path::Path, sync::Arc};
use protocol::{BUFFER_LENGTH, Error};
use protocol::Error;
use s2n_quic::{
Client,
client::Connect,
@ -11,7 +11,7 @@ use tokio::{
sync::{broadcast, oneshot},
};
use crate::{ClientConfig, voice::play};
use crate::{ClientConfig, SPEAKER_BUFFER_LENGHT, voice::play};
pub async fn connect(
connection_stop_signal_receiver: oneshot::Receiver<bool>,
@ -51,7 +51,7 @@ pub async fn connect(
.await
.map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?;
let (receive_stream, send_stream) = stream.split();
let (speaker_sender, speaker_receiver) = broadcast::channel(BUFFER_LENGTH);
let (speaker_sender, speaker_receiver) = broadcast::channel(SPEAKER_BUFFER_LENGHT);
let (speaker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel();
tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver));
let receive_voice_data_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender));

View file

@ -11,6 +11,7 @@ pub async fn record(
let host = cpal::default_host();
let input_device = host.default_input_device().unwrap();
let config = input_device.default_input_config().unwrap().into();
println!("Recorder Stream Config = {:#?}", config);
let input = move |data: &[f32], _: &cpal::InputCallbackInfo| {
for &sample in data {
@ -50,6 +51,7 @@ pub async fn play(
let host = cpal::default_host();
let output_device = host.default_output_device().unwrap();
let config = output_device.default_output_config().unwrap().into();
println!("Speaker Stream Config = {:#?}", config);
let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
for sample in data {

View file

@ -7,8 +7,6 @@ use std::{
use serde::{Deserialize, Serialize};
pub const BUFFER_LENGTH: usize = 1024 * 16;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Error {
ConnectionSetup(String),

View file

@ -16,29 +16,56 @@ use tokio::{
};
use crate::ServerConfig;
const BUFFER_LENGTH: usize = protocol::BUFFER_LENGTH;
const BUFFER_LENGTH: usize = 1024 * 16 * 2;
#[derive(Debug)]
struct User {
sender: broadcast::Sender<f32>,
audio_buffer: Arc<RwLock<VecDeque<f32>>>,
}
impl User {
async fn new() -> broadcast::Sender<f32> {
let (sender, _) = broadcast::channel(BUFFER_LENGTH);
async fn new() -> Arc<RwLock<VecDeque<f32>>> {
let audio_buffer = Arc::new(RwLock::new(VecDeque::new()));
let new_user = Self {
sender: sender.clone(),
audio_buffer: audio_buffer.clone(),
};
ONLINE_USERS.write().await.push(new_user);
if let Err(err_val) = NEW_USER_NOTIFIER.send(sender.clone()) {
eprintln!("Error: Send New User | Local | {}", err_val);
}
sender.clone()
audio_buffer
}
}
static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| vec![].into());
static NEW_USER_NOTIFIER: LazyLock<broadcast::Sender<broadcast::Sender<f32>>> =
static GLOBAL_MIXER: LazyLock<broadcast::Sender<f32>> =
LazyLock::new(|| broadcast::channel(BUFFER_LENGTH).0);
async fn global_mixer() {
let global_mixer_sender = GLOBAL_MIXER.clone();
loop {
sleep(Duration::from_millis(100)).await;
let mut mixed_audio_buffer = VecDeque::new();
for online_user in ONLINE_USERS.read().await.iter() {
let mut inner_buffer = vec![];
while let Some(audio_data) = online_user.audio_buffer.write().await.pop_front() {
inner_buffer.push(audio_data);
}
for (i, audio_data) in inner_buffer.iter().enumerate() {
match mixed_audio_buffer.get(i) {
Some(original_value) => mixed_audio_buffer[i] = original_value + audio_data,
None => mixed_audio_buffer.push_back(*audio_data),
}
}
}
mixed_audio_buffer.truncate(mixed_audio_buffer.len() * 85 / 100);
while let Some(audio_data) = mixed_audio_buffer.pop_front() {
if let Err(err_val) = global_mixer_sender.send(audio_data) {
eprintln!("Error: Send Mixed Audio Data | Local | {}", err_val);
}
}
}
}
pub async fn start(server_config: ServerConfig) {
let mut server = Server::builder()
.with_io(server_config.server_address.as_str())
@ -51,6 +78,8 @@ pub async fn start(server_config: ServerConfig) {
.start()
.unwrap();
tokio::spawn(global_mixer());
while let Some(connection) = server.accept().await {
tokio::spawn(handle_client(connection));
}
@ -74,152 +103,39 @@ async fn handle_client(mut connection: Connection) {
.unwrap();
let (receive_stream, send_stream) = stream.split();
let new_user_sender = User::new().await;
let user_audio_buffer = User::new().await;
tokio::spawn(receiver_audio_data(receive_stream, new_user_sender));
tokio::spawn(receiver_audio_data(receive_stream, user_audio_buffer));
tokio::spawn(send_audio_data(send_stream));
}
async fn mixer_with_buffer(
unmixed_audio_data: Arc<RwLock<VecDeque<Arc<RwLock<VecDeque<f32>>>>>>,
) -> Vec<f32> {
let mut mixed_audio_buffer = vec![];
for audio_buffer in unmixed_audio_data.read().await.iter() {
let mut inner_audio_buffer = vec![];
while let Some(audio_data) = audio_buffer.write().await.pop_front() {
inner_audio_buffer.push(audio_data);
}
for (i, audio_data) in inner_audio_buffer.iter().enumerate() {
match mixed_audio_buffer.get(i) {
Some(current_audio_data) => mixed_audio_buffer[i] = current_audio_data + audio_data,
None => mixed_audio_buffer.push(*audio_data),
}
}
}
mixed_audio_buffer
}
// async fn mixer_with_channel(
// unmixed_audio_data: Arc<RwLock<VecDeque<Arc<RwLock<VecDeque<f32>>>>>>,
// sender: broadcast::Sender<f32>,
// ) {
// loop {
// sleep(Duration::from_nanos(10)).await;
// let mut mixed_audio_buffer = vec![];
// for audio_buffer in unmixed_audio_data.read().await.iter() {
// let mut inner_audio_buffer = vec![];
//
// while let Some(audio_data) = audio_buffer.write().await.pop_front() {
// inner_audio_buffer.push(audio_data);
// }
//
// for (i, audio_data) in inner_audio_buffer.iter().enumerate() {
// match mixed_audio_buffer.get(i) {
// Some(current_audio_data) => {
// mixed_audio_buffer[i] = current_audio_data + audio_data
// }
// None => mixed_audio_buffer.push(*audio_data),
// }
// }
// }
// for audio_data in mixed_audio_buffer {
// if let Err(err_val) = sender.send(audio_data) {
// eprintln!("Error: Send Mixed Audio | Local | {}", err_val);
// return;
// }
// }
// }
// }
async fn new_user_handler(
mut receiver: broadcast::Receiver<f32>,
unmixed_audio_data: Arc<RwLock<VecDeque<Arc<RwLock<VecDeque<f32>>>>>>,
) {
let local_audio_buffer = Arc::new(RwLock::new(VecDeque::new()));
unmixed_audio_data
.write()
.await
.push_back(local_audio_buffer.clone());
async fn send_audio_data(mut send_stream: SendStream) {
let mut global_mixer_receiver = GLOBAL_MIXER.subscribe();
loop {
match receiver.recv().await {
match global_mixer_receiver.recv().await {
Ok(received_audio_data) => {
local_audio_buffer
.write()
.await
.push_back(received_audio_data);
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Local | {}", err_val);
return;
}
}
}
}
async fn send_audio_data(mut send_stream: SendStream) {
let unmixed_audio_data = Arc::new(RwLock::new(VecDeque::new()));
for online_user in ONLINE_USERS.read().await.iter() {
let receiver = online_user.sender.subscribe();
let unmixed_audio_data = unmixed_audio_data.clone();
tokio::spawn(new_user_handler(receiver, unmixed_audio_data));
}
let unmixed_audio_data_for_handling_new_user = unmixed_audio_data.clone();
tokio::spawn(async move {
loop {
match NEW_USER_NOTIFIER.subscribe().recv().await {
Ok(new_user_sender) => {
let receiver = new_user_sender.subscribe();
let unmixed_audio_data = unmixed_audio_data_for_handling_new_user.clone();
tokio::spawn(new_user_handler(receiver, unmixed_audio_data));
}
Err(err_val) => {
eprintln!("Error: Receive New User | Local | {}", err_val);
if let Err(err_val) = send_stream.write_f32(received_audio_data).await {
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
return;
}
}
}
});
// let (mixer_sender, mut mixer_receiver) = broadcast::channel(BUFFER_LENGTH);
// tokio::spawn(mixer_with_channel(unmixed_audio_data, mixer_sender));
// loop {
// match mixer_receiver.recv().await {
// Ok(mixed_audio_data) => {
// if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await {
// eprintln!("Error: Send Audio Data | Remote | {}", err_val);
// return;
// }
// }
// Err(err_val) => {
// eprintln!("Error: Receive Mixed Audio Data | Local | {}", err_val);
// return;
// }
// }
// }
loop {
sleep(Duration::from_millis(50)).await;
let mixed_audio_buffer = mixer_with_buffer(unmixed_audio_data.clone()).await;
for mixed_audio_data in mixed_audio_buffer {
if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await {
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
Err(err_val) => {
eprintln!("Error: Receive Mixed Audio Data | Local | {}", err_val);
return;
}
}
}
}
async fn receiver_audio_data(mut receive_stream: ReceiveStream, sender: broadcast::Sender<f32>) {
async fn receiver_audio_data(
mut receive_stream: ReceiveStream,
audio_buffer: Arc<RwLock<VecDeque<f32>>>,
) {
loop {
match receive_stream.read_f32().await {
Ok(received_audio_data) => {
if let Err(err_val) = sender.send(received_audio_data) {
eprintln!("Error: Send Audio Data | Local | {}", err_val);
return;
}
audio_buffer.write().await.push_back(received_audio_data);
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);