diff --git a/client/src/gui.rs b/client/src/gui.rs index c1839e2..c62cb00 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -8,7 +8,7 @@ use iced::{ Element, Task, Theme, widget::{button, column, row}, }; -use protocol::{BUFFER_LENGTH, Error}; +use protocol::Error; use tokio::{ sync::{ broadcast::{self}, @@ -17,7 +17,7 @@ use tokio::{ time::sleep, }; -use crate::{ClientConfig, stream::connect, voice::record}; +use crate::{ClientConfig, MICROPHONE_BUFFER_LENGHT, stream::connect, voice::record}; #[derive(Debug, Default)] struct Signal { @@ -59,7 +59,7 @@ struct Channel { impl Channel { fn new() -> Self { Self { - microphone: broadcast::channel(BUFFER_LENGTH).0.into(), + microphone: broadcast::channel(MICROPHONE_BUFFER_LENGHT / 4).0.into(), // speaker: broadcast::channel(BUFFER_LENGTH), } } diff --git a/client/src/lib.rs b/client/src/lib.rs index 5449d88..7571c6a 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -2,6 +2,9 @@ pub mod gui; pub mod stream; pub mod voice; +const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 16; +const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 4; + #[derive(Debug)] pub struct ClientConfig { certificate_path: String, diff --git a/client/src/stream.rs b/client/src/stream.rs index 26cb834..5c0e8a0 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, path::Path, sync::Arc}; -use protocol::{BUFFER_LENGTH, Error}; +use protocol::Error; use s2n_quic::{ Client, client::Connect, @@ -11,7 +11,7 @@ use tokio::{ sync::{broadcast, oneshot}, }; -use crate::{ClientConfig, voice::play}; +use crate::{ClientConfig, SPEAKER_BUFFER_LENGHT, voice::play}; pub async fn connect( connection_stop_signal_receiver: oneshot::Receiver, @@ -51,7 +51,7 @@ pub async fn connect( .await .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?; let (receive_stream, send_stream) = stream.split(); - let (speaker_sender, speaker_receiver) = broadcast::channel(BUFFER_LENGTH); + let (speaker_sender, speaker_receiver) = broadcast::channel(SPEAKER_BUFFER_LENGHT); let (speaker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel(); tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver)); let receive_voice_data_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender)); diff --git a/client/src/voice.rs b/client/src/voice.rs index 47f3f91..51367ba 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -11,6 +11,7 @@ pub async fn record( let host = cpal::default_host(); let input_device = host.default_input_device().unwrap(); let config = input_device.default_input_config().unwrap().into(); + println!("Recorder Stream Config = {:#?}", config); let input = move |data: &[f32], _: &cpal::InputCallbackInfo| { for &sample in data { @@ -50,6 +51,7 @@ pub async fn play( let host = cpal::default_host(); let output_device = host.default_output_device().unwrap(); let config = output_device.default_output_config().unwrap().into(); + println!("Speaker Stream Config = {:#?}", config); let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { for sample in data { diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index a410313..6c493b0 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -7,8 +7,6 @@ use std::{ use serde::{Deserialize, Serialize}; -pub const BUFFER_LENGTH: usize = 1024 * 16; - #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Error { ConnectionSetup(String), diff --git a/server/src/stream.rs b/server/src/stream.rs index 06b7296..9d6b51f 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -16,29 +16,56 @@ use tokio::{ }; use crate::ServerConfig; -const BUFFER_LENGTH: usize = protocol::BUFFER_LENGTH; +const BUFFER_LENGTH: usize = 1024 * 16 * 2; #[derive(Debug)] struct User { - sender: broadcast::Sender, + audio_buffer: Arc>>, } + impl User { - async fn new() -> broadcast::Sender { - let (sender, _) = broadcast::channel(BUFFER_LENGTH); + async fn new() -> Arc>> { + let audio_buffer = Arc::new(RwLock::new(VecDeque::new())); let new_user = Self { - sender: sender.clone(), + audio_buffer: audio_buffer.clone(), }; ONLINE_USERS.write().await.push(new_user); - if let Err(err_val) = NEW_USER_NOTIFIER.send(sender.clone()) { - eprintln!("Error: Send New User | Local | {}", err_val); - } - sender.clone() + audio_buffer } } static ONLINE_USERS: LazyLock>> = LazyLock::new(|| vec![].into()); -static NEW_USER_NOTIFIER: LazyLock>> = +static GLOBAL_MIXER: LazyLock> = LazyLock::new(|| broadcast::channel(BUFFER_LENGTH).0); +async fn global_mixer() { + let global_mixer_sender = GLOBAL_MIXER.clone(); + + loop { + sleep(Duration::from_millis(100)).await; + let mut mixed_audio_buffer = VecDeque::new(); + for online_user in ONLINE_USERS.read().await.iter() { + let mut inner_buffer = vec![]; + while let Some(audio_data) = online_user.audio_buffer.write().await.pop_front() { + inner_buffer.push(audio_data); + } + + for (i, audio_data) in inner_buffer.iter().enumerate() { + match mixed_audio_buffer.get(i) { + Some(original_value) => mixed_audio_buffer[i] = original_value + audio_data, + None => mixed_audio_buffer.push_back(*audio_data), + } + } + } + + mixed_audio_buffer.truncate(mixed_audio_buffer.len() * 85 / 100); + while let Some(audio_data) = mixed_audio_buffer.pop_front() { + if let Err(err_val) = global_mixer_sender.send(audio_data) { + eprintln!("Error: Send Mixed Audio Data | Local | {}", err_val); + } + } + } +} + pub async fn start(server_config: ServerConfig) { let mut server = Server::builder() .with_io(server_config.server_address.as_str()) @@ -51,6 +78,8 @@ pub async fn start(server_config: ServerConfig) { .start() .unwrap(); + tokio::spawn(global_mixer()); + while let Some(connection) = server.accept().await { tokio::spawn(handle_client(connection)); } @@ -74,152 +103,39 @@ async fn handle_client(mut connection: Connection) { .unwrap(); let (receive_stream, send_stream) = stream.split(); - let new_user_sender = User::new().await; + let user_audio_buffer = User::new().await; - tokio::spawn(receiver_audio_data(receive_stream, new_user_sender)); + tokio::spawn(receiver_audio_data(receive_stream, user_audio_buffer)); tokio::spawn(send_audio_data(send_stream)); } -async fn mixer_with_buffer( - unmixed_audio_data: Arc>>>>>, -) -> Vec { - let mut mixed_audio_buffer = vec![]; - for audio_buffer in unmixed_audio_data.read().await.iter() { - let mut inner_audio_buffer = vec![]; - - while let Some(audio_data) = audio_buffer.write().await.pop_front() { - inner_audio_buffer.push(audio_data); - } - - for (i, audio_data) in inner_audio_buffer.iter().enumerate() { - match mixed_audio_buffer.get(i) { - Some(current_audio_data) => mixed_audio_buffer[i] = current_audio_data + audio_data, - None => mixed_audio_buffer.push(*audio_data), - } - } - } - mixed_audio_buffer -} - -// async fn mixer_with_channel( -// unmixed_audio_data: Arc>>>>>, -// sender: broadcast::Sender, -// ) { -// loop { -// sleep(Duration::from_nanos(10)).await; -// let mut mixed_audio_buffer = vec![]; -// for audio_buffer in unmixed_audio_data.read().await.iter() { -// let mut inner_audio_buffer = vec![]; -// -// while let Some(audio_data) = audio_buffer.write().await.pop_front() { -// inner_audio_buffer.push(audio_data); -// } -// -// for (i, audio_data) in inner_audio_buffer.iter().enumerate() { -// match mixed_audio_buffer.get(i) { -// Some(current_audio_data) => { -// mixed_audio_buffer[i] = current_audio_data + audio_data -// } -// None => mixed_audio_buffer.push(*audio_data), -// } -// } -// } -// for audio_data in mixed_audio_buffer { -// if let Err(err_val) = sender.send(audio_data) { -// eprintln!("Error: Send Mixed Audio | Local | {}", err_val); -// return; -// } -// } -// } -// } - -async fn new_user_handler( - mut receiver: broadcast::Receiver, - unmixed_audio_data: Arc>>>>>, -) { - let local_audio_buffer = Arc::new(RwLock::new(VecDeque::new())); - unmixed_audio_data - .write() - .await - .push_back(local_audio_buffer.clone()); +async fn send_audio_data(mut send_stream: SendStream) { + let mut global_mixer_receiver = GLOBAL_MIXER.subscribe(); loop { - match receiver.recv().await { + match global_mixer_receiver.recv().await { Ok(received_audio_data) => { - local_audio_buffer - .write() - .await - .push_back(received_audio_data); - } - Err(err_val) => { - eprintln!("Error: Receive Audio Data | Local | {}", err_val); - return; - } - } - } -} - -async fn send_audio_data(mut send_stream: SendStream) { - let unmixed_audio_data = Arc::new(RwLock::new(VecDeque::new())); - for online_user in ONLINE_USERS.read().await.iter() { - let receiver = online_user.sender.subscribe(); - let unmixed_audio_data = unmixed_audio_data.clone(); - tokio::spawn(new_user_handler(receiver, unmixed_audio_data)); - } - let unmixed_audio_data_for_handling_new_user = unmixed_audio_data.clone(); - tokio::spawn(async move { - loop { - match NEW_USER_NOTIFIER.subscribe().recv().await { - Ok(new_user_sender) => { - let receiver = new_user_sender.subscribe(); - let unmixed_audio_data = unmixed_audio_data_for_handling_new_user.clone(); - tokio::spawn(new_user_handler(receiver, unmixed_audio_data)); - } - Err(err_val) => { - eprintln!("Error: Receive New User | Local | {}", err_val); + if let Err(err_val) = send_stream.write_f32(received_audio_data).await { + eprintln!("Error: Send Audio Data | Remote | {}", err_val); return; } } - } - }); - - // let (mixer_sender, mut mixer_receiver) = broadcast::channel(BUFFER_LENGTH); - // tokio::spawn(mixer_with_channel(unmixed_audio_data, mixer_sender)); - // loop { - // match mixer_receiver.recv().await { - // Ok(mixed_audio_data) => { - // if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await { - // eprintln!("Error: Send Audio Data | Remote | {}", err_val); - // return; - // } - // } - // Err(err_val) => { - // eprintln!("Error: Receive Mixed Audio Data | Local | {}", err_val); - // return; - // } - // } - // } - - loop { - sleep(Duration::from_millis(50)).await; - let mixed_audio_buffer = mixer_with_buffer(unmixed_audio_data.clone()).await; - for mixed_audio_data in mixed_audio_buffer { - if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await { - eprintln!("Error: Send Audio Data | Remote | {}", err_val); + Err(err_val) => { + eprintln!("Error: Receive Mixed Audio Data | Local | {}", err_val); return; } } } } -async fn receiver_audio_data(mut receive_stream: ReceiveStream, sender: broadcast::Sender) { +async fn receiver_audio_data( + mut receive_stream: ReceiveStream, + audio_buffer: Arc>>, +) { loop { match receive_stream.read_f32().await { Ok(received_audio_data) => { - if let Err(err_val) = sender.send(received_audio_data) { - eprintln!("Error: Send Audio Data | Local | {}", err_val); - return; - } + audio_buffer.write().await.push_back(received_audio_data); } Err(err_val) => { eprintln!("Error: Receive Audio Data | Remote | {}", err_val);