diff --git a/Cargo.lock b/Cargo.lock index 4a92665..0aa13ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3465,6 +3465,7 @@ version = "0.1.0" dependencies = [ "chrono", "protocol", + "s2n-quic", "serde", "serde_json", "tokio", diff --git a/client/src/gui.rs b/client/src/gui.rs index 1563cd9..c1839e2 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -1,14 +1,20 @@ -use std::sync::{Arc, RwLock}; +use std::{ + sync::{Arc, RwLock}, + time::Duration, +}; use iced::{ Alignment::Center, Element, Task, Theme, widget::{button, column, row}, }; -use protocol::BUFFER_LENGTH; -use tokio::sync::{ - broadcast::{self}, - oneshot, +use protocol::{BUFFER_LENGTH, Error}; +use tokio::{ + sync::{ + broadcast::{self}, + oneshot, + }, + time::sleep, }; use crate::{ClientConfig, stream::connect, voice::record}; @@ -20,16 +26,40 @@ struct Signal { connection: Option>, } +impl Signal { + fn reset_microphone(&mut self) -> Result<(), Error> { + if let Some(microphone_signal) = &self.microphone { + if !microphone_signal.is_closed() { + self.microphone.take().expect("Never").send(true).unwrap(); + self.microphone = None; + return Ok(()); + } + } + Err(Error::Signal("Reset".to_string())) + } + + fn reset_connection(&mut self) -> Result<(), Error> { + if let Some(connection_signal) = &self.connection { + if !connection_signal.is_closed() { + self.connection.take().expect("Never").send(true).unwrap(); + self.connection = None; + return Ok(()); + } + } + Err(Error::Signal("Reset".to_string())) + } +} + #[derive(Debug)] struct Channel { - microphone: broadcast::Sender, + microphone: Arc>, // speaker: (broadcast::Sender, broadcast::Receiver), } impl Channel { fn new() -> Self { Self { - microphone: broadcast::channel(BUFFER_LENGTH).0, + microphone: broadcast::channel(BUFFER_LENGTH).0.into(), // speaker: broadcast::channel(BUFFER_LENGTH), } } @@ -71,6 +101,18 @@ pub struct App { } impl App { + fn reset_microphone(&mut self) -> Result<(), Error> { + self.signal.reset_microphone()?; + self.gui_status.write().unwrap().microphone = State::Passive; + Ok(()) + } + + fn reset_connection(&mut self) -> Result<(), Error> { + self.signal.reset_connection()?; + self.gui_status.write().unwrap().room = State::Passive; + Ok(()) + } + pub fn theme(&self) -> Theme { Theme::Dark } @@ -110,35 +152,37 @@ impl App { self.gui_status.write().unwrap().room = State::Loading; let client_config = self.client_config.clone(); let gui_status = self.gui_status.clone(); - let microphone_receiver = self.channel.microphone.subscribe(); + let microphone_sender_for_producing_receiver = self.channel.microphone.clone(); let connection_signal = oneshot::channel(); self.signal.connection = Some(connection_signal.0); + let is_connection_started_signal = oneshot::channel(); + tokio::spawn(connect( + connection_signal.1, + is_connection_started_signal.0, + microphone_sender_for_producing_receiver, + client_config, + )); + + let is_connection_started_task = tokio::spawn(is_connection_started_signal.1); Task::perform( - connect(connection_signal.1, microphone_receiver, client_config), - move |result| match result { - Ok(_) => gui_status.write().unwrap().room = State::Active, - Err(err_val) => { - eprintln!("Error: Join Room | {}", err_val); - gui_status.write().unwrap().room = State::Passive; + async move { + match is_connection_started_task.await { + Ok(_) => gui_status.write().unwrap().room = State::Active, + Err(err_val) => { + gui_status.write().unwrap().room = State::Passive; + eprintln!("Error: Connection Task | {}", err_val); + } } }, + |_| {}, ) .map(|_| Message::None) } Message::LeaveRoom => { self.gui_status.write().unwrap().room = State::Loading; - if let Some(connection_signal) = &self.signal.connection { - if !connection_signal.is_closed() { - self.signal - .connection - .take() - .expect("Never") - .send(true) - .unwrap(); - self.signal.connection = None; - self.gui_status.write().unwrap().room = State::Passive; - } + if let Err(err_val) = self.reset_connection() { + eprintln!("Error: Leave Room | {}", err_val); } Task::none() } @@ -165,17 +209,8 @@ impl App { } Message::MuteMicrophone => { self.gui_status.write().unwrap().microphone = State::Loading; - if let Some(microphone_signal) = &self.signal.microphone { - if !microphone_signal.is_closed() { - self.signal - .microphone - .take() - .expect("Never") - .send(true) - .unwrap(); - self.signal.microphone = None; - self.gui_status.write().unwrap().microphone = State::Passive; - } + if let Err(err_val) = self.reset_microphone() { + eprintln!("Error: Mute Microphone | {}", err_val); } Task::none() } diff --git a/client/src/stream.rs b/client/src/stream.rs index 877ca9b..2e356dc 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -1,17 +1,22 @@ use std::{net::SocketAddr, path::Path, sync::Arc}; use protocol::{BUFFER_LENGTH, Error}; -use s2n_quic::{Client, client::Connect}; +use s2n_quic::{ + Client, + client::Connect, + stream::{ReceiveStream, SendStream}, +}; use tokio::{ - io::AsyncReadExt, + io::{AsyncReadExt, AsyncWriteExt}, sync::{broadcast, oneshot}, }; use crate::{ClientConfig, voice::play}; pub async fn connect( - connection_signal: oneshot::Receiver, - mut microphone_receiver: broadcast::Receiver, + connection_stop_signal_receiver: oneshot::Receiver, + is_connection_started_signal: oneshot::Sender, + microphone_sender_for_producing_receiver: Arc>, client_config: Arc, ) -> Result<(), Error> { let client = Client::builder() @@ -45,40 +50,91 @@ pub async fn connect( .open_bidirectional_stream() .await .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?; - let (mut receive_stream, mut send_stream) = stream.split(); + let (receive_stream, send_stream) = stream.split(); + let (speaker_sender, speaker_receiver) = broadcast::channel(BUFFER_LENGTH); + let (speaker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel(); + tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver)); + let receive_voice_data_task = tokio::spawn(receive_voice_data(receive_stream, speaker_sender)); + let send_voice_data_task = tokio::spawn(send_voice_data( + send_stream, + microphone_sender_for_producing_receiver, + )); - let (speaker_sender, speaker_receiver) = broadcast::channel::(BUFFER_LENGTH); - - let (spearker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel::(); - - let play_task = tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver)); - let receive_task = tokio::spawn(async move { - while let Ok(data) = receive_stream.read_f32_le().await { - speaker_sender - .send(data) - .map_err(|err_val| Error::Signal(err_val.to_string())) - .unwrap(); - } - }); - let send_task = tokio::spawn(async move { - while let Ok(data) = microphone_receiver.recv().await { - send_stream - .send(data.to_le_bytes().to_vec().into()) - .await - .map_err(|err_val| Error::Send(err_val.to_string())) - .unwrap(); - } - }); - if let Ok(_) = connection_signal.await { - println!("Connection Closing"); + if let Err(err_val) = is_connection_started_signal.send(true) { + eprintln!( + "Error: Is Connection Started | Local Channel | Send | {}", + err_val + ); } - spearker_stop_signal_sender - .send(true) - .map_err(|err_val| Error::Signal(err_val.to_string()))?; - send_task.abort(); - receive_task.abort(); - play_task.abort(); + if let Err(err_val) = connection_stop_signal_receiver.await { + eprintln!( + "Error: Connection Stop Signal | Local Channel | Receive | {}", + err_val + ); + } + + if let Err(err_val) = speaker_stop_signal_sender.send(true) { + eprintln!( + "Error: Speaker Stop Signal | Local Channel | Send | {}", + err_val + ); + } + + println!("Connection Is Closing"); + + receive_voice_data_task.abort(); + send_voice_data_task.abort(); + + println!("Connection Is Closed"); Ok(()) } + +async fn send_voice_data( + mut send_stream: SendStream, + microphone_sender_for_producing_receiver: Arc>, +) { + let mut microphone_receiver = microphone_sender_for_producing_receiver.subscribe(); + loop { + match microphone_receiver.recv().await { + Ok(microphone_data) => { + if let Err(err_val) = send_stream.write_f32(microphone_data).await { + eprintln!("Error: Send Microphone Data | {}", err_val); + break; + } + } + Err(err_val) => { + eprintln!( + "Error: Receive from Microphone | Local Channel | {}", + err_val + ); + match err_val { + broadcast::error::RecvError::Closed => break, + broadcast::error::RecvError::Lagged(_) => { + microphone_receiver = microphone_receiver.resubscribe() + } + } + } + } + } +} +async fn receive_voice_data( + mut receive_stream: ReceiveStream, + speaker_sender: broadcast::Sender, +) { + loop { + match receive_stream.read_f32().await { + Ok(received_data) => { + if let Err(err_val) = speaker_sender.send(received_data) { + eprintln!("Error: Send to Speaker | Local Channel | {}", err_val); + break; + } + } + Err(err_val) => { + eprintln!("Error: Receive Data | {}", err_val); + break; + } + } + } +} diff --git a/client/src/voice.rs b/client/src/voice.rs index bd65c2e..47f3f91 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -1,10 +1,12 @@ +use std::sync::Arc; + use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use tokio::sync::{broadcast, oneshot}; pub async fn record( - microphone_sender: broadcast::Sender, + microphone_sender: Arc>, is_microphone_started_signal: oneshot::Sender, - microphone_stop_signal: oneshot::Receiver, + microphone_stop_signal_receiver: oneshot::Receiver, ) { let host = cpal::default_host(); let input_device = host.default_input_device().unwrap(); @@ -12,7 +14,7 @@ pub async fn record( let input = move |data: &[f32], _: &cpal::InputCallbackInfo| { for &sample in data { - if microphone_sender.receiver_count() > 0 { + if microphone_sender.receiver_count() > 0 && sample != 0.0 { if let Err(err_val) = microphone_sender.send(sample) { eprintln!("Error: Microphone Send | {}", err_val); } @@ -30,7 +32,12 @@ pub async fn record( } tokio::task::block_in_place(|| { - let _ = microphone_stop_signal.blocking_recv(); + if let Err(err_val) = microphone_stop_signal_receiver.blocking_recv() { + eprintln!( + "Error: Microphone Stop Signal | Local Channel | {}", + err_val + ); + } }); input_stream.pause().unwrap(); @@ -38,7 +45,7 @@ pub async fn record( pub async fn play( mut speaker_receiver: broadcast::Receiver, - speaker_stop_signal: oneshot::Receiver, + speaker_stop_signal_receiver: oneshot::Receiver, ) { let host = cpal::default_host(); let output_device = host.default_output_device().unwrap(); @@ -49,7 +56,19 @@ pub async fn play( if speaker_receiver.len() > 0 { match speaker_receiver.blocking_recv() { Ok(received_sample) => *sample = received_sample, - Err(err_val) => eprintln!("Error: Speaker Receive | {}", err_val), + Err(err_val) => match err_val { + broadcast::error::RecvError::Closed => { + eprintln!("Error: Speaker Receive | Local Channel | Channel Closed"); + return; + } + broadcast::error::RecvError::Lagged(lag_amount) => { + eprintln!( + "Error: Speaker Receive | Local Channel | Lagging by -> {}", + lag_amount + ); + speaker_receiver = speaker_receiver.resubscribe(); + } + }, } } else { *sample = 0.0; @@ -65,7 +84,12 @@ pub async fn play( println!("Playing Started"); tokio::task::block_in_place(|| { - let _ = speaker_stop_signal.blocking_recv(); + if let Err(err_val) = speaker_stop_signal_receiver.blocking_recv() { + eprintln!( + "Error: Speaker Stop Signal Receive | Local Channel | {}", + err_val + ); + } }); output_stream.pause().unwrap(); diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 4a6f03f..55ac856 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -7,7 +7,7 @@ use std::{ use serde::{Deserialize, Serialize}; -pub const BUFFER_LENGTH: usize = 1024; +pub const BUFFER_LENGTH: usize = 1024 * 8; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Error { diff --git a/server/Cargo.toml b/server/Cargo.toml index 53ecf78..e536fad 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -9,3 +9,4 @@ serde = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } tokio = { workspace = true } +s2n-quic = { workspace = true } diff --git a/server/src/lib.rs b/server/src/lib.rs index 7a26447..c6326ed 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,4 +1,18 @@ +pub mod stream; + #[derive(Debug)] -struct ServerConfig { +pub struct ServerConfig { + certificate_path: String, + certificate_key_path: String, server_address: String, } + +impl ServerConfig { + pub fn new() -> Self { + Self { + certificate_path: "./server/certificates/cert.pem".to_string(), + certificate_key_path: "./server/certificates/key.pem".to_string(), + server_address: "127.0.0.1:4546".to_string(), + } + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 7e3d561..602a33c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,4 +1,8 @@ +use server::{ServerConfig, stream::start}; + #[tokio::main] async fn main() { println!("Hello, world!"); + let server_config = ServerConfig::new(); + start(server_config).await; } diff --git a/server/src/stream.rs b/server/src/stream.rs new file mode 100644 index 0000000..fdecd4f --- /dev/null +++ b/server/src/stream.rs @@ -0,0 +1,105 @@ +use std::{path::Path, sync::LazyLock}; + +use protocol::BUFFER_LENGTH; +use s2n_quic::{ + Connection, Server, + stream::{ReceiveStream, SendStream}, +}; +use tokio::{ + io::AsyncReadExt, + sync::{RwLock, broadcast}, +}; + +use crate::ServerConfig; + +#[derive(Debug)] +struct User { + sender_to_user: broadcast::Sender, +} +impl User { + fn new() -> (Self, broadcast::Receiver) { + let (sender, receiver) = broadcast::channel(BUFFER_LENGTH); + let user = Self { + sender_to_user: sender, + }; + (user, receiver) + } +} + +static ONLINE_USERS: LazyLock>> = LazyLock::new(|| RwLock::new(vec![])); + +pub async fn start(server_config: ServerConfig) { + let mut server = Server::builder() + .with_io(server_config.server_address.as_str()) + .unwrap() + .with_tls(( + Path::new(&server_config.certificate_path), + Path::new(&server_config.certificate_key_path), + )) + .unwrap() + .start() + .unwrap(); + + while let Some(connection) = server.accept().await { + tokio::spawn(handle_client(connection)); + } +} + +async fn handle_client(mut connection: Connection) { + println!( + "Server Name = {}", + connection.server_name().unwrap().unwrap().to_string() + ); + + println!( + "Client Address = {}", + connection.remote_addr().unwrap().to_string() + ); + + let stream = connection + .accept_bidirectional_stream() + .await + .unwrap() + .unwrap(); + let (receive_stream, send_stream) = stream.split(); + let (user, receiver) = User::new(); + ONLINE_USERS.write().await.push(user); + tokio::spawn(receive_sound_data(receive_stream)); + tokio::spawn(send_sound_data(receiver, send_stream)); +} + +async fn send_sound_data(mut receiver: broadcast::Receiver, mut send_stream: SendStream) { + loop { + match receiver.recv().await { + Ok(received_data) => { + if let Err(err_val) = send_stream + .send(received_data.to_be_bytes().to_vec().into()) + .await + { + eprintln!("Error: Send Sound Data | {}", err_val); + } + } + Err(err_val) => { + eprintln!("Error: Receive Sound Data | Local Channel | {}", err_val); + break; + } + } + } +} +async fn receive_sound_data(mut receive_stream: ReceiveStream) { + loop { + match receive_stream.read_f32().await { + Ok(received_data) => { + for online_user in ONLINE_USERS.read().await.iter() { + if let Err(err_val) = online_user.sender_to_user.send(received_data) { + eprintln!("Error: Send Sound Data to All | {}", err_val); + } + } + } + Err(err_val) => { + eprintln!("Error: Receive Sound Data | {} ", err_val); + break; + } + } + } +}