diff --git a/server/src/stream.rs b/server/src/stream.rs index fdecd4f..061b750 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -1,33 +1,54 @@ -use std::{path::Path, sync::LazyLock}; +use std::{ + cell::{Cell, RefCell}, + path::Path, + sync::{Arc, LazyLock}, + time::Duration, +}; +use chrono::{TimeDelta, TimeZone, Utc}; use protocol::BUFFER_LENGTH; use s2n_quic::{ Connection, Server, stream::{ReceiveStream, SendStream}, }; use tokio::{ - io::AsyncReadExt, + io::{AsyncReadExt, AsyncWriteExt}, sync::{RwLock, broadcast}, + time::{Instant, Timeout}, }; use crate::ServerConfig; +const LOCAL_AUDIO_BUFFERING_TIMEOUT: Duration = Duration::from_millis(50); + #[derive(Debug)] struct User { - sender_to_user: broadcast::Sender, -} -impl User { - fn new() -> (Self, broadcast::Receiver) { - let (sender, receiver) = broadcast::channel(BUFFER_LENGTH); - let user = Self { - sender_to_user: sender, - }; - (user, receiver) - } + user_id: u64, + local_audio_data_sender: broadcast::Sender, } static ONLINE_USERS: LazyLock>> = LazyLock::new(|| RwLock::new(vec![])); +#[derive(Debug, Clone, Copy)] +struct UseredAudioData { + user_id: u64, + audio_data: f32, +} + +impl User { + async fn new(user_id: u64) -> broadcast::Receiver { + let (local_audio_data_sender, local_audio_data_receiver) = + broadcast::channel(BUFFER_LENGTH); + let new_user = User { + user_id, + local_audio_data_sender, + }; + + ONLINE_USERS.write().await.push(new_user); + local_audio_data_receiver + } +} + pub async fn start(server_config: ServerConfig) { let mut server = Server::builder() .with_io(server_config.server_address.as_str()) @@ -62,44 +83,56 @@ async fn handle_client(mut connection: Connection) { .unwrap() .unwrap(); let (receive_stream, send_stream) = stream.split(); - let (user, receiver) = User::new(); - ONLINE_USERS.write().await.push(user); - tokio::spawn(receive_sound_data(receive_stream)); - tokio::spawn(send_sound_data(receiver, send_stream)); + let local_audio_receiver = User::new(connection.id()).await; } -async fn send_sound_data(mut receiver: broadcast::Receiver, mut send_stream: SendStream) { - loop { - match receiver.recv().await { - Ok(received_data) => { - if let Err(err_val) = send_stream - .send(received_data.to_be_bytes().to_vec().into()) - .await - { - eprintln!("Error: Send Sound Data | {}", err_val); - } - } - Err(err_val) => { - eprintln!("Error: Receive Sound Data | Local Channel | {}", err_val); - break; - } - } - } -} -async fn receive_sound_data(mut receive_stream: ReceiveStream) { +async fn receive_audio_stream(mut receive_stream: ReceiveStream) { loop { match receive_stream.read_f32().await { - Ok(received_data) => { + Ok(received_audio_data) => { for online_user in ONLINE_USERS.read().await.iter() { - if let Err(err_val) = online_user.sender_to_user.send(received_data) { - eprintln!("Error: Send Sound Data to All | {}", err_val); + let usered_audio_data = UseredAudioData { + user_id: receive_stream.id(), + audio_data: received_audio_data, + }; + if let Err(err_val) = + online_user.local_audio_data_sender.send(usered_audio_data) + { + eprintln!("Error: Send Audio Data | Local | {}", err_val); } } } Err(err_val) => { - eprintln!("Error: Receive Sound Data | {} ", err_val); + eprintln!("Error: Receive Audio Data | Remote | {}", err_val); break; } } } } + +async fn send_audio_stream( + mut send_stream: SendStream, + mut local_audio_receiver: broadcast::Receiver, +) { + let mut usered_audio_buffer = vec![]; + let mut time = Utc::now(); + loop { + if time + LOCAL_AUDIO_BUFFERING_TIMEOUT > Utc::now() { + match local_audio_receiver.recv().await { + Ok(usered_audio_data) => { + usered_audio_buffer.push(usered_audio_data); + } + Err(err_val) => { + eprintln!("Error: Receive Audio Data | Local | {}", err_val); + break; + } + } + } else { + usered_audio_buffer.sort_by(|first, second| first.user_id.cmp(&second.user_id)); + todo!("mixer"); + + usered_audio_buffer.clear(); + time = Utc::now(); + } + } +}