use std::{net::SocketAddr, path::Path, sync::Arc}; use protocol::{BUFFER_LENGTH, Error}; use s2n_quic::{Client, client::Connect}; use tokio::{ io::AsyncReadExt, sync::{broadcast, oneshot}, }; use crate::{ClientConfig, voice::play}; pub async fn connect( connection_signal: oneshot::Receiver, mut microphone_receiver: broadcast::Receiver, client_config: Arc, ) -> Result<(), Error> { let client = Client::builder() .with_io("0:0") .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))? .with_tls(Path::new(&client_config.certificate_path)) .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))? .start() .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?; println!("Client Address = {}", client.local_addr().unwrap()); let connect = Connect::new( client_config .server_address .parse::() .map_err(|inner| Error::Connection(inner.to_string()))?, ) .with_server_name("localhost"); let mut connection = match client.connect(connect).await { Ok(connection) => connection, Err(err_val) => { eprintln!("Error: Client Connection | {}", err_val); return Err(Error::Connection(err_val.to_string())); } }; connection .keep_alive(true) .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?; let stream = connection .open_bidirectional_stream() .await .map_err(|err_val| Error::ConnectionSetup(err_val.to_string()))?; let (mut receive_stream, mut send_stream) = stream.split(); let (speaker_sender, speaker_receiver) = broadcast::channel::(BUFFER_LENGTH); let (spearker_stop_signal_sender, speaker_stop_signal_receiver) = oneshot::channel::(); let play_task = tokio::spawn(play(speaker_receiver, speaker_stop_signal_receiver)); let receive_task = tokio::spawn(async move { while let Ok(data) = receive_stream.read_f32_le().await { speaker_sender .send(data) .map_err(|err_val| Error::Signal(err_val.to_string())) .unwrap(); } }); let send_task = tokio::spawn(async move { while let Ok(data) = microphone_receiver.recv().await { send_stream .send(data.to_le_bytes().to_vec().into()) .await .map_err(|err_val| Error::Send(err_val.to_string())) .unwrap(); } }); if let Ok(_) = connection_signal.await { println!("Connection Closing"); } spearker_stop_signal_sender .send(true) .map_err(|err_val| Error::Signal(err_val.to_string()))?; send_task.abort(); receive_task.abort(); play_task.abort(); Ok(()) }