From 68a280fc806483f11effac16a32640f375dc9a32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Mon, 15 Apr 2024 03:38:03 +0300 Subject: [PATCH 1/9] feat: :alembic: experiment: rustls --- back/Cargo.toml | 4 +++- back/src/streaming.rs | 42 ++++++++++++++++++++++++++++++++++++--- front/Cargo.toml | 3 ++- front/src/streaming.rs | 1 + streamer/Cargo.toml | 4 +++- streamer/src/streaming.rs | 38 +++++++++++++++++++++++++++++++---- 6 files changed, 82 insertions(+), 10 deletions(-) diff --git a/back/Cargo.toml b/back/Cargo.toml index 55590ab..fcf741f 100644 --- a/back/Cargo.toml +++ b/back/Cargo.toml @@ -11,9 +11,11 @@ axum-server = { version = "0.6.0", features = ["tls-rustls"] } futures-util = "0.3.30" rand = "0.8.5" ringbuf = "0.3.3" +rustls-pemfile = "2.1.2" serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" tokio = { version = "1.36.0", features = ["full"] } -tokio-tungstenite = { version = "0.21.0", features = ["rustls"] } +tokio-rustls = "0.25.0" +tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } tokio-util = { version = "0.7.10", features = ["full"] } tower-http = { version = "0.5.2", features = ["full"] } diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 6e13760..afd43d5 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -1,11 +1,28 @@ -use std::time::Duration; +use std::{ + fs::File, + io::{self, BufReader}, + sync::Arc, + time::Duration, +}; use futures_util::{SinkExt, StreamExt}; +use rustls_pemfile::{certs, pkcs8_private_keys, private_key, rsa_private_keys}; + use tokio::{ net::{TcpListener, TcpStream}, sync::broadcast::{channel, Receiver, Sender}, time::Instant, }; +use tokio_rustls::{ + rustls::{ + client::danger::DangerousClientConfig, + internal::msgs::handshake::CertificateChain, + pki_types::{CertificateDer, PrivateKeyDer}, + ClientConfig, + }, + server::TlsStream, + TlsAcceptor, +}; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use crate::{Listener, Streamer}; @@ -19,10 +36,29 @@ pub async fn start() { let streamer_socket = TcpListener::bind("192.168.1.2:2525").await.unwrap(); let timer = Instant::now(); + let fullchain: io::Result>> = certs(&mut BufReader::new( + File::open("certificates/fullchain.pem").unwrap(), + )) + .collect(); + let fullchain = fullchain.unwrap(); + let privkey: io::Result> = pkcs8_private_keys(&mut BufReader::new( + File::open("certificates/privkey.pem").unwrap(), + )) + .next() + .unwrap() + .map(Into::into); + let privkey = privkey.unwrap(); + + let config = tokio_rustls::rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(fullchain, privkey) + .unwrap(); + let acceptor = TlsAcceptor::from(Arc::new(config)); loop { match streamer_socket.accept().await { Ok((streamer_tcp, streamer_info)) => { - match tokio_tungstenite::accept_async(streamer_tcp).await { + let streamer_tcp_tls = acceptor.accept(streamer_tcp).await.unwrap(); + match tokio_tungstenite::accept_async(streamer_tcp_tls).await { Ok(ws_stream) => { println!( "New Streamer: {:#?} | {:#?}", @@ -109,7 +145,7 @@ async fn buffer_layer(mut message_consumer: Receiver, buffered_producer async fn streamer_stream( streamer: Streamer, record_producer: Sender, - mut ws_stream: WebSocketStream, + mut ws_stream: WebSocketStream>, timer: Instant, ) { loop { diff --git a/front/Cargo.toml b/front/Cargo.toml index 321e2a4..fefac4b 100644 --- a/front/Cargo.toml +++ b/front/Cargo.toml @@ -9,13 +9,14 @@ edition = "2021" anyhow = "1.0.81" brotli = "5.0.0" cpal = { version = "0.15.3", features = ["wasm-bindgen"] } -dioxus = { version = "0.5.0", features = ["web"] } +dioxus = { version = "0.5.1", features = ["web"] } futures-core = "0.3.30" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } log = "0.4.21" reqwest = { version = "0.12.2", features = ["json"] } ringbuf = "0.3.3" serde = { version = "1.0.197", features = ["derive"] } +tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } tokio-tungstenite-wasm = "0.3.1" tokio_with_wasm = "0.4.3" wasm-logger = "0.2.0" diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 95c85da..0eb91de 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -20,6 +20,7 @@ pub async fn start_listening( if is_listening() { log::info!("Trying Sir"); let connect_addr = "ws://192.168.1.2:2424"; + let (mut stream_producer, stream_consumer); match tokio_tungstenite_wasm::connect(connect_addr).await { Ok(ws_stream) => (stream_producer, stream_consumer) = ws_stream.split(), diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 21d3768..bffcc48 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -10,5 +10,7 @@ brotli = "5.0.0" cpal = "0.15.3" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } ringbuf = "0.3.3" +rustls-pemfile = "2.1.2" tokio = { version = "1.36.0", features = ["full"] } -tokio-tungstenite = "0.21.0" +tokio-rustls = "0.25.0" +tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index c7bb053..b9dc127 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -1,18 +1,48 @@ -use std::{io::Write, time::Duration}; +use std::{ + fs::File, + io::{self, BufReader, Write}, + sync::Arc, + time::Duration, +}; use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::sync::broadcast::{channel, Receiver, Sender}; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use tokio_rustls::rustls::{pki_types::CertificateDer, ClientConfig, RootCertStore}; +use tokio_tungstenite::{tungstenite::Message, Connector, WebSocketStream}; use crate::BUFFER_LENGTH; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start(sound_stream_consumer: Receiver) { - let connect_addr = "ws://192.168.1.2:2525"; + let connect_addr = "wss://192.168.1.2:2525"; + + let certs: io::Result>> = rustls_pemfile::certs( + &mut BufReader::new(File::open("certificates/cert.pem").unwrap()), + ) + .collect(); + let certs = certs.unwrap(); + let mut root_cert_store = RootCertStore::empty(); + for cert in certs { + root_cert_store.add(cert).unwrap(); + } + + let config = ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth(); + + let connector = Connector::Rustls(Arc::new(config)); + let ws_stream; - match tokio_tungstenite::connect_async(connect_addr).await { + match tokio_tungstenite::connect_async_tls_with_config( + connect_addr, + None, + false, + Some(connector), + ) + .await + { Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, Err(_) => { return; From 5713bec4aeb91cf95785e2910eb9a9e3e6679cc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Tue, 16 Apr 2024 23:31:17 +0300 Subject: [PATCH 2/9] refactor: :fire: removed wrong implemented tls in streamer --- back/src/streaming.rs | 9 ++------- streamer/src/streaming.rs | 35 +++-------------------------------- 2 files changed, 5 insertions(+), 39 deletions(-) diff --git a/back/src/streaming.rs b/back/src/streaming.rs index afd43d5..86ee9c5 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -6,7 +6,7 @@ use std::{ }; use futures_util::{SinkExt, StreamExt}; -use rustls_pemfile::{certs, pkcs8_private_keys, private_key, rsa_private_keys}; +use rustls_pemfile::{certs, pkcs8_private_keys}; use tokio::{ net::{TcpListener, TcpStream}, @@ -14,12 +14,7 @@ use tokio::{ time::Instant, }; use tokio_rustls::{ - rustls::{ - client::danger::DangerousClientConfig, - internal::msgs::handshake::CertificateChain, - pki_types::{CertificateDer, PrivateKeyDer}, - ClientConfig, - }, + rustls::pki_types::{CertificateDer, PrivateKeyDer}, server::TlsStream, TlsAcceptor, }; diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index b9dc127..95b579f 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -1,16 +1,10 @@ -use std::{ - fs::File, - io::{self, BufReader, Write}, - sync::Arc, - time::Duration, -}; +use std::{io::Write, time::Duration}; use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::sync::broadcast::{channel, Receiver, Sender}; -use tokio_rustls::rustls::{pki_types::CertificateDer, ClientConfig, RootCertStore}; -use tokio_tungstenite::{tungstenite::Message, Connector, WebSocketStream}; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use crate::BUFFER_LENGTH; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; @@ -18,31 +12,8 @@ const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start(sound_stream_consumer: Receiver) { let connect_addr = "wss://192.168.1.2:2525"; - let certs: io::Result>> = rustls_pemfile::certs( - &mut BufReader::new(File::open("certificates/cert.pem").unwrap()), - ) - .collect(); - let certs = certs.unwrap(); - let mut root_cert_store = RootCertStore::empty(); - for cert in certs { - root_cert_store.add(cert).unwrap(); - } - - let config = ClientConfig::builder() - .with_root_certificates(root_cert_store) - .with_no_client_auth(); - - let connector = Connector::Rustls(Arc::new(config)); - let ws_stream; - match tokio_tungstenite::connect_async_tls_with_config( - connect_addr, - None, - false, - Some(connector), - ) - .await - { + match tokio_tungstenite::connect_async(connect_addr).await { Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, Err(_) => { return; From 799ab335665f122f0891b9c50ab9318395ce9ce3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Tue, 16 Apr 2024 23:39:29 +0300 Subject: [PATCH 3/9] chore: :see_no_evil: configs --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index cacc1a2..9e02892 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ target/ dist/ certificates/ audios/ +configs/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html From b20302e363d873f4e997aa43e3a52b1cdb243fb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Fri, 19 Apr 2024 23:12:14 +0300 Subject: [PATCH 4/9] chore: :heavy_plus_sign: add rustls-platform-verifier dependency --- streamer/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index bffcc48..1501eec 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -11,6 +11,7 @@ cpal = "0.15.3" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } ringbuf = "0.3.3" rustls-pemfile = "2.1.2" +rustls-platform-verifier = "0.2.0" tokio = { version = "1.36.0", features = ["full"] } tokio-rustls = "0.25.0" tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } From 7bc208b0e4fa3849504e36b374720c7f329da915 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sat, 20 Apr 2024 00:37:22 +0300 Subject: [PATCH 5/9] feat: :alembic: experiment: tls --- streamer/src/streaming.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 95b579f..0b7b28a 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -1,19 +1,23 @@ -use std::{io::Write, time::Duration}; +use std::{io::Write, sync::Arc, time::Duration}; use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::sync::broadcast::{channel, Receiver, Sender}; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use tokio_tungstenite::{tungstenite::Message, Connector, WebSocketStream}; use crate::BUFFER_LENGTH; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start(sound_stream_consumer: Receiver) { let connect_addr = "wss://192.168.1.2:2525"; - + let config = rustls_platform_verifier::tls_config(); + + + let connector = tokio_tungstenite::Connector::Rustls(Arc::new(config)); let ws_stream; - match tokio_tungstenite::connect_async(connect_addr).await { + + match tokio_tungstenite::connect_async_tls_with_config(connect_addr, None, false, Some(connector)).await { Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, Err(_) => { return; From f693e25503f6effdb049c0ac96cf49dd834cb68d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sat, 20 Apr 2024 02:06:25 +0300 Subject: [PATCH 6/9] feat: :sparkles: tls support for streamer --- streamer/src/streaming.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 0b7b28a..3bb9ed1 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -10,11 +10,11 @@ use crate::BUFFER_LENGTH; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start(sound_stream_consumer: Receiver) { - let connect_addr = "wss://192.168.1.2:2525"; + let connect_addr = "wss://tahinli.com.tr:2525"; let config = rustls_platform_verifier::tls_config(); - let connector = tokio_tungstenite::Connector::Rustls(Arc::new(config)); + let connector = Connector::Rustls(Arc::new(config)); let ws_stream; match tokio_tungstenite::connect_async_tls_with_config(connect_addr, None, false, Some(connector)).await { From 25022e8634be16834ef1edf8431805304db4744c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sat, 20 Apr 2024 15:25:45 +0300 Subject: [PATCH 7/9] feat: :alembic: experiment: listener side tsl feat: :sparkles: tls for listener in the back fix: :bug: wrong dependency feature corrected --- back/src/streaming.rs | 5 +++-- front/Cargo.toml | 5 +++-- front/src/streaming.rs | 36 ++++++++++++++++-------------------- streamer/Cargo.toml | 2 +- streamer/src/streaming.rs | 20 +++++++++++++------- 5 files changed, 36 insertions(+), 32 deletions(-) diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 86ee9c5..4baa747 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -84,7 +84,8 @@ pub async fn start() { tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone())); tokio::spawn(status_checker(buffered_producer.clone(), timer)); while let Ok((tcp_stream, listener_info)) = socket.accept().await { - let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); + let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap(); + let ws_stream = tokio_tungstenite::accept_async(streamer_tcp_tls).await.unwrap(); println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); let new_listener = Listener { ip: listener_info.ip(), @@ -186,7 +187,7 @@ async fn message_organizer( } async fn stream( listener: Listener, - mut ws_stream: WebSocketStream, + mut ws_stream: WebSocketStream>, mut buffered_consumer: Receiver, ) { while let Ok(message) = buffered_consumer.recv().await { diff --git a/front/Cargo.toml b/front/Cargo.toml index fefac4b..35745c9 100644 --- a/front/Cargo.toml +++ b/front/Cargo.toml @@ -12,11 +12,12 @@ cpal = { version = "0.15.3", features = ["wasm-bindgen"] } dioxus = { version = "0.5.1", features = ["web"] } futures-core = "0.3.30" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } +#getrandom = { version = "0.2.14", features = ["js"] } log = "0.4.21" reqwest = { version = "0.12.2", features = ["json"] } ringbuf = "0.3.3" +#rustls-platform-verifier = "0.2.0" serde = { version = "1.0.197", features = ["derive"] } -tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } -tokio-tungstenite-wasm = "0.3.1" +tokio-tungstenite-wasm = { version = "0.3.1", features = ["rustls-tls-webpki-roots"] } tokio_with_wasm = "0.4.3" wasm-logger = "0.2.0" diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 0eb91de..f1dfbf0 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -1,15 +1,12 @@ -use std::{io::Write, mem::MaybeUninit, sync::Arc}; - use brotli::DecompressorWriter; - use dioxus::{ prelude::spawn, signals::{Signal, Writable}, }; -use futures_util::{stream::SplitStream, SinkExt, StreamExt}; - +use futures_util::StreamExt; use ringbuf::{HeapRb, Producer, SharedRb}; -use tokio_tungstenite_wasm::{Message, WebSocketStream}; +use std::{io::Write, mem::MaybeUninit, sync::Arc}; + use crate::{listening::listen_podcast, BUFFER_LENGTH}; @@ -19,11 +16,15 @@ pub async fn start_listening( ) { if is_listening() { log::info!("Trying Sir"); - let connect_addr = "ws://192.168.1.2:2424"; - - let (mut stream_producer, stream_consumer); - match tokio_tungstenite_wasm::connect(connect_addr).await { - Ok(ws_stream) => (stream_producer, stream_consumer) = ws_stream.split(), + let connect_addr = "wss://tahinli.com.tr:2424"; + + let ws_stream: tokio_tungstenite_wasm::WebSocketStream; + match tokio_tungstenite_wasm::connect( + connect_addr, + ) + .await + { + Ok(ws_stream_connected) => ws_stream = ws_stream_connected, Err(_) => { is_listening.set(false); return; @@ -34,7 +35,7 @@ pub async fn start_listening( let (producer, consumer) = ring.split(); let _sound_stream_task = spawn({ async move { - sound_stream(is_listening, stream_consumer, producer).await; + sound_stream(is_listening, ws_stream, producer).await; is_listening.set(false); is_maintaining.set((false, is_maintaining().1)); } @@ -44,7 +45,6 @@ pub async fn start_listening( listen_podcast(is_listening, consumer).await; is_listening.set(false); //stream_producer.send("Disconnect ME".into()).await.unwrap(); - stream_producer.close().await.unwrap(); is_maintaining.set((is_maintaining().0, false)); } }); @@ -53,19 +53,15 @@ pub async fn start_listening( pub async fn sound_stream( is_listening: Signal, - mut stream_consumer: SplitStream, + mut ws_stream: tokio_tungstenite_wasm::WebSocketStream, mut producer: Producer>>>>, ) { log::info!("Attention! We need cables"); - while let Some(message_with_question) = stream_consumer.next().await { + while let Some(message_with_question) = ws_stream.next().await { if is_listening() { //log::info!("{}", message_with_question.unwrap().len()); - let mut data: Vec = vec![]; - if let Message::Binary(message) = message_with_question.unwrap() { - data = message; - } - + let data: Vec = message_with_question.unwrap().into(); let mut decompression_writer = DecompressorWriter::new(vec![], BUFFER_LENGTH); if let Err(err_val) = decompression_writer.write_all(&data) { log::error!("Error: Decompression | {}", err_val); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 1501eec..dd8fc90 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -14,4 +14,4 @@ rustls-pemfile = "2.1.2" rustls-platform-verifier = "0.2.0" tokio = { version = "1.36.0", features = ["full"] } tokio-rustls = "0.25.0" -tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } +tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-webpki-roots"] } diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 3bb9ed1..69158e1 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -4,20 +4,26 @@ use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::sync::broadcast::{channel, Receiver, Sender}; -use tokio_tungstenite::{tungstenite::Message, Connector, WebSocketStream}; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use crate::BUFFER_LENGTH; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start(sound_stream_consumer: Receiver) { let connect_addr = "wss://tahinli.com.tr:2525"; - let config = rustls_platform_verifier::tls_config(); - - - let connector = Connector::Rustls(Arc::new(config)); + + let tls_client_config = rustls_platform_verifier::tls_config(); + let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config)); + let ws_stream; - - match tokio_tungstenite::connect_async_tls_with_config(connect_addr, None, false, Some(connector)).await { + match tokio_tungstenite::connect_async_tls_with_config( + connect_addr, + None, + false, + Some(tls_connector), + ) + .await + { Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, Err(_) => { return; From 13500464535b9395734e9ac3df77b7571776d588 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sat, 20 Apr 2024 23:33:19 +0300 Subject: [PATCH 8/9] feat: :art: config --- .gitignore | 1 - streamer/configs/streamer_configs.txt | 3 +++ streamer/src/lib.rs | 7 +++++++ streamer/src/main.rs | 28 +++++++++++++++++++++++---- 4 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 streamer/configs/streamer_configs.txt diff --git a/.gitignore b/.gitignore index 9e02892..cacc1a2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,6 @@ target/ dist/ certificates/ audios/ -configs/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html diff --git a/streamer/configs/streamer_configs.txt b/streamer/configs/streamer_configs.txt new file mode 100644 index 0000000..f62f6e0 --- /dev/null +++ b/streamer/configs/streamer_configs.txt @@ -0,0 +1,3 @@ +address: tahinli.com.tr:2525 +quality: 6 +latency: 100 \ No newline at end of file diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index 1dfe855..0399b4b 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -2,3 +2,10 @@ pub mod recording; pub mod streaming; pub const BUFFER_LENGTH: usize = 1000000; + +pub struct Config { + pub address: String, + pub quality: u8, + pub latency: u16, + pub tls: bool, +} diff --git a/streamer/src/main.rs b/streamer/src/main.rs index 64fee3f..ce307d8 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -1,16 +1,36 @@ use std::time::Duration; -use streamer::{recording::recording, streaming::start, BUFFER_LENGTH}; -use tokio::sync::broadcast::channel; +use streamer::{recording::recording, streaming::start, Config, BUFFER_LENGTH}; +use tokio::{fs::File, io::AsyncReadExt, sync::broadcast::channel}; #[tokio::main] async fn main() { println!("Hello, world!"); - + let streamer_config = get_config().await; let (sound_stream_producer, sound_stream_consumer) = channel(BUFFER_LENGTH); tokio::spawn(recording(sound_stream_producer)); - tokio::spawn(start(sound_stream_consumer)); + tokio::spawn(start(sound_stream_consumer, streamer_config)); loop { tokio::time::sleep(Duration::from_secs(1000000000)).await; } } + +async fn get_config() -> Config { + let mut config_file = File::open("configs/streamer_configs.txt").await.unwrap(); + let mut configs_unparsed = String::new(); + config_file.read_to_string(&mut configs_unparsed).await.unwrap(); + + let configs_parsed:Vec<&str> = configs_unparsed.split_terminator("\n").collect(); + let mut configs_cleaned: Vec<&str> = vec![]; + + for config in configs_parsed { + let dirty_configs: Vec<&str> = config.split(": ").collect(); + configs_cleaned.push(dirty_configs[1]); + } + Config { + address: configs_cleaned[0].to_string(), + quality: configs_cleaned[1].parse().unwrap(), + latency: configs_cleaned[2].parse().unwrap(), + tls: configs_cleaned[3].parse().unwrap(), + } +} From 9e8cc84f665ddfee58a164d4220c0369274fd776 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sun, 21 Apr 2024 04:29:17 +0300 Subject: [PATCH 9/9] feat: :wrench: new structure for both tls and non-tls with config files and generics --- back/configs/relay_configs.txt | 5 + back/src/lib.rs | 9 ++ back/src/main.rs | 29 +++--- back/src/routing.rs | 3 +- back/src/streaming.rs | 138 +++++++++++++++++--------- back/src/utils.rs | 26 +++++ streamer/configs/streamer_configs.txt | 5 +- streamer/src/lib.rs | 1 + streamer/src/main.rs | 24 +---- streamer/src/streaming.rs | 47 ++++++--- streamer/src/utils.rs | 23 +++++ 11 files changed, 207 insertions(+), 103 deletions(-) create mode 100644 back/configs/relay_configs.txt create mode 100644 streamer/src/utils.rs diff --git a/back/configs/relay_configs.txt b/back/configs/relay_configs.txt new file mode 100644 index 0000000..adf6ca7 --- /dev/null +++ b/back/configs/relay_configs.txt @@ -0,0 +1,5 @@ +axum_address: 192.168.1.2:2323 +listener_address: 192.168.1.2:2424 +streamer_address: 192.168.1.2:2525 +latency: 50 +tls: false \ No newline at end of file diff --git a/back/src/lib.rs b/back/src/lib.rs index 497a913..c3bbcf5 100644 --- a/back/src/lib.rs +++ b/back/src/lib.rs @@ -6,6 +6,15 @@ pub mod routing; pub mod streaming; pub mod utils; +#[derive(Debug, Clone)] +pub struct Config { + pub axum_address: String, + pub listener_address: String, + pub streamer_address: String, + pub latency: u16, + pub tls: bool, +} + #[derive(Debug, Clone)] pub struct AppState {} diff --git a/back/src/main.rs b/back/src/main.rs index d7f9535..8a40574 100644 --- a/back/src/main.rs +++ b/back/src/main.rs @@ -1,28 +1,27 @@ use axum_server::tls_rustls::RustlsConfig; -use back::{routing, streaming, AppState}; -use std::{env, net::SocketAddr}; - -fn take_args() -> String { - let mut bind_address: String = String::new(); - for element in env::args() { - bind_address = element - } - println!("\n\n\tOn Air -> http://{}\n\n", bind_address); - bind_address -} +use back::{routing, streaming, utils::get_config, AppState}; +use std::net::SocketAddr; #[tokio::main] async fn main() { println!("Hello, world!"); - let config = + let relay_config = get_config().await; + let rustls_config = RustlsConfig::from_pem_file("certificates/fullchain.pem", "certificates/privkey.pem") .await .unwrap(); let state = AppState {}; let app = routing::routing(axum::extract::State(state)).await; - let addr = SocketAddr::from(take_args().parse::().unwrap()); - tokio::spawn(streaming::start()); - axum_server::bind_rustls(addr, config) + let addr = SocketAddr::from( + relay_config + .axum_address + .clone() + .parse::() + .unwrap(), + ); + println!("\n\n\tOn Air -> http://{}\n\n", relay_config.axum_address.clone()); + tokio::spawn(streaming::start(relay_config)); + axum_server::bind_rustls(addr, rustls_config) .serve(app.into_make_service()) .await .unwrap(); diff --git a/back/src/routing.rs b/back/src/routing.rs index 701ca90..a90361e 100644 --- a/back/src/routing.rs +++ b/back/src/routing.rs @@ -1,4 +1,4 @@ -use crate::{streaming, AppState, CoinStatus, ServerStatus}; +use crate::{AppState, CoinStatus, ServerStatus}; use axum::{ body::Body, extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, Router, @@ -42,7 +42,6 @@ async fn flip_coin() -> impl IntoResponse { #[axum::debug_handler] async fn stream() -> impl IntoResponse { println!("Stream"); - streaming::start().await; let file = File::open("audios/audio.mp3").await.unwrap(); let stream = ReaderStream::new(file); Body::from_stream(stream) diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 4baa747..cf91ccd 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -9,26 +9,29 @@ use futures_util::{SinkExt, StreamExt}; use rustls_pemfile::{certs, pkcs8_private_keys}; use tokio::{ - net::{TcpListener, TcpStream}, + net::TcpListener, sync::broadcast::{channel, Receiver, Sender}, time::Instant, }; use tokio_rustls::{ rustls::pki_types::{CertificateDer, PrivateKeyDer}, - server::TlsStream, TlsAcceptor, }; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use tokio_tungstenite::tungstenite::{Error, Message}; -use crate::{Listener, Streamer}; +use crate::{Config, Listener, Streamer}; const BUFFER_LENGTH: usize = 1000000; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; -pub async fn start() { +pub async fn start(relay_configs: Config) { //need to move them for multi streamer - let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); + let listener_socket = TcpListener::bind(relay_configs.listener_address) + .await + .unwrap(); let (record_producer, record_consumer) = channel(BUFFER_LENGTH); - let streamer_socket = TcpListener::bind("192.168.1.2:2525").await.unwrap(); + let streamer_socket = TcpListener::bind(relay_configs.streamer_address) + .await + .unwrap(); let timer = Instant::now(); let fullchain: io::Result>> = certs(&mut BufReader::new( @@ -44,35 +47,50 @@ pub async fn start() { .map(Into::into); let privkey = privkey.unwrap(); - let config = tokio_rustls::rustls::ServerConfig::builder() + let server_tls_config = tokio_rustls::rustls::ServerConfig::builder() .with_no_client_auth() .with_single_cert(fullchain, privkey) .unwrap(); - let acceptor = TlsAcceptor::from(Arc::new(config)); + let acceptor = TlsAcceptor::from(Arc::new(server_tls_config)); loop { match streamer_socket.accept().await { Ok((streamer_tcp, streamer_info)) => { - let streamer_tcp_tls = acceptor.accept(streamer_tcp).await.unwrap(); - match tokio_tungstenite::accept_async(streamer_tcp_tls).await { - Ok(ws_stream) => { - println!( - "New Streamer: {:#?} | {:#?}", - streamer_info, - timer.elapsed() - ); - let new_streamer = Streamer { - ip: streamer_info.ip(), - port: streamer_info.port(), - }; - tokio::spawn(streamer_stream( - new_streamer, - record_producer, - ws_stream, - timer, - )); - break; + let new_streamer = Streamer { + ip: streamer_info.ip(), + port: streamer_info.port(), + }; + println!( + "New Streamer: {:#?} | {:#?}", + streamer_info, + timer.elapsed() + ); + if relay_configs.tls { + let streamer_tcp_tls = acceptor.accept(streamer_tcp).await.unwrap(); + match tokio_tungstenite::accept_async(streamer_tcp_tls).await { + Ok(ws_stream) => { + tokio::spawn(streamer_stream( + new_streamer, + record_producer, + ws_stream, + timer, + )); + break; + } + Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), + } + } else { + match tokio_tungstenite::accept_async(streamer_tcp).await { + Ok(ws_stream) => { + tokio::spawn(streamer_stream( + new_streamer, + record_producer, + ws_stream, + timer, + )); + break; + } + Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), } - Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), } } Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val), @@ -80,22 +98,41 @@ pub async fn start() { } let (message_producer, message_consumer) = channel(BUFFER_LENGTH); let (buffered_producer, _) = channel(BUFFER_LENGTH); - tokio::spawn(message_organizer(message_producer.clone(), record_consumer)); - tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone())); + tokio::spawn(message_organizer( + message_producer.clone(), + record_consumer, + relay_configs.latency, + )); + tokio::spawn(buffer_layer( + message_consumer, + buffered_producer.clone(), + relay_configs.latency, + )); tokio::spawn(status_checker(buffered_producer.clone(), timer)); - while let Ok((tcp_stream, listener_info)) = socket.accept().await { - let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap(); - let ws_stream = tokio_tungstenite::accept_async(streamer_tcp_tls).await.unwrap(); - println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); + while let Ok((tcp_stream, listener_info)) = listener_socket.accept().await { let new_listener = Listener { ip: listener_info.ip(), port: listener_info.port(), }; - tokio::spawn(stream( - new_listener, - ws_stream, - buffered_producer.subscribe(), - )); + if relay_configs.tls { + let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap(); + let wss_stream = tokio_tungstenite::accept_async(streamer_tcp_tls) + .await + .unwrap(); + tokio::spawn(stream( + new_listener, + wss_stream, + buffered_producer.subscribe(), + )); + } else { + let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); + tokio::spawn(stream( + new_listener, + ws_stream, + buffered_producer.subscribe(), + )); + } + println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); } } async fn status_checker(buffered_producer: Sender, timer: Instant) { @@ -124,9 +161,13 @@ async fn status_checker(buffered_producer: Sender, timer: Instant) { } } } -async fn buffer_layer(mut message_consumer: Receiver, buffered_producer: Sender) { +async fn buffer_layer( + mut message_consumer: Receiver, + buffered_producer: Sender, + delay: u16, +) { loop { - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(delay.into())).await; while message_consumer.len() > 0 { match message_consumer.recv().await { Ok(message) => match buffered_producer.send(message) { @@ -138,10 +179,12 @@ async fn buffer_layer(mut message_consumer: Receiver, buffered_producer } } } -async fn streamer_stream( +async fn streamer_stream< + T: futures_util::Stream> + std::marker::Unpin, +>( streamer: Streamer, record_producer: Sender, - mut ws_stream: WebSocketStream>, + mut ws_stream: T, timer: Instant, ) { loop { @@ -173,6 +216,7 @@ async fn streamer_stream( async fn message_organizer( message_producer: Sender, mut record_consumer: Receiver, + delay: u16, ) { loop { match record_consumer.recv().await { @@ -182,12 +226,12 @@ async fn message_organizer( }, Err(_) => {} } - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(delay.into())).await; } } -async fn stream( +async fn stream + std::marker::Unpin>( listener: Listener, - mut ws_stream: WebSocketStream>, + mut ws_stream: T, mut buffered_consumer: Receiver, ) { while let Ok(message) = buffered_consumer.recv().await { diff --git a/back/src/utils.rs b/back/src/utils.rs index 8b13789..92ab30b 100644 --- a/back/src/utils.rs +++ b/back/src/utils.rs @@ -1 +1,27 @@ +use tokio::{fs::File, io::AsyncReadExt}; +use crate::Config; + +pub async fn get_config() -> Config { + let mut config_file = File::open("configs/relay_configs.txt").await.unwrap(); + let mut configs_unparsed = String::new(); + config_file + .read_to_string(&mut configs_unparsed) + .await + .unwrap(); + + let configs_parsed: Vec<&str> = configs_unparsed.split_terminator("\n").collect(); + let mut configs_cleaned: Vec<&str> = vec![]; + + for config in configs_parsed { + let dirty_configs: Vec<&str> = config.split(": ").collect(); + configs_cleaned.push(dirty_configs[1]); + } + Config { + axum_address: configs_cleaned[0].to_string(), + listener_address: configs_cleaned[1].to_string(), + streamer_address: configs_cleaned[2].to_string(), + latency: configs_cleaned[3].parse().unwrap(), + tls: configs_cleaned[4].parse().unwrap(), + } +} diff --git a/streamer/configs/streamer_configs.txt b/streamer/configs/streamer_configs.txt index f62f6e0..010b3bc 100644 --- a/streamer/configs/streamer_configs.txt +++ b/streamer/configs/streamer_configs.txt @@ -1,3 +1,4 @@ -address: tahinli.com.tr:2525 +address: 192.168.1.2:2525 quality: 6 -latency: 100 \ No newline at end of file +latency: 100 +tls: false \ No newline at end of file diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index 0399b4b..e424cc9 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -1,5 +1,6 @@ pub mod recording; pub mod streaming; +pub mod utils; pub const BUFFER_LENGTH: usize = 1000000; diff --git a/streamer/src/main.rs b/streamer/src/main.rs index ce307d8..019e531 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -1,7 +1,7 @@ use std::time::Duration; -use streamer::{recording::recording, streaming::start, Config, BUFFER_LENGTH}; -use tokio::{fs::File, io::AsyncReadExt, sync::broadcast::channel}; +use streamer::{recording::recording, streaming::start, utils::get_config, BUFFER_LENGTH}; +use tokio::sync::broadcast::channel; #[tokio::main] async fn main() { @@ -14,23 +14,3 @@ async fn main() { tokio::time::sleep(Duration::from_secs(1000000000)).await; } } - -async fn get_config() -> Config { - let mut config_file = File::open("configs/streamer_configs.txt").await.unwrap(); - let mut configs_unparsed = String::new(); - config_file.read_to_string(&mut configs_unparsed).await.unwrap(); - - let configs_parsed:Vec<&str> = configs_unparsed.split_terminator("\n").collect(); - let mut configs_cleaned: Vec<&str> = vec![]; - - for config in configs_parsed { - let dirty_configs: Vec<&str> = config.split(": ").collect(); - configs_cleaned.push(dirty_configs[1]); - } - Config { - address: configs_cleaned[0].to_string(), - quality: configs_cleaned[1].parse().unwrap(), - latency: configs_cleaned[2].parse().unwrap(), - tls: configs_cleaned[3].parse().unwrap(), - } -} diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 69158e1..077fabb 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -4,38 +4,55 @@ use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::sync::broadcast::{channel, Receiver, Sender}; -use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use tokio_tungstenite::tungstenite::Message; -use crate::BUFFER_LENGTH; +use crate::{Config, BUFFER_LENGTH}; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; -pub async fn start(sound_stream_consumer: Receiver) { - let connect_addr = "wss://tahinli.com.tr:2525"; - - let tls_client_config = rustls_platform_verifier::tls_config(); - let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config)); +pub async fn start(sound_stream_consumer: Receiver, streamer_config:Config) { + let connect_addr = + match streamer_config.tls { + true => format!("wss://{}", streamer_config.address), + false => format!("ws://{}", streamer_config.address), + }; let ws_stream; + + match streamer_config.tls { + true => { + let tls_client_config = rustls_platform_verifier::tls_config(); + let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config)); + match tokio_tungstenite::connect_async_tls_with_config( - connect_addr, + connect_addr.clone(), None, false, Some(tls_connector), ) .await { - Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, + Ok(wss_stream_connected) => ws_stream = wss_stream_connected.0, Err(_) => { return; } + } + }, + false => { + match tokio_tungstenite::connect_async(connect_addr.clone()).await { + Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, + Err(_) => { + return; + }, + } + }, } let (message_producer, message_consumer) = channel(BUFFER_LENGTH); println!("Connected to: {}", connect_addr); - tokio::spawn(message_organizer(message_producer, sound_stream_consumer)); + tokio::spawn(message_organizer(message_producer, sound_stream_consumer, streamer_config.quality, streamer_config.latency)); tokio::spawn(stream(ws_stream, message_consumer)); } -async fn message_organizer(message_producer: Sender, mut consumer: Receiver) { +async fn message_organizer(message_producer: Sender, mut consumer: Receiver, quality: u8, latency:u16) { loop { let mut messages: Vec = Vec::new(); let mut iteration = consumer.len(); @@ -53,7 +70,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece let _zero = charred.remove(1); let _point = charred.remove(1); } - charred.truncate(4); + charred.truncate(quality.into()); let mut single_data_packet: Vec = vec![]; for char in charred { let char_packet = char.to_string().as_bytes().to_vec(); @@ -89,12 +106,12 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece // message_producer.receiver_count() // ); } - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(latency.into())).await; } } -async fn stream( - mut ws_stream: WebSocketStream>, +async fn stream + std::marker::Unpin>( + mut ws_stream: T, mut message_consumer: Receiver, ) { while let Ok(message) = message_consumer.recv().await { diff --git a/streamer/src/utils.rs b/streamer/src/utils.rs new file mode 100644 index 0000000..85c0dc3 --- /dev/null +++ b/streamer/src/utils.rs @@ -0,0 +1,23 @@ +use tokio::{fs::File, io::AsyncReadExt}; + +use crate::Config; + +pub async fn get_config() -> Config { + let mut config_file = File::open("configs/streamer_configs.txt").await.unwrap(); + let mut configs_unparsed = String::new(); + config_file.read_to_string(&mut configs_unparsed).await.unwrap(); + + let configs_parsed:Vec<&str> = configs_unparsed.split_terminator("\n").collect(); + let mut configs_cleaned: Vec<&str> = vec![]; + + for config in configs_parsed { + let dirty_configs: Vec<&str> = config.split(": ").collect(); + configs_cleaned.push(dirty_configs[1]); + } + Config { + address: configs_cleaned[0].to_string(), + quality: configs_cleaned[1].parse().unwrap(), + latency: configs_cleaned[2].parse().unwrap(), + tls: configs_cleaned[3].parse().unwrap(), + } +} \ No newline at end of file