From 51c29f792107a57e0db73f762ad1ca9bb54bc27c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Wed, 11 Jun 2025 17:48:15 +0300 Subject: [PATCH] feat: :sparkles: new protocol and server implementation --- Cargo.lock | 183 ++++++++++++++++++++++++------ Cargo.toml | 5 +- client/Cargo.toml | 3 +- client/src/lib.rs | 2 +- client/src/stream.rs | 74 +++++++------ client/src/voice.rs | 35 +++--- protocol/Cargo.toml | 4 +- protocol/src/lib.rs | 13 ++- protocol/src/protocol.rs | 88 +++++++++++++++ server/Cargo.toml | 2 - server/src/stream.rs | 234 ++++++++++++++++++++++----------------- 11 files changed, 445 insertions(+), 198 deletions(-) create mode 100644 protocol/src/protocol.rs diff --git a/Cargo.lock b/Cargo.lock index 0aa13ac..ca991cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,6 +428,26 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "bindgen" version = "0.69.5" @@ -661,11 +681,10 @@ version = "0.1.0" dependencies = [ "chrono", "cpal", + "fixed-resample", "iced", "protocol", "s2n-quic", - "serde", - "serde_json", "tokio", ] @@ -1154,6 +1173,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fast-interleave" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a7e05e2b3c97d4516fa5c177133f3e4decf9c8318841e1545b260535311e3a5" + [[package]] name = "fastrand" version = "2.3.0" @@ -1169,6 +1194,18 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "fixed-resample" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a267bc40fae9b208e2a67a99a8d794caf3a9fe1146d01c147f59bd96257a74" +dependencies = [ + "arrayvec", + "fast-interleave", + "ringbuf", + "rubato", +] + [[package]] name = "flate2" version = "1.1.1" @@ -1502,7 +1539,7 @@ dependencies = [ "log", "presser", "thiserror 1.0.69", - "windows 0.54.0", + "windows 0.58.0", ] [[package]] @@ -1950,12 +1987,6 @@ dependencies = [ "either", ] -[[package]] -name = "itoa" -version = "1.0.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" - [[package]] name = "jni" version = "0.21.1" @@ -2325,6 +2356,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-derive" version = "0.4.2" @@ -2801,6 +2841,21 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -2835,6 +2890,15 @@ dependencies = [ "syn", ] +[[package]] +name = "primal-check" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc0d895b311e3af9902528fbb8f928688abbd95872819320517cc24ca6b2bd08" +dependencies = [ + "num-integer", +] + [[package]] name = "proc-macro-crate" version = "3.3.0" @@ -2863,8 +2927,8 @@ checksum = "afbdc74edc00b6f6a218ca6a5364d6226a259d4b8ea1af4a0ea063f27e179f4d" name = "protocol" version = "0.1.0" dependencies = [ + "bincode", "chrono", - "serde", ] [[package]] @@ -3019,6 +3083,15 @@ dependencies = [ "font-types", ] +[[package]] +name = "realfft" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "390252372b7f2aac8360fc5e72eba10136b166d6faeed97e6d0c8324eb99b2b1" +dependencies = [ + "rustfft", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -3086,12 +3159,35 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ringbuf" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe47b720588c8702e34b5979cb3271a8b1842c7cb6f57408efa70c779363488c" +dependencies = [ + "crossbeam-utils", + "portable-atomic", + "portable-atomic-util", +] + [[package]] name = "roxmltree" version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97" +[[package]] +name = "rubato" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5258099699851cfd0082aeb645feb9c084d9a5e1f1b8d5372086b989fc5e56a1" +dependencies = [ + "num-complex", + "num-integer", + "num-traits", + "realfft", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -3110,6 +3206,21 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +[[package]] +name = "rustfft" +version = "6.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f266ff9b0cfc79de11fd5af76a2bc672fe3ace10c96fa06456740fa70cb1ed49" +dependencies = [ + "num-complex", + "num-integer", + "num-traits", + "primal-check", + "strength_reduce", + "transpose", + "version_check", +] + [[package]] name = "rustix" version = "0.38.44" @@ -3133,7 +3244,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3204,12 +3315,6 @@ dependencies = [ "unicode-script", ] -[[package]] -name = "ryu" -version = "1.0.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" - [[package]] name = "s2n-codec" version = "0.58.0" @@ -3436,18 +3541,6 @@ dependencies = [ "syn", ] -[[package]] -name = "serde_json" -version = "1.0.140" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" -dependencies = [ - "itoa", - "memchr", - "ryu", - "serde", -] - [[package]] name = "serde_repr" version = "0.1.20" @@ -3466,8 +3559,6 @@ dependencies = [ "chrono", "protocol", "s2n-quic", - "serde", - "serde_json", "tokio", ] @@ -3640,6 +3731,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strength_reduce" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe895eb47f22e2ddd4dabc02bce419d2e643c8e3b585c78158b349195bc24d82" + [[package]] name = "strict-num" version = "0.1.1" @@ -3732,7 +3829,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3925,6 +4022,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "transpose" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad61aed86bc3faea4300c7aee358b4c6d0c8d6ccc36524c96e4c92ccf26e77e" +dependencies = [ + "num-integer", + "strength_reduce", +] + [[package]] name = "ttf-parser" version = "0.20.0" @@ -4026,6 +4133,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "url" version = "2.5.4" @@ -4056,6 +4169,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "walkdir" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index cc25a41..e4dab28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,8 @@ members = [ [workspace.dependencies] chrono = { version = "0.4.41", features = ["serde"] } -serde = { version = "1.0.219", features = ["derive"] } -serde_json = "1.0.140" +# serde = { version = "1.0.219", features = ["derive"] } +# serde_json = "1.0.140" tokio = { version = "1.45.0", features = ["full"] } s2n-quic = "1.58.0" +bincode = "2.0.1" diff --git a/client/Cargo.toml b/client/Cargo.toml index 51fad27..f606956 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -5,10 +5,9 @@ edition = "2024" [dependencies] protocol = { path = "../protocol" } -serde = { workspace = true } -serde_json = { workspace = true } chrono = { workspace = true } tokio = { workspace = true } s2n-quic = { workspace = true } cpal = "0.15.3" iced = { features = ["tokio"], git = "https://github.com/iced-rs/iced", rev = "d39022432c778a8cda455f40b9c12245db86ce45" } +fixed-resample = "0.8.0" diff --git a/client/src/lib.rs b/client/src/lib.rs index d3a3105..f4e6ab0 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -3,7 +3,7 @@ pub mod stream; pub mod voice; const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 4; -const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 4; +const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 16; #[derive(Debug)] pub struct ClientConfig { diff --git a/client/src/stream.rs b/client/src/stream.rs index 307d077..016ae3a 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, path::Path, sync::Arc}; -use protocol::Error; +use protocol::{Error, NETWORK_BUFFER_LENGTH, Signal, SignalType, SignedAudioDatum}; use s2n_quic::{ Client, client::Connect, @@ -16,8 +16,8 @@ use crate::ClientConfig; #[derive(Debug)] pub struct ConnectReturn { - send_audio_task: JoinHandle<()>, - receive_audio_task: JoinHandle<()>, + send_signals_task: JoinHandle<()>, + receive_signals_task: JoinHandle<()>, } pub async fn connect( @@ -58,13 +58,15 @@ pub async fn connect( .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?; let (receive_stream, send_stream) = stream.split(); + let ready_signal = broadcast::channel(3); - let receive_audio_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender)); - let send_audio_task = tokio::spawn(send_audio_data(send_stream, microphone_receiver)); + let send_signals_task = tokio::spawn(send_signals(send_stream, microphone_receiver)); + let speaker_clone = speaker_sender.clone(); + let receive_signals_task = tokio::spawn(receive_signals(receive_stream, speaker_sender)); Ok(ConnectReturn { - send_audio_task, - receive_audio_task, + send_signals_task, + receive_signals_task, }) } @@ -81,49 +83,57 @@ pub async fn disconnect_watcher( println!("Going to Disconnect"); - connect_return.send_audio_task.abort(); - connect_return.receive_audio_task.abort(); + connect_return.send_signals_task.abort(); + connect_return.receive_signals_task.abort(); println!("Disconnected"); } -async fn send_audio_data( +async fn send_signals( mut send_stream: SendStream, - old_microphone_receiver: broadcast::Receiver, + mut microphone_receiver: broadcast::Receiver, ) { - let mut microphone_receiver = old_microphone_receiver.resubscribe(); - drop(old_microphone_receiver); loop { match microphone_receiver.recv().await { - Ok(microphone_data) => { - if let Err(err_val) = send_stream.write_f32(microphone_data).await { - eprintln!("Error: Send Microphone Data | Remote | {}", err_val); - todo!("GUI Status: Disconnect"); - // break; + Ok(audio_datum) => { + if let Err(err_val) = send_stream.write_f32(audio_datum).await { + eprintln!("Error: Send Audio Datum | Remote | {}", err_val); + return; } } Err(err_val) => { - eprintln!("Error: Receive from Microphone | Local | {}", err_val); - match err_val { - broadcast::error::RecvError::Closed => break, - broadcast::error::RecvError::Lagged(_) => { - microphone_receiver = microphone_receiver.resubscribe() - } - } + eprintln!("Error: Receive Audio Datum | Local | {}", err_val); + return; } } } } -async fn receive_audio_data( + +async fn receive_signals( mut receive_stream: ReceiveStream, - speaker_sender: Arc>, + speaker_sender: Arc>, ) { + let mut network_buffer = [0; NETWORK_BUFFER_LENGTH]; loop { - match receive_stream.read_f32().await { - Ok(received_data) => { - // todo: error only happens if there is no receiver, think about it - let _ = speaker_sender.send(received_data); - } + match receive_stream.read_exact(&mut network_buffer).await { + Ok(_) => match Signal::unpack_signal(&network_buffer) { + Ok(received_signal) => match received_signal.signal_type { + SignalType::AudioDatum => match received_signal.unpack_audio() { + Ok(signed_audio_datum) => { + let _ = speaker_sender.send(signed_audio_datum); + } + Err(err_val) => { + eprintln!("Error: Unpack Audio | {}", err_val); + println!("Warning: Illegal Operation"); + return; + } + }, + SignalType::SpeakerLeft => todo!(), + }, + Err(err_val) => { + eprintln!("Error: Unpack Signal | {}", err_val); + } + }, Err(err_val) => { eprintln!("Error: Receive Audio Data | Remote | {}", err_val); todo!("GUI Status Disconnect"); diff --git a/client/src/voice.rs b/client/src/voice.rs index b309659..5452b4f 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -72,25 +72,22 @@ pub async fn play( let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { for sample in data { - if play_receiver.len() > 0 { - match play_receiver.blocking_recv() { - Ok(received_sample) => *sample = received_sample, - Err(err_val) => match err_val { - broadcast::error::RecvError::Closed => { - eprintln!("Error: Speaker Receive | Local Channel | Channel Closed"); - return; - } - broadcast::error::RecvError::Lagged(lag_amount) => { - eprintln!( - "Error: Speaker Receive | Local Channel | Lagging by -> {}", - lag_amount - ); - play_receiver = play_receiver.resubscribe(); - } - }, - } - } else { - *sample = 0.0 + match play_receiver.try_recv() { + Ok(received_sample) => *sample = received_sample, + Err(err_val) => match err_val { + broadcast::error::TryRecvError::Empty => *sample = 0.0, + broadcast::error::TryRecvError::Closed => { + eprintln!("Error: Speaker Receive | Local Channel | Channel Closed"); + return; + } + broadcast::error::TryRecvError::Lagged(lag_amount) => { + eprintln!( + "Error: Speaker Receive | Local Channel | Lagging by -> {}", + lag_amount + ); + play_receiver = play_receiver.resubscribe(); + } + }, } } }; diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 63f4f2f..2013a18 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -4,5 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -serde = { workspace = true } +# serde = { workspace = true } +# serde_json = { workspace = true } chrono = { workspace = true } +bincode = { workspace = true } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index ca3bd4d..eed0ff4 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -5,9 +5,9 @@ use std::{ io::Read, }; -use serde::{Deserialize, Serialize}; +pub mod protocol; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub enum Error { ConnectionSetup(String), Connection(String), @@ -17,6 +17,9 @@ pub enum Error { Record(String), Play(String), NotSupposeTo(String), + Serialization(String), + Deserialization(String), + UnexpectedSignalType(String), } impl std::error::Error for Error { @@ -36,16 +39,20 @@ impl Display for Error { Error::Record(inner) => write!(f, "Record | {}", inner), Error::Play(inner) => write!(f, "Play | {}", inner), Error::NotSupposeTo(inner) => write!(f, "Not Suppose To | {}", inner), + Error::Serialization(inner) => write!(f, "Serialization | {}", inner), + Error::Deserialization(inner) => write!(f, "Deserialization | {}", inner), + Error::UnexpectedSignalType(inner) => write!(f, "Unexpected Signal Type | {}", inner), } } } - +#[allow(dead_code)] #[derive(Debug, Clone)] struct TOML { header: String, fields: HashMap, } +#[allow(dead_code)] fn naive_toml_parser(file_location: &str) -> TOML { let mut toml_file = File::open(file_location).unwrap(); let mut toml_ingredients = String::default(); diff --git a/protocol/src/protocol.rs b/protocol/src/protocol.rs new file mode 100644 index 0000000..fa58b12 --- /dev/null +++ b/protocol/src/protocol.rs @@ -0,0 +1,88 @@ +use bincode::{Decode, Encode}; + +use crate::Error; + +const SIGNAL_DATA_LENGTH: usize = 4; +const NETWORK_DATA_LENGTH: usize = 6; + +type SignalBufferReturn = [u8; SIGNAL_DATA_LENGTH]; +type NetworkBufferReturn = [u8; NETWORK_DATA_LENGTH]; + +static BINCODE_CONFIG: bincode::config::Configuration = + bincode::config::standard().with_big_endian(); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Encode, Decode)] +pub struct Speaker { + id: u8, +} + +impl Speaker { + pub fn new(id: u8) -> Self { + Self { id } + } + + pub fn get_id(&self) -> u8 { + self.id + } +} + +#[derive(Debug, Encode, Decode)] +pub enum SignalType { + AudioDatum, + SpeakerLeft, +} + +#[derive(Debug, Encode, Decode)] +pub struct Signal { + signal_type: SignalType, + speaker: Speaker, + data: [u8; SIGNAL_DATA_LENGTH], +} + +impl Signal { + pub fn unpack(data: NetworkBufferReturn) -> Result { + Ok(bincode::decode_from_slice::(&data, BINCODE_CONFIG) + .map_err(|inner| Error::Deserialization(inner.to_string()))? + .0) + } + + fn pack(self) -> NetworkBufferReturn { + let encoded = serialize(self); + + assert_eq!(encoded.len(), NETWORK_DATA_LENGTH); + + let mut buffer = NetworkBufferReturn::default(); + buffer.copy_from_slice(&encoded); + + buffer + } + + pub fn pack_audio_datum(speaker: Speaker, audio_datum: f32) -> NetworkBufferReturn { + let signal = Self { + signal_type: SignalType::AudioDatum, + speaker, + data: audio_datum.to_be_bytes(), + }; + + Self::pack(signal) + } + + pub fn pack_speaker_left(speaker: Speaker) -> NetworkBufferReturn { + let signal = Self { + signal_type: SignalType::SpeakerLeft, + speaker, + data: SignalBufferReturn::default(), + }; + + signal.pack() + } +} + +fn serialize(value: E) -> Vec +where + E: bincode::enc::Encode, +{ + bincode::encode_to_vec(value, BINCODE_CONFIG) + .map_err(|inner| Error::Serialization(inner.to_string())) + .unwrap() +} diff --git a/server/Cargo.toml b/server/Cargo.toml index e536fad..df09f6e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -5,8 +5,6 @@ edition = "2024" [dependencies] protocol = { path = "../protocol" } -serde = { workspace = true } -serde_json = { workspace = true } chrono = { workspace = true } tokio = { workspace = true } s2n-quic = { workspace = true } diff --git a/server/src/stream.rs b/server/src/stream.rs index 1935a9c..73ccb83 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -1,62 +1,91 @@ use std::{ path::Path, sync::{Arc, LazyLock}, - time::Duration, }; -use chrono::{DateTime, Utc}; +use protocol::protocol::{Signal, Speaker}; use s2n_quic::{ Connection, Server, stream::{ReceiveStream, SendStream}, }; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - sync::RwLock, - time::sleep, + sync::{RwLock, broadcast}, }; use crate::ServerConfig; -const BUFFER_LENGTH: usize = 1024 * 13; -#[derive(Debug)] -struct TimedBuffer { - audio_buffer: [f32; BUFFER_LENGTH], - latest_update: DateTime, +const NEW_SPEAKER_LENGTH: usize = u8::MAX as usize; +const AUDIO_BUFFER_LENGTH: usize = 1024 * 16 * 16; + +#[derive(Debug, Clone, Copy)] +enum SpeakerAction { + Join, + Left, } -impl TimedBuffer { - fn new() -> Self { - let audio_buffer = [0.0 as f32; BUFFER_LENGTH]; - let latest_update = Utc::now(); - Self { - audio_buffer, - latest_update, +#[derive(Debug, Clone)] +struct SpeakerWithData { + speaker: Speaker, + speaker_action_sender: broadcast::Sender<(SpeakerWithData, SpeakerAction)>, + audio_data_sender: broadcast::Sender, +} + +impl SpeakerWithData { + async fn new( + speaker: Speaker, + ) -> ( + broadcast::Receiver<(SpeakerWithData, SpeakerAction)>, + broadcast::Sender, + ) { + let speaker_action_channel = broadcast::channel(NEW_SPEAKER_LENGTH); + let audio_data_sender = broadcast::channel(AUDIO_BUFFER_LENGTH).0; + + let speaker_with_data = Self { + speaker, + speaker_action_sender: speaker_action_channel.0, + audio_data_sender: audio_data_sender.clone(), + }; + + let mut online_speakers = ONLINE_SPEAKERS.write().await; + for online_speaker in online_speakers.iter() { + let _ = speaker_with_data + .speaker_action_sender + .send((online_speaker.clone(), SpeakerAction::Join)); + let _ = online_speaker + .speaker_action_sender + .send((speaker_with_data.clone(), SpeakerAction::Join)); + } + + online_speakers.push(speaker_with_data); + online_speakers.sort_by_key(|speaker| speaker.speaker.get_id()); + + drop(online_speakers); + + (speaker_action_channel.1, audio_data_sender) + } + + async fn remove(speaker_id: u8) { + let mut online_speakers = ONLINE_SPEAKERS.write().await; + + let speaker_index = + online_speakers.binary_search_by_key(&speaker_id, |speaker| speaker.speaker.get_id()); + + match speaker_index { + Ok(speaker_index) => { + let speaker = online_speakers.remove(speaker_index); + for online_speaker in online_speakers.iter() { + let _ = online_speaker + .speaker_action_sender + .send((speaker.clone(), SpeakerAction::Left)); + } + } + Err(_) => return, } } - fn update(&mut self, new_audio_buffer: [f32; BUFFER_LENGTH]) { - self.audio_buffer = new_audio_buffer; - self.latest_update = Utc::now(); - } } -#[derive(Debug)] -struct User { - user_id: u32, - timed_buffer: Arc>, -} - -impl User { - async fn new(user_id: u32) -> Arc> { - let timed_buffer = Arc::new(RwLock::new(TimedBuffer::new())); - let new_user = Self { - user_id, - timed_buffer: timed_buffer.clone(), - }; - ONLINE_USERS.write().await.push(new_user); - timed_buffer - } -} -static ONLINE_USERS: LazyLock>> = LazyLock::new(|| vec![].into()); +static ONLINE_SPEAKERS: LazyLock>> = LazyLock::new(|| vec![].into()); pub async fn start(server_config: ServerConfig) { let mut server = Server::builder() @@ -70,14 +99,22 @@ pub async fn start(server_config: ServerConfig) { .start() .unwrap(); - let mut user_id = 0; + let mut speaker_id = 0; while let Some(connection) = server.accept().await { - tokio::spawn(handle_client(user_id, connection)); - user_id += 1; + let speaker = Speaker::new(speaker_id); + tokio::spawn(handle_client(speaker, connection)); + + match speaker_id.checked_add(1) { + Some(next_speaker_id) => speaker_id = next_speaker_id, + None => { + println!("Warning: Room Is Full"); + return; + } + } } } -async fn handle_client(user_id: u32, mut connection: Connection) { +async fn handle_client(speaker: Speaker, mut connection: Connection) { println!( "Server Name = {}", connection.server_name().unwrap().unwrap().to_string() @@ -95,79 +132,68 @@ async fn handle_client(user_id: u32, mut connection: Connection) { .unwrap(); let (receive_stream, send_stream) = stream.split(); - let user_audio_buffer = User::new(user_id).await; + let speaker_id = speaker.get_id(); + let (speaker_action_receiver, audio_data_sender) = SpeakerWithData::new(speaker).await; - tokio::spawn(receiver_audio_data(receive_stream, user_audio_buffer)); - tokio::spawn(send_audio_data(user_id, send_stream)); + tokio::spawn(receive_audio_data( + receive_stream, + speaker_id, + audio_data_sender, + )); + tokio::spawn(send_audio_data( + send_stream, + speaker_id, + speaker_action_receiver, + )); } -async fn mixer(user_id: u32, latest_update: DateTime) -> Option<[f32; BUFFER_LENGTH]> { - let mut mixed_audio_buffer = [0.0 as f32; BUFFER_LENGTH]; - - for online_user in ONLINE_USERS.read().await.iter() { - let online_user_timed_buffer = online_user.timed_buffer.read().await; - - if online_user.user_id != user_id && online_user_timed_buffer.latest_update > latest_update - { - for (i, audio_data) in online_user_timed_buffer.audio_buffer.iter().enumerate() { - mixed_audio_buffer[i] = mixed_audio_buffer[i] + audio_data; - } - } - } - - if 0.0 as f32 == mixed_audio_buffer.iter().sum() { - None - } else { - Some(mixed_audio_buffer) - } -} - -async fn send_audio_data(user_id: u32, mut send_stream: SendStream) { - let mut latest_update = Utc::now(); - loop { - let mixed_audio_buffer = match mixer(user_id, latest_update).await { - Some(mixed_audio_buffer) => { - latest_update = Utc::now(); - mixed_audio_buffer - } - None => { - sleep(Duration::from_millis(150)).await; - continue; - } - }; - - for mixed_audio_data in mixed_audio_buffer { - if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await { - eprintln!("Error: Send Audio Data | Remote | {}", err_val); - return; - } - } - } -} - -async fn receiver_audio_data( - mut receive_stream: ReceiveStream, - audio_buffer: Arc>, +async fn send_audio_data( + send_stream: SendStream, + speaker_id: u8, + mut speaker_action_receiver: broadcast::Receiver<(SpeakerWithData, SpeakerAction)>, ) { - let mut inner_buffer = [0.0 as f32; BUFFER_LENGTH]; - let mut i = 0; + let send_stream = Arc::new(RwLock::new(send_stream)); + while let Ok((speaker_with_data, speaker_action)) = speaker_action_receiver.recv().await { + match speaker_action { + SpeakerAction::Join => { + let send_stream = send_stream.clone(); + tokio::spawn(async move { + let mut audio_data_receiver = speaker_with_data.audio_data_sender.subscribe(); + while let Ok(audio_datum) = audio_data_receiver.recv().await { + let data = Signal::pack_audio_datum(speaker_with_data.speaker, audio_datum); + if let Err(err_val) = send_stream.write().await.write_all(&data).await { + eprintln!("Error: Send Audio Data | Remote | {}", err_val); + SpeakerWithData::remove(speaker_id).await; + }; + } + }); + } + SpeakerAction::Left => { + let data = Signal::pack_speaker_left(speaker_with_data.speaker); + if let Err(err_val) = send_stream.write().await.write_all(&data).await { + eprintln!("Error: Send Speaker Left | Remote | {}", err_val); + SpeakerWithData::remove(speaker_id).await; + } + } + } + } +} + +async fn receive_audio_data( + mut receive_stream: ReceiveStream, + speaker_id: u8, + audio_data_sender: broadcast::Sender, +) { loop { match receive_stream.read_f32().await { - Ok(received_audio_data) => { - inner_buffer[i] = received_audio_data; + Ok(received_data) => { + let _ = audio_data_sender.send(received_data); } Err(err_val) => { eprintln!("Error: Receive Audio Data | Remote | {}", err_val); - return; + SpeakerWithData::remove(speaker_id).await; } } - - if i == BUFFER_LENGTH - 1 { - audio_buffer.write().await.update(inner_buffer); - i = 0; - } else { - i += 1; - } } }