feat: streamer: Disconnect from server

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-27 19:48:13 +03:00
parent 21d8781188
commit 6eb3e9b419
6 changed files with 117 additions and 55 deletions

View file

@ -3,56 +3,77 @@ use std::{io::Write, sync::Arc, time::Duration};
use brotli::CompressorWriter;
use futures_util::SinkExt;
use ringbuf::HeapRb;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::{
sync::broadcast::{channel, Receiver, Sender},
task::JoinHandle,
};
use tokio_tungstenite::tungstenite::Message;
use crate::{Config, BUFFER_LENGTH};
const MAX_TOLERATED_MESSAGE_COUNT: usize = 10;
pub async fn connect(sound_stream_consumer: Receiver<f32>, streamer_config:Config) {
let connect_addr =
match streamer_config.tls {
pub async fn connect(
sound_stream_consumer: Receiver<f32>,
streamer_config: Config,
mut stop_connection_consumer: Receiver<bool>,
) {
let connect_addr = match streamer_config.tls {
true => format!("wss://{}", streamer_config.address),
false => format!("ws://{}", streamer_config.address),
};
let ws_stream;
if let Err(_) = stop_connection_consumer.try_recv() {
let ws_stream;
match streamer_config.tls {
true => {
let tls_client_config = rustls_platform_verifier::tls_config();
let tls_connector =
tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config));
match streamer_config.tls {
true => {
let tls_client_config = rustls_platform_verifier::tls_config();
let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config));
match tokio_tungstenite::connect_async_tls_with_config(
connect_addr.clone(),
None,
false,
Some(tls_connector),
)
.await
{
Ok(wss_stream_connected) => ws_stream = wss_stream_connected.0,
Err(_) => {
return;
}
}
},
false => {
match tokio_tungstenite::connect_async(connect_addr.clone()).await {
match tokio_tungstenite::connect_async_tls_with_config(
connect_addr.clone(),
None,
false,
Some(tls_connector),
)
.await
{
Ok(wss_stream_connected) => ws_stream = wss_stream_connected.0,
Err(_) => {
return;
}
}
}
false => match tokio_tungstenite::connect_async(connect_addr.clone()).await {
Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0,
Err(_) => {
return;
},
}
},
}
},
}
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
println!("Connected to: {}", connect_addr);
let message_organizer_task = tokio::spawn(message_organizer(
message_producer,
sound_stream_consumer,
streamer_config.quality,
streamer_config.latency,
));
let stream_task = tokio::spawn(stream(ws_stream, message_consumer));
tokio::spawn(status_checker(
message_organizer_task,
stream_task,
stop_connection_consumer,
));
}
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
println!("Connected to: {}", connect_addr);
tokio::spawn(message_organizer(message_producer, sound_stream_consumer, streamer_config.quality, streamer_config.latency));
tokio::spawn(stream(ws_stream, message_consumer));
}
async fn message_organizer(message_producer: Sender<Message>, mut consumer: Receiver<f32>, quality: u8, latency:u16) {
async fn message_organizer(
message_producer: Sender<Message>,
mut consumer: Receiver<f32>,
quality: u8,
latency: u16,
) {
loop {
let mut messages: Vec<u8> = Vec::new();
let mut iteration = consumer.len();
@ -110,7 +131,7 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
}
}
async fn stream <T: futures_util::Sink<Message> + std::marker::Unpin>(
async fn stream<T: futures_util::Sink<Message> + std::marker::Unpin>(
mut ws_stream: T,
mut message_consumer: Receiver<Message>,
) {
@ -142,3 +163,16 @@ async fn stream <T: futures_util::Sink<Message> + std::marker::Unpin>(
}
}
}
async fn status_checker(
message_organizer_task: JoinHandle<()>,
stream_task: JoinHandle<()>,
mut stop_connection_consumer: Receiver<bool>,
) {
while let Err(_) = stop_connection_consumer.try_recv() {
tokio::time::sleep(Duration::from_secs(3)).await;
}
stream_task.abort();
message_organizer_task.abort();
println!("Cleaning Done: Streamer Disconnected");
}