diff --git a/back/Cargo.toml b/back/Cargo.toml index 55590ab..fcf741f 100644 --- a/back/Cargo.toml +++ b/back/Cargo.toml @@ -11,9 +11,11 @@ axum-server = { version = "0.6.0", features = ["tls-rustls"] } futures-util = "0.3.30" rand = "0.8.5" ringbuf = "0.3.3" +rustls-pemfile = "2.1.2" serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" tokio = { version = "1.36.0", features = ["full"] } -tokio-tungstenite = { version = "0.21.0", features = ["rustls"] } +tokio-rustls = "0.25.0" +tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } tokio-util = { version = "0.7.10", features = ["full"] } tower-http = { version = "0.5.2", features = ["full"] } diff --git a/back/configs/relay_configs.txt b/back/configs/relay_configs.txt new file mode 100644 index 0000000..adf6ca7 --- /dev/null +++ b/back/configs/relay_configs.txt @@ -0,0 +1,5 @@ +axum_address: 192.168.1.2:2323 +listener_address: 192.168.1.2:2424 +streamer_address: 192.168.1.2:2525 +latency: 50 +tls: false \ No newline at end of file diff --git a/back/src/lib.rs b/back/src/lib.rs index 497a913..c3bbcf5 100644 --- a/back/src/lib.rs +++ b/back/src/lib.rs @@ -6,6 +6,15 @@ pub mod routing; pub mod streaming; pub mod utils; +#[derive(Debug, Clone)] +pub struct Config { + pub axum_address: String, + pub listener_address: String, + pub streamer_address: String, + pub latency: u16, + pub tls: bool, +} + #[derive(Debug, Clone)] pub struct AppState {} diff --git a/back/src/main.rs b/back/src/main.rs index d7f9535..8a40574 100644 --- a/back/src/main.rs +++ b/back/src/main.rs @@ -1,28 +1,27 @@ use axum_server::tls_rustls::RustlsConfig; -use back::{routing, streaming, AppState}; -use std::{env, net::SocketAddr}; - -fn take_args() -> String { - let mut bind_address: String = String::new(); - for element in env::args() { - bind_address = element - } - println!("\n\n\tOn Air -> http://{}\n\n", bind_address); - bind_address -} +use back::{routing, streaming, utils::get_config, AppState}; +use std::net::SocketAddr; #[tokio::main] async fn main() { println!("Hello, world!"); - let config = + let relay_config = get_config().await; + let rustls_config = RustlsConfig::from_pem_file("certificates/fullchain.pem", "certificates/privkey.pem") .await .unwrap(); let state = AppState {}; let app = routing::routing(axum::extract::State(state)).await; - let addr = SocketAddr::from(take_args().parse::().unwrap()); - tokio::spawn(streaming::start()); - axum_server::bind_rustls(addr, config) + let addr = SocketAddr::from( + relay_config + .axum_address + .clone() + .parse::() + .unwrap(), + ); + println!("\n\n\tOn Air -> http://{}\n\n", relay_config.axum_address.clone()); + tokio::spawn(streaming::start(relay_config)); + axum_server::bind_rustls(addr, rustls_config) .serve(app.into_make_service()) .await .unwrap(); diff --git a/back/src/routing.rs b/back/src/routing.rs index 701ca90..a90361e 100644 --- a/back/src/routing.rs +++ b/back/src/routing.rs @@ -1,4 +1,4 @@ -use crate::{streaming, AppState, CoinStatus, ServerStatus}; +use crate::{AppState, CoinStatus, ServerStatus}; use axum::{ body::Body, extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, Router, @@ -42,7 +42,6 @@ async fn flip_coin() -> impl IntoResponse { #[axum::debug_handler] async fn stream() -> impl IntoResponse { println!("Stream"); - streaming::start().await; let file = File::open("audios/audio.mp3").await.unwrap(); let stream = ReaderStream::new(file); Body::from_stream(stream) diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 6e13760..cf91ccd 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -1,47 +1,96 @@ -use std::time::Duration; +use std::{ + fs::File, + io::{self, BufReader}, + sync::Arc, + time::Duration, +}; use futures_util::{SinkExt, StreamExt}; +use rustls_pemfile::{certs, pkcs8_private_keys}; + use tokio::{ - net::{TcpListener, TcpStream}, + net::TcpListener, sync::broadcast::{channel, Receiver, Sender}, time::Instant, }; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use tokio_rustls::{ + rustls::pki_types::{CertificateDer, PrivateKeyDer}, + TlsAcceptor, +}; +use tokio_tungstenite::tungstenite::{Error, Message}; -use crate::{Listener, Streamer}; +use crate::{Config, Listener, Streamer}; const BUFFER_LENGTH: usize = 1000000; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; -pub async fn start() { +pub async fn start(relay_configs: Config) { //need to move them for multi streamer - let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); + let listener_socket = TcpListener::bind(relay_configs.listener_address) + .await + .unwrap(); let (record_producer, record_consumer) = channel(BUFFER_LENGTH); - let streamer_socket = TcpListener::bind("192.168.1.2:2525").await.unwrap(); + let streamer_socket = TcpListener::bind(relay_configs.streamer_address) + .await + .unwrap(); let timer = Instant::now(); + let fullchain: io::Result>> = certs(&mut BufReader::new( + File::open("certificates/fullchain.pem").unwrap(), + )) + .collect(); + let fullchain = fullchain.unwrap(); + let privkey: io::Result> = pkcs8_private_keys(&mut BufReader::new( + File::open("certificates/privkey.pem").unwrap(), + )) + .next() + .unwrap() + .map(Into::into); + let privkey = privkey.unwrap(); + + let server_tls_config = tokio_rustls::rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(fullchain, privkey) + .unwrap(); + let acceptor = TlsAcceptor::from(Arc::new(server_tls_config)); loop { match streamer_socket.accept().await { Ok((streamer_tcp, streamer_info)) => { - match tokio_tungstenite::accept_async(streamer_tcp).await { - Ok(ws_stream) => { - println!( - "New Streamer: {:#?} | {:#?}", - streamer_info, - timer.elapsed() - ); - let new_streamer = Streamer { - ip: streamer_info.ip(), - port: streamer_info.port(), - }; - tokio::spawn(streamer_stream( - new_streamer, - record_producer, - ws_stream, - timer, - )); - break; + let new_streamer = Streamer { + ip: streamer_info.ip(), + port: streamer_info.port(), + }; + println!( + "New Streamer: {:#?} | {:#?}", + streamer_info, + timer.elapsed() + ); + if relay_configs.tls { + let streamer_tcp_tls = acceptor.accept(streamer_tcp).await.unwrap(); + match tokio_tungstenite::accept_async(streamer_tcp_tls).await { + Ok(ws_stream) => { + tokio::spawn(streamer_stream( + new_streamer, + record_producer, + ws_stream, + timer, + )); + break; + } + Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), + } + } else { + match tokio_tungstenite::accept_async(streamer_tcp).await { + Ok(ws_stream) => { + tokio::spawn(streamer_stream( + new_streamer, + record_producer, + ws_stream, + timer, + )); + break; + } + Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), } - Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), } } Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val), @@ -49,21 +98,41 @@ pub async fn start() { } let (message_producer, message_consumer) = channel(BUFFER_LENGTH); let (buffered_producer, _) = channel(BUFFER_LENGTH); - tokio::spawn(message_organizer(message_producer.clone(), record_consumer)); - tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone())); + tokio::spawn(message_organizer( + message_producer.clone(), + record_consumer, + relay_configs.latency, + )); + tokio::spawn(buffer_layer( + message_consumer, + buffered_producer.clone(), + relay_configs.latency, + )); tokio::spawn(status_checker(buffered_producer.clone(), timer)); - while let Ok((tcp_stream, listener_info)) = socket.accept().await { - let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); - println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); + while let Ok((tcp_stream, listener_info)) = listener_socket.accept().await { let new_listener = Listener { ip: listener_info.ip(), port: listener_info.port(), }; - tokio::spawn(stream( - new_listener, - ws_stream, - buffered_producer.subscribe(), - )); + if relay_configs.tls { + let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap(); + let wss_stream = tokio_tungstenite::accept_async(streamer_tcp_tls) + .await + .unwrap(); + tokio::spawn(stream( + new_listener, + wss_stream, + buffered_producer.subscribe(), + )); + } else { + let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); + tokio::spawn(stream( + new_listener, + ws_stream, + buffered_producer.subscribe(), + )); + } + println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); } } async fn status_checker(buffered_producer: Sender, timer: Instant) { @@ -92,9 +161,13 @@ async fn status_checker(buffered_producer: Sender, timer: Instant) { } } } -async fn buffer_layer(mut message_consumer: Receiver, buffered_producer: Sender) { +async fn buffer_layer( + mut message_consumer: Receiver, + buffered_producer: Sender, + delay: u16, +) { loop { - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(delay.into())).await; while message_consumer.len() > 0 { match message_consumer.recv().await { Ok(message) => match buffered_producer.send(message) { @@ -106,10 +179,12 @@ async fn buffer_layer(mut message_consumer: Receiver, buffered_producer } } } -async fn streamer_stream( +async fn streamer_stream< + T: futures_util::Stream> + std::marker::Unpin, +>( streamer: Streamer, record_producer: Sender, - mut ws_stream: WebSocketStream, + mut ws_stream: T, timer: Instant, ) { loop { @@ -141,6 +216,7 @@ async fn streamer_stream( async fn message_organizer( message_producer: Sender, mut record_consumer: Receiver, + delay: u16, ) { loop { match record_consumer.recv().await { @@ -150,12 +226,12 @@ async fn message_organizer( }, Err(_) => {} } - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(delay.into())).await; } } -async fn stream( +async fn stream + std::marker::Unpin>( listener: Listener, - mut ws_stream: WebSocketStream, + mut ws_stream: T, mut buffered_consumer: Receiver, ) { while let Ok(message) = buffered_consumer.recv().await { diff --git a/back/src/utils.rs b/back/src/utils.rs index 8b13789..92ab30b 100644 --- a/back/src/utils.rs +++ b/back/src/utils.rs @@ -1 +1,27 @@ +use tokio::{fs::File, io::AsyncReadExt}; +use crate::Config; + +pub async fn get_config() -> Config { + let mut config_file = File::open("configs/relay_configs.txt").await.unwrap(); + let mut configs_unparsed = String::new(); + config_file + .read_to_string(&mut configs_unparsed) + .await + .unwrap(); + + let configs_parsed: Vec<&str> = configs_unparsed.split_terminator("\n").collect(); + let mut configs_cleaned: Vec<&str> = vec![]; + + for config in configs_parsed { + let dirty_configs: Vec<&str> = config.split(": ").collect(); + configs_cleaned.push(dirty_configs[1]); + } + Config { + axum_address: configs_cleaned[0].to_string(), + listener_address: configs_cleaned[1].to_string(), + streamer_address: configs_cleaned[2].to_string(), + latency: configs_cleaned[3].parse().unwrap(), + tls: configs_cleaned[4].parse().unwrap(), + } +} diff --git a/front/Cargo.toml b/front/Cargo.toml index 321e2a4..35745c9 100644 --- a/front/Cargo.toml +++ b/front/Cargo.toml @@ -9,13 +9,15 @@ edition = "2021" anyhow = "1.0.81" brotli = "5.0.0" cpal = { version = "0.15.3", features = ["wasm-bindgen"] } -dioxus = { version = "0.5.0", features = ["web"] } +dioxus = { version = "0.5.1", features = ["web"] } futures-core = "0.3.30" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } +#getrandom = { version = "0.2.14", features = ["js"] } log = "0.4.21" reqwest = { version = "0.12.2", features = ["json"] } ringbuf = "0.3.3" +#rustls-platform-verifier = "0.2.0" serde = { version = "1.0.197", features = ["derive"] } -tokio-tungstenite-wasm = "0.3.1" +tokio-tungstenite-wasm = { version = "0.3.1", features = ["rustls-tls-webpki-roots"] } tokio_with_wasm = "0.4.3" wasm-logger = "0.2.0" diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 95c85da..f1dfbf0 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -1,15 +1,12 @@ -use std::{io::Write, mem::MaybeUninit, sync::Arc}; - use brotli::DecompressorWriter; - use dioxus::{ prelude::spawn, signals::{Signal, Writable}, }; -use futures_util::{stream::SplitStream, SinkExt, StreamExt}; - +use futures_util::StreamExt; use ringbuf::{HeapRb, Producer, SharedRb}; -use tokio_tungstenite_wasm::{Message, WebSocketStream}; +use std::{io::Write, mem::MaybeUninit, sync::Arc}; + use crate::{listening::listen_podcast, BUFFER_LENGTH}; @@ -19,10 +16,15 @@ pub async fn start_listening( ) { if is_listening() { log::info!("Trying Sir"); - let connect_addr = "ws://192.168.1.2:2424"; - let (mut stream_producer, stream_consumer); - match tokio_tungstenite_wasm::connect(connect_addr).await { - Ok(ws_stream) => (stream_producer, stream_consumer) = ws_stream.split(), + let connect_addr = "wss://tahinli.com.tr:2424"; + + let ws_stream: tokio_tungstenite_wasm::WebSocketStream; + match tokio_tungstenite_wasm::connect( + connect_addr, + ) + .await + { + Ok(ws_stream_connected) => ws_stream = ws_stream_connected, Err(_) => { is_listening.set(false); return; @@ -33,7 +35,7 @@ pub async fn start_listening( let (producer, consumer) = ring.split(); let _sound_stream_task = spawn({ async move { - sound_stream(is_listening, stream_consumer, producer).await; + sound_stream(is_listening, ws_stream, producer).await; is_listening.set(false); is_maintaining.set((false, is_maintaining().1)); } @@ -43,7 +45,6 @@ pub async fn start_listening( listen_podcast(is_listening, consumer).await; is_listening.set(false); //stream_producer.send("Disconnect ME".into()).await.unwrap(); - stream_producer.close().await.unwrap(); is_maintaining.set((is_maintaining().0, false)); } }); @@ -52,19 +53,15 @@ pub async fn start_listening( pub async fn sound_stream( is_listening: Signal, - mut stream_consumer: SplitStream, + mut ws_stream: tokio_tungstenite_wasm::WebSocketStream, mut producer: Producer>>>>, ) { log::info!("Attention! We need cables"); - while let Some(message_with_question) = stream_consumer.next().await { + while let Some(message_with_question) = ws_stream.next().await { if is_listening() { //log::info!("{}", message_with_question.unwrap().len()); - let mut data: Vec = vec![]; - if let Message::Binary(message) = message_with_question.unwrap() { - data = message; - } - + let data: Vec = message_with_question.unwrap().into(); let mut decompression_writer = DecompressorWriter::new(vec![], BUFFER_LENGTH); if let Err(err_val) = decompression_writer.write_all(&data) { log::error!("Error: Decompression | {}", err_val); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 21d3768..dd8fc90 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -10,5 +10,8 @@ brotli = "5.0.0" cpal = "0.15.3" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } ringbuf = "0.3.3" +rustls-pemfile = "2.1.2" +rustls-platform-verifier = "0.2.0" tokio = { version = "1.36.0", features = ["full"] } -tokio-tungstenite = "0.21.0" +tokio-rustls = "0.25.0" +tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-webpki-roots"] } diff --git a/streamer/configs/streamer_configs.txt b/streamer/configs/streamer_configs.txt new file mode 100644 index 0000000..010b3bc --- /dev/null +++ b/streamer/configs/streamer_configs.txt @@ -0,0 +1,4 @@ +address: 192.168.1.2:2525 +quality: 6 +latency: 100 +tls: false \ No newline at end of file diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index 1dfe855..e424cc9 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -1,4 +1,12 @@ pub mod recording; pub mod streaming; +pub mod utils; pub const BUFFER_LENGTH: usize = 1000000; + +pub struct Config { + pub address: String, + pub quality: u8, + pub latency: u16, + pub tls: bool, +} diff --git a/streamer/src/main.rs b/streamer/src/main.rs index 64fee3f..019e531 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -1,15 +1,15 @@ use std::time::Duration; -use streamer::{recording::recording, streaming::start, BUFFER_LENGTH}; +use streamer::{recording::recording, streaming::start, utils::get_config, BUFFER_LENGTH}; use tokio::sync::broadcast::channel; #[tokio::main] async fn main() { println!("Hello, world!"); - + let streamer_config = get_config().await; let (sound_stream_producer, sound_stream_consumer) = channel(BUFFER_LENGTH); tokio::spawn(recording(sound_stream_producer)); - tokio::spawn(start(sound_stream_consumer)); + tokio::spawn(start(sound_stream_consumer, streamer_config)); loop { tokio::time::sleep(Duration::from_secs(1000000000)).await; } diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index c7bb053..077fabb 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -1,30 +1,58 @@ -use std::{io::Write, time::Duration}; +use std::{io::Write, sync::Arc, time::Duration}; use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::sync::broadcast::{channel, Receiver, Sender}; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use tokio_tungstenite::tungstenite::Message; -use crate::BUFFER_LENGTH; +use crate::{Config, BUFFER_LENGTH}; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; -pub async fn start(sound_stream_consumer: Receiver) { - let connect_addr = "ws://192.168.1.2:2525"; +pub async fn start(sound_stream_consumer: Receiver, streamer_config:Config) { + let connect_addr = + match streamer_config.tls { + true => format!("wss://{}", streamer_config.address), + false => format!("ws://{}", streamer_config.address), + }; + let ws_stream; - match tokio_tungstenite::connect_async(connect_addr).await { - Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, + + match streamer_config.tls { + true => { + let tls_client_config = rustls_platform_verifier::tls_config(); + let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config)); + + match tokio_tungstenite::connect_async_tls_with_config( + connect_addr.clone(), + None, + false, + Some(tls_connector), + ) + .await + { + Ok(wss_stream_connected) => ws_stream = wss_stream_connected.0, Err(_) => { return; } + } + }, + false => { + match tokio_tungstenite::connect_async(connect_addr.clone()).await { + Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, + Err(_) => { + return; + }, + } + }, } let (message_producer, message_consumer) = channel(BUFFER_LENGTH); println!("Connected to: {}", connect_addr); - tokio::spawn(message_organizer(message_producer, sound_stream_consumer)); + tokio::spawn(message_organizer(message_producer, sound_stream_consumer, streamer_config.quality, streamer_config.latency)); tokio::spawn(stream(ws_stream, message_consumer)); } -async fn message_organizer(message_producer: Sender, mut consumer: Receiver) { +async fn message_organizer(message_producer: Sender, mut consumer: Receiver, quality: u8, latency:u16) { loop { let mut messages: Vec = Vec::new(); let mut iteration = consumer.len(); @@ -42,7 +70,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece let _zero = charred.remove(1); let _point = charred.remove(1); } - charred.truncate(4); + charred.truncate(quality.into()); let mut single_data_packet: Vec = vec![]; for char in charred { let char_packet = char.to_string().as_bytes().to_vec(); @@ -78,12 +106,12 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece // message_producer.receiver_count() // ); } - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(latency.into())).await; } } -async fn stream( - mut ws_stream: WebSocketStream>, +async fn stream + std::marker::Unpin>( + mut ws_stream: T, mut message_consumer: Receiver, ) { while let Ok(message) = message_consumer.recv().await { diff --git a/streamer/src/utils.rs b/streamer/src/utils.rs new file mode 100644 index 0000000..85c0dc3 --- /dev/null +++ b/streamer/src/utils.rs @@ -0,0 +1,23 @@ +use tokio::{fs::File, io::AsyncReadExt}; + +use crate::Config; + +pub async fn get_config() -> Config { + let mut config_file = File::open("configs/streamer_configs.txt").await.unwrap(); + let mut configs_unparsed = String::new(); + config_file.read_to_string(&mut configs_unparsed).await.unwrap(); + + let configs_parsed:Vec<&str> = configs_unparsed.split_terminator("\n").collect(); + let mut configs_cleaned: Vec<&str> = vec![]; + + for config in configs_parsed { + let dirty_configs: Vec<&str> = config.split(": ").collect(); + configs_cleaned.push(dirty_configs[1]); + } + Config { + address: configs_cleaned[0].to_string(), + quality: configs_cleaned[1].parse().unwrap(), + latency: configs_cleaned[2].parse().unwrap(), + tls: configs_cleaned[3].parse().unwrap(), + } +} \ No newline at end of file