feat: new protocol client implementation

This commit is contained in:
Ahmet Kaan Gümüş 2025-06-14 06:04:48 +03:00
parent 51c29f7921
commit ad0323da33
9 changed files with 522 additions and 206 deletions

324
Cargo.lock generated
View file

@ -457,7 +457,7 @@ dependencies = [
"bitflags 2.9.0",
"cexpr",
"clang-sys",
"itertools 0.12.1",
"itertools",
"lazy_static",
"lazycell",
"log",
@ -473,18 +473,18 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.70.1"
version = "0.72.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
checksum = "4f72209734318d0b619a5e0f5129918b848c416e122a3c4ce054e03cb87b726f"
dependencies = [
"bitflags 2.9.0",
"cexpr",
"clang-sys",
"itertools 0.13.0",
"itertools",
"proc-macro2",
"quote",
"regex",
"rustc-hash 1.1.0",
"rustc-hash 2.1.1",
"shlex",
"syn",
]
@ -531,7 +531,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c132eebf10f5cad5289222520a4a058514204aed6d791f1cf4fe8088b82d15f"
dependencies = [
"objc2",
"objc2 0.5.2",
]
[[package]]
@ -675,15 +675,22 @@ dependencies = [
"libloading",
]
[[package]]
name = "claxon"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bfbf56724aa9eca8afa4fcfadeb479e722935bb2a0900c2d37e0cc477af0688"
[[package]]
name = "client"
version = "0.1.0"
dependencies = [
"chrono",
"cpal",
"cpal 0.16.0",
"fixed-resample",
"iced",
"protocol",
"rodio",
"s2n-quic",
"tokio",
]
@ -703,9 +710,9 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b7f4aaa047ba3c3630b080bb9860894732ff23e2aee290a418909aa6d5df38f"
dependencies = [
"objc2",
"objc2 0.5.2",
"objc2-app-kit",
"objc2-foundation",
"objc2-foundation 0.2.2",
]
[[package]]
@ -851,12 +858,26 @@ dependencies = [
]
[[package]]
name = "coreaudio-sys"
version = "0.2.16"
name = "coreaudio-rs"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ce857aa0b77d77287acc1ac3e37a05a8c95a2af3647d23b15f263bdaeb7562b"
checksum = "1aae284fbaf7d27aa0e292f7677dfbe26503b0d555026f702940805a630eac17"
dependencies = [
"bindgen 0.70.1",
"bitflags 1.3.2",
"libc",
"objc2-audio-toolbox",
"objc2-core-audio",
"objc2-core-audio-types",
"objc2-core-foundation",
]
[[package]]
name = "coreaudio-sys"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceec7a6067e62d6f931a2baf6f3a751f4a892595bcec1461a3c94ef9949864b6"
dependencies = [
"bindgen 0.72.0",
]
[[package]]
@ -890,7 +911,7 @@ checksum = "873dab07c8f743075e57f524c583985fbaf745602acbe916a01539364369a779"
dependencies = [
"alsa",
"core-foundation-sys",
"coreaudio-rs",
"coreaudio-rs 0.11.3",
"dasp_sample",
"jni",
"js-sys",
@ -905,6 +926,32 @@ dependencies = [
"windows 0.54.0",
]
[[package]]
name = "cpal"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbd307f43cc2a697e2d1f8bc7a1d824b5269e052209e28883e5bc04d095aaa3f"
dependencies = [
"alsa",
"coreaudio-rs 0.13.0",
"dasp_sample",
"jni",
"js-sys",
"libc",
"mach2",
"ndk 0.9.0",
"ndk-context",
"num-derive",
"num-traits",
"objc2-audio-toolbox",
"objc2-core-audio",
"objc2-core-audio-types",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"windows 0.54.0",
]
[[package]]
name = "crc32fast"
version = "1.4.2"
@ -969,8 +1016,8 @@ checksum = "18e1a09f280e29a8b00bc7e81eca5ac87dca0575639c9422a5fa25a07bb884b8"
dependencies = [
"ashpd",
"async-std",
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
"web-sys",
"winreg",
]
@ -987,6 +1034,16 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd0c93bb4b0c6d9b77f4435b0ae98c24d17f1c45b2ff844c6151a07256ca923b"
[[package]]
name = "dispatch2"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec"
dependencies = [
"bitflags 2.9.0",
"objc2 0.6.1",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@ -1078,6 +1135,15 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "encoding_rs"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3"
dependencies = [
"cfg-if",
]
[[package]]
name = "endi"
version = "1.1.0"
@ -1644,6 +1710,12 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "hound"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62adaabb884c94955b19907d60019f4e145d091c75345379e70d1ee696f7854f"
[[package]]
name = "iana-time-zone"
version = "0.1.63"
@ -1978,15 +2050,6 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
dependencies = [
"either",
]
[[package]]
name = "jni"
version = "0.21.1"
@ -2077,6 +2140,17 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lewton"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "777b48df9aaab155475a83a7df3070395ea1ac6902f5cd062b8f2b028075c030"
dependencies = [
"byteorder",
"ogg",
"tinyvec",
]
[[package]]
name = "libc"
version = "0.2.172"
@ -2461,6 +2535,15 @@ dependencies = [
"objc2-encode",
]
[[package]]
name = "objc2"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88c6597e14493ab2e44ce58f2fdecf095a51f12ca57bec060a11c57332520551"
dependencies = [
"objc2-encode",
]
[[package]]
name = "objc2-app-kit"
version = "0.2.2"
@ -2470,13 +2553,28 @@ dependencies = [
"bitflags 2.9.0",
"block2",
"libc",
"objc2",
"objc2 0.5.2",
"objc2-core-data",
"objc2-core-image",
"objc2-foundation",
"objc2-foundation 0.2.2",
"objc2-quartz-core",
]
[[package]]
name = "objc2-audio-toolbox"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10cbe18d879e20a4aea544f8befe38bcf52255eb63d3f23eca2842f3319e4c07"
dependencies = [
"bitflags 2.9.0",
"libc",
"objc2 0.6.1",
"objc2-core-audio",
"objc2-core-audio-types",
"objc2-core-foundation",
"objc2-foundation 0.3.1",
]
[[package]]
name = "objc2-cloud-kit"
version = "0.2.2"
@ -2485,9 +2583,9 @@ checksum = "74dd3b56391c7a0596a295029734d3c1c5e7e510a4cb30245f8221ccea96b009"
dependencies = [
"bitflags 2.9.0",
"block2",
"objc2",
"objc2 0.5.2",
"objc2-core-location",
"objc2-foundation",
"objc2-foundation 0.2.2",
]
[[package]]
@ -2497,8 +2595,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5ff520e9c33812fd374d8deecef01d4a840e7b41862d849513de77e44aa4889"
dependencies = [
"block2",
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
[[package]]
name = "objc2-core-audio"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca44961e888e19313b808f23497073e3f6b3c22bb485056674c8b49f3b025c82"
dependencies = [
"dispatch2",
"objc2 0.6.1",
"objc2-core-audio-types",
"objc2-core-foundation",
]
[[package]]
name = "objc2-core-audio-types"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f1cc99bb07ad2ddb6527ddf83db6a15271bb036b3eb94b801cd44fdc666ee1"
dependencies = [
"bitflags 2.9.0",
"objc2 0.6.1",
]
[[package]]
@ -2509,8 +2629,19 @@ checksum = "617fbf49e071c178c0b24c080767db52958f716d9eabdf0890523aeae54773ef"
dependencies = [
"bitflags 2.9.0",
"block2",
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
[[package]]
name = "objc2-core-foundation"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166"
dependencies = [
"bitflags 2.9.0",
"dispatch2",
"objc2 0.6.1",
]
[[package]]
@ -2520,8 +2651,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55260963a527c99f1819c4f8e3b47fe04f9650694ef348ffd2227e8196d34c80"
dependencies = [
"block2",
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
"objc2-metal",
]
@ -2532,9 +2663,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "000cfee34e683244f284252ee206a27953279d370e309649dc3ee317b37e5781"
dependencies = [
"block2",
"objc2",
"objc2 0.5.2",
"objc2-contacts",
"objc2-foundation",
"objc2-foundation 0.2.2",
]
[[package]]
@ -2553,7 +2684,16 @@ dependencies = [
"block2",
"dispatch",
"libc",
"objc2",
"objc2 0.5.2",
]
[[package]]
name = "objc2-foundation"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900831247d2fe1a09a683278e5384cfb8c80c79fe6b166f9d14bfdde0ea1b03c"
dependencies = [
"objc2 0.6.1",
]
[[package]]
@ -2563,9 +2703,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1a1ae721c5e35be65f01a03b6d2ac13a54cb4fa70d8a5da293d7b0020261398"
dependencies = [
"block2",
"objc2",
"objc2 0.5.2",
"objc2-app-kit",
"objc2-foundation",
"objc2-foundation 0.2.2",
]
[[package]]
@ -2576,8 +2716,8 @@ checksum = "dd0cba1276f6023976a406a14ffa85e1fdd19df6b0f737b063b95f6c8c7aadd6"
dependencies = [
"bitflags 2.9.0",
"block2",
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
[[package]]
@ -2588,8 +2728,8 @@ checksum = "e42bee7bff906b14b167da2bac5efe6b6a07e6f7c0a21a7308d40c960242dc7a"
dependencies = [
"bitflags 2.9.0",
"block2",
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
"objc2-metal",
]
@ -2599,8 +2739,8 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a684efe3dec1b305badae1a28f6555f6ddd3bb2c2267896782858d5a78404dc"
dependencies = [
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
[[package]]
@ -2611,12 +2751,12 @@ checksum = "b8bb46798b20cd6b91cbd113524c490f1686f4c4e8f49502431415f3512e2b6f"
dependencies = [
"bitflags 2.9.0",
"block2",
"objc2",
"objc2 0.5.2",
"objc2-cloud-kit",
"objc2-core-data",
"objc2-core-image",
"objc2-core-location",
"objc2-foundation",
"objc2-foundation 0.2.2",
"objc2-link-presentation",
"objc2-quartz-core",
"objc2-symbols",
@ -2631,8 +2771,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44fa5f9748dbfe1ca6c0b79ad20725a11eca7c2218bceb4b005cb1be26273bfe"
dependencies = [
"block2",
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
[[package]]
@ -2643,9 +2783,9 @@ checksum = "76cfcbf642358e8689af64cee815d139339f3ed8ad05103ed5eaf73db8d84cb3"
dependencies = [
"bitflags 2.9.0",
"block2",
"objc2",
"objc2 0.5.2",
"objc2-core-location",
"objc2-foundation",
"objc2-foundation 0.2.2",
]
[[package]]
@ -2680,6 +2820,15 @@ dependencies = [
"cc",
]
[[package]]
name = "ogg"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6951b4e8bf21c8193da321bcce9c9dd2e13c858fe078bf9054a288b419ae5d6e"
dependencies = [
"byteorder",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@ -2929,6 +3078,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"chrono",
"tokio",
]
[[package]]
@ -3170,6 +3320,19 @@ dependencies = [
"portable-atomic-util",
]
[[package]]
name = "rodio"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7ceb6607dd738c99bc8cb28eff249b7cd5c8ec88b9db96c0608c1480d140fb1"
dependencies = [
"claxon",
"cpal 0.15.3",
"hound",
"lewton",
"symphonia",
]
[[package]]
name = "roxmltree"
version = "0.20.0"
@ -3694,8 +3857,8 @@ dependencies = [
"js-sys",
"log",
"memmap2",
"objc2",
"objc2-foundation",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
"objc2-quartz-core",
"raw-window-handle",
"redox_syscall 0.5.12",
@ -3788,6 +3951,55 @@ dependencies = [
"zeno",
]
[[package]]
name = "symphonia"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "815c942ae7ee74737bb00f965fa5b5a2ac2ce7b6c01c0cc169bbeaf7abd5f5a9"
dependencies = [
"lazy_static",
"symphonia-bundle-mp3",
"symphonia-core",
"symphonia-metadata",
]
[[package]]
name = "symphonia-bundle-mp3"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c01c2aae70f0f1fb096b6f0ff112a930b1fb3626178fba3ae68b09dce71706d4"
dependencies = [
"lazy_static",
"log",
"symphonia-core",
"symphonia-metadata",
]
[[package]]
name = "symphonia-core"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "798306779e3dc7d5231bd5691f5a813496dc79d3f56bf82e25789f2094e022c3"
dependencies = [
"arrayvec",
"bitflags 1.3.2",
"bytemuck",
"lazy_static",
"log",
]
[[package]]
name = "symphonia-metadata"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc622b9841a10089c5b18e99eb904f4341615d5aa55bbf4eedde1be721a4023c"
dependencies = [
"encoding_rs",
"lazy_static",
"log",
"symphonia-core",
]
[[package]]
name = "syn"
version = "2.0.101"
@ -5038,9 +5250,9 @@ dependencies = [
"libc",
"memmap2",
"ndk 0.9.0",
"objc2",
"objc2 0.5.2",
"objc2-app-kit",
"objc2-foundation",
"objc2-foundation 0.2.2",
"objc2-ui-kit",
"orbclient",
"percent-encoding",

View file

@ -8,6 +8,7 @@ protocol = { path = "../protocol" }
chrono = { workspace = true }
tokio = { workspace = true }
s2n-quic = { workspace = true }
cpal = "0.15.3"
cpal = "0.16.0"
iced = { features = ["tokio"], git = "https://github.com/iced-rs/iced", rev = "d39022432c778a8cda455f40b9c12245db86ce45" }
fixed-resample = "0.8.0"
rodio = "0.20.1"

View file

@ -18,13 +18,13 @@ use crate::{
};
#[derive(Debug)]
struct Signal {
struct Controller {
record_control: mpsc::Sender<State>,
play_control: mpsc::Sender<State>,
connection_stop_sender: RwLock<Option<oneshot::Sender<bool>>>,
}
impl Signal {
impl Controller {
fn reset_connection(&self) -> Result<(), Error> {
let connection_stop_sender = self.connection_stop_sender.write().unwrap().take();
match connection_stop_sender {
@ -43,7 +43,7 @@ impl Signal {
#[derive(Debug)]
struct Channel {
microphone: Arc<broadcast::Sender<f32>>,
speaker: Arc<broadcast::Sender<f32>>,
speaker: Arc<broadcast::Sender<protocol::protocol::Signal>>,
}
#[derive(Debug, Clone, Copy)]
@ -90,12 +90,12 @@ pub struct App {
client_config: Arc<ClientConfig>,
gui_status: Arc<RwLock<GUIStatus>>,
channel: Channel,
signal: Arc<Signal>,
controller: Arc<Controller>,
}
impl App {
fn reset_connection(&mut self) -> Result<(), Error> {
self.signal.reset_connection()?;
self.controller.reset_connection()?;
self.gui_status.write().unwrap().room = State::Passive;
Ok(())
}
@ -122,7 +122,7 @@ impl App {
microphone: record_sender,
speaker: play_sender,
},
signal: Signal {
controller: Controller {
record_control: record_control.0,
play_control: play_control.0,
connection_stop_sender: None.into(),
@ -168,7 +168,7 @@ impl App {
let speaker_sender = self.channel.speaker.clone();
let (connection_stop_sender, connection_stop_receiver) = oneshot::channel();
*self.signal.connection_stop_sender.write().unwrap() = Some(connection_stop_sender);
*self.controller.connection_stop_sender.write().unwrap() = Some(connection_stop_sender);
Task::perform(
async move {
@ -211,7 +211,7 @@ impl App {
self.gui_status.write().unwrap().microphone = State::Loading;
let gui_status = self.gui_status.clone();
let signal = self.signal.clone();
let signal = self.controller.clone();
Task::perform(
async move { signal.record_control.send(State::Active).await },
move |result| {
@ -232,7 +232,7 @@ impl App {
self.gui_status.write().unwrap().microphone = State::Loading;
let gui_status = self.gui_status.clone();
let signal = self.signal.clone();
let signal = self.controller.clone();
Task::perform(
async move { signal.record_control.send(State::Passive).await },
move |result| {
@ -253,7 +253,7 @@ impl App {
self.gui_status.write().unwrap().speaker = State::Loading;
let gui_status = self.gui_status.clone();
let signal = self.signal.clone();
let signal = self.controller.clone();
Task::perform(
async move { signal.play_control.send(State::Active).await },
move |result| {
@ -274,7 +274,7 @@ impl App {
self.gui_status.write().unwrap().speaker = State::Loading;
let gui_status = self.gui_status.clone();
let signal = self.signal.clone();
let signal = self.controller.clone();
Task::perform(
async move { signal.play_control.send(State::Passive).await },
move |result| {

View file

@ -1,3 +1,6 @@
use protocol::protocol::Speaker;
use tokio::sync::broadcast;
pub mod gui;
pub mod stream;
pub mod voice;
@ -5,6 +8,21 @@ pub mod voice;
const MICROPHONE_BUFFER_LENGHT: usize = 1024 * 4;
const SPEAKER_BUFFER_LENGHT: usize = 1024 * 16 * 16;
#[derive(Debug, Clone)]
pub struct SpeakerWithData {
speaker: Speaker,
audio_sender: broadcast::Sender<f32>,
}
impl SpeakerWithData {
pub fn get_speaker_id(&self) -> u8 {
self.speaker.get_id()
}
pub fn subscribe(&self) -> broadcast::Receiver<f32> {
self.audio_sender.subscribe()
}
}
#[derive(Debug)]
pub struct ClientConfig {
certificate_path: String,

View file

@ -1,6 +1,9 @@
use std::{net::SocketAddr, path::Path, sync::Arc};
use protocol::{Error, NETWORK_BUFFER_LENGTH, Signal, SignalType, SignedAudioDatum};
use protocol::{
Error,
protocol::{NETWORK_DATA_LENGTH, Signal},
};
use s2n_quic::{
Client,
client::Connect,
@ -22,7 +25,7 @@ pub struct ConnectReturn {
pub async fn connect(
microphone_receiver: broadcast::Receiver<f32>,
speaker_sender: Arc<broadcast::Sender<f32>>,
speaker_sender: Arc<broadcast::Sender<Signal>>,
client_config: Arc<ClientConfig>,
) -> Result<ConnectReturn, Error> {
let client = Client::builder()
@ -58,10 +61,8 @@ pub async fn connect(
.map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?;
let (receive_stream, send_stream) = stream.split();
let ready_signal = broadcast::channel(3);
let send_signals_task = tokio::spawn(send_signals(send_stream, microphone_receiver));
let speaker_clone = speaker_sender.clone();
let receive_signals_task = tokio::spawn(receive_signals(receive_stream, speaker_sender));
Ok(ConnectReturn {
@ -111,25 +112,15 @@ async fn send_signals(
async fn receive_signals(
mut receive_stream: ReceiveStream,
speaker_sender: Arc<broadcast::Sender<SignedAudioDatum>>,
speaker_sender: Arc<broadcast::Sender<Signal>>,
) {
let mut network_buffer = [0; NETWORK_BUFFER_LENGTH];
let mut network_buffer = [0; NETWORK_DATA_LENGTH];
loop {
match receive_stream.read_exact(&mut network_buffer).await {
Ok(_) => match Signal::unpack_signal(&network_buffer) {
Ok(received_signal) => match received_signal.signal_type {
SignalType::AudioDatum => match received_signal.unpack_audio() {
Ok(signed_audio_datum) => {
let _ = speaker_sender.send(signed_audio_datum);
}
Err(err_val) => {
eprintln!("Error: Unpack Audio | {}", err_val);
println!("Warning: Illegal Operation");
return;
}
},
SignalType::SpeakerLeft => todo!(),
},
Ok(_) => match Signal::unpack(network_buffer) {
Ok(signal) => {
let _ = speaker_sender.send(signal);
}
Err(err_val) => {
eprintln!("Error: Unpack Signal | {}", err_val);
}

View file

@ -1,11 +1,30 @@
use std::sync::Arc;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use protocol::Error;
use protocol::{
Error,
protocol::{DEFAULT_SAMPLE_RATE, Signal, SignalType},
};
use rodio::{OutputStream, OutputStreamHandle, Sink, buffer::SamplesBuffer};
use tokio::sync::{broadcast, mpsc};
use crate::gui::State;
struct SpeakerSink {
speaker_id: u8,
sink: Sink,
}
impl SpeakerSink {
fn new(speaker_id: u8, output_stream_handle: &OutputStreamHandle) -> Self {
let sink = Sink::try_new(&output_stream_handle).unwrap();
Self { speaker_id, sink }
}
}
pub async fn record(
mut record_control: mpsc::Receiver<State>,
record_sender: Arc<broadcast::Sender<f32>>,
@ -61,66 +80,85 @@ pub async fn record(
})
}
pub async fn play(
mut play_control: mpsc::Receiver<State>,
mut play_receiver: broadcast::Receiver<f32>,
) -> Result<(), Error> {
let host = cpal::default_host();
let output_device = host.default_output_device().unwrap();
let config = output_device.default_output_config().unwrap().into();
println!("Speaker Stream Config = {:#?}", config);
async fn mixer(
output_stream_handle: OutputStreamHandle,
mut play_receiver: broadcast::Receiver<Signal>,
play_pause: Arc<AtomicBool>,
) {
let mut speaker_list = vec![];
let output = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
for sample in data {
match play_receiver.try_recv() {
Ok(received_sample) => *sample = received_sample,
Err(err_val) => match err_val {
broadcast::error::TryRecvError::Empty => *sample = 0.0,
broadcast::error::TryRecvError::Closed => {
eprintln!("Error: Speaker Receive | Local Channel | Channel Closed");
return;
while let Ok(signal) = play_receiver.recv().await {
match signal.get_signal_type() {
SignalType::AudioDatum => {
let data = if play_pause.load(std::sync::atomic::Ordering::Relaxed) {
[signal.get_audio_datum()]
} else {
[0.0]
};
let source = SamplesBuffer::new(2, DEFAULT_SAMPLE_RATE, data);
match speaker_list.binary_search_by(|speaker_sink: &SpeakerSink| {
speaker_sink.speaker_id.cmp(&signal.get_speaker_id())
}) {
Ok(speaker_sink_index) => {
let speaker_sink = speaker_list.get(speaker_sink_index).expect("Never");
speaker_sink.sink.append(source);
}
broadcast::error::TryRecvError::Lagged(lag_amount) => {
eprintln!(
"Error: Speaker Receive | Local Channel | Lagging by -> {}",
lag_amount
);
play_receiver = play_receiver.resubscribe();
Err(_) => {
let speaker_sink =
SpeakerSink::new(signal.get_speaker_id(), &output_stream_handle);
speaker_sink.sink.append(source);
speaker_list.push(speaker_sink);
speaker_list.sort_by(|x, y| x.speaker_id.cmp(&y.speaker_id));
}
},
}
}
SignalType::SpeakerLeft => {
speaker_list
.retain(|speaker_sink| speaker_sink.speaker_id != signal.get_speaker_id());
}
}
};
}
}
let output_stream = output_device
.build_output_stream(&config, output, voice_error, None)
.unwrap();
pub async fn play(
mut play_control: mpsc::Receiver<State>,
play_receiver: broadcast::Receiver<Signal>,
) {
let (_output_stream, output_stream_handle) = OutputStream::try_default().unwrap();
let play_pause = Arc::new(AtomicBool::new(true));
output_stream
.play()
.map_err(|inner| Error::Play(inner.to_string()))?;
let mixer = tokio::spawn(mixer(
output_stream_handle,
play_receiver,
play_pause.clone(),
));
tokio::task::block_in_place(|| {
loop {
match play_control.blocking_recv() {
Some(message) => match message {
State::Active => output_stream
.play()
.map_err(|inner| Error::Play(inner.to_string()))?,
State::Passive => output_stream
.pause()
.map_err(|inner| Error::Play(inner.to_string()))?,
Some(requested_state) => match requested_state {
State::Active => {
play_pause.store(true, Ordering::Relaxed);
}
State::Passive => {
play_pause.store(false, Ordering::Relaxed);
}
State::Loading => {}
},
None => {
output_stream
.pause()
.map_err(|inner| Error::Play(inner.to_string()))?;
return Ok(());
mixer.abort();
return;
}
}
}
})
});
mixer.abort();
}
fn voice_error(err_val: cpal::StreamError) {

View file

@ -6,5 +6,6 @@ edition = "2024"
[dependencies]
# serde = { workspace = true }
# serde_json = { workspace = true }
tokio = { workspace = true }
chrono = { workspace = true }
bincode = { workspace = true }

View file

@ -1,10 +1,17 @@
use std::sync::Arc;
use bincode::{Decode, Encode};
use tokio::sync::broadcast;
use crate::Error;
const SIGNAL_DATA_LENGTH: usize = 4;
const NETWORK_DATA_LENGTH: usize = 6;
pub const NETWORK_DATA_LENGTH: usize = 6;
pub const SPEAKER_ACTION_CHANNEL_LENGTH: usize = 1024;
pub const AUDIO_DATA_SENDER_CHANNEL_LENGTH: usize = 1024 * 16 * 4;
pub const DEFAULT_SAMPLE_RATE: u32 = 44100;
pub type SpeakerWithDataAndAction = (SpeakerWithData, SpeakerAction);
type SignalBufferReturn = [u8; SIGNAL_DATA_LENGTH];
type NetworkBufferReturn = [u8; NETWORK_DATA_LENGTH];
@ -26,20 +33,88 @@ impl Speaker {
}
}
#[derive(Debug, Encode, Decode)]
#[derive(Debug, Clone, Copy)]
pub enum SpeakerAction {
Join,
Left,
}
#[derive(Debug, Clone)]
pub struct SpeakerWithData {
speaker: Speaker,
speaker_action_sender: Arc<broadcast::Sender<(SpeakerWithData, SpeakerAction)>>,
audio_data_sender: Arc<broadcast::Sender<f32>>,
}
impl SpeakerWithData {
pub fn new(speaker: Speaker) -> Self {
let speaker_action_sender = broadcast::channel(SPEAKER_ACTION_CHANNEL_LENGTH).0.into();
let audio_data_sender = broadcast::channel(AUDIO_DATA_SENDER_CHANNEL_LENGTH)
.0
.into();
Self {
speaker,
speaker_action_sender,
audio_data_sender,
}
}
pub fn get_speaker_id(&self) -> u8 {
self.speaker.get_id()
}
pub fn get_speaker(&self) -> Speaker {
self.speaker
}
pub fn send_speaker_action(&self, speaker_with_data_and_action: SpeakerWithDataAndAction) {
let _ = self
.speaker_action_sender
.send(speaker_with_data_and_action);
}
pub fn clone_audio_data_sender(&self) -> Arc<broadcast::Sender<f32>> {
self.audio_data_sender.clone()
}
pub fn subscribe_speaker_action_channel(
&self,
) -> broadcast::Receiver<(SpeakerWithData, SpeakerAction)> {
self.speaker_action_sender.subscribe()
}
pub fn subscribe_audio_data_channel(&self) -> broadcast::Receiver<f32> {
self.audio_data_sender.subscribe()
}
}
#[derive(Debug, Clone, Copy, Encode, Decode)]
pub enum SignalType {
AudioDatum,
SpeakerLeft,
}
#[derive(Debug, Encode, Decode)]
#[derive(Debug, Clone, Copy, Encode, Decode)]
pub struct Signal {
signal_type: SignalType,
speaker: Speaker,
data: [u8; SIGNAL_DATA_LENGTH],
data: SignalBufferReturn,
}
impl Signal {
pub fn get_signal_type(&self) -> SignalType {
self.signal_type
}
pub fn get_speaker_id(&self) -> u8 {
self.speaker.get_id()
}
pub fn get_audio_datum(&self) -> f32 {
f32::from_be_bytes(self.data)
}
pub fn unpack(data: NetworkBufferReturn) -> Result<Self, Error> {
Ok(bincode::decode_from_slice::<Self, _>(&data, BINCODE_CONFIG)
.map_err(|inner| Error::Deserialization(inner.to_string()))?

View file

@ -3,7 +3,9 @@ use std::{
sync::{Arc, LazyLock},
};
use protocol::protocol::{Signal, Speaker};
use protocol::protocol::{
Signal, Speaker, SpeakerAction, SpeakerWithData, SpeakerWithDataAndAction,
};
use s2n_quic::{
Connection, Server,
stream::{ReceiveStream, SendStream},
@ -15,73 +17,45 @@ use tokio::{
use crate::ServerConfig;
const NEW_SPEAKER_LENGTH: usize = u8::MAX as usize;
const AUDIO_BUFFER_LENGTH: usize = 1024 * 16 * 16;
async fn add_speaker(
speaker_with_data: SpeakerWithData,
) -> (
broadcast::Receiver<SpeakerWithDataAndAction>,
Arc<broadcast::Sender<f32>>,
) {
// Do this first so receiver can keep track of later insertions, otherwise they just wastes
let speaker_action_receiver = speaker_with_data.subscribe_speaker_action_channel();
#[derive(Debug, Clone, Copy)]
enum SpeakerAction {
Join,
Left,
}
#[derive(Debug, Clone)]
struct SpeakerWithData {
speaker: Speaker,
speaker_action_sender: broadcast::Sender<(SpeakerWithData, SpeakerAction)>,
audio_data_sender: broadcast::Sender<f32>,
}
impl SpeakerWithData {
async fn new(
speaker: Speaker,
) -> (
broadcast::Receiver<(SpeakerWithData, SpeakerAction)>,
broadcast::Sender<f32>,
) {
let speaker_action_channel = broadcast::channel(NEW_SPEAKER_LENGTH);
let audio_data_sender = broadcast::channel(AUDIO_BUFFER_LENGTH).0;
let speaker_with_data = Self {
speaker,
speaker_action_sender: speaker_action_channel.0,
audio_data_sender: audio_data_sender.clone(),
};
let mut online_speakers = ONLINE_SPEAKERS.write().await;
for online_speaker in online_speakers.iter() {
let _ = speaker_with_data
.speaker_action_sender
.send((online_speaker.clone(), SpeakerAction::Join));
let _ = online_speaker
.speaker_action_sender
.send((speaker_with_data.clone(), SpeakerAction::Join));
}
online_speakers.push(speaker_with_data);
online_speakers.sort_by_key(|speaker| speaker.speaker.get_id());
drop(online_speakers);
(speaker_action_channel.1, audio_data_sender)
let mut online_speakers = ONLINE_SPEAKERS.write().await;
for online_speaker in online_speakers.iter() {
speaker_with_data.send_speaker_action((online_speaker.clone(), SpeakerAction::Join));
online_speaker.send_speaker_action((speaker_with_data.clone(), SpeakerAction::Join));
}
async fn remove(speaker_id: u8) {
let mut online_speakers = ONLINE_SPEAKERS.write().await;
let audio_data_sender = speaker_with_data.clone_audio_data_sender();
let speaker_index =
online_speakers.binary_search_by_key(&speaker_id, |speaker| speaker.speaker.get_id());
online_speakers.push(speaker_with_data);
online_speakers.sort_by_key(|speaker| speaker.get_speaker_id());
match speaker_index {
Ok(speaker_index) => {
let speaker = online_speakers.remove(speaker_index);
for online_speaker in online_speakers.iter() {
let _ = online_speaker
.speaker_action_sender
.send((speaker.clone(), SpeakerAction::Left));
}
drop(online_speakers);
(speaker_action_receiver, audio_data_sender)
}
async fn remove_speaker(speaker_id: u8) {
let mut online_speakers = ONLINE_SPEAKERS.write().await;
let speaker_index =
online_speakers.binary_search_by_key(&speaker_id, |speaker| speaker.get_speaker_id());
match speaker_index {
Ok(speaker_index) => {
let speaker = online_speakers.remove(speaker_index);
for online_speaker in online_speakers.iter() {
online_speaker.send_speaker_action((speaker.clone(), SpeakerAction::Left));
}
Err(_) => return,
}
Err(_) => return,
}
}
@ -133,7 +107,9 @@ async fn handle_client(speaker: Speaker, mut connection: Connection) {
let (receive_stream, send_stream) = stream.split();
let speaker_id = speaker.get_id();
let (speaker_action_receiver, audio_data_sender) = SpeakerWithData::new(speaker).await;
let speaker_with_data = SpeakerWithData::new(speaker);
let (speaker_action_receiver, audio_data_sender) = add_speaker(speaker_with_data).await;
tokio::spawn(receive_audio_data(
receive_stream,
@ -158,22 +134,25 @@ async fn send_audio_data(
SpeakerAction::Join => {
let send_stream = send_stream.clone();
tokio::spawn(async move {
let mut audio_data_receiver = speaker_with_data.audio_data_sender.subscribe();
let mut audio_data_receiver = speaker_with_data.subscribe_audio_data_channel();
while let Ok(audio_datum) = audio_data_receiver.recv().await {
let data = Signal::pack_audio_datum(speaker_with_data.speaker, audio_datum);
let data =
Signal::pack_audio_datum(speaker_with_data.get_speaker(), audio_datum);
if let Err(err_val) = send_stream.write().await.write_all(&data).await {
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
SpeakerWithData::remove(speaker_id).await;
remove_speaker(speaker_id).await;
return;
};
}
});
}
SpeakerAction::Left => {
let data = Signal::pack_speaker_left(speaker_with_data.speaker);
let data = Signal::pack_speaker_left(speaker_with_data.get_speaker());
if let Err(err_val) = send_stream.write().await.write_all(&data).await {
eprintln!("Error: Send Speaker Left | Remote | {}", err_val);
SpeakerWithData::remove(speaker_id).await;
remove_speaker(speaker_id).await;
return;
}
}
}
@ -183,7 +162,7 @@ async fn send_audio_data(
async fn receive_audio_data(
mut receive_stream: ReceiveStream,
speaker_id: u8,
audio_data_sender: broadcast::Sender<f32>,
audio_data_sender: Arc<broadcast::Sender<f32>>,
) {
loop {
match receive_stream.read_f32().await {
@ -192,7 +171,8 @@ async fn receive_audio_data(
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);
SpeakerWithData::remove(speaker_id).await;
remove_speaker(speaker_id).await;
return;
}
}
}