feat: remote mixer part 1

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-18 23:32:06 +03:00
parent 1451e9ccfc
commit cabb54cad3

View file

@ -1,33 +1,54 @@
use std::{path::Path, sync::LazyLock}; use std::{
cell::{Cell, RefCell},
path::Path,
sync::{Arc, LazyLock},
time::Duration,
};
use chrono::{TimeDelta, TimeZone, Utc};
use protocol::BUFFER_LENGTH; use protocol::BUFFER_LENGTH;
use s2n_quic::{ use s2n_quic::{
Connection, Server, Connection, Server,
stream::{ReceiveStream, SendStream}, stream::{ReceiveStream, SendStream},
}; };
use tokio::{ use tokio::{
io::AsyncReadExt, io::{AsyncReadExt, AsyncWriteExt},
sync::{RwLock, broadcast}, sync::{RwLock, broadcast},
time::{Instant, Timeout},
}; };
use crate::ServerConfig; use crate::ServerConfig;
const LOCAL_AUDIO_BUFFERING_TIMEOUT: Duration = Duration::from_millis(50);
#[derive(Debug)] #[derive(Debug)]
struct User { struct User {
sender_to_user: broadcast::Sender<f32>, user_id: u64,
} local_audio_data_sender: broadcast::Sender<UseredAudioData>,
impl User {
fn new() -> (Self, broadcast::Receiver<f32>) {
let (sender, receiver) = broadcast::channel(BUFFER_LENGTH);
let user = Self {
sender_to_user: sender,
};
(user, receiver)
}
} }
static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| RwLock::new(vec![])); static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| RwLock::new(vec![]));
#[derive(Debug, Clone, Copy)]
struct UseredAudioData {
user_id: u64,
audio_data: f32,
}
impl User {
async fn new(user_id: u64) -> broadcast::Receiver<UseredAudioData> {
let (local_audio_data_sender, local_audio_data_receiver) =
broadcast::channel(BUFFER_LENGTH);
let new_user = User {
user_id,
local_audio_data_sender,
};
ONLINE_USERS.write().await.push(new_user);
local_audio_data_receiver
}
}
pub async fn start(server_config: ServerConfig) { pub async fn start(server_config: ServerConfig) {
let mut server = Server::builder() let mut server = Server::builder()
.with_io(server_config.server_address.as_str()) .with_io(server_config.server_address.as_str())
@ -62,44 +83,56 @@ async fn handle_client(mut connection: Connection) {
.unwrap() .unwrap()
.unwrap(); .unwrap();
let (receive_stream, send_stream) = stream.split(); let (receive_stream, send_stream) = stream.split();
let (user, receiver) = User::new(); let local_audio_receiver = User::new(connection.id()).await;
ONLINE_USERS.write().await.push(user);
tokio::spawn(receive_sound_data(receive_stream));
tokio::spawn(send_sound_data(receiver, send_stream));
} }
async fn send_sound_data(mut receiver: broadcast::Receiver<f32>, mut send_stream: SendStream) { async fn receive_audio_stream(mut receive_stream: ReceiveStream) {
loop {
match receiver.recv().await {
Ok(received_data) => {
if let Err(err_val) = send_stream
.send(received_data.to_be_bytes().to_vec().into())
.await
{
eprintln!("Error: Send Sound Data | {}", err_val);
}
}
Err(err_val) => {
eprintln!("Error: Receive Sound Data | Local Channel | {}", err_val);
break;
}
}
}
}
async fn receive_sound_data(mut receive_stream: ReceiveStream) {
loop { loop {
match receive_stream.read_f32().await { match receive_stream.read_f32().await {
Ok(received_data) => { Ok(received_audio_data) => {
for online_user in ONLINE_USERS.read().await.iter() { for online_user in ONLINE_USERS.read().await.iter() {
if let Err(err_val) = online_user.sender_to_user.send(received_data) { let usered_audio_data = UseredAudioData {
eprintln!("Error: Send Sound Data to All | {}", err_val); user_id: receive_stream.id(),
audio_data: received_audio_data,
};
if let Err(err_val) =
online_user.local_audio_data_sender.send(usered_audio_data)
{
eprintln!("Error: Send Audio Data | Local | {}", err_val);
} }
} }
} }
Err(err_val) => { Err(err_val) => {
eprintln!("Error: Receive Sound Data | {} ", err_val); eprintln!("Error: Receive Audio Data | Remote | {}", err_val);
break; break;
} }
} }
} }
} }
async fn send_audio_stream(
mut send_stream: SendStream,
mut local_audio_receiver: broadcast::Receiver<UseredAudioData>,
) {
let mut usered_audio_buffer = vec![];
let mut time = Utc::now();
loop {
if time + LOCAL_AUDIO_BUFFERING_TIMEOUT > Utc::now() {
match local_audio_receiver.recv().await {
Ok(usered_audio_data) => {
usered_audio_buffer.push(usered_audio_data);
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Local | {}", err_val);
break;
}
}
} else {
usered_audio_buffer.sort_by(|first, second| first.user_id.cmp(&second.user_id));
todo!("mixer");
usered_audio_buffer.clear();
time = Utc::now();
}
}
}