From d27dd58a6857ccc985249e7773ea73e9fb3916be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Sat, 24 May 2025 03:02:07 +0300 Subject: [PATCH] feat: :sparkles: remote mixer part 5 --- server/src/stream.rs | 145 ++++++++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 58 deletions(-) diff --git a/server/src/stream.rs b/server/src/stream.rs index 9d6b51f..b1c2ea3 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -1,70 +1,62 @@ use std::{ - collections::VecDeque, path::Path, sync::{Arc, LazyLock}, time::Duration, }; +use chrono::{DateTime, Utc}; use s2n_quic::{ Connection, Server, stream::{ReceiveStream, SendStream}, }; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - sync::{RwLock, broadcast}, + sync::RwLock, time::sleep, }; use crate::ServerConfig; -const BUFFER_LENGTH: usize = 1024 * 16 * 2; +const BUFFER_LENGTH: usize = 1024 * 13; + +#[derive(Debug)] +struct TimedBuffer { + audio_buffer: [f32; BUFFER_LENGTH], + latest_update: DateTime, +} +impl TimedBuffer { + fn new() -> Self { + let audio_buffer = [0.0 as f32; BUFFER_LENGTH]; + let latest_update = Utc::now(); + + Self { + audio_buffer, + latest_update, + } + } + fn update(&mut self, new_audio_buffer: [f32; BUFFER_LENGTH]) { + self.audio_buffer = new_audio_buffer; + self.latest_update = Utc::now(); + } +} #[derive(Debug)] struct User { - audio_buffer: Arc>>, + user_id: u32, + timed_buffer: Arc>, } impl User { - async fn new() -> Arc>> { - let audio_buffer = Arc::new(RwLock::new(VecDeque::new())); + async fn new(user_id: u32) -> Arc> { + let timed_buffer = Arc::new(RwLock::new(TimedBuffer::new())); let new_user = Self { - audio_buffer: audio_buffer.clone(), + user_id, + timed_buffer: timed_buffer.clone(), }; ONLINE_USERS.write().await.push(new_user); - audio_buffer + timed_buffer } } static ONLINE_USERS: LazyLock>> = LazyLock::new(|| vec![].into()); -static GLOBAL_MIXER: LazyLock> = - LazyLock::new(|| broadcast::channel(BUFFER_LENGTH).0); - -async fn global_mixer() { - let global_mixer_sender = GLOBAL_MIXER.clone(); - - loop { - sleep(Duration::from_millis(100)).await; - let mut mixed_audio_buffer = VecDeque::new(); - for online_user in ONLINE_USERS.read().await.iter() { - let mut inner_buffer = vec![]; - while let Some(audio_data) = online_user.audio_buffer.write().await.pop_front() { - inner_buffer.push(audio_data); - } - - for (i, audio_data) in inner_buffer.iter().enumerate() { - match mixed_audio_buffer.get(i) { - Some(original_value) => mixed_audio_buffer[i] = original_value + audio_data, - None => mixed_audio_buffer.push_back(*audio_data), - } - } - } - - mixed_audio_buffer.truncate(mixed_audio_buffer.len() * 85 / 100); - while let Some(audio_data) = mixed_audio_buffer.pop_front() { - if let Err(err_val) = global_mixer_sender.send(audio_data) { - eprintln!("Error: Send Mixed Audio Data | Local | {}", err_val); - } - } - } -} pub async fn start(server_config: ServerConfig) { let mut server = Server::builder() @@ -78,14 +70,14 @@ pub async fn start(server_config: ServerConfig) { .start() .unwrap(); - tokio::spawn(global_mixer()); - + let mut user_id = 0; while let Some(connection) = server.accept().await { - tokio::spawn(handle_client(connection)); + tokio::spawn(handle_client(user_id, connection)); + user_id += 1; } } -async fn handle_client(mut connection: Connection) { +async fn handle_client(user_id: u32, mut connection: Connection) { println!( "Server Name = {}", connection.server_name().unwrap().unwrap().to_string() @@ -103,25 +95,52 @@ async fn handle_client(mut connection: Connection) { .unwrap(); let (receive_stream, send_stream) = stream.split(); - let user_audio_buffer = User::new().await; + let user_audio_buffer = User::new(user_id).await; tokio::spawn(receiver_audio_data(receive_stream, user_audio_buffer)); - tokio::spawn(send_audio_data(send_stream)); + tokio::spawn(send_audio_data(user_id, send_stream)); } -async fn send_audio_data(mut send_stream: SendStream) { - let mut global_mixer_receiver = GLOBAL_MIXER.subscribe(); +async fn mixer(user_id: u32, latest_update: DateTime) -> Option<[f32; BUFFER_LENGTH]> { + // dont forget to remove this + let user_id = 999; + let mut mixed_audio_buffer = [0.0 as f32; BUFFER_LENGTH]; - loop { - match global_mixer_receiver.recv().await { - Ok(received_audio_data) => { - if let Err(err_val) = send_stream.write_f32(received_audio_data).await { - eprintln!("Error: Send Audio Data | Remote | {}", err_val); - return; - } + for online_user in ONLINE_USERS.read().await.iter() { + let online_user_timed_buffer = online_user.timed_buffer.read().await; + + if online_user.user_id != user_id && online_user_timed_buffer.latest_update > latest_update + { + for (i, audio_data) in online_user_timed_buffer.audio_buffer.iter().enumerate() { + mixed_audio_buffer[i] = mixed_audio_buffer[i] + audio_data; } - Err(err_val) => { - eprintln!("Error: Receive Mixed Audio Data | Local | {}", err_val); + } + } + + if 0.0 as f32 == mixed_audio_buffer.iter().sum() { + None + } else { + Some(mixed_audio_buffer) + } +} + +async fn send_audio_data(user_id: u32, mut send_stream: SendStream) { + let mut latest_update = Utc::now(); + loop { + let mixed_audio_buffer = match mixer(user_id, latest_update).await { + Some(mixed_audio_buffer) => { + latest_update = Utc::now(); + mixed_audio_buffer + } + None => { + sleep(Duration::from_millis(150)).await; + continue; + } + }; + + for mixed_audio_data in mixed_audio_buffer { + if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await { + eprintln!("Error: Send Audio Data | Remote | {}", err_val); return; } } @@ -130,17 +149,27 @@ async fn send_audio_data(mut send_stream: SendStream) { async fn receiver_audio_data( mut receive_stream: ReceiveStream, - audio_buffer: Arc>>, + audio_buffer: Arc>, ) { + let mut inner_buffer = [0.0 as f32; BUFFER_LENGTH]; + let mut i = 0; + loop { match receive_stream.read_f32().await { Ok(received_audio_data) => { - audio_buffer.write().await.push_back(received_audio_data); + inner_buffer[i] = received_audio_data; } Err(err_val) => { eprintln!("Error: Receive Audio Data | Remote | {}", err_val); return; } } + + if i == BUFFER_LENGTH - 1 { + audio_buffer.write().await.update(inner_buffer); + i = 0; + } else { + i += 1; + } } }