From 9da3fd6998f691a0a7bcebd4ef23964dc9efc7e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Tue, 4 Mar 2025 18:10:18 +0300 Subject: [PATCH] feat: :sparkles: bidirectional voice transmission --- src/client.rs | 47 +++++++++++++++++++++++++++-------------------- src/server.rs | 43 ++++++++++++++++++++++++++++++++----------- src/voice.rs | 2 ++ 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/src/client.rs b/src/client.rs index cc04841..c321e62 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,8 +1,8 @@ -use std::{net::SocketAddr, path::Path}; +use std::{io, net::SocketAddr, path::Path}; use s2n_quic::{Client, client::Connect}; use tokio::{ - io::AsyncWriteExt, + io::AsyncReadExt, sync::{broadcast, oneshot}, }; @@ -32,26 +32,33 @@ pub async fn run<'a>(client_config: ClientConfig<'a>) { }; connection.keep_alive(true).unwrap(); - let (microphone_sender, mut microphone_receiver) = broadcast::channel::(BUFFER_LENGTH); - let (stop_signal_sender, stop_signal_receiver) = oneshot::channel::(); - let stream = connection.open_bidirectional_stream().await.unwrap(); + let (mut receive_stream, mut send_stream) = stream.split(); - tokio::spawn(record(microphone_sender, stop_signal_receiver)); + let (microphone_sender, mut microphone_receiver) = broadcast::channel::(BUFFER_LENGTH); + let (speaker_sender, speaker_receiver) = broadcast::channel::(BUFFER_LENGTH); - let (receive_stream, mut send_stream) = stream.split(); - while let Ok(sample) = microphone_receiver.recv().await { - send_stream - .send(sample.to_le_bytes().to_vec().into()) - .await - .unwrap(); - } + let (microphone_stop_signal_sender, microphone_stop_signal_receiver) = + oneshot::channel::(); + let (spearker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel::(); - if let Err(err_val) = send_stream.close().await { - eprintln!("Error: Stream Close | {}", err_val); - } - - if let Err(err_val) = send_stream.shutdown().await { - eprintln!("Error: Stream Shutdown| {}", err_val); - } + tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver)); + tokio::spawn(record(microphone_sender, microphone_stop_signal_receiver)); + tokio::spawn(async move { + while let Ok(data) = receive_stream.read_f32_le().await { + speaker_sender.send(data).unwrap(); + } + }); + tokio::spawn(async move { + while let Ok(data) = microphone_receiver.recv().await { + send_stream + .send(data.to_le_bytes().to_vec().into()) + .await + .unwrap(); + } + }); + let mut read_buffer = String::default(); + io::stdin().read_line(&mut read_buffer).unwrap(); + microphone_stop_signal_sender.send(true).unwrap(); + spearker_stop_signal_sender.send(true).unwrap(); } diff --git a/src/server.rs b/src/server.rs index e11afe4..06c21bf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,12 +1,15 @@ -use std::path::Path; +use std::{io, path::Path}; use s2n_quic::Server; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + io::AsyncReadExt, sync::{broadcast, oneshot}, }; -use crate::{BUFFER_LENGTH, ServerConfig, voice::play}; +use crate::{ + BUFFER_LENGTH, ServerConfig, + voice::{play, record}, +}; pub async fn run<'a>(server_config: ServerConfig<'a>) { let mut server = Server::builder() @@ -29,22 +32,40 @@ pub async fn run<'a>(server_config: ServerConfig<'a>) { "Client Address = {}", connection.remote_addr().unwrap().to_string() ); - let (speaker_sender, speaker_receiver) = broadcast::channel::(BUFFER_LENGTH); - let (stop_signal_sender, stop_signal_receiver) = oneshot::channel::(); let stream = connection .accept_bidirectional_stream() .await .unwrap() .unwrap(); + let (mut receive_stream, mut send_stream) = stream.split(); - tokio::spawn(play(speaker_receiver, stop_signal_receiver)); + let (microphone_sender, mut microphone_receiver) = broadcast::channel::(BUFFER_LENGTH); + let (speaker_sender, speaker_receiver) = broadcast::channel::(BUFFER_LENGTH); - let (mut receive_stream, send_stream) = stream.split(); - while let Ok(data) = receive_stream.read_f32_le().await { - speaker_sender.send(data).unwrap(); - } + let (microphone_stop_signal_sender, microphone_stop_signal_receiver) = + oneshot::channel::(); + let (spearker_stop_signal_sender, speaker_stop_signal_receiver) = + oneshot::channel::(); - return; + tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver)); + tokio::spawn(record(microphone_sender, microphone_stop_signal_receiver)); + tokio::spawn(async move { + while let Ok(data) = receive_stream.read_f32_le().await { + speaker_sender.send(data).unwrap(); + } + }); + tokio::spawn(async move { + while let Ok(data) = microphone_receiver.recv().await { + send_stream + .send(data.to_le_bytes().to_vec().into()) + .await + .unwrap(); + } + }); + let mut read_buffer = String::default(); + io::stdin().read_line(&mut read_buffer).unwrap(); + microphone_stop_signal_sender.send(true).unwrap(); + spearker_stop_signal_sender.send(true).unwrap(); } } diff --git a/src/voice.rs b/src/voice.rs index 50f805b..592585a 100644 --- a/src/voice.rs +++ b/src/voice.rs @@ -47,6 +47,8 @@ pub async fn play( Ok(received_sample) => *sample = received_sample, Err(err_val) => eprintln!("Error: Speaker Receive | {}", err_val), } + } else { + *sample = 0.0; } } };