feat: remote mixer part 2

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-19 03:23:13 +03:00
parent cabb54cad3
commit 0c187b166f
2 changed files with 67 additions and 19 deletions

View file

@ -7,7 +7,7 @@ use std::{
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub const BUFFER_LENGTH: usize = 1024 * 8; pub const BUFFER_LENGTH: usize = 1024 * 16;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Error { pub enum Error {

View file

@ -1,11 +1,6 @@
use std::{ use std::{path::Path, process::exit, sync::LazyLock, time::Duration};
cell::{Cell, RefCell},
path::Path,
sync::{Arc, LazyLock},
time::Duration,
};
use chrono::{TimeDelta, TimeZone, Utc}; use chrono::Utc;
use protocol::BUFFER_LENGTH; use protocol::BUFFER_LENGTH;
use s2n_quic::{ use s2n_quic::{
Connection, Server, Connection, Server,
@ -14,12 +9,12 @@ use s2n_quic::{
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
sync::{RwLock, broadcast}, sync::{RwLock, broadcast},
time::{Instant, Timeout}, time::sleep,
}; };
use crate::ServerConfig; use crate::ServerConfig;
const LOCAL_AUDIO_BUFFERING_TIMEOUT: Duration = Duration::from_millis(50); const LOCAL_AUDIO_BUFFERING_TIMEOUT: Duration = Duration::from_millis(10);
#[derive(Debug)] #[derive(Debug)]
struct User { struct User {
@ -38,7 +33,7 @@ struct UseredAudioData {
impl User { impl User {
async fn new(user_id: u64) -> broadcast::Receiver<UseredAudioData> { async fn new(user_id: u64) -> broadcast::Receiver<UseredAudioData> {
let (local_audio_data_sender, local_audio_data_receiver) = let (local_audio_data_sender, local_audio_data_receiver) =
broadcast::channel(BUFFER_LENGTH); broadcast::channel(BUFFER_LENGTH * 16);
let new_user = User { let new_user = User {
user_id, user_id,
local_audio_data_sender, local_audio_data_sender,
@ -60,13 +55,14 @@ pub async fn start(server_config: ServerConfig) {
.unwrap() .unwrap()
.start() .start()
.unwrap(); .unwrap();
let mut user_id = 0;
while let Some(connection) = server.accept().await { while let Some(connection) = server.accept().await {
tokio::spawn(handle_client(connection)); tokio::spawn(handle_client(user_id, connection));
user_id += 1;
} }
} }
async fn handle_client(mut connection: Connection) { async fn handle_client(user_id: u64, mut connection: Connection) {
println!( println!(
"Server Name = {}", "Server Name = {}",
connection.server_name().unwrap().unwrap().to_string() connection.server_name().unwrap().unwrap().to_string()
@ -83,22 +79,29 @@ async fn handle_client(mut connection: Connection) {
.unwrap() .unwrap()
.unwrap(); .unwrap();
let (receive_stream, send_stream) = stream.split(); let (receive_stream, send_stream) = stream.split();
let local_audio_receiver = User::new(connection.id()).await; let local_audio_receiver = User::new(user_id).await;
tokio::spawn(receive_audio_stream(user_id, receive_stream));
tokio::spawn(send_audio_stream(
user_id,
send_stream,
local_audio_receiver,
));
} }
async fn receive_audio_stream(mut receive_stream: ReceiveStream) { async fn receive_audio_stream(user_id: u64, mut receive_stream: ReceiveStream) {
loop { loop {
match receive_stream.read_f32().await { match receive_stream.read_f32().await {
Ok(received_audio_data) => { Ok(received_audio_data) => {
for online_user in ONLINE_USERS.read().await.iter() { for online_user in ONLINE_USERS.read().await.iter() {
let usered_audio_data = UseredAudioData { let usered_audio_data = UseredAudioData {
user_id: receive_stream.id(), user_id,
audio_data: received_audio_data, audio_data: received_audio_data,
}; };
if let Err(err_val) = if let Err(err_val) =
online_user.local_audio_data_sender.send(usered_audio_data) online_user.local_audio_data_sender.send(usered_audio_data)
{ {
eprintln!("Error: Send Audio Data | Local | {}", err_val); eprintln!("Error: Send Audio Data | Local | {}", err_val);
exit(0);
} }
} }
} }
@ -111,6 +114,7 @@ async fn receive_audio_stream(mut receive_stream: ReceiveStream) {
} }
async fn send_audio_stream( async fn send_audio_stream(
user_id: u64,
mut send_stream: SendStream, mut send_stream: SendStream,
mut local_audio_receiver: broadcast::Receiver<UseredAudioData>, mut local_audio_receiver: broadcast::Receiver<UseredAudioData>,
) { ) {
@ -128,11 +132,55 @@ async fn send_audio_stream(
} }
} }
} else { } else {
usered_audio_buffer.sort_by(|first, second| first.user_id.cmp(&second.user_id)); let mixed_buffer = mixer(user_id, &mut usered_audio_buffer).await;
todo!("mixer");
for mixed_buffer_data in mixed_buffer {
if let Err(err_val) = send_stream.write_f32(mixed_buffer_data).await {
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
}
}
usered_audio_buffer.clear(); usered_audio_buffer.clear();
time = Utc::now(); time = Utc::now();
} }
} }
} }
async fn mixer(user_id: u64, usered_audio_buffer: &mut Vec<UseredAudioData>) -> Vec<f32> {
usered_audio_buffer.sort_by(|first, second| first.user_id.cmp(&second.user_id));
let mut parsed_audio_data = vec![];
let mut inner = vec![];
let mut current_user = user_id;
while let Some(usered_audio) = usered_audio_buffer.pop() {
if usered_audio.user_id == user_id {
continue;
} else {
if current_user == usered_audio.user_id {
inner.push(usered_audio.audio_data);
} else {
parsed_audio_data.push(inner.clone());
inner.clear();
current_user = usered_audio.user_id;
}
}
}
parsed_audio_data.push(inner.clone());
let mut mixed_buffer = vec![];
for parsed_audio_single_user_buffer in parsed_audio_data.iter() {
for (i, parsed_audio_single_user_buffer_data) in
parsed_audio_single_user_buffer.iter().enumerate()
{
match mixed_buffer.get(i) {
Some(current_data) => {
mixed_buffer[i] = current_data + parsed_audio_single_user_buffer_data
}
None => mixed_buffer.push(*parsed_audio_single_user_buffer_data),
}
}
}
mixed_buffer
}