feat: ⚗️ experiment: listener side tsl

feat:  tls for listener in the back
fix: 🐛 wrong dependency feature corrected
This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-20 15:25:45 +03:00
parent f693e25503
commit 25022e8634
5 changed files with 36 additions and 32 deletions

View file

@ -84,7 +84,8 @@ pub async fn start() {
tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone()));
tokio::spawn(status_checker(buffered_producer.clone(), timer));
while let Ok((tcp_stream, listener_info)) = socket.accept().await {
let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap();
let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap();
let ws_stream = tokio_tungstenite::accept_async(streamer_tcp_tls).await.unwrap();
println!("New Listener: {} | {:#?}", listener_info, timer.elapsed());
let new_listener = Listener {
ip: listener_info.ip(),
@ -186,7 +187,7 @@ async fn message_organizer(
}
async fn stream(
listener: Listener,
mut ws_stream: WebSocketStream<TcpStream>,
mut ws_stream: WebSocketStream<TlsStream<TcpStream>>,
mut buffered_consumer: Receiver<Message>,
) {
while let Ok(message) = buffered_consumer.recv().await {

View file

@ -12,11 +12,12 @@ cpal = { version = "0.15.3", features = ["wasm-bindgen"] }
dioxus = { version = "0.5.1", features = ["web"] }
futures-core = "0.3.30"
futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] }
#getrandom = { version = "0.2.14", features = ["js"] }
log = "0.4.21"
reqwest = { version = "0.12.2", features = ["json"] }
ringbuf = "0.3.3"
#rustls-platform-verifier = "0.2.0"
serde = { version = "1.0.197", features = ["derive"] }
tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] }
tokio-tungstenite-wasm = "0.3.1"
tokio-tungstenite-wasm = { version = "0.3.1", features = ["rustls-tls-webpki-roots"] }
tokio_with_wasm = "0.4.3"
wasm-logger = "0.2.0"

View file

@ -1,15 +1,12 @@
use std::{io::Write, mem::MaybeUninit, sync::Arc};
use brotli::DecompressorWriter;
use dioxus::{
prelude::spawn,
signals::{Signal, Writable},
};
use futures_util::{stream::SplitStream, SinkExt, StreamExt};
use futures_util::StreamExt;
use ringbuf::{HeapRb, Producer, SharedRb};
use tokio_tungstenite_wasm::{Message, WebSocketStream};
use std::{io::Write, mem::MaybeUninit, sync::Arc};
use crate::{listening::listen_podcast, BUFFER_LENGTH};
@ -19,11 +16,15 @@ pub async fn start_listening(
) {
if is_listening() {
log::info!("Trying Sir");
let connect_addr = "ws://192.168.1.2:2424";
let (mut stream_producer, stream_consumer);
match tokio_tungstenite_wasm::connect(connect_addr).await {
Ok(ws_stream) => (stream_producer, stream_consumer) = ws_stream.split(),
let connect_addr = "wss://tahinli.com.tr:2424";
let ws_stream: tokio_tungstenite_wasm::WebSocketStream;
match tokio_tungstenite_wasm::connect(
connect_addr,
)
.await
{
Ok(ws_stream_connected) => ws_stream = ws_stream_connected,
Err(_) => {
is_listening.set(false);
return;
@ -34,7 +35,7 @@ pub async fn start_listening(
let (producer, consumer) = ring.split();
let _sound_stream_task = spawn({
async move {
sound_stream(is_listening, stream_consumer, producer).await;
sound_stream(is_listening, ws_stream, producer).await;
is_listening.set(false);
is_maintaining.set((false, is_maintaining().1));
}
@ -44,7 +45,6 @@ pub async fn start_listening(
listen_podcast(is_listening, consumer).await;
is_listening.set(false);
//stream_producer.send("Disconnect ME".into()).await.unwrap();
stream_producer.close().await.unwrap();
is_maintaining.set((is_maintaining().0, false));
}
});
@ -53,19 +53,15 @@ pub async fn start_listening(
pub async fn sound_stream(
is_listening: Signal<bool>,
mut stream_consumer: SplitStream<WebSocketStream>,
mut ws_stream: tokio_tungstenite_wasm::WebSocketStream,
mut producer: Producer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>,
) {
log::info!("Attention! We need cables");
while let Some(message_with_question) = stream_consumer.next().await {
while let Some(message_with_question) = ws_stream.next().await {
if is_listening() {
//log::info!("{}", message_with_question.unwrap().len());
let mut data: Vec<u8> = vec![];
if let Message::Binary(message) = message_with_question.unwrap() {
data = message;
}
let data: Vec<u8> = message_with_question.unwrap().into();
let mut decompression_writer = DecompressorWriter::new(vec![], BUFFER_LENGTH);
if let Err(err_val) = decompression_writer.write_all(&data) {
log::error!("Error: Decompression | {}", err_val);

View file

@ -14,4 +14,4 @@ rustls-pemfile = "2.1.2"
rustls-platform-verifier = "0.2.0"
tokio = { version = "1.36.0", features = ["full"] }
tokio-rustls = "0.25.0"
tokio-tungstenite = { version = "0.21.0", features = ["__rustls-tls"] }
tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-webpki-roots"] }

View file

@ -4,20 +4,26 @@ use brotli::CompressorWriter;
use futures_util::SinkExt;
use ringbuf::HeapRb;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio_tungstenite::{tungstenite::Message, Connector, WebSocketStream};
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
use crate::BUFFER_LENGTH;
const MAX_TOLERATED_MESSAGE_COUNT: usize = 10;
pub async fn start(sound_stream_consumer: Receiver<f32>) {
let connect_addr = "wss://tahinli.com.tr:2525";
let config = rustls_platform_verifier::tls_config();
let connector = Connector::Rustls(Arc::new(config));
let tls_client_config = rustls_platform_verifier::tls_config();
let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config));
let ws_stream;
match tokio_tungstenite::connect_async_tls_with_config(connect_addr, None, false, Some(connector)).await {
match tokio_tungstenite::connect_async_tls_with_config(
connect_addr,
None,
false,
Some(tls_connector),
)
.await
{
Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0,
Err(_) => {
return;