From 3432ba68de803702b844a694bab8127a1d32a518 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sat, 18 May 2024 01:36:56 +0300 Subject: [PATCH] fix: :bug: waiting for certificates even if tls is off --- back/src/main.rs | 32 +++++++++------ back/src/streaming.rs | 92 ++++++++++++++++++++++--------------------- 2 files changed, 66 insertions(+), 58 deletions(-) diff --git a/back/src/main.rs b/back/src/main.rs index ac298d6..a524e71 100644 --- a/back/src/main.rs +++ b/back/src/main.rs @@ -5,11 +5,9 @@ use std::net::SocketAddr; #[tokio::main] async fn main() { println!("Hello, world!"); + let relay_config = get_config().await; - let rustls_config = - RustlsConfig::from_pem_file("certificates/fullchain.pem", "certificates/privkey.pem") - .await - .unwrap(); + let state = AppState {}; let app = routing::routing(axum::extract::State(state)).await; let addr = SocketAddr::from( @@ -19,13 +17,21 @@ async fn main() { .parse::() .unwrap(), ); - println!( - "\n\n\tOn Air -> http://{}\n\n", - relay_config.axum_address.clone() - ); - tokio::spawn(streaming::start(relay_config)); - axum_server::bind_rustls(addr, rustls_config) - .serve(app.into_make_service()) - .await - .unwrap(); + tokio::spawn(streaming::start(relay_config.clone())); + if relay_config.tls { + let rustls_config = + RustlsConfig::from_pem_file("certificates/fullchain.pem", "certificates/privkey.pem") + .await + .unwrap(); + + println!("\n\n\tOn Air -> https://{}\n\n", relay_config.axum_address); + axum_server::bind_rustls(addr, rustls_config) + .serve(app.into_make_service()) + .await + .unwrap(); + } else { + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + println!("\n\n\tOn Air -> http://{}\n\n", relay_config.axum_address); + axum::serve(listener, app).await.unwrap(); + } } diff --git a/back/src/streaming.rs b/back/src/streaming.rs index b7d96dd..69476ad 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -26,7 +26,7 @@ const BUFFER_LENGTH: usize = 1000000; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start(relay_configs: Config) { let timer = Instant::now(); - let acceptor = tls_configurator().await; + let acceptor = None; loop { //need to move them for multi streamer let (record_producer, record_consumer) = channel(BUFFER_LENGTH); @@ -54,6 +54,7 @@ pub async fn start(relay_configs: Config) { timer.elapsed() ); if relay_configs.tls { + let acceptor = tls_configurator().await; match acceptor.accept(streamer_tcp).await { Ok(streamer_tcp_tls) => { match tokio_tungstenite::accept_async(streamer_tcp_tls).await { @@ -118,7 +119,6 @@ pub async fn start(relay_configs: Config) { let listener_handler_task = tokio::spawn(listener_handler( listener_socket, acceptor.clone(), - relay_configs.tls, buffered_producer.clone(), listener_stream_tasks_producer, timer, @@ -164,8 +164,7 @@ async fn tls_configurator() -> TlsAcceptor { } async fn listener_handler( listener_socket: TcpListener, - acceptor: TlsAcceptor, - is_tls: bool, + acceptor: Option, buffered_producer: Sender, listener_stream_tasks_producer: tokio::sync::mpsc::Sender>, timer: Instant, @@ -179,51 +178,54 @@ async fn listener_handler( ip: listener_info.ip(), port: listener_info.port(), }; - if is_tls { - match acceptor.accept(tcp_stream).await { - Ok(listener_tcp_tls) => { - match tokio_tungstenite::accept_async(listener_tcp_tls).await { - Ok(wss_stream) => { - let listener_stream_task = tokio::spawn(stream( - new_listener, - wss_stream, - buffered_producer.subscribe(), - )); - let _ = listener_stream_tasks_producer - .send(listener_stream_task) - .await; - } - Err(err_val) => { - eprintln!("Error: TCP WSS Listener | {}", err_val); - drop(listener_socket); - return; + match acceptor { + Some(ref acceptor) => { + match acceptor.accept(tcp_stream).await { + Ok(listener_tcp_tls) => { + match tokio_tungstenite::accept_async(listener_tcp_tls).await { + Ok(wss_stream) => { + let listener_stream_task = tokio::spawn(stream( + new_listener, + wss_stream, + buffered_producer.subscribe(), + )); + let _ = listener_stream_tasks_producer + .send(listener_stream_task) + .await; + } + Err(err_val) => { + eprintln!("Error: TCP WSS Listener | {}", err_val); + drop(listener_socket); + return; + } } } + Err(err_val) => { + eprintln!("Error: TCP TLS Listener | {}", err_val); + drop(listener_socket); + return; + } } - Err(err_val) => { - eprintln!("Error: TCP TLS Listener | {}", err_val); - drop(listener_socket); - return; + }, + None => { + match tokio_tungstenite::accept_async(tcp_stream).await { + Ok(ws_stream) => { + let listener_stream_task = tokio::spawn(stream( + new_listener, + ws_stream, + buffered_producer.subscribe(), + )); + let _ = listener_stream_tasks_producer + .send(listener_stream_task) + .await; + } + Err(err_val) => { + eprintln!("Error: TCP WS Listener | {}", err_val); + drop(listener_socket); + return; + } } - } - } else { - match tokio_tungstenite::accept_async(tcp_stream).await { - Ok(ws_stream) => { - let listener_stream_task = tokio::spawn(stream( - new_listener, - ws_stream, - buffered_producer.subscribe(), - )); - let _ = listener_stream_tasks_producer - .send(listener_stream_task) - .await; - } - Err(err_val) => { - eprintln!("Error: TCP WS Listener | {}", err_val); - drop(listener_socket); - return; - } - } + }, } println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); }