diff --git a/client/src/stream.rs b/client/src/stream.rs index 2e356dc..26cb834 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -54,31 +54,25 @@ pub async fn connect( let (speaker_sender, speaker_receiver) = broadcast::channel(BUFFER_LENGTH); let (speaker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel(); tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver)); - let receive_voice_data_task = tokio::spawn(receive_voice_data(receive_stream, speaker_sender)); - let send_voice_data_task = tokio::spawn(send_voice_data( + let receive_voice_data_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender)); + let send_voice_data_task = tokio::spawn(send_audio_data( send_stream, microphone_sender_for_producing_receiver, )); if let Err(err_val) = is_connection_started_signal.send(true) { - eprintln!( - "Error: Is Connection Started | Local Channel | Send | {}", - err_val - ); + eprintln!("Error: Is Connection Started | Local | Send | {}", err_val); } if let Err(err_val) = connection_stop_signal_receiver.await { eprintln!( - "Error: Connection Stop Signal | Local Channel | Receive | {}", + "Error: Connection Stop Signal | Local | Receive | {}", err_val ); } if let Err(err_val) = speaker_stop_signal_sender.send(true) { - eprintln!( - "Error: Speaker Stop Signal | Local Channel | Send | {}", - err_val - ); + eprintln!("Error: Speaker Stop Signal | Local | Send | {}", err_val); } println!("Connection Is Closing"); @@ -91,7 +85,7 @@ pub async fn connect( Ok(()) } -async fn send_voice_data( +async fn send_audio_data( mut send_stream: SendStream, microphone_sender_for_producing_receiver: Arc>, ) { @@ -100,15 +94,12 @@ async fn send_voice_data( match microphone_receiver.recv().await { Ok(microphone_data) => { if let Err(err_val) = send_stream.write_f32(microphone_data).await { - eprintln!("Error: Send Microphone Data | {}", err_val); + eprintln!("Error: Send Microphone Data | Remote | {}", err_val); break; } } Err(err_val) => { - eprintln!( - "Error: Receive from Microphone | Local Channel | {}", - err_val - ); + eprintln!("Error: Receive from Microphone | Local | {}", err_val); match err_val { broadcast::error::RecvError::Closed => break, broadcast::error::RecvError::Lagged(_) => { @@ -119,7 +110,7 @@ async fn send_voice_data( } } } -async fn receive_voice_data( +async fn receive_audio_data( mut receive_stream: ReceiveStream, speaker_sender: broadcast::Sender, ) { @@ -127,12 +118,12 @@ async fn receive_voice_data( match receive_stream.read_f32().await { Ok(received_data) => { if let Err(err_val) = speaker_sender.send(received_data) { - eprintln!("Error: Send to Speaker | Local Channel | {}", err_val); + eprintln!("Error: Send to Speaker | Local | {}", err_val); break; } } Err(err_val) => { - eprintln!("Error: Receive Data | {}", err_val); + eprintln!("Error: Receive Audio Data | Remote | {}", err_val); break; } } diff --git a/server/src/stream.rs b/server/src/stream.rs index 89e8d4f..06b7296 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -1,7 +1,10 @@ -use std::{path::Path, process::exit, sync::LazyLock, time::Duration}; +use std::{ + collections::VecDeque, + path::Path, + sync::{Arc, LazyLock}, + time::Duration, +}; -use chrono::Utc; -use protocol::BUFFER_LENGTH; use s2n_quic::{ Connection, Server, stream::{ReceiveStream, SendStream}, @@ -13,36 +16,28 @@ use tokio::{ }; use crate::ServerConfig; - -const LOCAL_AUDIO_BUFFERING_TIMEOUT: Duration = Duration::from_millis(10); +const BUFFER_LENGTH: usize = protocol::BUFFER_LENGTH; #[derive(Debug)] struct User { - user_id: u64, - local_audio_data_sender: broadcast::Sender, + sender: broadcast::Sender, } - -static ONLINE_USERS: LazyLock>> = LazyLock::new(|| RwLock::new(vec![])); - -#[derive(Debug, Clone, Copy)] -struct UseredAudioData { - user_id: u64, - audio_data: f32, -} - impl User { - async fn new(user_id: u64) -> broadcast::Receiver { - let (local_audio_data_sender, local_audio_data_receiver) = - broadcast::channel(BUFFER_LENGTH * 16); - let new_user = User { - user_id, - local_audio_data_sender, + async fn new() -> broadcast::Sender { + let (sender, _) = broadcast::channel(BUFFER_LENGTH); + let new_user = Self { + sender: sender.clone(), }; - ONLINE_USERS.write().await.push(new_user); - local_audio_data_receiver + if let Err(err_val) = NEW_USER_NOTIFIER.send(sender.clone()) { + eprintln!("Error: Send New User | Local | {}", err_val); + } + sender.clone() } } +static ONLINE_USERS: LazyLock>> = LazyLock::new(|| vec![].into()); +static NEW_USER_NOTIFIER: LazyLock>> = + LazyLock::new(|| broadcast::channel(BUFFER_LENGTH).0); pub async fn start(server_config: ServerConfig) { let mut server = Server::builder() @@ -55,14 +50,13 @@ pub async fn start(server_config: ServerConfig) { .unwrap() .start() .unwrap(); - let mut user_id = 0; + while let Some(connection) = server.accept().await { - tokio::spawn(handle_client(user_id, connection)); - user_id += 1; + tokio::spawn(handle_client(connection)); } } -async fn handle_client(user_id: u64, mut connection: Connection) { +async fn handle_client(mut connection: Connection) { println!( "Server Name = {}", connection.server_name().unwrap().unwrap().to_string() @@ -79,108 +73,158 @@ async fn handle_client(user_id: u64, mut connection: Connection) { .unwrap() .unwrap(); let (receive_stream, send_stream) = stream.split(); - let local_audio_receiver = User::new(user_id).await; - tokio::spawn(receive_audio_stream(user_id, receive_stream)); - tokio::spawn(send_audio_stream( - user_id, - send_stream, - local_audio_receiver, - )); + + let new_user_sender = User::new().await; + + tokio::spawn(receiver_audio_data(receive_stream, new_user_sender)); + tokio::spawn(send_audio_data(send_stream)); } -async fn receive_audio_stream(user_id: u64, mut receive_stream: ReceiveStream) { +async fn mixer_with_buffer( + unmixed_audio_data: Arc>>>>>, +) -> Vec { + let mut mixed_audio_buffer = vec![]; + for audio_buffer in unmixed_audio_data.read().await.iter() { + let mut inner_audio_buffer = vec![]; + + while let Some(audio_data) = audio_buffer.write().await.pop_front() { + inner_audio_buffer.push(audio_data); + } + + for (i, audio_data) in inner_audio_buffer.iter().enumerate() { + match mixed_audio_buffer.get(i) { + Some(current_audio_data) => mixed_audio_buffer[i] = current_audio_data + audio_data, + None => mixed_audio_buffer.push(*audio_data), + } + } + } + mixed_audio_buffer +} + +// async fn mixer_with_channel( +// unmixed_audio_data: Arc>>>>>, +// sender: broadcast::Sender, +// ) { +// loop { +// sleep(Duration::from_nanos(10)).await; +// let mut mixed_audio_buffer = vec![]; +// for audio_buffer in unmixed_audio_data.read().await.iter() { +// let mut inner_audio_buffer = vec![]; +// +// while let Some(audio_data) = audio_buffer.write().await.pop_front() { +// inner_audio_buffer.push(audio_data); +// } +// +// for (i, audio_data) in inner_audio_buffer.iter().enumerate() { +// match mixed_audio_buffer.get(i) { +// Some(current_audio_data) => { +// mixed_audio_buffer[i] = current_audio_data + audio_data +// } +// None => mixed_audio_buffer.push(*audio_data), +// } +// } +// } +// for audio_data in mixed_audio_buffer { +// if let Err(err_val) = sender.send(audio_data) { +// eprintln!("Error: Send Mixed Audio | Local | {}", err_val); +// return; +// } +// } +// } +// } + +async fn new_user_handler( + mut receiver: broadcast::Receiver, + unmixed_audio_data: Arc>>>>>, +) { + let local_audio_buffer = Arc::new(RwLock::new(VecDeque::new())); + unmixed_audio_data + .write() + .await + .push_back(local_audio_buffer.clone()); + + loop { + match receiver.recv().await { + Ok(received_audio_data) => { + local_audio_buffer + .write() + .await + .push_back(received_audio_data); + } + Err(err_val) => { + eprintln!("Error: Receive Audio Data | Local | {}", err_val); + return; + } + } + } +} + +async fn send_audio_data(mut send_stream: SendStream) { + let unmixed_audio_data = Arc::new(RwLock::new(VecDeque::new())); + for online_user in ONLINE_USERS.read().await.iter() { + let receiver = online_user.sender.subscribe(); + let unmixed_audio_data = unmixed_audio_data.clone(); + tokio::spawn(new_user_handler(receiver, unmixed_audio_data)); + } + let unmixed_audio_data_for_handling_new_user = unmixed_audio_data.clone(); + tokio::spawn(async move { + loop { + match NEW_USER_NOTIFIER.subscribe().recv().await { + Ok(new_user_sender) => { + let receiver = new_user_sender.subscribe(); + let unmixed_audio_data = unmixed_audio_data_for_handling_new_user.clone(); + tokio::spawn(new_user_handler(receiver, unmixed_audio_data)); + } + Err(err_val) => { + eprintln!("Error: Receive New User | Local | {}", err_val); + return; + } + } + } + }); + + // let (mixer_sender, mut mixer_receiver) = broadcast::channel(BUFFER_LENGTH); + // tokio::spawn(mixer_with_channel(unmixed_audio_data, mixer_sender)); + // loop { + // match mixer_receiver.recv().await { + // Ok(mixed_audio_data) => { + // if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await { + // eprintln!("Error: Send Audio Data | Remote | {}", err_val); + // return; + // } + // } + // Err(err_val) => { + // eprintln!("Error: Receive Mixed Audio Data | Local | {}", err_val); + // return; + // } + // } + // } + + loop { + sleep(Duration::from_millis(50)).await; + let mixed_audio_buffer = mixer_with_buffer(unmixed_audio_data.clone()).await; + for mixed_audio_data in mixed_audio_buffer { + if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await { + eprintln!("Error: Send Audio Data | Remote | {}", err_val); + return; + } + } + } +} + +async fn receiver_audio_data(mut receive_stream: ReceiveStream, sender: broadcast::Sender) { loop { match receive_stream.read_f32().await { Ok(received_audio_data) => { - for online_user in ONLINE_USERS.read().await.iter() { - let usered_audio_data = UseredAudioData { - user_id, - audio_data: received_audio_data, - }; - if let Err(err_val) = - online_user.local_audio_data_sender.send(usered_audio_data) - { - eprintln!("Error: Send Audio Data | Local | {}", err_val); - exit(0); - } + if let Err(err_val) = sender.send(received_audio_data) { + eprintln!("Error: Send Audio Data | Local | {}", err_val); + return; } } Err(err_val) => { eprintln!("Error: Receive Audio Data | Remote | {}", err_val); - break; + return; } } } } - -async fn send_audio_stream( - user_id: u64, - mut send_stream: SendStream, - mut local_audio_receiver: broadcast::Receiver, -) { - let mut usered_audio_buffer = vec![]; - let mut time = Utc::now(); - loop { - if time + LOCAL_AUDIO_BUFFERING_TIMEOUT > Utc::now() { - match local_audio_receiver.recv().await { - Ok(usered_audio_data) => { - usered_audio_buffer.push(usered_audio_data); - } - Err(err_val) => { - eprintln!("Error: Receive Audio Data | Local | {}", err_val); - break; - } - } - } else { - let mixed_buffer = mixer(user_id, &mut usered_audio_buffer).await; - - for mixed_buffer_data in mixed_buffer { - if let Err(err_val) = send_stream.write_f32(mixed_buffer_data).await { - eprintln!("Error: Send Audio Data | Remote | {}", err_val); - } - } - - usered_audio_buffer.clear(); - time = Utc::now(); - } - } -} - -async fn mixer(user_id: u64, usered_audio_buffer: &mut Vec) -> Vec { - usered_audio_buffer.sort_by(|first, second| first.user_id.cmp(&second.user_id)); - - let mut parsed_audio_data = vec![]; - let mut inner = vec![]; - let mut current_user = user_id; - - while let Some(usered_audio) = usered_audio_buffer.pop() { - if usered_audio.user_id == user_id { - continue; - } else { - if current_user == usered_audio.user_id { - inner.push(usered_audio.audio_data); - } else { - parsed_audio_data.push(inner.clone()); - inner.clear(); - current_user = usered_audio.user_id; - } - } - } - parsed_audio_data.push(inner.clone()); - - let mut mixed_buffer = vec![]; - for parsed_audio_single_user_buffer in parsed_audio_data.iter() { - for (i, parsed_audio_single_user_buffer_data) in - parsed_audio_single_user_buffer.iter().enumerate() - { - match mixed_buffer.get(i) { - Some(current_data) => { - mixed_buffer[i] = current_data + parsed_audio_single_user_buffer_data - } - None => mixed_buffer.push(*parsed_audio_single_user_buffer_data), - } - } - } - - mixed_buffer -}