feat: ✨ talk locally and listen from server
This commit is contained in:
parent
d930888abb
commit
1451e9ccfc
9 changed files with 320 additions and 80 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -3465,6 +3465,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"chrono",
|
||||
"protocol",
|
||||
"s2n-quic",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
|
|
|
@ -1,14 +1,20 @@
|
|||
use std::sync::{Arc, RwLock};
|
||||
use std::{
|
||||
sync::{Arc, RwLock},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use iced::{
|
||||
Alignment::Center,
|
||||
Element, Task, Theme,
|
||||
widget::{button, column, row},
|
||||
};
|
||||
use protocol::BUFFER_LENGTH;
|
||||
use tokio::sync::{
|
||||
use protocol::{BUFFER_LENGTH, Error};
|
||||
use tokio::{
|
||||
sync::{
|
||||
broadcast::{self},
|
||||
oneshot,
|
||||
},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
use crate::{ClientConfig, stream::connect, voice::record};
|
||||
|
@ -20,16 +26,40 @@ struct Signal {
|
|||
connection: Option<oneshot::Sender<bool>>,
|
||||
}
|
||||
|
||||
impl Signal {
|
||||
fn reset_microphone(&mut self) -> Result<(), Error> {
|
||||
if let Some(microphone_signal) = &self.microphone {
|
||||
if !microphone_signal.is_closed() {
|
||||
self.microphone.take().expect("Never").send(true).unwrap();
|
||||
self.microphone = None;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(Error::Signal("Reset".to_string()))
|
||||
}
|
||||
|
||||
fn reset_connection(&mut self) -> Result<(), Error> {
|
||||
if let Some(connection_signal) = &self.connection {
|
||||
if !connection_signal.is_closed() {
|
||||
self.connection.take().expect("Never").send(true).unwrap();
|
||||
self.connection = None;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(Error::Signal("Reset".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Channel {
|
||||
microphone: broadcast::Sender<f32>,
|
||||
microphone: Arc<broadcast::Sender<f32>>,
|
||||
// speaker: (broadcast::Sender<f32>, broadcast::Receiver<f32>),
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
microphone: broadcast::channel(BUFFER_LENGTH).0,
|
||||
microphone: broadcast::channel(BUFFER_LENGTH).0.into(),
|
||||
// speaker: broadcast::channel(BUFFER_LENGTH),
|
||||
}
|
||||
}
|
||||
|
@ -71,6 +101,18 @@ pub struct App {
|
|||
}
|
||||
|
||||
impl App {
|
||||
fn reset_microphone(&mut self) -> Result<(), Error> {
|
||||
self.signal.reset_microphone()?;
|
||||
self.gui_status.write().unwrap().microphone = State::Passive;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reset_connection(&mut self) -> Result<(), Error> {
|
||||
self.signal.reset_connection()?;
|
||||
self.gui_status.write().unwrap().room = State::Passive;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn theme(&self) -> Theme {
|
||||
Theme::Dark
|
||||
}
|
||||
|
@ -110,35 +152,37 @@ impl App {
|
|||
self.gui_status.write().unwrap().room = State::Loading;
|
||||
let client_config = self.client_config.clone();
|
||||
let gui_status = self.gui_status.clone();
|
||||
let microphone_receiver = self.channel.microphone.subscribe();
|
||||
let microphone_sender_for_producing_receiver = self.channel.microphone.clone();
|
||||
|
||||
let connection_signal = oneshot::channel();
|
||||
self.signal.connection = Some(connection_signal.0);
|
||||
let is_connection_started_signal = oneshot::channel();
|
||||
tokio::spawn(connect(
|
||||
connection_signal.1,
|
||||
is_connection_started_signal.0,
|
||||
microphone_sender_for_producing_receiver,
|
||||
client_config,
|
||||
));
|
||||
|
||||
let is_connection_started_task = tokio::spawn(is_connection_started_signal.1);
|
||||
Task::perform(
|
||||
connect(connection_signal.1, microphone_receiver, client_config),
|
||||
move |result| match result {
|
||||
async move {
|
||||
match is_connection_started_task.await {
|
||||
Ok(_) => gui_status.write().unwrap().room = State::Active,
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: Join Room | {}", err_val);
|
||||
gui_status.write().unwrap().room = State::Passive;
|
||||
eprintln!("Error: Connection Task | {}", err_val);
|
||||
}
|
||||
}
|
||||
},
|
||||
|_| {},
|
||||
)
|
||||
.map(|_| Message::None)
|
||||
}
|
||||
Message::LeaveRoom => {
|
||||
self.gui_status.write().unwrap().room = State::Loading;
|
||||
if let Some(connection_signal) = &self.signal.connection {
|
||||
if !connection_signal.is_closed() {
|
||||
self.signal
|
||||
.connection
|
||||
.take()
|
||||
.expect("Never")
|
||||
.send(true)
|
||||
.unwrap();
|
||||
self.signal.connection = None;
|
||||
self.gui_status.write().unwrap().room = State::Passive;
|
||||
}
|
||||
if let Err(err_val) = self.reset_connection() {
|
||||
eprintln!("Error: Leave Room | {}", err_val);
|
||||
}
|
||||
Task::none()
|
||||
}
|
||||
|
@ -165,17 +209,8 @@ impl App {
|
|||
}
|
||||
Message::MuteMicrophone => {
|
||||
self.gui_status.write().unwrap().microphone = State::Loading;
|
||||
if let Some(microphone_signal) = &self.signal.microphone {
|
||||
if !microphone_signal.is_closed() {
|
||||
self.signal
|
||||
.microphone
|
||||
.take()
|
||||
.expect("Never")
|
||||
.send(true)
|
||||
.unwrap();
|
||||
self.signal.microphone = None;
|
||||
self.gui_status.write().unwrap().microphone = State::Passive;
|
||||
}
|
||||
if let Err(err_val) = self.reset_microphone() {
|
||||
eprintln!("Error: Mute Microphone | {}", err_val);
|
||||
}
|
||||
Task::none()
|
||||
}
|
||||
|
|
|
@ -1,17 +1,22 @@
|
|||
use std::{net::SocketAddr, path::Path, sync::Arc};
|
||||
|
||||
use protocol::{BUFFER_LENGTH, Error};
|
||||
use s2n_quic::{Client, client::Connect};
|
||||
use s2n_quic::{
|
||||
Client,
|
||||
client::Connect,
|
||||
stream::{ReceiveStream, SendStream},
|
||||
};
|
||||
use tokio::{
|
||||
io::AsyncReadExt,
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
sync::{broadcast, oneshot},
|
||||
};
|
||||
|
||||
use crate::{ClientConfig, voice::play};
|
||||
|
||||
pub async fn connect(
|
||||
connection_signal: oneshot::Receiver<bool>,
|
||||
mut microphone_receiver: broadcast::Receiver<f32>,
|
||||
connection_stop_signal_receiver: oneshot::Receiver<bool>,
|
||||
is_connection_started_signal: oneshot::Sender<bool>,
|
||||
microphone_sender_for_producing_receiver: Arc<broadcast::Sender<f32>>,
|
||||
client_config: Arc<ClientConfig>,
|
||||
) -> Result<(), Error> {
|
||||
let client = Client::builder()
|
||||
|
@ -45,40 +50,91 @@ pub async fn connect(
|
|||
.open_bidirectional_stream()
|
||||
.await
|
||||
.map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?;
|
||||
let (mut receive_stream, mut send_stream) = stream.split();
|
||||
let (receive_stream, send_stream) = stream.split();
|
||||
let (speaker_sender, speaker_receiver) = broadcast::channel(BUFFER_LENGTH);
|
||||
let (speaker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel();
|
||||
tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver));
|
||||
let receive_voice_data_task = tokio::spawn(receive_voice_data(receive_stream, speaker_sender));
|
||||
let send_voice_data_task = tokio::spawn(send_voice_data(
|
||||
send_stream,
|
||||
microphone_sender_for_producing_receiver,
|
||||
));
|
||||
|
||||
let (speaker_sender, speaker_receiver) = broadcast::channel::<f32>(BUFFER_LENGTH);
|
||||
|
||||
let (spearker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel::<bool>();
|
||||
|
||||
let play_task = tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver));
|
||||
let receive_task = tokio::spawn(async move {
|
||||
while let Ok(data) = receive_stream.read_f32_le().await {
|
||||
speaker_sender
|
||||
.send(data)
|
||||
.map_err(|err_val| Error::Signal(err_val.to_string()))
|
||||
.unwrap();
|
||||
if let Err(err_val) = is_connection_started_signal.send(true) {
|
||||
eprintln!(
|
||||
"Error: Is Connection Started | Local Channel | Send | {}",
|
||||
err_val
|
||||
);
|
||||
}
|
||||
});
|
||||
let send_task = tokio::spawn(async move {
|
||||
while let Ok(data) = microphone_receiver.recv().await {
|
||||
send_stream
|
||||
.send(data.to_le_bytes().to_vec().into())
|
||||
.await
|
||||
.map_err(|err_val| Error::Send(err_val.to_string()))
|
||||
.unwrap();
|
||||
}
|
||||
});
|
||||
if let Ok(_) = connection_signal.await {
|
||||
println!("Connection Closing");
|
||||
}
|
||||
spearker_stop_signal_sender
|
||||
.send(true)
|
||||
.map_err(|err_val| Error::Signal(err_val.to_string()))?;
|
||||
|
||||
send_task.abort();
|
||||
receive_task.abort();
|
||||
play_task.abort();
|
||||
if let Err(err_val) = connection_stop_signal_receiver.await {
|
||||
eprintln!(
|
||||
"Error: Connection Stop Signal | Local Channel | Receive | {}",
|
||||
err_val
|
||||
);
|
||||
}
|
||||
|
||||
if let Err(err_val) = speaker_stop_signal_sender.send(true) {
|
||||
eprintln!(
|
||||
"Error: Speaker Stop Signal | Local Channel | Send | {}",
|
||||
err_val
|
||||
);
|
||||
}
|
||||
|
||||
println!("Connection Is Closing");
|
||||
|
||||
receive_voice_data_task.abort();
|
||||
send_voice_data_task.abort();
|
||||
|
||||
println!("Connection Is Closed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_voice_data(
|
||||
mut send_stream: SendStream,
|
||||
microphone_sender_for_producing_receiver: Arc<broadcast::Sender<f32>>,
|
||||
) {
|
||||
let mut microphone_receiver = microphone_sender_for_producing_receiver.subscribe();
|
||||
loop {
|
||||
match microphone_receiver.recv().await {
|
||||
Ok(microphone_data) => {
|
||||
if let Err(err_val) = send_stream.write_f32(microphone_data).await {
|
||||
eprintln!("Error: Send Microphone Data | {}", err_val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!(
|
||||
"Error: Receive from Microphone | Local Channel | {}",
|
||||
err_val
|
||||
);
|
||||
match err_val {
|
||||
broadcast::error::RecvError::Closed => break,
|
||||
broadcast::error::RecvError::Lagged(_) => {
|
||||
microphone_receiver = microphone_receiver.resubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn receive_voice_data(
|
||||
mut receive_stream: ReceiveStream,
|
||||
speaker_sender: broadcast::Sender<f32>,
|
||||
) {
|
||||
loop {
|
||||
match receive_stream.read_f32().await {
|
||||
Ok(received_data) => {
|
||||
if let Err(err_val) = speaker_sender.send(received_data) {
|
||||
eprintln!("Error: Send to Speaker | Local Channel | {}", err_val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: Receive Data | {}", err_val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||
use tokio::sync::{broadcast, oneshot};
|
||||
|
||||
pub async fn record(
|
||||
microphone_sender: broadcast::Sender<f32>,
|
||||
microphone_sender: Arc<broadcast::Sender<f32>>,
|
||||
is_microphone_started_signal: oneshot::Sender<bool>,
|
||||
microphone_stop_signal: oneshot::Receiver<bool>,
|
||||
microphone_stop_signal_receiver: oneshot::Receiver<bool>,
|
||||
) {
|
||||
let host = cpal::default_host();
|
||||
let input_device = host.default_input_device().unwrap();
|
||||
|
@ -12,7 +14,7 @@ pub async fn record(
|
|||
|
||||
let input = move |data: &[f32], _: &cpal::InputCallbackInfo| {
|
||||
for &sample in data {
|
||||
if microphone_sender.receiver_count() > 0 {
|
||||
if microphone_sender.receiver_count() > 0 && sample != 0.0 {
|
||||
if let Err(err_val) = microphone_sender.send(sample) {
|
||||
eprintln!("Error: Microphone Send | {}", err_val);
|
||||
}
|
||||
|
@ -30,7 +32,12 @@ pub async fn record(
|
|||
}
|
||||
|
||||
tokio::task::block_in_place(|| {
|
||||
let _ = microphone_stop_signal.blocking_recv();
|
||||
if let Err(err_val) = microphone_stop_signal_receiver.blocking_recv() {
|
||||
eprintln!(
|
||||
"Error: Microphone Stop Signal | Local Channel | {}",
|
||||
err_val
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
input_stream.pause().unwrap();
|
||||
|
@ -38,7 +45,7 @@ pub async fn record(
|
|||
|
||||
pub async fn play(
|
||||
mut speaker_receiver: broadcast::Receiver<f32>,
|
||||
speaker_stop_signal: oneshot::Receiver<bool>,
|
||||
speaker_stop_signal_receiver: oneshot::Receiver<bool>,
|
||||
) {
|
||||
let host = cpal::default_host();
|
||||
let output_device = host.default_output_device().unwrap();
|
||||
|
@ -49,7 +56,19 @@ pub async fn play(
|
|||
if speaker_receiver.len() > 0 {
|
||||
match speaker_receiver.blocking_recv() {
|
||||
Ok(received_sample) => *sample = received_sample,
|
||||
Err(err_val) => eprintln!("Error: Speaker Receive | {}", err_val),
|
||||
Err(err_val) => match err_val {
|
||||
broadcast::error::RecvError::Closed => {
|
||||
eprintln!("Error: Speaker Receive | Local Channel | Channel Closed");
|
||||
return;
|
||||
}
|
||||
broadcast::error::RecvError::Lagged(lag_amount) => {
|
||||
eprintln!(
|
||||
"Error: Speaker Receive | Local Channel | Lagging by -> {}",
|
||||
lag_amount
|
||||
);
|
||||
speaker_receiver = speaker_receiver.resubscribe();
|
||||
}
|
||||
},
|
||||
}
|
||||
} else {
|
||||
*sample = 0.0;
|
||||
|
@ -65,7 +84,12 @@ pub async fn play(
|
|||
println!("Playing Started");
|
||||
|
||||
tokio::task::block_in_place(|| {
|
||||
let _ = speaker_stop_signal.blocking_recv();
|
||||
if let Err(err_val) = speaker_stop_signal_receiver.blocking_recv() {
|
||||
eprintln!(
|
||||
"Error: Speaker Stop Signal Receive | Local Channel | {}",
|
||||
err_val
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
output_stream.pause().unwrap();
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const BUFFER_LENGTH: usize = 1024;
|
||||
pub const BUFFER_LENGTH: usize = 1024 * 8;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum Error {
|
||||
|
|
|
@ -9,3 +9,4 @@ serde = { workspace = true }
|
|||
serde_json = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
s2n-quic = { workspace = true }
|
||||
|
|
|
@ -1,4 +1,18 @@
|
|||
pub mod stream;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ServerConfig {
|
||||
pub struct ServerConfig {
|
||||
certificate_path: String,
|
||||
certificate_key_path: String,
|
||||
server_address: String,
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
certificate_path: "./server/certificates/cert.pem".to_string(),
|
||||
certificate_key_path: "./server/certificates/key.pem".to_string(),
|
||||
server_address: "127.0.0.1:4546".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
use server::{ServerConfig, stream::start};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
println!("Hello, world!");
|
||||
let server_config = ServerConfig::new();
|
||||
start(server_config).await;
|
||||
}
|
||||
|
|
105
server/src/stream.rs
Normal file
105
server/src/stream.rs
Normal file
|
@ -0,0 +1,105 @@
|
|||
use std::{path::Path, sync::LazyLock};
|
||||
|
||||
use protocol::BUFFER_LENGTH;
|
||||
use s2n_quic::{
|
||||
Connection, Server,
|
||||
stream::{ReceiveStream, SendStream},
|
||||
};
|
||||
use tokio::{
|
||||
io::AsyncReadExt,
|
||||
sync::{RwLock, broadcast},
|
||||
};
|
||||
|
||||
use crate::ServerConfig;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct User {
|
||||
sender_to_user: broadcast::Sender<f32>,
|
||||
}
|
||||
impl User {
|
||||
fn new() -> (Self, broadcast::Receiver<f32>) {
|
||||
let (sender, receiver) = broadcast::channel(BUFFER_LENGTH);
|
||||
let user = Self {
|
||||
sender_to_user: sender,
|
||||
};
|
||||
(user, receiver)
|
||||
}
|
||||
}
|
||||
|
||||
static ONLINE_USERS: LazyLock<RwLock<Vec<User>>> = LazyLock::new(|| RwLock::new(vec![]));
|
||||
|
||||
pub async fn start(server_config: ServerConfig) {
|
||||
let mut server = Server::builder()
|
||||
.with_io(server_config.server_address.as_str())
|
||||
.unwrap()
|
||||
.with_tls((
|
||||
Path::new(&server_config.certificate_path),
|
||||
Path::new(&server_config.certificate_key_path),
|
||||
))
|
||||
.unwrap()
|
||||
.start()
|
||||
.unwrap();
|
||||
|
||||
while let Some(connection) = server.accept().await {
|
||||
tokio::spawn(handle_client(connection));
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_client(mut connection: Connection) {
|
||||
println!(
|
||||
"Server Name = {}",
|
||||
connection.server_name().unwrap().unwrap().to_string()
|
||||
);
|
||||
|
||||
println!(
|
||||
"Client Address = {}",
|
||||
connection.remote_addr().unwrap().to_string()
|
||||
);
|
||||
|
||||
let stream = connection
|
||||
.accept_bidirectional_stream()
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let (receive_stream, send_stream) = stream.split();
|
||||
let (user, receiver) = User::new();
|
||||
ONLINE_USERS.write().await.push(user);
|
||||
tokio::spawn(receive_sound_data(receive_stream));
|
||||
tokio::spawn(send_sound_data(receiver, send_stream));
|
||||
}
|
||||
|
||||
async fn send_sound_data(mut receiver: broadcast::Receiver<f32>, mut send_stream: SendStream) {
|
||||
loop {
|
||||
match receiver.recv().await {
|
||||
Ok(received_data) => {
|
||||
if let Err(err_val) = send_stream
|
||||
.send(received_data.to_be_bytes().to_vec().into())
|
||||
.await
|
||||
{
|
||||
eprintln!("Error: Send Sound Data | {}", err_val);
|
||||
}
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: Receive Sound Data | Local Channel | {}", err_val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn receive_sound_data(mut receive_stream: ReceiveStream) {
|
||||
loop {
|
||||
match receive_stream.read_f32().await {
|
||||
Ok(received_data) => {
|
||||
for online_user in ONLINE_USERS.read().await.iter() {
|
||||
if let Err(err_val) = online_user.sender_to_user.send(received_data) {
|
||||
eprintln!("Error: Send Sound Data to All | {}", err_val);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: Receive Sound Data | {} ", err_val);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue