feat: bidirectional voice transmission

This commit is contained in:
Ahmet Kaan Gümüş 2025-03-04 18:10:18 +03:00
parent c280c5226c
commit 9da3fd6998
3 changed files with 61 additions and 31 deletions

View file

@ -1,8 +1,8 @@
use std::{net::SocketAddr, path::Path}; use std::{io, net::SocketAddr, path::Path};
use s2n_quic::{Client, client::Connect}; use s2n_quic::{Client, client::Connect};
use tokio::{ use tokio::{
io::AsyncWriteExt, io::AsyncReadExt,
sync::{broadcast, oneshot}, sync::{broadcast, oneshot},
}; };
@ -32,26 +32,33 @@ pub async fn run<'a>(client_config: ClientConfig<'a>) {
}; };
connection.keep_alive(true).unwrap(); connection.keep_alive(true).unwrap();
let (microphone_sender, mut microphone_receiver) = broadcast::channel::<f32>(BUFFER_LENGTH);
let (stop_signal_sender, stop_signal_receiver) = oneshot::channel::<bool>();
let stream = connection.open_bidirectional_stream().await.unwrap(); let stream = connection.open_bidirectional_stream().await.unwrap();
let (mut receive_stream, mut send_stream) = stream.split();
tokio::spawn(record(microphone_sender, stop_signal_receiver)); let (microphone_sender, mut microphone_receiver) = broadcast::channel::<f32>(BUFFER_LENGTH);
let (speaker_sender, speaker_receiver) = broadcast::channel::<f32>(BUFFER_LENGTH);
let (receive_stream, mut send_stream) = stream.split(); let (microphone_stop_signal_sender, microphone_stop_signal_receiver) =
while let Ok(sample) = microphone_receiver.recv().await { oneshot::channel::<bool>();
send_stream let (spearker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel::<bool>();
.send(sample.to_le_bytes().to_vec().into())
.await
.unwrap();
}
if let Err(err_val) = send_stream.close().await { tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver));
eprintln!("Error: Stream Close | {}", err_val); tokio::spawn(record(microphone_sender, microphone_stop_signal_receiver));
} tokio::spawn(async move {
while let Ok(data) = receive_stream.read_f32_le().await {
if let Err(err_val) = send_stream.shutdown().await { speaker_sender.send(data).unwrap();
eprintln!("Error: Stream Shutdown| {}", err_val); }
} });
tokio::spawn(async move {
while let Ok(data) = microphone_receiver.recv().await {
send_stream
.send(data.to_le_bytes().to_vec().into())
.await
.unwrap();
}
});
let mut read_buffer = String::default();
io::stdin().read_line(&mut read_buffer).unwrap();
microphone_stop_signal_sender.send(true).unwrap();
spearker_stop_signal_sender.send(true).unwrap();
} }

View file

@ -1,12 +1,15 @@
use std::path::Path; use std::{io, path::Path};
use s2n_quic::Server; use s2n_quic::Server;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::AsyncReadExt,
sync::{broadcast, oneshot}, sync::{broadcast, oneshot},
}; };
use crate::{BUFFER_LENGTH, ServerConfig, voice::play}; use crate::{
BUFFER_LENGTH, ServerConfig,
voice::{play, record},
};
pub async fn run<'a>(server_config: ServerConfig<'a>) { pub async fn run<'a>(server_config: ServerConfig<'a>) {
let mut server = Server::builder() let mut server = Server::builder()
@ -29,22 +32,40 @@ pub async fn run<'a>(server_config: ServerConfig<'a>) {
"Client Address = {}", "Client Address = {}",
connection.remote_addr().unwrap().to_string() connection.remote_addr().unwrap().to_string()
); );
let (speaker_sender, speaker_receiver) = broadcast::channel::<f32>(BUFFER_LENGTH);
let (stop_signal_sender, stop_signal_receiver) = oneshot::channel::<bool>();
let stream = connection let stream = connection
.accept_bidirectional_stream() .accept_bidirectional_stream()
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
let (mut receive_stream, mut send_stream) = stream.split();
tokio::spawn(play(speaker_receiver, stop_signal_receiver)); let (microphone_sender, mut microphone_receiver) = broadcast::channel::<f32>(BUFFER_LENGTH);
let (speaker_sender, speaker_receiver) = broadcast::channel::<f32>(BUFFER_LENGTH);
let (mut receive_stream, send_stream) = stream.split(); let (microphone_stop_signal_sender, microphone_stop_signal_receiver) =
while let Ok(data) = receive_stream.read_f32_le().await { oneshot::channel::<bool>();
speaker_sender.send(data).unwrap(); let (spearker_stop_signal_sender, speaker_stop_signal_receiver) =
} oneshot::channel::<bool>();
return; tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver));
tokio::spawn(record(microphone_sender, microphone_stop_signal_receiver));
tokio::spawn(async move {
while let Ok(data) = receive_stream.read_f32_le().await {
speaker_sender.send(data).unwrap();
}
});
tokio::spawn(async move {
while let Ok(data) = microphone_receiver.recv().await {
send_stream
.send(data.to_le_bytes().to_vec().into())
.await
.unwrap();
}
});
let mut read_buffer = String::default();
io::stdin().read_line(&mut read_buffer).unwrap();
microphone_stop_signal_sender.send(true).unwrap();
spearker_stop_signal_sender.send(true).unwrap();
} }
} }

View file

@ -47,6 +47,8 @@ pub async fn play(
Ok(received_sample) => *sample = received_sample, Ok(received_sample) => *sample = received_sample,
Err(err_val) => eprintln!("Error: Speaker Receive | {}", err_val), Err(err_val) => eprintln!("Error: Speaker Receive | {}", err_val),
} }
} else {
*sample = 0.0;
} }
} }
}; };