feat: ✨ remote mixer part 5
This commit is contained in:
parent
b0bb6c6a90
commit
d6e5389743
1 changed files with 85 additions and 58 deletions
|
@ -1,70 +1,62 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
|
||||||
path::Path,
|
path::Path,
|
||||||
sync::{Arc, LazyLock},
|
sync::{Arc, LazyLock},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use s2n_quic::{
|
use s2n_quic::{
|
||||||
Connection, Server,
|
Connection, Server,
|
||||||
stream::{ReceiveStream, SendStream},
|
stream::{ReceiveStream, SendStream},
|
||||||
};
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
sync::{RwLock, broadcast},
|
sync::RwLock,
|
||||||
time::sleep,
|
time::sleep,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::ServerConfig;
|
use crate::ServerConfig;
|
||||||
const BUFFER_LENGTH: usize = 1024 * 16 * 2;
|
const BUFFER_LENGTH: usize = 1024 * 13;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct TimedBuffer {
|
||||||
|
audio_buffer: [f32; BUFFER_LENGTH],
|
||||||
|
latest_update: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
impl TimedBuffer {
|
||||||
|
fn new() -> Self {
|
||||||
|
let audio_buffer = [0.0 as f32; BUFFER_LENGTH];
|
||||||
|
let latest_update = Utc::now();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
audio_buffer,
|
||||||
|
latest_update,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn update(&mut self, new_audio_buffer: [f32; BUFFER_LENGTH]) {
|
||||||
|
self.audio_buffer = new_audio_buffer;
|
||||||
|
self.latest_update = Utc::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct User {
|
struct User {
|
||||||
audio_buffer: Arc<RwLock<VecDeque<f32>>>,
|
user_id: u32,
|
||||||
|
timed_buffer: Arc<RwLock<TimedBuffer>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl User {
|
impl User {
|
||||||
async fn new() -> Arc<RwLock<VecDeque<f32>>> {
|
async fn new(user_id: u32) -> Arc<RwLock<TimedBuffer>> {
|
||||||
let audio_buffer = Arc::new(RwLock::new(VecDeque::new()));
|
let timed_buffer = Arc::new(RwLock::new(TimedBuffer::new()));
|
||||||
let new_user = Self {
|
let new_user = Self {
|
||||||
audio_buffer: audio_buffer.clone(),
|
user_id,
|
||||||
|
timed_buffer: timed_buffer.clone(),
|
||||||
};
|
};
|
||||||
ONLINE_USERS.write().await.push(new_user);
|
ONLINE_USERS.write().await.push(new_user);
|
||||||
audio_buffer
|
timed_buffer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| vec![].into());
|
static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| vec![].into());
|
||||||
static GLOBAL_MIXER: LazyLock<broadcast::Sender<f32>> =
|
|
||||||
LazyLock::new(|| broadcast::channel(BUFFER_LENGTH).0);
|
|
||||||
|
|
||||||
async fn global_mixer() {
|
|
||||||
let global_mixer_sender = GLOBAL_MIXER.clone();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
sleep(Duration::from_millis(100)).await;
|
|
||||||
let mut mixed_audio_buffer = VecDeque::new();
|
|
||||||
for online_user in ONLINE_USERS.read().await.iter() {
|
|
||||||
let mut inner_buffer = vec![];
|
|
||||||
while let Some(audio_data) = online_user.audio_buffer.write().await.pop_front() {
|
|
||||||
inner_buffer.push(audio_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (i, audio_data) in inner_buffer.iter().enumerate() {
|
|
||||||
match mixed_audio_buffer.get(i) {
|
|
||||||
Some(original_value) => mixed_audio_buffer[i] = original_value + audio_data,
|
|
||||||
None => mixed_audio_buffer.push_back(*audio_data),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mixed_audio_buffer.truncate(mixed_audio_buffer.len() * 85 / 100);
|
|
||||||
while let Some(audio_data) = mixed_audio_buffer.pop_front() {
|
|
||||||
if let Err(err_val) = global_mixer_sender.send(audio_data) {
|
|
||||||
eprintln!("Error: Send Mixed Audio Data | Local | {}", err_val);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start(server_config: ServerConfig) {
|
pub async fn start(server_config: ServerConfig) {
|
||||||
let mut server = Server::builder()
|
let mut server = Server::builder()
|
||||||
|
@ -78,14 +70,14 @@ pub async fn start(server_config: ServerConfig) {
|
||||||
.start()
|
.start()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
tokio::spawn(global_mixer());
|
let mut user_id = 0;
|
||||||
|
|
||||||
while let Some(connection) = server.accept().await {
|
while let Some(connection) = server.accept().await {
|
||||||
tokio::spawn(handle_client(connection));
|
tokio::spawn(handle_client(user_id, connection));
|
||||||
|
user_id += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_client(mut connection: Connection) {
|
async fn handle_client(user_id: u32, mut connection: Connection) {
|
||||||
println!(
|
println!(
|
||||||
"Server Name = {}",
|
"Server Name = {}",
|
||||||
connection.server_name().unwrap().unwrap().to_string()
|
connection.server_name().unwrap().unwrap().to_string()
|
||||||
|
@ -103,25 +95,50 @@ async fn handle_client(mut connection: Connection) {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (receive_stream, send_stream) = stream.split();
|
let (receive_stream, send_stream) = stream.split();
|
||||||
|
|
||||||
let user_audio_buffer = User::new().await;
|
let user_audio_buffer = User::new(user_id).await;
|
||||||
|
|
||||||
tokio::spawn(receiver_audio_data(receive_stream, user_audio_buffer));
|
tokio::spawn(receiver_audio_data(receive_stream, user_audio_buffer));
|
||||||
tokio::spawn(send_audio_data(send_stream));
|
tokio::spawn(send_audio_data(user_id, send_stream));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_audio_data(mut send_stream: SendStream) {
|
async fn mixer(user_id: u32, latest_update: DateTime<Utc>) -> Option<[f32; BUFFER_LENGTH]> {
|
||||||
let mut global_mixer_receiver = GLOBAL_MIXER.subscribe();
|
let mut mixed_audio_buffer = [0.0 as f32; BUFFER_LENGTH];
|
||||||
|
|
||||||
loop {
|
for online_user in ONLINE_USERS.read().await.iter() {
|
||||||
match global_mixer_receiver.recv().await {
|
let online_user_timed_buffer = online_user.timed_buffer.read().await;
|
||||||
Ok(received_audio_data) => {
|
|
||||||
if let Err(err_val) = send_stream.write_f32(received_audio_data).await {
|
if online_user.user_id != user_id && online_user_timed_buffer.latest_update > latest_update
|
||||||
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
|
{
|
||||||
return;
|
for (i, audio_data) in online_user_timed_buffer.audio_buffer.iter().enumerate() {
|
||||||
}
|
mixed_audio_buffer[i] = mixed_audio_buffer[i] + audio_data;
|
||||||
}
|
}
|
||||||
Err(err_val) => {
|
}
|
||||||
eprintln!("Error: Receive Mixed Audio Data | Local | {}", err_val);
|
}
|
||||||
|
|
||||||
|
if 0.0 as f32 == mixed_audio_buffer.iter().sum() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(mixed_audio_buffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_audio_data(user_id: u32, mut send_stream: SendStream) {
|
||||||
|
let mut latest_update = Utc::now();
|
||||||
|
loop {
|
||||||
|
let mixed_audio_buffer = match mixer(user_id, latest_update).await {
|
||||||
|
Some(mixed_audio_buffer) => {
|
||||||
|
latest_update = Utc::now();
|
||||||
|
mixed_audio_buffer
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for mixed_audio_data in mixed_audio_buffer {
|
||||||
|
if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await {
|
||||||
|
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -130,17 +147,27 @@ async fn send_audio_data(mut send_stream: SendStream) {
|
||||||
|
|
||||||
async fn receiver_audio_data(
|
async fn receiver_audio_data(
|
||||||
mut receive_stream: ReceiveStream,
|
mut receive_stream: ReceiveStream,
|
||||||
audio_buffer: Arc<RwLock<VecDeque<f32>>>,
|
audio_buffer: Arc<RwLock<TimedBuffer>>,
|
||||||
) {
|
) {
|
||||||
|
let mut inner_buffer = [0.0 as f32; BUFFER_LENGTH];
|
||||||
|
let mut i = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match receive_stream.read_f32().await {
|
match receive_stream.read_f32().await {
|
||||||
Ok(received_audio_data) => {
|
Ok(received_audio_data) => {
|
||||||
audio_buffer.write().await.push_back(received_audio_data);
|
inner_buffer[i] = received_audio_data;
|
||||||
}
|
}
|
||||||
Err(err_val) => {
|
Err(err_val) => {
|
||||||
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);
|
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if i == BUFFER_LENGTH - 1 {
|
||||||
|
audio_buffer.write().await.update(inner_buffer);
|
||||||
|
i = 0;
|
||||||
|
} else {
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue