feat: new protocol client implementation

This commit is contained in:
Ahmet Kaan Gümüş 2025-06-14 06:04:48 +03:00
parent 51c29f7921
commit 5ad6184f13
9 changed files with 497 additions and 267 deletions

View file

@ -3,7 +3,9 @@ use std::{
sync::{Arc, LazyLock},
};
use protocol::protocol::{Signal, Speaker};
use protocol::protocol::{
Signal, Speaker, SpeakerAction, SpeakerWithData, SpeakerWithDataAndAction,
};
use s2n_quic::{
Connection, Server,
stream::{ReceiveStream, SendStream},
@ -15,73 +17,45 @@ use tokio::{
use crate::ServerConfig;
const NEW_SPEAKER_LENGTH: usize = u8::MAX as usize;
const AUDIO_BUFFER_LENGTH: usize = 1024 * 16 * 16;
async fn add_speaker(
speaker_with_data: SpeakerWithData,
) -> (
broadcast::Receiver<SpeakerWithDataAndAction>,
Arc<broadcast::Sender<f32>>,
) {
// Do this first so receiver can keep track of later insertions, otherwise they just wastes
let speaker_action_receiver = speaker_with_data.subscribe_speaker_action_channel();
#[derive(Debug, Clone, Copy)]
enum SpeakerAction {
Join,
Left,
}
#[derive(Debug, Clone)]
struct SpeakerWithData {
speaker: Speaker,
speaker_action_sender: broadcast::Sender<(SpeakerWithData, SpeakerAction)>,
audio_data_sender: broadcast::Sender<f32>,
}
impl SpeakerWithData {
async fn new(
speaker: Speaker,
) -> (
broadcast::Receiver<(SpeakerWithData, SpeakerAction)>,
broadcast::Sender<f32>,
) {
let speaker_action_channel = broadcast::channel(NEW_SPEAKER_LENGTH);
let audio_data_sender = broadcast::channel(AUDIO_BUFFER_LENGTH).0;
let speaker_with_data = Self {
speaker,
speaker_action_sender: speaker_action_channel.0,
audio_data_sender: audio_data_sender.clone(),
};
let mut online_speakers = ONLINE_SPEAKERS.write().await;
for online_speaker in online_speakers.iter() {
let _ = speaker_with_data
.speaker_action_sender
.send((online_speaker.clone(), SpeakerAction::Join));
let _ = online_speaker
.speaker_action_sender
.send((speaker_with_data.clone(), SpeakerAction::Join));
}
online_speakers.push(speaker_with_data);
online_speakers.sort_by_key(|speaker| speaker.speaker.get_id());
drop(online_speakers);
(speaker_action_channel.1, audio_data_sender)
let mut online_speakers = ONLINE_SPEAKERS.write().await;
for online_speaker in online_speakers.iter() {
speaker_with_data.send_speaker_action((online_speaker.clone(), SpeakerAction::Join));
online_speaker.send_speaker_action((speaker_with_data.clone(), SpeakerAction::Join));
}
async fn remove(speaker_id: u8) {
let mut online_speakers = ONLINE_SPEAKERS.write().await;
let audio_data_sender = speaker_with_data.clone_audio_data_sender();
let speaker_index =
online_speakers.binary_search_by_key(&speaker_id, |speaker| speaker.speaker.get_id());
online_speakers.push(speaker_with_data);
online_speakers.sort_by_key(|speaker| speaker.get_speaker_id());
match speaker_index {
Ok(speaker_index) => {
let speaker = online_speakers.remove(speaker_index);
for online_speaker in online_speakers.iter() {
let _ = online_speaker
.speaker_action_sender
.send((speaker.clone(), SpeakerAction::Left));
}
drop(online_speakers);
(speaker_action_receiver, audio_data_sender)
}
async fn remove_speaker(speaker_id: u8) {
let mut online_speakers = ONLINE_SPEAKERS.write().await;
let speaker_index =
online_speakers.binary_search_by_key(&speaker_id, |speaker| speaker.get_speaker_id());
match speaker_index {
Ok(speaker_index) => {
let speaker = online_speakers.remove(speaker_index);
for online_speaker in online_speakers.iter() {
online_speaker.send_speaker_action((speaker.clone(), SpeakerAction::Left));
}
Err(_) => return,
}
Err(_) => return,
}
}
@ -133,7 +107,9 @@ async fn handle_client(speaker: Speaker, mut connection: Connection) {
let (receive_stream, send_stream) = stream.split();
let speaker_id = speaker.get_id();
let (speaker_action_receiver, audio_data_sender) = SpeakerWithData::new(speaker).await;
let speaker_with_data = SpeakerWithData::new(speaker);
let (speaker_action_receiver, audio_data_sender) = add_speaker(speaker_with_data).await;
tokio::spawn(receive_audio_data(
receive_stream,
@ -158,22 +134,25 @@ async fn send_audio_data(
SpeakerAction::Join => {
let send_stream = send_stream.clone();
tokio::spawn(async move {
let mut audio_data_receiver = speaker_with_data.audio_data_sender.subscribe();
let mut audio_data_receiver = speaker_with_data.subscribe_audio_data_channel();
while let Ok(audio_datum) = audio_data_receiver.recv().await {
let data = Signal::pack_audio_datum(speaker_with_data.speaker, audio_datum);
let data =
Signal::pack_audio_datum(speaker_with_data.get_speaker(), audio_datum);
if let Err(err_val) = send_stream.write().await.write_all(&data).await {
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
SpeakerWithData::remove(speaker_id).await;
remove_speaker(speaker_id).await;
return;
};
}
});
}
SpeakerAction::Left => {
let data = Signal::pack_speaker_left(speaker_with_data.speaker);
let data = Signal::pack_speaker_left(speaker_with_data.get_speaker());
if let Err(err_val) = send_stream.write().await.write_all(&data).await {
eprintln!("Error: Send Speaker Left | Remote | {}", err_val);
SpeakerWithData::remove(speaker_id).await;
remove_speaker(speaker_id).await;
return;
}
}
}
@ -183,7 +162,7 @@ async fn send_audio_data(
async fn receive_audio_data(
mut receive_stream: ReceiveStream,
speaker_id: u8,
audio_data_sender: broadcast::Sender<f32>,
audio_data_sender: Arc<broadcast::Sender<f32>>,
) {
loop {
match receive_stream.read_f32().await {
@ -192,7 +171,8 @@ async fn receive_audio_data(
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);
SpeakerWithData::remove(speaker_id).await;
remove_speaker(speaker_id).await;
return;
}
}
}