feat: remote mixer part 3

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-20 00:59:34 +03:00
parent 0c187b166f
commit 50917c09d5
2 changed files with 175 additions and 140 deletions

View file

@ -54,31 +54,25 @@ pub async fn connect(
let (speaker_sender, speaker_receiver) = broadcast::channel(BUFFER_LENGTH);
let (speaker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel();
tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver));
let receive_voice_data_task = tokio::spawn(receive_voice_data(receive_stream, speaker_sender));
let send_voice_data_task = tokio::spawn(send_voice_data(
let receive_voice_data_task = tokio::spawn(receive_audio_data(receive_stream, speaker_sender));
let send_voice_data_task = tokio::spawn(send_audio_data(
send_stream,
microphone_sender_for_producing_receiver,
));
if let Err(err_val) = is_connection_started_signal.send(true) {
eprintln!(
"Error: Is Connection Started | Local Channel | Send | {}",
err_val
);
eprintln!("Error: Is Connection Started | Local | Send | {}", err_val);
}
if let Err(err_val) = connection_stop_signal_receiver.await {
eprintln!(
"Error: Connection Stop Signal | Local Channel | Receive | {}",
"Error: Connection Stop Signal | Local | Receive | {}",
err_val
);
}
if let Err(err_val) = speaker_stop_signal_sender.send(true) {
eprintln!(
"Error: Speaker Stop Signal | Local Channel | Send | {}",
err_val
);
eprintln!("Error: Speaker Stop Signal | Local | Send | {}", err_val);
}
println!("Connection Is Closing");
@ -91,7 +85,7 @@ pub async fn connect(
Ok(())
}
async fn send_voice_data(
async fn send_audio_data(
mut send_stream: SendStream,
microphone_sender_for_producing_receiver: Arc<broadcast::Sender<f32>>,
) {
@ -100,15 +94,12 @@ async fn send_voice_data(
match microphone_receiver.recv().await {
Ok(microphone_data) => {
if let Err(err_val) = send_stream.write_f32(microphone_data).await {
eprintln!("Error: Send Microphone Data | {}", err_val);
eprintln!("Error: Send Microphone Data | Remote | {}", err_val);
break;
}
}
Err(err_val) => {
eprintln!(
"Error: Receive from Microphone | Local Channel | {}",
err_val
);
eprintln!("Error: Receive from Microphone | Local | {}", err_val);
match err_val {
broadcast::error::RecvError::Closed => break,
broadcast::error::RecvError::Lagged(_) => {
@ -119,7 +110,7 @@ async fn send_voice_data(
}
}
}
async fn receive_voice_data(
async fn receive_audio_data(
mut receive_stream: ReceiveStream,
speaker_sender: broadcast::Sender<f32>,
) {
@ -127,12 +118,12 @@ async fn receive_voice_data(
match receive_stream.read_f32().await {
Ok(received_data) => {
if let Err(err_val) = speaker_sender.send(received_data) {
eprintln!("Error: Send to Speaker | Local Channel | {}", err_val);
eprintln!("Error: Send to Speaker | Local | {}", err_val);
break;
}
}
Err(err_val) => {
eprintln!("Error: Receive Data | {}", err_val);
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);
break;
}
}

View file

@ -1,7 +1,10 @@
use std::{path::Path, process::exit, sync::LazyLock, time::Duration};
use std::{
collections::VecDeque,
path::Path,
sync::{Arc, LazyLock},
time::Duration,
};
use chrono::Utc;
use protocol::BUFFER_LENGTH;
use s2n_quic::{
Connection, Server,
stream::{ReceiveStream, SendStream},
@ -13,36 +16,28 @@ use tokio::{
};
use crate::ServerConfig;
const LOCAL_AUDIO_BUFFERING_TIMEOUT: Duration = Duration::from_millis(10);
const BUFFER_LENGTH: usize = protocol::BUFFER_LENGTH;
#[derive(Debug)]
struct User {
user_id: u64,
local_audio_data_sender: broadcast::Sender<UseredAudioData>,
sender: broadcast::Sender<f32>,
}
static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| RwLock::new(vec![]));
#[derive(Debug, Clone, Copy)]
struct UseredAudioData {
user_id: u64,
audio_data: f32,
}
impl User {
async fn new(user_id: u64) -> broadcast::Receiver<UseredAudioData> {
let (local_audio_data_sender, local_audio_data_receiver) =
broadcast::channel(BUFFER_LENGTH * 16);
let new_user = User {
user_id,
local_audio_data_sender,
async fn new() -> broadcast::Sender<f32> {
let (sender, _) = broadcast::channel(BUFFER_LENGTH);
let new_user = Self {
sender: sender.clone(),
};
ONLINE_USERS.write().await.push(new_user);
local_audio_data_receiver
if let Err(err_val) = NEW_USER_NOTIFIER.send(sender.clone()) {
eprintln!("Error: Send New User | Local | {}", err_val);
}
sender.clone()
}
}
static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| vec![].into());
static NEW_USER_NOTIFIER: LazyLock<broadcast::Sender<broadcast::Sender<f32>>> =
LazyLock::new(|| broadcast::channel(BUFFER_LENGTH).0);
pub async fn start(server_config: ServerConfig) {
let mut server = Server::builder()
@ -55,14 +50,13 @@ pub async fn start(server_config: ServerConfig) {
.unwrap()
.start()
.unwrap();
let mut user_id = 0;
while let Some(connection) = server.accept().await {
tokio::spawn(handle_client(user_id, connection));
user_id += 1;
tokio::spawn(handle_client(connection));
}
}
async fn handle_client(user_id: u64, mut connection: Connection) {
async fn handle_client(mut connection: Connection) {
println!(
"Server Name = {}",
connection.server_name().unwrap().unwrap().to_string()
@ -79,108 +73,158 @@ async fn handle_client(user_id: u64, mut connection: Connection) {
.unwrap()
.unwrap();
let (receive_stream, send_stream) = stream.split();
let local_audio_receiver = User::new(user_id).await;
tokio::spawn(receive_audio_stream(user_id, receive_stream));
tokio::spawn(send_audio_stream(
user_id,
send_stream,
local_audio_receiver,
));
let new_user_sender = User::new().await;
tokio::spawn(receiver_audio_data(receive_stream, new_user_sender));
tokio::spawn(send_audio_data(send_stream));
}
async fn receive_audio_stream(user_id: u64, mut receive_stream: ReceiveStream) {
async fn mixer_with_buffer(
unmixed_audio_data: Arc<RwLock<VecDeque<Arc<RwLock<VecDeque<f32>>>>>>,
) -> Vec<f32> {
let mut mixed_audio_buffer = vec![];
for audio_buffer in unmixed_audio_data.read().await.iter() {
let mut inner_audio_buffer = vec![];
while let Some(audio_data) = audio_buffer.write().await.pop_front() {
inner_audio_buffer.push(audio_data);
}
for (i, audio_data) in inner_audio_buffer.iter().enumerate() {
match mixed_audio_buffer.get(i) {
Some(current_audio_data) => mixed_audio_buffer[i] = current_audio_data + audio_data,
None => mixed_audio_buffer.push(*audio_data),
}
}
}
mixed_audio_buffer
}
// async fn mixer_with_channel(
// unmixed_audio_data: Arc<RwLock<VecDeque<Arc<RwLock<VecDeque<f32>>>>>>,
// sender: broadcast::Sender<f32>,
// ) {
// loop {
// sleep(Duration::from_nanos(10)).await;
// let mut mixed_audio_buffer = vec![];
// for audio_buffer in unmixed_audio_data.read().await.iter() {
// let mut inner_audio_buffer = vec![];
//
// while let Some(audio_data) = audio_buffer.write().await.pop_front() {
// inner_audio_buffer.push(audio_data);
// }
//
// for (i, audio_data) in inner_audio_buffer.iter().enumerate() {
// match mixed_audio_buffer.get(i) {
// Some(current_audio_data) => {
// mixed_audio_buffer[i] = current_audio_data + audio_data
// }
// None => mixed_audio_buffer.push(*audio_data),
// }
// }
// }
// for audio_data in mixed_audio_buffer {
// if let Err(err_val) = sender.send(audio_data) {
// eprintln!("Error: Send Mixed Audio | Local | {}", err_val);
// return;
// }
// }
// }
// }
async fn new_user_handler(
mut receiver: broadcast::Receiver<f32>,
unmixed_audio_data: Arc<RwLock<VecDeque<Arc<RwLock<VecDeque<f32>>>>>>,
) {
let local_audio_buffer = Arc::new(RwLock::new(VecDeque::new()));
unmixed_audio_data
.write()
.await
.push_back(local_audio_buffer.clone());
loop {
match receiver.recv().await {
Ok(received_audio_data) => {
local_audio_buffer
.write()
.await
.push_back(received_audio_data);
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Local | {}", err_val);
return;
}
}
}
}
async fn send_audio_data(mut send_stream: SendStream) {
let unmixed_audio_data = Arc::new(RwLock::new(VecDeque::new()));
for online_user in ONLINE_USERS.read().await.iter() {
let receiver = online_user.sender.subscribe();
let unmixed_audio_data = unmixed_audio_data.clone();
tokio::spawn(new_user_handler(receiver, unmixed_audio_data));
}
let unmixed_audio_data_for_handling_new_user = unmixed_audio_data.clone();
tokio::spawn(async move {
loop {
match NEW_USER_NOTIFIER.subscribe().recv().await {
Ok(new_user_sender) => {
let receiver = new_user_sender.subscribe();
let unmixed_audio_data = unmixed_audio_data_for_handling_new_user.clone();
tokio::spawn(new_user_handler(receiver, unmixed_audio_data));
}
Err(err_val) => {
eprintln!("Error: Receive New User | Local | {}", err_val);
return;
}
}
}
});
// let (mixer_sender, mut mixer_receiver) = broadcast::channel(BUFFER_LENGTH);
// tokio::spawn(mixer_with_channel(unmixed_audio_data, mixer_sender));
// loop {
// match mixer_receiver.recv().await {
// Ok(mixed_audio_data) => {
// if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await {
// eprintln!("Error: Send Audio Data | Remote | {}", err_val);
// return;
// }
// }
// Err(err_val) => {
// eprintln!("Error: Receive Mixed Audio Data | Local | {}", err_val);
// return;
// }
// }
// }
loop {
sleep(Duration::from_millis(50)).await;
let mixed_audio_buffer = mixer_with_buffer(unmixed_audio_data.clone()).await;
for mixed_audio_data in mixed_audio_buffer {
if let Err(err_val) = send_stream.write_f32(mixed_audio_data).await {
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
return;
}
}
}
}
async fn receiver_audio_data(mut receive_stream: ReceiveStream, sender: broadcast::Sender<f32>) {
loop {
match receive_stream.read_f32().await {
Ok(received_audio_data) => {
for online_user in ONLINE_USERS.read().await.iter() {
let usered_audio_data = UseredAudioData {
user_id,
audio_data: received_audio_data,
};
if let Err(err_val) =
online_user.local_audio_data_sender.send(usered_audio_data)
{
if let Err(err_val) = sender.send(received_audio_data) {
eprintln!("Error: Send Audio Data | Local | {}", err_val);
exit(0);
}
return;
}
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Remote | {}", err_val);
break;
return;
}
}
}
}
async fn send_audio_stream(
user_id: u64,
mut send_stream: SendStream,
mut local_audio_receiver: broadcast::Receiver<UseredAudioData>,
) {
let mut usered_audio_buffer = vec![];
let mut time = Utc::now();
loop {
if time + LOCAL_AUDIO_BUFFERING_TIMEOUT > Utc::now() {
match local_audio_receiver.recv().await {
Ok(usered_audio_data) => {
usered_audio_buffer.push(usered_audio_data);
}
Err(err_val) => {
eprintln!("Error: Receive Audio Data | Local | {}", err_val);
break;
}
}
} else {
let mixed_buffer = mixer(user_id, &mut usered_audio_buffer).await;
for mixed_buffer_data in mixed_buffer {
if let Err(err_val) = send_stream.write_f32(mixed_buffer_data).await {
eprintln!("Error: Send Audio Data | Remote | {}", err_val);
}
}
usered_audio_buffer.clear();
time = Utc::now();
}
}
}
async fn mixer(user_id: u64, usered_audio_buffer: &mut Vec<UseredAudioData>) -> Vec<f32> {
usered_audio_buffer.sort_by(|first, second| first.user_id.cmp(&second.user_id));
let mut parsed_audio_data = vec![];
let mut inner = vec![];
let mut current_user = user_id;
while let Some(usered_audio) = usered_audio_buffer.pop() {
if usered_audio.user_id == user_id {
continue;
} else {
if current_user == usered_audio.user_id {
inner.push(usered_audio.audio_data);
} else {
parsed_audio_data.push(inner.clone());
inner.clear();
current_user = usered_audio.user_id;
}
}
}
parsed_audio_data.push(inner.clone());
let mut mixed_buffer = vec![];
for parsed_audio_single_user_buffer in parsed_audio_data.iter() {
for (i, parsed_audio_single_user_buffer_data) in
parsed_audio_single_user_buffer.iter().enumerate()
{
match mixed_buffer.get(i) {
Some(current_data) => {
mixed_buffer[i] = current_data + parsed_audio_single_user_buffer_data
}
None => mixed_buffer.push(*parsed_audio_single_user_buffer_data),
}
}
}
mixed_buffer
}