diff --git a/Cargo.lock b/Cargo.lock index ca991cc..11eb2f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,7 +457,7 @@ dependencies = [ "bitflags 2.9.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools", "lazy_static", "lazycell", "log", @@ -473,18 +473,18 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.70.1" +version = "0.72.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" +checksum = "4f72209734318d0b619a5e0f5129918b848c416e122a3c4ce054e03cb87b726f" dependencies = [ "bitflags 2.9.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools", "proc-macro2", "quote", "regex", - "rustc-hash 1.1.0", + "rustc-hash 2.1.1", "shlex", "syn", ] @@ -531,7 +531,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c132eebf10f5cad5289222520a4a058514204aed6d791f1cf4fe8088b82d15f" dependencies = [ - "objc2", + "objc2 0.5.2", ] [[package]] @@ -675,15 +675,22 @@ dependencies = [ "libloading", ] +[[package]] +name = "claxon" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bfbf56724aa9eca8afa4fcfadeb479e722935bb2a0900c2d37e0cc477af0688" + [[package]] name = "client" version = "0.1.0" dependencies = [ "chrono", - "cpal", + "cpal 0.16.0", "fixed-resample", "iced", "protocol", + "rodio", "s2n-quic", "tokio", ] @@ -703,9 +710,9 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7f4aaa047ba3c3630b080bb9860894732ff23e2aee290a418909aa6d5df38f" dependencies = [ - "objc2", + "objc2 0.5.2", "objc2-app-kit", - "objc2-foundation", + "objc2-foundation 0.2.2", ] [[package]] @@ -851,12 +858,26 @@ dependencies = [ ] [[package]] -name = "coreaudio-sys" -version = "0.2.16" +name = "coreaudio-rs" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ce857aa0b77d77287acc1ac3e37a05a8c95a2af3647d23b15f263bdaeb7562b" +checksum = "1aae284fbaf7d27aa0e292f7677dfbe26503b0d555026f702940805a630eac17" dependencies = [ - "bindgen 0.70.1", + "bitflags 1.3.2", + "libc", + "objc2-audio-toolbox", + "objc2-core-audio", + "objc2-core-audio-types", + "objc2-core-foundation", +] + +[[package]] +name = "coreaudio-sys" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceec7a6067e62d6f931a2baf6f3a751f4a892595bcec1461a3c94ef9949864b6" +dependencies = [ + "bindgen 0.72.0", ] [[package]] @@ -890,7 +911,7 @@ checksum = "873dab07c8f743075e57f524c583985fbaf745602acbe916a01539364369a779" dependencies = [ "alsa", "core-foundation-sys", - "coreaudio-rs", + "coreaudio-rs 0.11.3", "dasp_sample", "jni", "js-sys", @@ -905,6 +926,32 @@ dependencies = [ "windows 0.54.0", ] +[[package]] +name = "cpal" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbd307f43cc2a697e2d1f8bc7a1d824b5269e052209e28883e5bc04d095aaa3f" +dependencies = [ + "alsa", + "coreaudio-rs 0.13.0", + "dasp_sample", + "jni", + "js-sys", + "libc", + "mach2", + "ndk 0.9.0", + "ndk-context", + "num-derive", + "num-traits", + "objc2-audio-toolbox", + "objc2-core-audio", + "objc2-core-audio-types", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "windows 0.54.0", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -969,8 +1016,8 @@ checksum = "18e1a09f280e29a8b00bc7e81eca5ac87dca0575639c9422a5fa25a07bb884b8" dependencies = [ "ashpd", "async-std", - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", "web-sys", "winreg", ] @@ -987,6 +1034,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd0c93bb4b0c6d9b77f4435b0ae98c24d17f1c45b2ff844c6151a07256ca923b" +[[package]] +name = "dispatch2" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" +dependencies = [ + "bitflags 2.9.0", + "objc2 0.6.1", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -1078,6 +1135,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endi" version = "1.1.0" @@ -1644,6 +1710,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "hound" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62adaabb884c94955b19907d60019f4e145d091c75345379e70d1ee696f7854f" + [[package]] name = "iana-time-zone" version = "0.1.63" @@ -1978,15 +2050,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "jni" version = "0.21.1" @@ -2077,6 +2140,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "lewton" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "777b48df9aaab155475a83a7df3070395ea1ac6902f5cd062b8f2b028075c030" +dependencies = [ + "byteorder", + "ogg", + "tinyvec", +] + [[package]] name = "libc" version = "0.2.172" @@ -2461,6 +2535,15 @@ dependencies = [ "objc2-encode", ] +[[package]] +name = "objc2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88c6597e14493ab2e44ce58f2fdecf095a51f12ca57bec060a11c57332520551" +dependencies = [ + "objc2-encode", +] + [[package]] name = "objc2-app-kit" version = "0.2.2" @@ -2470,13 +2553,28 @@ dependencies = [ "bitflags 2.9.0", "block2", "libc", - "objc2", + "objc2 0.5.2", "objc2-core-data", "objc2-core-image", - "objc2-foundation", + "objc2-foundation 0.2.2", "objc2-quartz-core", ] +[[package]] +name = "objc2-audio-toolbox" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10cbe18d879e20a4aea544f8befe38bcf52255eb63d3f23eca2842f3319e4c07" +dependencies = [ + "bitflags 2.9.0", + "libc", + "objc2 0.6.1", + "objc2-core-audio", + "objc2-core-audio-types", + "objc2-core-foundation", + "objc2-foundation 0.3.1", +] + [[package]] name = "objc2-cloud-kit" version = "0.2.2" @@ -2485,9 +2583,9 @@ checksum = "74dd3b56391c7a0596a295029734d3c1c5e7e510a4cb30245f8221ccea96b009" dependencies = [ "bitflags 2.9.0", "block2", - "objc2", + "objc2 0.5.2", "objc2-core-location", - "objc2-foundation", + "objc2-foundation 0.2.2", ] [[package]] @@ -2497,8 +2595,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5ff520e9c33812fd374d8deecef01d4a840e7b41862d849513de77e44aa4889" dependencies = [ "block2", - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", +] + +[[package]] +name = "objc2-core-audio" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca44961e888e19313b808f23497073e3f6b3c22bb485056674c8b49f3b025c82" +dependencies = [ + "dispatch2", + "objc2 0.6.1", + "objc2-core-audio-types", + "objc2-core-foundation", +] + +[[package]] +name = "objc2-core-audio-types" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0f1cc99bb07ad2ddb6527ddf83db6a15271bb036b3eb94b801cd44fdc666ee1" +dependencies = [ + "bitflags 2.9.0", + "objc2 0.6.1", ] [[package]] @@ -2509,8 +2629,19 @@ checksum = "617fbf49e071c178c0b24c080767db52958f716d9eabdf0890523aeae54773ef" dependencies = [ "bitflags 2.9.0", "block2", - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", +] + +[[package]] +name = "objc2-core-foundation" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166" +dependencies = [ + "bitflags 2.9.0", + "dispatch2", + "objc2 0.6.1", ] [[package]] @@ -2520,8 +2651,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55260963a527c99f1819c4f8e3b47fe04f9650694ef348ffd2227e8196d34c80" dependencies = [ "block2", - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", "objc2-metal", ] @@ -2532,9 +2663,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "000cfee34e683244f284252ee206a27953279d370e309649dc3ee317b37e5781" dependencies = [ "block2", - "objc2", + "objc2 0.5.2", "objc2-contacts", - "objc2-foundation", + "objc2-foundation 0.2.2", ] [[package]] @@ -2553,7 +2684,16 @@ dependencies = [ "block2", "dispatch", "libc", - "objc2", + "objc2 0.5.2", +] + +[[package]] +name = "objc2-foundation" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900831247d2fe1a09a683278e5384cfb8c80c79fe6b166f9d14bfdde0ea1b03c" +dependencies = [ + "objc2 0.6.1", ] [[package]] @@ -2563,9 +2703,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1a1ae721c5e35be65f01a03b6d2ac13a54cb4fa70d8a5da293d7b0020261398" dependencies = [ "block2", - "objc2", + "objc2 0.5.2", "objc2-app-kit", - "objc2-foundation", + "objc2-foundation 0.2.2", ] [[package]] @@ -2576,8 +2716,8 @@ checksum = "dd0cba1276f6023976a406a14ffa85e1fdd19df6b0f737b063b95f6c8c7aadd6" dependencies = [ "bitflags 2.9.0", "block2", - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", ] [[package]] @@ -2588,8 +2728,8 @@ checksum = "e42bee7bff906b14b167da2bac5efe6b6a07e6f7c0a21a7308d40c960242dc7a" dependencies = [ "bitflags 2.9.0", "block2", - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", "objc2-metal", ] @@ -2599,8 +2739,8 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a684efe3dec1b305badae1a28f6555f6ddd3bb2c2267896782858d5a78404dc" dependencies = [ - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", ] [[package]] @@ -2611,12 +2751,12 @@ checksum = "b8bb46798b20cd6b91cbd113524c490f1686f4c4e8f49502431415f3512e2b6f" dependencies = [ "bitflags 2.9.0", "block2", - "objc2", + "objc2 0.5.2", "objc2-cloud-kit", "objc2-core-data", "objc2-core-image", "objc2-core-location", - "objc2-foundation", + "objc2-foundation 0.2.2", "objc2-link-presentation", "objc2-quartz-core", "objc2-symbols", @@ -2631,8 +2771,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44fa5f9748dbfe1ca6c0b79ad20725a11eca7c2218bceb4b005cb1be26273bfe" dependencies = [ "block2", - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", ] [[package]] @@ -2643,9 +2783,9 @@ checksum = "76cfcbf642358e8689af64cee815d139339f3ed8ad05103ed5eaf73db8d84cb3" dependencies = [ "bitflags 2.9.0", "block2", - "objc2", + "objc2 0.5.2", "objc2-core-location", - "objc2-foundation", + "objc2-foundation 0.2.2", ] [[package]] @@ -2680,6 +2820,15 @@ dependencies = [ "cc", ] +[[package]] +name = "ogg" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6951b4e8bf21c8193da321bcce9c9dd2e13c858fe078bf9054a288b419ae5d6e" +dependencies = [ + "byteorder", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -2929,6 +3078,7 @@ version = "0.1.0" dependencies = [ "bincode", "chrono", + "tokio", ] [[package]] @@ -3170,6 +3320,19 @@ dependencies = [ "portable-atomic-util", ] +[[package]] +name = "rodio" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ceb6607dd738c99bc8cb28eff249b7cd5c8ec88b9db96c0608c1480d140fb1" +dependencies = [ + "claxon", + "cpal 0.15.3", + "hound", + "lewton", + "symphonia", +] + [[package]] name = "roxmltree" version = "0.20.0" @@ -3694,8 +3857,8 @@ dependencies = [ "js-sys", "log", "memmap2", - "objc2", - "objc2-foundation", + "objc2 0.5.2", + "objc2-foundation 0.2.2", "objc2-quartz-core", "raw-window-handle", "redox_syscall 0.5.12", @@ -3788,6 +3951,55 @@ dependencies = [ "zeno", ] +[[package]] +name = "symphonia" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "815c942ae7ee74737bb00f965fa5b5a2ac2ce7b6c01c0cc169bbeaf7abd5f5a9" +dependencies = [ + "lazy_static", + "symphonia-bundle-mp3", + "symphonia-core", + "symphonia-metadata", +] + +[[package]] +name = "symphonia-bundle-mp3" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c01c2aae70f0f1fb096b6f0ff112a930b1fb3626178fba3ae68b09dce71706d4" +dependencies = [ + "lazy_static", + "log", + "symphonia-core", + "symphonia-metadata", +] + +[[package]] +name = "symphonia-core" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "798306779e3dc7d5231bd5691f5a813496dc79d3f56bf82e25789f2094e022c3" +dependencies = [ + "arrayvec", + "bitflags 1.3.2", + "bytemuck", + "lazy_static", + "log", +] + +[[package]] +name = "symphonia-metadata" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc622b9841a10089c5b18e99eb904f4341615d5aa55bbf4eedde1be721a4023c" +dependencies = [ + "encoding_rs", + "lazy_static", + "log", + "symphonia-core", +] + [[package]] name = "syn" version = "2.0.101" @@ -5038,9 +5250,9 @@ dependencies = [ "libc", "memmap2", "ndk 0.9.0", - "objc2", + "objc2 0.5.2", "objc2-app-kit", - "objc2-foundation", + "objc2-foundation 0.2.2", "objc2-ui-kit", "orbclient", "percent-encoding", diff --git a/client/Cargo.toml b/client/Cargo.toml index f606956..148a6f9 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -8,6 +8,7 @@ protocol = { path = "../protocol" } chrono = { workspace = true } tokio = { workspace = true } s2n-quic = { workspace = true } -cpal = "0.15.3" +cpal = "0.16.0" iced = { features = ["tokio"], git = "https://github.com/iced-rs/iced", rev = "d39022432c778a8cda455f40b9c12245db86ce45" } fixed-resample = "0.8.0" +rodio = "0.20.1" diff --git a/client/src/gui.rs b/client/src/gui.rs index e87e415..9e97b07 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -18,13 +18,13 @@ use crate::{ }; #[derive(Debug)] -struct Signal { +struct Controller { record_control: mpsc::Sender, play_control: mpsc::Sender, connection_stop_sender: RwLock>>, } -impl Signal { +impl Controller { fn reset_connection(&self) -> Result<(), Error> { let connection_stop_sender = self.connection_stop_sender.write().unwrap().take(); match connection_stop_sender { @@ -43,7 +43,7 @@ impl Signal { #[derive(Debug)] struct Channel { microphone: Arc>, - speaker: Arc>, + speaker: Arc>, } #[derive(Debug, Clone, Copy)] @@ -90,12 +90,12 @@ pub struct App { client_config: Arc, gui_status: Arc>, channel: Channel, - signal: Arc, + controller: Arc, } impl App { fn reset_connection(&mut self) -> Result<(), Error> { - self.signal.reset_connection()?; + self.controller.reset_connection()?; self.gui_status.write().unwrap().room = State::Passive; Ok(()) } @@ -122,7 +122,7 @@ impl App { microphone: record_sender, speaker: play_sender, }, - signal: Signal { + controller: Controller { record_control: record_control.0, play_control: play_control.0, connection_stop_sender: None.into(), @@ -168,7 +168,7 @@ impl App { let speaker_sender = self.channel.speaker.clone(); let (connection_stop_sender, connection_stop_receiver) = oneshot::channel(); - *self.signal.connection_stop_sender.write().unwrap() = Some(connection_stop_sender); + *self.controller.connection_stop_sender.write().unwrap() = Some(connection_stop_sender); Task::perform( async move { @@ -211,7 +211,7 @@ impl App { self.gui_status.write().unwrap().microphone = State::Loading; let gui_status = self.gui_status.clone(); - let signal = self.signal.clone(); + let signal = self.controller.clone(); Task::perform( async move { signal.record_control.send(State::Active).await }, move |result| { @@ -232,7 +232,7 @@ impl App { self.gui_status.write().unwrap().microphone = State::Loading; let gui_status = self.gui_status.clone(); - let signal = self.signal.clone(); + let signal = self.controller.clone(); Task::perform( async move { signal.record_control.send(State::Passive).await }, move |result| { @@ -253,7 +253,7 @@ impl App { self.gui_status.write().unwrap().speaker = State::Loading; let gui_status = self.gui_status.clone(); - let signal = self.signal.clone(); + let signal = self.controller.clone(); Task::perform( async move { signal.play_control.send(State::Active).await }, move |result| { @@ -274,7 +274,7 @@ impl App { self.gui_status.write().unwrap().speaker = State::Loading; let gui_status = self.gui_status.clone(); - let signal = self.signal.clone(); + let signal = self.controller.clone(); Task::perform( async move { signal.play_control.send(State::Passive).await }, move |result| { diff --git a/client/src/lib.rs b/client/src/lib.rs index f4e6ab0..ce37796 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,3 +1,6 @@ +use protocol::protocol::Speaker; +use tokio::sync::broadcast; + pub mod gui; pub mod stream; pub mod voice; @@ -5,6 +8,21 @@ pub mod voice; const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 4; const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 16; +#[derive(Debug, Clone)] +pub struct SpeakerWithData { + speaker: Speaker, + audio_sender: broadcast::Sender, +} +impl SpeakerWithData { + pub fn get_speaker_id(&self) -> u8 { + self.speaker.get_id() + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.audio_sender.subscribe() + } +} + #[derive(Debug)] pub struct ClientConfig { certificate_path: String, diff --git a/client/src/stream.rs b/client/src/stream.rs index 016ae3a..0344dc0 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -1,6 +1,9 @@ use std::{net::SocketAddr, path::Path, sync::Arc}; -use protocol::{Error, NETWORK_BUFFER_LENGTH, Signal, SignalType, SignedAudioDatum}; +use protocol::{ + Error, + protocol::{NETWORK_DATA_LENGTH, Signal}, +}; use s2n_quic::{ Client, client::Connect, @@ -22,7 +25,7 @@ pub struct ConnectReturn { pub async fn connect( microphone_receiver: broadcast::Receiver, - speaker_sender: Arc>, + speaker_sender: Arc>, client_config: Arc, ) -> Result { let client = Client::builder() @@ -58,10 +61,8 @@ pub async fn connect( .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?; let (receive_stream, send_stream) = stream.split(); - let ready_signal = broadcast::channel(3); let send_signals_task = tokio::spawn(send_signals(send_stream, microphone_receiver)); - let speaker_clone = speaker_sender.clone(); let receive_signals_task = tokio::spawn(receive_signals(receive_stream, speaker_sender)); Ok(ConnectReturn { @@ -111,25 +112,15 @@ async fn send_signals( async fn receive_signals( mut receive_stream: ReceiveStream, - speaker_sender: Arc>, + speaker_sender: Arc>, ) { - let mut network_buffer = [0; NETWORK_BUFFER_LENGTH]; + let mut network_buffer = [0; NETWORK_DATA_LENGTH]; loop { match receive_stream.read_exact(&mut network_buffer).await { - Ok(_) => match Signal::unpack_signal(&network_buffer) { - Ok(received_signal) => match received_signal.signal_type { - SignalType::AudioDatum => match received_signal.unpack_audio() { - Ok(signed_audio_datum) => { - let _ = speaker_sender.send(signed_audio_datum); - } - Err(err_val) => { - eprintln!("Error: Unpack Audio | {}", err_val); - println!("Warning: Illegal Operation"); - return; - } - }, - SignalType::SpeakerLeft => todo!(), - }, + Ok(_) => match Signal::unpack(network_buffer) { + Ok(signal) => { + let _ = speaker_sender.send(signal); + } Err(err_val) => { eprintln!("Error: Unpack Signal | {}", err_val); } diff --git a/client/src/voice.rs b/client/src/voice.rs index 5452b4f..b696dc8 100644 --- a/client/src/voice.rs +++ b/client/src/voice.rs @@ -1,11 +1,30 @@ -use std::sync::Arc; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use protocol::Error; +use protocol::{ + Error, + protocol::{DEFAULT_SAMPLE_RATE, Signal, SignalType}, +}; +use rodio::{OutputStream, OutputStreamHandle, Sink, buffer::SamplesBuffer}; use tokio::sync::{broadcast, mpsc}; use crate::gui::State; +struct SpeakerSink { + speaker_id: u8, + sink: Sink, +} + +impl SpeakerSink { + fn new(speaker_id: u8, output_stream_handle: &OutputStreamHandle) -> Self { + let sink = Sink::try_new(&output_stream_handle).unwrap(); + Self { speaker_id, sink } + } +} + pub async fn record( mut record_control: mpsc::Receiver, record_sender: Arc>, @@ -61,66 +80,85 @@ pub async fn record( }) } -pub async fn play( - mut play_control: mpsc::Receiver, - mut play_receiver: broadcast::Receiver, -) -> Result<(), Error> { - let host = cpal::default_host(); - let output_device = host.default_output_device().unwrap(); - let config = output_device.default_output_config().unwrap().into(); - println!("Speaker Stream Config = {:#?}", config); +async fn mixer( + output_stream_handle: OutputStreamHandle, + mut play_receiver: broadcast::Receiver, + play_pause: Arc, +) { + let mut speaker_list = vec![]; - let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - for sample in data { - match play_receiver.try_recv() { - Ok(received_sample) => *sample = received_sample, - Err(err_val) => match err_val { - broadcast::error::TryRecvError::Empty => *sample = 0.0, - broadcast::error::TryRecvError::Closed => { - eprintln!("Error: Speaker Receive | Local Channel | Channel Closed"); - return; + while let Ok(signal) = play_receiver.recv().await { + match signal.get_signal_type() { + SignalType::AudioDatum => { + let data = if play_pause.load(std::sync::atomic::Ordering::Relaxed) { + [signal.get_audio_datum()] + } else { + [0.0] + }; + + let source = SamplesBuffer::new(2, DEFAULT_SAMPLE_RATE, data); + + match speaker_list.binary_search_by(|speaker_sink: &SpeakerSink| { + speaker_sink.speaker_id.cmp(&signal.get_speaker_id()) + }) { + Ok(speaker_sink_index) => { + let speaker_sink = speaker_list.get(speaker_sink_index).expect("Never"); + + speaker_sink.sink.append(source); } - broadcast::error::TryRecvError::Lagged(lag_amount) => { - eprintln!( - "Error: Speaker Receive | Local Channel | Lagging by -> {}", - lag_amount - ); - play_receiver = play_receiver.resubscribe(); + Err(_) => { + let speaker_sink = + SpeakerSink::new(signal.get_speaker_id(), &output_stream_handle); + + speaker_sink.sink.append(source); + + speaker_list.push(speaker_sink); + speaker_list.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id)); } - }, + } + } + SignalType::SpeakerLeft => { + speaker_list + .retain(|speaker_sink| speaker_sink.speaker_id != signal.get_speaker_id()); } } - }; + } +} - let output_stream = output_device - .build_output_stream(&config, output, voice_error, None) - .unwrap(); +pub async fn play( + mut play_control: mpsc::Receiver, + play_receiver: broadcast::Receiver, +) { + let (_output_stream, output_stream_handle) = OutputStream::try_default().unwrap(); + let play_pause = Arc::new(AtomicBool::new(true)); - output_stream - .play() - .map_err(|inner| Error::Play(inner.to_string()))?; + let mixer = tokio::spawn(mixer( + output_stream_handle, + play_receiver, + play_pause.clone(), + )); tokio::task::block_in_place(|| { loop { match play_control.blocking_recv() { - Some(message) => match message { - State::Active => output_stream - .play() - .map_err(|inner| Error::Play(inner.to_string()))?, - State::Passive => output_stream - .pause() - .map_err(|inner| Error::Play(inner.to_string()))?, + Some(requested_state) => match requested_state { + State::Active => { + play_pause.store(true, Ordering::Relaxed); + } + State::Passive => { + play_pause.store(false, Ordering::Relaxed); + } State::Loading => {} }, None => { - output_stream - .pause() - .map_err(|inner| Error::Play(inner.to_string()))?; - return Ok(()); + mixer.abort(); + return; } } } - }) + }); + + mixer.abort(); } fn voice_error(err_val: cpal::StreamError) { diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 2013a18..bfcf56a 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -6,5 +6,6 @@ edition = "2024" [dependencies] # serde = { workspace = true } # serde_json = { workspace = true } +tokio = { workspace = true } chrono = { workspace = true } bincode = { workspace = true } diff --git a/protocol/src/protocol.rs b/protocol/src/protocol.rs index fa58b12..5d7dc47 100644 --- a/protocol/src/protocol.rs +++ b/protocol/src/protocol.rs @@ -1,10 +1,17 @@ +use std::sync::Arc; + use bincode::{Decode, Encode}; +use tokio::sync::broadcast; use crate::Error; const SIGNAL_DATA_LENGTH: usize = 4; -const NETWORK_DATA_LENGTH: usize = 6; +pub const NETWORK_DATA_LENGTH: usize = 6; +pub const SPEAKER_ACTION_CHANNEL_LENGTH: usize = 1024; +pub const AUDIO_DATA_SENDER_CHANNEL_LENGTH: usize = 1024 * 16 * 4; +pub const DEFAULT_SAMPLE_RATE: u32 = 44100; +pub type SpeakerWithDataAndAction = (SpeakerWithData, SpeakerAction); type SignalBufferReturn = [u8; SIGNAL_DATA_LENGTH]; type NetworkBufferReturn = [u8; NETWORK_DATA_LENGTH]; @@ -26,20 +33,88 @@ impl Speaker { } } -#[derive(Debug, Encode, Decode)] +#[derive(Debug, Clone, Copy)] +pub enum SpeakerAction { + Join, + Left, +} + +#[derive(Debug, Clone)] +pub struct SpeakerWithData { + speaker: Speaker, + speaker_action_sender: Arc>, + audio_data_sender: Arc>, +} + +impl SpeakerWithData { + pub fn new(speaker: Speaker) -> Self { + let speaker_action_sender = broadcast::channel(SPEAKER_ACTION_CHANNEL_LENGTH).0.into(); + let audio_data_sender = broadcast::channel(AUDIO_DATA_SENDER_CHANNEL_LENGTH) + .0 + .into(); + + Self { + speaker, + speaker_action_sender, + audio_data_sender, + } + } + + pub fn get_speaker_id(&self) -> u8 { + self.speaker.get_id() + } + + pub fn get_speaker(&self) -> Speaker { + self.speaker + } + + pub fn send_speaker_action(&self, speaker_with_data_and_action: SpeakerWithDataAndAction) { + let _ = self + .speaker_action_sender + .send(speaker_with_data_and_action); + } + + pub fn clone_audio_data_sender(&self) -> Arc> { + self.audio_data_sender.clone() + } + + pub fn subscribe_speaker_action_channel( + &self, + ) -> broadcast::Receiver<(SpeakerWithData, SpeakerAction)> { + self.speaker_action_sender.subscribe() + } + + pub fn subscribe_audio_data_channel(&self) -> broadcast::Receiver { + self.audio_data_sender.subscribe() + } +} + +#[derive(Debug, Clone, Copy, Encode, Decode)] pub enum SignalType { AudioDatum, SpeakerLeft, } -#[derive(Debug, Encode, Decode)] +#[derive(Debug, Clone, Copy, Encode, Decode)] pub struct Signal { signal_type: SignalType, speaker: Speaker, - data: [u8; SIGNAL_DATA_LENGTH], + data: SignalBufferReturn, } impl Signal { + pub fn get_signal_type(&self) -> SignalType { + self.signal_type + } + + pub fn get_speaker_id(&self) -> u8 { + self.speaker.get_id() + } + + pub fn get_audio_datum(&self) -> f32 { + f32::from_be_bytes(self.data) + } + pub fn unpack(data: NetworkBufferReturn) -> Result { Ok(bincode::decode_from_slice::(&data, BINCODE_CONFIG) .map_err(|inner| Error::Deserialization(inner.to_string()))? diff --git a/server/src/stream.rs b/server/src/stream.rs index 73ccb83..82e1fe0 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -3,7 +3,9 @@ use std::{ sync::{Arc, LazyLock}, }; -use protocol::protocol::{Signal, Speaker}; +use protocol::protocol::{ + Signal, Speaker, SpeakerAction, SpeakerWithData, SpeakerWithDataAndAction, +}; use s2n_quic::{ Connection, Server, stream::{ReceiveStream, SendStream}, @@ -15,73 +17,45 @@ use tokio::{ use crate::ServerConfig; -const NEW_SPEAKER_LENGTH: usize = u8::MAX as usize; -const AUDIO_BUFFER_LENGTH: usize = 1024 * 16 * 16; +async fn add_speaker( + speaker_with_data: SpeakerWithData, +) -> ( + broadcast::Receiver, + Arc>, +) { + // Do this first so receiver can keep track of later insertions, otherwise they just wastes + let speaker_action_receiver = speaker_with_data.subscribe_speaker_action_channel(); -#[derive(Debug, Clone, Copy)] -enum SpeakerAction { - Join, - Left, -} - -#[derive(Debug, Clone)] -struct SpeakerWithData { - speaker: Speaker, - speaker_action_sender: broadcast::Sender<(SpeakerWithData, SpeakerAction)>, - audio_data_sender: broadcast::Sender, -} - -impl SpeakerWithData { - async fn new( - speaker: Speaker, - ) -> ( - broadcast::Receiver<(SpeakerWithData, SpeakerAction)>, - broadcast::Sender, - ) { - let speaker_action_channel = broadcast::channel(NEW_SPEAKER_LENGTH); - let audio_data_sender = broadcast::channel(AUDIO_BUFFER_LENGTH).0; - - let speaker_with_data = Self { - speaker, - speaker_action_sender: speaker_action_channel.0, - audio_data_sender: audio_data_sender.clone(), - }; - - let mut online_speakers = ONLINE_SPEAKERS.write().await; - for online_speaker in online_speakers.iter() { - let _ = speaker_with_data - .speaker_action_sender - .send((online_speaker.clone(), SpeakerAction::Join)); - let _ = online_speaker - .speaker_action_sender - .send((speaker_with_data.clone(), SpeakerAction::Join)); - } - - online_speakers.push(speaker_with_data); - online_speakers.sort_by_key(|speaker| speaker.speaker.get_id()); - - drop(online_speakers); - - (speaker_action_channel.1, audio_data_sender) + let mut online_speakers = ONLINE_SPEAKERS.write().await; + for online_speaker in online_speakers.iter() { + speaker_with_data.send_speaker_action((online_speaker.clone(), SpeakerAction::Join)); + online_speaker.send_speaker_action((speaker_with_data.clone(), SpeakerAction::Join)); } - async fn remove(speaker_id: u8) { - let mut online_speakers = ONLINE_SPEAKERS.write().await; + let audio_data_sender = speaker_with_data.clone_audio_data_sender(); - let speaker_index = - online_speakers.binary_search_by_key(&speaker_id, |speaker| speaker.speaker.get_id()); + online_speakers.push(speaker_with_data); + online_speakers.sort_by_key(|speaker| speaker.get_speaker_id()); - match speaker_index { - Ok(speaker_index) => { - let speaker = online_speakers.remove(speaker_index); - for online_speaker in online_speakers.iter() { - let _ = online_speaker - .speaker_action_sender - .send((speaker.clone(), SpeakerAction::Left)); - } + drop(online_speakers); + + (speaker_action_receiver, audio_data_sender) +} + +async fn remove_speaker(speaker_id: u8) { + let mut online_speakers = ONLINE_SPEAKERS.write().await; + + let speaker_index = + online_speakers.binary_search_by_key(&speaker_id, |speaker| speaker.get_speaker_id()); + + match speaker_index { + Ok(speaker_index) => { + let speaker = online_speakers.remove(speaker_index); + for online_speaker in online_speakers.iter() { + online_speaker.send_speaker_action((speaker.clone(), SpeakerAction::Left)); } - Err(_) => return, } + Err(_) => return, } } @@ -133,7 +107,9 @@ async fn handle_client(speaker: Speaker, mut connection: Connection) { let (receive_stream, send_stream) = stream.split(); let speaker_id = speaker.get_id(); - let (speaker_action_receiver, audio_data_sender) = SpeakerWithData::new(speaker).await; + + let speaker_with_data = SpeakerWithData::new(speaker); + let (speaker_action_receiver, audio_data_sender) = add_speaker(speaker_with_data).await; tokio::spawn(receive_audio_data( receive_stream, @@ -158,22 +134,25 @@ async fn send_audio_data( SpeakerAction::Join => { let send_stream = send_stream.clone(); tokio::spawn(async move { - let mut audio_data_receiver = speaker_with_data.audio_data_sender.subscribe(); + let mut audio_data_receiver = speaker_with_data.subscribe_audio_data_channel(); while let Ok(audio_datum) = audio_data_receiver.recv().await { - let data = Signal::pack_audio_datum(speaker_with_data.speaker, audio_datum); + let data = + Signal::pack_audio_datum(speaker_with_data.get_speaker(), audio_datum); if let Err(err_val) = send_stream.write().await.write_all(&data).await { eprintln!("Error: Send Audio Data | Remote | {}", err_val); - SpeakerWithData::remove(speaker_id).await; + remove_speaker(speaker_id).await; + return; }; } }); } SpeakerAction::Left => { - let data = Signal::pack_speaker_left(speaker_with_data.speaker); + let data = Signal::pack_speaker_left(speaker_with_data.get_speaker()); if let Err(err_val) = send_stream.write().await.write_all(&data).await { eprintln!("Error: Send Speaker Left | Remote | {}", err_val); - SpeakerWithData::remove(speaker_id).await; + remove_speaker(speaker_id).await; + return; } } } @@ -183,7 +162,7 @@ async fn send_audio_data( async fn receive_audio_data( mut receive_stream: ReceiveStream, speaker_id: u8, - audio_data_sender: broadcast::Sender, + audio_data_sender: Arc>, ) { loop { match receive_stream.read_f32().await { @@ -192,7 +171,8 @@ async fn receive_audio_data( } Err(err_val) => { eprintln!("Error: Receive Audio Data | Remote | {}", err_val); - SpeakerWithData::remove(speaker_id).await; + remove_speaker(speaker_id).await; + return; } } }