feat: ✨ new protocol and server implementation
This commit is contained in:
parent
d6e5389743
commit
51c29f7921
11 changed files with 445 additions and 198 deletions
|
@ -5,8 +5,6 @@ edition = "2024"
|
|||
|
||||
[dependencies]
|
||||
protocol = { path = "../protocol" }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
s2n-quic = { workspace = true }
|
||||
|
|
|
@ -1,62 +1,91 @@
|
|||
use std::{
|
||||
path::Path,
|
||||
sync::{Arc, LazyLock},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use protocol::protocol::{Signal, Speaker};
|
||||
use s2n_quic::{
|
||||
Connection, Server,
|
||||
stream::{ReceiveStream, SendStream},
|
||||
};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
sync::RwLock,
|
||||
time::sleep,
|
||||
sync::{RwLock, broadcast},
|
||||
};
|
||||
|
||||
use crate::ServerConfig;
|
||||
const BUFFER_LENGTH: usize = 1024 * 13;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TimedBuffer {
|
||||
audio_buffer: [f32; BUFFER_LENGTH],
|
||||
latest_update: DateTime<Utc>,
|
||||
const NEW_SPEAKER_LENGTH: usize = u8::MAX as usize;
|
||||
const AUDIO_BUFFER_LENGTH: usize = 1024 * 16 * 16;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum SpeakerAction {
|
||||
Join,
|
||||
Left,
|
||||
}
|
||||
impl TimedBuffer {
|
||||
fn new() -> Self {
|
||||
let audio_buffer = [0.0 as f32; BUFFER_LENGTH];
|
||||
let latest_update = Utc::now();
|
||||
|
||||
Self {
|
||||
audio_buffer,
|
||||
latest_update,
|
||||
#[derive(Debug, Clone)]
|
||||
struct SpeakerWithData {
|
||||
speaker: Speaker,
|
||||
speaker_action_sender: broadcast::Sender<(SpeakerWithData, SpeakerAction)>,
|
||||
audio_data_sender: broadcast::Sender<f32>,
|
||||
}
|
||||
|
||||
impl SpeakerWithData {
|
||||
async fn new(
|
||||
speaker: Speaker,
|
||||
) -> (
|
||||
broadcast::Receiver<(SpeakerWithData, SpeakerAction)>,
|
||||
broadcast::Sender<f32>,
|
||||
) {
|
||||
let speaker_action_channel = broadcast::channel(NEW_SPEAKER_LENGTH);
|
||||
let audio_data_sender = broadcast::channel(AUDIO_BUFFER_LENGTH).0;
|
||||
|
||||
let speaker_with_data = Self {
|
||||
speaker,
|
||||
speaker_action_sender: speaker_action_channel.0,
|
||||
audio_data_sender: audio_data_sender.clone(),
|
||||
};
|
||||
|
||||
let mut online_speakers = ONLINE_SPEAKERS.write().await;
|
||||
for online_speaker in online_speakers.iter() {
|
||||
let _ = speaker_with_data
|
||||
.speaker_action_sender
|
||||
.send((online_speaker.clone(), SpeakerAction::Join));
|
||||
let _ = online_speaker
|
||||
.speaker_action_sender
|
||||
.send((speaker_with_data.clone(), SpeakerAction::Join));
|
||||
}
|
||||
|
||||
online_speakers.push(speaker_with_data);
|
||||
online_speakers.sort_by_key(|speaker| speaker.speaker.get_id());
|
||||
|
||||
drop(online_speakers);
|
||||
|
||||
(speaker_action_channel.1, audio_data_sender)
|
||||
}
|
||||
|
||||
async fn remove(speaker_id: u8) {
|
||||
let mut online_speakers = ONLINE_SPEAKERS.write().await;
|
||||
|
||||
let speaker_index =
|
||||
online_speakers.binary_search_by_key(&speaker_id, |speaker| speaker.speaker.get_id());
|
||||
|
||||
match speaker_index {
|
||||
Ok(speaker_index) => {
|
||||
let speaker = online_speakers.remove(speaker_index);
|
||||
for online_speaker in online_speakers.iter() {
|
||||
let _ = online_speaker
|
||||
.speaker_action_sender
|
||||
.send((speaker.clone(), SpeakerAction::Left));
|
||||
}
|
||||
}
|
||||
Err(_) => return,
|
||||
}
|
||||
}
|
||||
fn update(&mut self, new_audio_buffer: [f32; BUFFER_LENGTH]) {
|
||||
self.audio_buffer = new_audio_buffer;
|
||||
self.latest_update = Utc::now();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct User {
|
||||
user_id: u32,
|
||||
timed_buffer: Arc<RwLock<TimedBuffer>>,
|
||||
}
|
||||
|
||||
impl User {
|
||||
async fn new(user_id: u32) -> Arc<RwLock<TimedBuffer>> {
|
||||
let timed_buffer = Arc::new(RwLock::new(TimedBuffer::new()));
|
||||
let new_user = Self {
|
||||
user_id,
|
||||
timed_buffer: timed_buffer.clone(),
|
||||
};
|
||||
ONLINE_USERS.write().await.push(new_user);
|
||||
timed_buffer
|
||||
}
|
||||
}
|
||||
static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| vec![].into());
|
||||
static ONLINE_SPEAKERS: LazyLock<RwLock<Vec<SpeakerWithData>>> = LazyLock::new(|| vec![].into());
|
||||
|
||||
pub async fn start(server_config: ServerConfig) {
|
||||
let mut server = Server::builder()
|
||||
|
@ -70,14 +99,22 @@ pub async fn start(server_config: ServerConfig) {
|
|||
.start()
|
||||
.unwrap();
|
||||
|
||||
let mut user_id = 0;
|
||||
let mut speaker_id = 0;
|
||||
while let Some(connection) = server.accept().await {
|
||||
tokio::spawn(handle_client(user_id, connection));
|
||||
user_id += 1;
|
||||
let speaker = Speaker::new(speaker_id);
|
||||
tokio::spawn(handle_client(speaker, connection));
|
||||
|
||||
match speaker_id.checked_add(1) {
|
||||
Some(next_speaker_id) => speaker_id = next_speaker_id,
|
||||
None => {
|
||||
println!("Warning: Room Is Full");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_client(user_id: u32, mut connection: Connection) {
|
||||
async fn handle_client(speaker: Speaker, mut connection: Connection) {
|
||||
println!(
|
||||
"Server Name = {}",
|
||||
connection.server_name().unwrap().unwrap().to_string()
|
||||
|
@ -95,79 +132,68 @@ async fn handle_client(user_id: u32, mut connection: Connection) {
|
|||
.unwrap();
|
||||
let (receive_stream, send_stream) = stream.split();
|
||||
|
||||
let user_audio_buffer = User::new(user_id).await;
|
||||
let speaker_id = speaker.get_id();
|
||||
let (speaker_action_receiver, audio_data_sender) = SpeakerWithData::new(speaker).await;
|
||||
|
||||
tokio::spawn(receiver_audio_data(receive_stream, user_audio_buffer));
|
||||
tokio::spawn(send_audio_data(user_id, send_stream));
|
||||
tokio::spawn(receive_audio_data(
|
||||
receive_stream,
|
||||
speaker_id,
|
||||
audio_data_sender,
|
||||
));
|
||||
tokio::spawn(send_audio_data(
|
||||
send_stream,
|
||||
speaker_id,
|
||||
speaker_action_receiver,
|
||||
));
|
||||
}
|
||||
|
||||
async fn mixer(user_id: u32, latest_update: DateTime<Utc>) -> Option<[f32; BUFFER_LENGTH]> {
|
||||
let mut mixed_audio_buffer = [0.0 as f32; BUFFER_LENGTH];
|
||||
|
||||
for online_user in ONLINE_USERS.read().await.iter() {
|
||||
let online_user_timed_buffer = online_user.timed_buffer.read().await;
|
||||
|
||||
if online_user.user_id != user_id && online_user_timed_buffer.latest_update > latest_update
|
||||
{
|
||||
for (i, audio_data) in online_user_timed_buffer.audio_buffer.iter().enumerate() {
|
||||
mixed_audio_buffer[i] = mixed_audio_buffer[i] + audio_data;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if 0.0 as f32 == mixed_audio_buffer.iter().sum() {
|
||||
None
|
||||
} else {
|
||||
Some(mixed_audio_buffer)
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_audio_data(user_id: u32, mut send_stream: SendStream) {
|
||||
let mut latest_update = Utc::now();
|
||||
loop {
|
||||
let mixed_audio_buffer = match mixer(user_id, latest_update).await {
|
||||
Some(mixed_audio_buffer) => {
|
||||
latest_update = Utc::now();
|
||||
mixed_audio_buffer
|
||||
}
|
||||
None => {
|
||||
sleep(Duration::from_millis(150)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for mixed_audio_data in mixed_audio_buffer {
|
||||
if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await {
|
||||
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn receiver_audio_data(
|
||||
mut receive_stream: ReceiveStream,
|
||||
audio_buffer: Arc<RwLock<TimedBuffer>>,
|
||||
async fn send_audio_data(
|
||||
send_stream: SendStream,
|
||||
speaker_id: u8,
|
||||
mut speaker_action_receiver: broadcast::Receiver<(SpeakerWithData, SpeakerAction)>,
|
||||
) {
|
||||
let mut inner_buffer = [0.0 as f32; BUFFER_LENGTH];
|
||||
let mut i = 0;
|
||||
let send_stream = Arc::new(RwLock::new(send_stream));
|
||||
while let Ok((speaker_with_data, speaker_action)) = speaker_action_receiver.recv().await {
|
||||
match speaker_action {
|
||||
SpeakerAction::Join => {
|
||||
let send_stream = send_stream.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut audio_data_receiver = speaker_with_data.audio_data_sender.subscribe();
|
||||
|
||||
while let Ok(audio_datum) = audio_data_receiver.recv().await {
|
||||
let data = Signal::pack_audio_datum(speaker_with_data.speaker, audio_datum);
|
||||
if let Err(err_val) = send_stream.write().await.write_all(&data).await {
|
||||
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
|
||||
SpeakerWithData::remove(speaker_id).await;
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
SpeakerAction::Left => {
|
||||
let data = Signal::pack_speaker_left(speaker_with_data.speaker);
|
||||
if let Err(err_val) = send_stream.write().await.write_all(&data).await {
|
||||
eprintln!("Error: Send Speaker Left | Remote | {}", err_val);
|
||||
SpeakerWithData::remove(speaker_id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn receive_audio_data(
|
||||
mut receive_stream: ReceiveStream,
|
||||
speaker_id: u8,
|
||||
audio_data_sender: broadcast::Sender<f32>,
|
||||
) {
|
||||
loop {
|
||||
match receive_stream.read_f32().await {
|
||||
Ok(received_audio_data) => {
|
||||
inner_buffer[i] = received_audio_data;
|
||||
Ok(received_data) => {
|
||||
let _ = audio_data_sender.send(received_data);
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);
|
||||
return;
|
||||
SpeakerWithData::remove(speaker_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
if i == BUFFER_LENGTH - 1 {
|
||||
audio_buffer.write().await.update(inner_buffer);
|
||||
i = 0;
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue