From 0c187b166fd846c98514e08880b9c9bf4bff2154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Mon, 19 May 2025 03:23:13 +0300 Subject: [PATCH] feat: :sparkles: remote mixer part 2 --- protocol/src/lib.rs | 2 +- server/src/stream.rs | 84 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 67 insertions(+), 19 deletions(-) diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 55ac856..a410313 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -7,7 +7,7 @@ use std::{ use serde::{Deserialize, Serialize}; -pub const BUFFER_LENGTH: usize = 1024 * 8; +pub const BUFFER_LENGTH: usize = 1024 * 16; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Error { diff --git a/server/src/stream.rs b/server/src/stream.rs index 061b750..89e8d4f 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -1,11 +1,6 @@ -use std::{ - cell::{Cell, RefCell}, - path::Path, - sync::{Arc, LazyLock}, - time::Duration, -}; +use std::{path::Path, process::exit, sync::LazyLock, time::Duration}; -use chrono::{TimeDelta, TimeZone, Utc}; +use chrono::Utc; use protocol::BUFFER_LENGTH; use s2n_quic::{ Connection, Server, @@ -14,12 +9,12 @@ use s2n_quic::{ use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, sync::{RwLock, broadcast}, - time::{Instant, Timeout}, + time::sleep, }; use crate::ServerConfig; -const LOCAL_AUDIO_BUFFERING_TIMEOUT: Duration = Duration::from_millis(50); +const LOCAL_AUDIO_BUFFERING_TIMEOUT: Duration = Duration::from_millis(10); #[derive(Debug)] struct User { @@ -38,7 +33,7 @@ struct UseredAudioData { impl User { async fn new(user_id: u64) -> broadcast::Receiver { let (local_audio_data_sender, local_audio_data_receiver) = - broadcast::channel(BUFFER_LENGTH); + broadcast::channel(BUFFER_LENGTH * 16); let new_user = User { user_id, local_audio_data_sender, @@ -60,13 +55,14 @@ pub async fn start(server_config: ServerConfig) { .unwrap() .start() .unwrap(); - + let mut user_id = 0; while let Some(connection) = server.accept().await { - tokio::spawn(handle_client(connection)); + tokio::spawn(handle_client(user_id, connection)); + user_id += 1; } } -async fn handle_client(mut connection: Connection) { +async fn handle_client(user_id: u64, mut connection: Connection) { println!( "Server Name = {}", connection.server_name().unwrap().unwrap().to_string() @@ -83,22 +79,29 @@ async fn handle_client(mut connection: Connection) { .unwrap() .unwrap(); let (receive_stream, send_stream) = stream.split(); - let local_audio_receiver = User::new(connection.id()).await; + let local_audio_receiver = User::new(user_id).await; + tokio::spawn(receive_audio_stream(user_id, receive_stream)); + tokio::spawn(send_audio_stream( + user_id, + send_stream, + local_audio_receiver, + )); } -async fn receive_audio_stream(mut receive_stream: ReceiveStream) { +async fn receive_audio_stream(user_id: u64, mut receive_stream: ReceiveStream) { loop { match receive_stream.read_f32().await { Ok(received_audio_data) => { for online_user in ONLINE_USERS.read().await.iter() { let usered_audio_data = UseredAudioData { - user_id: receive_stream.id(), + user_id, audio_data: received_audio_data, }; if let Err(err_val) = online_user.local_audio_data_sender.send(usered_audio_data) { eprintln!("Error: Send Audio Data | Local | {}", err_val); + exit(0); } } } @@ -111,6 +114,7 @@ async fn receive_audio_stream(mut receive_stream: ReceiveStream) { } async fn send_audio_stream( + user_id: u64, mut send_stream: SendStream, mut local_audio_receiver: broadcast::Receiver, ) { @@ -128,11 +132,55 @@ async fn send_audio_stream( } } } else { - usered_audio_buffer.sort_by(|first, second| first.user_id.cmp(&second.user_id)); - todo!("mixer"); + let mixed_buffer = mixer(user_id, &mut usered_audio_buffer).await; + + for mixed_buffer_data in mixed_buffer { + if let Err(err_val) = send_stream.write_f32(mixed_buffer_data).await { + eprintln!("Error: Send Audio Data | Remote | {}", err_val); + } + } usered_audio_buffer.clear(); time = Utc::now(); } } } + +async fn mixer(user_id: u64, usered_audio_buffer: &mut Vec) -> Vec { + usered_audio_buffer.sort_by(|first, second| first.user_id.cmp(&second.user_id)); + + let mut parsed_audio_data = vec![]; + let mut inner = vec![]; + let mut current_user = user_id; + + while let Some(usered_audio) = usered_audio_buffer.pop() { + if usered_audio.user_id == user_id { + continue; + } else { + if current_user == usered_audio.user_id { + inner.push(usered_audio.audio_data); + } else { + parsed_audio_data.push(inner.clone()); + inner.clear(); + current_user = usered_audio.user_id; + } + } + } + parsed_audio_data.push(inner.clone()); + + let mut mixed_buffer = vec![]; + for parsed_audio_single_user_buffer in parsed_audio_data.iter() { + for (i, parsed_audio_single_user_buffer_data) in + parsed_audio_single_user_buffer.iter().enumerate() + { + match mixed_buffer.get(i) { + Some(current_data) => { + mixed_buffer[i] = current_data + parsed_audio_single_user_buffer_data + } + None => mixed_buffer.push(*parsed_audio_single_user_buffer_data), + } + } + } + + mixed_buffer +}