From 56d17084e15d3b06649873e717edb30400198c48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Thu, 25 Apr 2024 04:19:03 +0300 Subject: [PATCH] feat: :sparkles: always ready for a streamer at relay --- back/src/streaming.rs | 267 ++++++++++++++++++++++++----------------- front/Cargo.toml | 2 - front/src/streaming.rs | 2 +- 3 files changed, 159 insertions(+), 112 deletions(-) diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 565bd0a..fb47532 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -18,21 +18,13 @@ use tokio_rustls::{ rustls::pki_types::{CertificateDer, PrivateKeyDer}, TlsAcceptor, }; -use tokio_tungstenite::tungstenite::{Error, Message}; +use tokio_tungstenite::tungstenite::{util::NonBlockingResult, Error, Message}; use crate::{Config, Listener, Streamer}; const BUFFER_LENGTH: usize = 1000000; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start(relay_configs: Config) { - //need to move them for multi streamer - let listener_socket = TcpListener::bind(relay_configs.listener_address) - .await - .unwrap(); - let (record_producer, record_consumer) = channel(BUFFER_LENGTH); - let streamer_socket = TcpListener::bind(relay_configs.streamer_address) - .await - .unwrap(); let timer = Instant::now(); let fullchain: io::Result>> = certs(&mut BufReader::new( @@ -53,114 +45,159 @@ pub async fn start(relay_configs: Config) { .with_single_cert(fullchain, privkey) .unwrap(); let acceptor = TlsAcceptor::from(Arc::new(server_tls_config)); - - let (streamer_alive_producer, streamer_alive_receiver) = tokio::sync::oneshot::channel(); - let message_organizer_task: Option>; - let buffer_layer_task: Option>; - let (listener_stream_tasks_producer, listener_stream_tasks_receiver) = - tokio::sync::mpsc::channel(BUFFER_LENGTH); - let mut new_streamer = Streamer { - ip: "127.0.0.1".to_string().parse().unwrap(), - port: 0000, - }; loop { - match streamer_socket.accept().await { - Ok((streamer_tcp, streamer_info)) => { - new_streamer.ip = streamer_info.ip(); - new_streamer.port = streamer_info.port(); - println!( - "New Streamer: {:#?} | {:#?}", - streamer_info, - timer.elapsed() - ); - if relay_configs.tls { - let streamer_tcp_tls = acceptor.accept(streamer_tcp).await.unwrap(); - match tokio_tungstenite::accept_async(streamer_tcp_tls).await { - Ok(ws_stream) => { - tokio::spawn(streamer_stream( - new_streamer.clone(), - record_producer, - ws_stream, - timer, - streamer_alive_producer, - )); - break; + //need to move them for multi streamer + let listener_socket = TcpListener::bind(relay_configs.listener_address.clone()) + .await + .unwrap(); + let (record_producer, record_consumer) = channel(BUFFER_LENGTH); + let streamer_socket = TcpListener::bind(relay_configs.streamer_address.clone()) + .await + .unwrap(); + + let (streamer_alive_producer, streamer_alive_receiver) = tokio::sync::oneshot::channel(); + let message_organizer_task: Option>; + let buffer_layer_task: Option>; + let (listener_stream_tasks_producer, listener_stream_tasks_receiver) = + tokio::sync::mpsc::channel(BUFFER_LENGTH); + let mut new_streamer = Streamer { + ip: "127.0.0.1".to_string().parse().unwrap(), + port: 0000, + }; + loop { + match streamer_socket.accept().await { + Ok((streamer_tcp, streamer_info)) => { + new_streamer.ip = streamer_info.ip(); + new_streamer.port = streamer_info.port(); + println!( + "New Streamer: {:#?} | {:#?}", + streamer_info, + timer.elapsed() + ); + if relay_configs.tls { + let streamer_tcp_tls = acceptor.accept(streamer_tcp).await.unwrap(); + match tokio_tungstenite::accept_async(streamer_tcp_tls).await { + Ok(ws_stream) => { + tokio::spawn(streamer_stream( + new_streamer.clone(), + record_producer, + ws_stream, + timer, + streamer_alive_producer, + )); + break; + } + Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), } - Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), - } - } else { - match tokio_tungstenite::accept_async(streamer_tcp).await { - Ok(ws_stream) => { - tokio::spawn(streamer_stream( - new_streamer.clone(), - record_producer, - ws_stream, - timer, - streamer_alive_producer, - )); - break; + } else { + match tokio_tungstenite::accept_async(streamer_tcp).await { + Ok(ws_stream) => { + tokio::spawn(streamer_stream( + new_streamer.clone(), + record_producer, + ws_stream, + timer, + streamer_alive_producer, + )); + break; + } + Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), } - Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), } } + Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val), } - Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val), + } + let (message_producer, message_consumer) = channel(BUFFER_LENGTH); + let (buffered_producer, _) = channel(BUFFER_LENGTH); + message_organizer_task = tokio::spawn(message_organizer( + message_producer.clone(), + record_consumer, + relay_configs.latency, + )) + .into(); + buffer_layer_task = tokio::spawn(buffer_layer( + message_consumer, + buffered_producer.clone(), + relay_configs.latency, + )) + .into(); + let (listener_socket_killer_producer, listener_socket_killer_receiver) = + tokio::sync::oneshot::channel(); + let listener_handler_task = tokio::spawn(listener_handler( + listener_socket, + acceptor.clone(), + relay_configs.tls, + buffered_producer.clone(), + listener_stream_tasks_producer, + timer, + listener_socket_killer_receiver, + )); + status_checker( + buffered_producer.clone(), + timer, + new_streamer, + streamer_alive_receiver, + message_organizer_task, + buffer_layer_task, + listener_stream_tasks_receiver, + listener_handler_task, + listener_socket_killer_producer, + relay_configs.listener_address.clone(), + ) + .await; + drop(streamer_socket); + } +} +async fn listener_handler( + listener_socket: TcpListener, + acceptor: TlsAcceptor, + is_tls: bool, + buffered_producer: Sender, + listener_stream_tasks_producer: tokio::sync::mpsc::Sender>, + timer: Instant, + mut listener_socket_killer_receiver: tokio::sync::oneshot::Receiver, +) { + while let Err(_) = listener_socket_killer_receiver.try_recv() { + match listener_socket.accept().await.no_block() { + Ok(accepted_request) => match accepted_request { + Some((tcp_stream, listener_info)) => { + let new_listener = Listener { + ip: listener_info.ip(), + port: listener_info.port(), + }; + if is_tls { + let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap(); + let wss_stream = tokio_tungstenite::accept_async(streamer_tcp_tls) + .await + .unwrap(); + let listener_stream_task = tokio::spawn(stream( + new_listener, + wss_stream, + buffered_producer.subscribe(), + )); + let _ = listener_stream_tasks_producer + .send(listener_stream_task) + .await; + } else { + let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); + let listener_stream_task = tokio::spawn(stream( + new_listener, + ws_stream, + buffered_producer.subscribe(), + )); + let _ = listener_stream_tasks_producer + .send(listener_stream_task) + .await; + } + println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); + } + None => {} + }, + Err(_) => {} } } - let (message_producer, message_consumer) = channel(BUFFER_LENGTH); - let (buffered_producer, _) = channel(BUFFER_LENGTH); - message_organizer_task = tokio::spawn(message_organizer( - message_producer.clone(), - record_consumer, - relay_configs.latency, - )) - .into(); - buffer_layer_task = tokio::spawn(buffer_layer( - message_consumer, - buffered_producer.clone(), - relay_configs.latency, - )) - .into(); - tokio::spawn(status_checker( - buffered_producer.clone(), - timer, - new_streamer, - streamer_alive_receiver, - message_organizer_task, - buffer_layer_task, - listener_stream_tasks_receiver, - )); - while let Ok((tcp_stream, listener_info)) = listener_socket.accept().await { - let new_listener = Listener { - ip: listener_info.ip(), - port: listener_info.port(), - }; - if relay_configs.tls { - let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap(); - let wss_stream = tokio_tungstenite::accept_async(streamer_tcp_tls) - .await - .unwrap(); - let listener_stream_task = tokio::spawn(stream( - new_listener, - wss_stream, - buffered_producer.subscribe(), - )); - let _ = listener_stream_tasks_producer - .send(listener_stream_task) - .await; - } else { - let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); - let listener_stream_task = tokio::spawn(stream( - new_listener, - ws_stream, - buffered_producer.subscribe(), - )); - let _ = listener_stream_tasks_producer - .send(listener_stream_task) - .await; - } - println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); - } + drop(listener_socket); } async fn status_checker( buffered_producer: Sender, @@ -170,6 +207,9 @@ async fn status_checker( message_organizer_task: Option>, buffer_layer_task: Option>, mut listener_stream_tasks_receiver: tokio::sync::mpsc::Receiver>, + listener_handler_task: JoinHandle<()>, + listener_socket_killer_producer: tokio::sync::oneshot::Sender, + listener_address: String, ) { let mut listener_counter = buffered_producer.receiver_count(); let mut bottleneck_flag = false; @@ -185,6 +225,7 @@ async fn status_checker( let cleaning_timer = Instant::now(); message_organizer_task.as_ref().unwrap().abort(); buffer_layer_task.as_ref().unwrap().abort(); + listener_socket_killer_producer.send(true).unwrap(); let mut listener_task_counter = 0; while listener_stream_tasks_receiver.len() > 0 { match listener_stream_tasks_receiver.recv().await { @@ -195,6 +236,14 @@ async fn status_checker( None => {} } } + if !listener_handler_task.is_finished() { + listener_handler_task.abort(); + println!("Cleaning: Listener Handler Killed"); + } + while TcpListener::bind(listener_address.clone()).await.is_err() { + tokio::time::sleep(Duration::from_millis(1)).await; + } + println!("Cleaning: Listener Socket Killed | {}", listener_address); println!( "Cleaning Done: Streamer Disconnected | {} | Disconnected Listener(s) = {} | {:#?}", format!("{}:{}", streamer.ip, streamer.port), diff --git a/front/Cargo.toml b/front/Cargo.toml index 35745c9..7f60c83 100644 --- a/front/Cargo.toml +++ b/front/Cargo.toml @@ -12,11 +12,9 @@ cpal = { version = "0.15.3", features = ["wasm-bindgen"] } dioxus = { version = "0.5.1", features = ["web"] } futures-core = "0.3.30" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } -#getrandom = { version = "0.2.14", features = ["js"] } log = "0.4.21" reqwest = { version = "0.12.2", features = ["json"] } ringbuf = "0.3.3" -#rustls-platform-verifier = "0.2.0" serde = { version = "1.0.197", features = ["derive"] } tokio-tungstenite-wasm = { version = "0.3.1", features = ["rustls-tls-webpki-roots"] } tokio_with_wasm = "0.4.3" diff --git a/front/src/streaming.rs b/front/src/streaming.rs index f1dfbf0..84a59b2 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -16,7 +16,7 @@ pub async fn start_listening( ) { if is_listening() { log::info!("Trying Sir"); - let connect_addr = "wss://tahinli.com.tr:2424"; + let connect_addr = "ws://192.168.1.2:2424"; let ws_stream: tokio_tungstenite_wasm::WebSocketStream; match tokio_tungstenite_wasm::connect(