diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 86ee9c5..4baa747 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -84,7 +84,8 @@ pub async fn start() { tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone())); tokio::spawn(status_checker(buffered_producer.clone(), timer)); while let Ok((tcp_stream, listener_info)) = socket.accept().await { - let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); + let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap(); + let ws_stream = tokio_tungstenite::accept_async(streamer_tcp_tls).await.unwrap(); println!("New Listener: {} | {:#?}", listener_info, timer.elapsed()); let new_listener = Listener { ip: listener_info.ip(), @@ -186,7 +187,7 @@ async fn message_organizer( } async fn stream( listener: Listener, - mut ws_stream: WebSocketStream, + mut ws_stream: WebSocketStream>, mut buffered_consumer: Receiver, ) { while let Ok(message) = buffered_consumer.recv().await { diff --git a/front/Cargo.toml b/front/Cargo.toml index fefac4b..35745c9 100644 --- a/front/Cargo.toml +++ b/front/Cargo.toml @@ -12,11 +12,12 @@ cpal = { version = "0.15.3", features = ["wasm-bindgen"] } dioxus = { version = "0.5.1", features = ["web"] } futures-core = "0.3.30" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } +#getrandom = { version = "0.2.14", features = ["js"] } log = "0.4.21" reqwest = { version = "0.12.2", features = ["json"] } ringbuf = "0.3.3" +#rustls-platform-verifier = "0.2.0" serde = { version = "1.0.197", features = ["derive"] } -tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } -tokio-tungstenite-wasm = "0.3.1" +tokio-tungstenite-wasm = { version = "0.3.1", features = ["rustls-tls-webpki-roots"] } tokio_with_wasm = "0.4.3" wasm-logger = "0.2.0" diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 0eb91de..f1dfbf0 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -1,15 +1,12 @@ -use std::{io::Write, mem::MaybeUninit, sync::Arc}; - use brotli::DecompressorWriter; - use dioxus::{ prelude::spawn, signals::{Signal, Writable}, }; -use futures_util::{stream::SplitStream, SinkExt, StreamExt}; - +use futures_util::StreamExt; use ringbuf::{HeapRb, Producer, SharedRb}; -use tokio_tungstenite_wasm::{Message, WebSocketStream}; +use std::{io::Write, mem::MaybeUninit, sync::Arc}; + use crate::{listening::listen_podcast, BUFFER_LENGTH}; @@ -19,11 +16,15 @@ pub async fn start_listening( ) { if is_listening() { log::info!("Trying Sir"); - let connect_addr = "ws://192.168.1.2:2424"; - - let (mut stream_producer, stream_consumer); - match tokio_tungstenite_wasm::connect(connect_addr).await { - Ok(ws_stream) => (stream_producer, stream_consumer) = ws_stream.split(), + let connect_addr = "wss://tahinli.com.tr:2424"; + + let ws_stream: tokio_tungstenite_wasm::WebSocketStream; + match tokio_tungstenite_wasm::connect( + connect_addr, + ) + .await + { + Ok(ws_stream_connected) => ws_stream = ws_stream_connected, Err(_) => { is_listening.set(false); return; @@ -34,7 +35,7 @@ pub async fn start_listening( let (producer, consumer) = ring.split(); let _sound_stream_task = spawn({ async move { - sound_stream(is_listening, stream_consumer, producer).await; + sound_stream(is_listening, ws_stream, producer).await; is_listening.set(false); is_maintaining.set((false, is_maintaining().1)); } @@ -44,7 +45,6 @@ pub async fn start_listening( listen_podcast(is_listening, consumer).await; is_listening.set(false); //stream_producer.send("Disconnect ME".into()).await.unwrap(); - stream_producer.close().await.unwrap(); is_maintaining.set((is_maintaining().0, false)); } }); @@ -53,19 +53,15 @@ pub async fn start_listening( pub async fn sound_stream( is_listening: Signal, - mut stream_consumer: SplitStream, + mut ws_stream: tokio_tungstenite_wasm::WebSocketStream, mut producer: Producer>>>>, ) { log::info!("Attention! We need cables"); - while let Some(message_with_question) = stream_consumer.next().await { + while let Some(message_with_question) = ws_stream.next().await { if is_listening() { //log::info!("{}", message_with_question.unwrap().len()); - let mut data: Vec = vec![]; - if let Message::Binary(message) = message_with_question.unwrap() { - data = message; - } - + let data: Vec = message_with_question.unwrap().into(); let mut decompression_writer = DecompressorWriter::new(vec![], BUFFER_LENGTH); if let Err(err_val) = decompression_writer.write_all(&data) { log::error!("Error: Decompression | {}", err_val); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 1501eec..dd8fc90 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -14,4 +14,4 @@ rustls-pemfile = "2.1.2" rustls-platform-verifier = "0.2.0" tokio = { version = "1.36.0", features = ["full"] } tokio-rustls = "0.25.0" -tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] } +tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-webpki-roots"] } diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 3bb9ed1..69158e1 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -4,20 +4,26 @@ use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::sync::broadcast::{channel, Receiver, Sender}; -use tokio_tungstenite::{tungstenite::Message, Connector, WebSocketStream}; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use crate::BUFFER_LENGTH; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start(sound_stream_consumer: Receiver) { let connect_addr = "wss://tahinli.com.tr:2525"; - let config = rustls_platform_verifier::tls_config(); - - - let connector = Connector::Rustls(Arc::new(config)); + + let tls_client_config = rustls_platform_verifier::tls_config(); + let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config)); + let ws_stream; - - match tokio_tungstenite::connect_async_tls_with_config(connect_addr, None, false, Some(connector)).await { + match tokio_tungstenite::connect_async_tls_with_config( + connect_addr, + None, + false, + Some(tls_connector), + ) + .await + { Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, Err(_) => { return;