From 6eb3e9b419f270699f8e01b97a7689644a55fa1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sat, 27 Apr 2024 19:48:13 +0300 Subject: [PATCH] feat: :sparkles: streamer: Disconnect from server --- streamer/src/gui.rs | 37 ++++++++++---- streamer/src/lib.rs | 2 +- streamer/src/main.rs | 6 +-- streamer/src/recording.rs | 14 +++-- streamer/src/streaming.rs | 104 +++++++++++++++++++++++++------------- streamer/src/utils.rs | 9 ++-- 6 files changed, 117 insertions(+), 55 deletions(-) diff --git a/streamer/src/gui.rs b/streamer/src/gui.rs index 0fdad88..e86946e 100644 --- a/streamer/src/gui.rs +++ b/streamer/src/gui.rs @@ -1,4 +1,7 @@ -use iced::{widget::{button, column, Column}, Command}; +use iced::{ + widget::{button, column, Column}, + Command, +}; use tokio::sync::broadcast::{channel, Sender}; use crate::{recording, streaming, utils::get_config, Config, BUFFER_LENGTH}; @@ -6,13 +9,16 @@ use crate::{recording, streaming, utils::get_config, Config, BUFFER_LENGTH}; #[derive(Debug, Clone)] pub enum Message { StartStreaming, + StopStreaming, ConfigLoad(Config), } #[derive(Debug)] pub struct Streamer { config: Option, - sound_stream_producer:Sender, + sound_stream_producer: Sender, + stop_connection_producer: Sender, + stop_recording_producer: Sender, } impl Default for Streamer { fn default() -> Self { @@ -25,13 +31,28 @@ impl Streamer { Self { config: None, sound_stream_producer: channel(BUFFER_LENGTH).0, + stop_connection_producer: channel(BUFFER_LENGTH).0, + stop_recording_producer: channel(BUFFER_LENGTH).0, } } - pub fn update(&mut self, message:Message) { + pub fn update(&mut self, message: Message) { match message { Message::StartStreaming => { - tokio::spawn(streaming::connect(self.sound_stream_producer.subscribe(), self.config.clone().unwrap())); - tokio::spawn(recording::record(self.sound_stream_producer.clone())); + println!("Start Stream"); + tokio::spawn(streaming::connect( + self.sound_stream_producer.subscribe(), + self.config.clone().unwrap(), + self.stop_connection_producer.subscribe(), + )); + tokio::spawn(recording::record( + self.sound_stream_producer.clone(), + self.stop_recording_producer.subscribe(), + )); + } + Message::StopStreaming => { + println!("Stop Stream"); + self.stop_connection_producer.send(true).unwrap(); + self.stop_recording_producer.send(true).unwrap(); } Message::ConfigLoad(config) => { self.config = Some(config); @@ -40,13 +61,11 @@ impl Streamer { } pub fn view(&self) -> Column { column![ - button("Start Streaming").on_press(Message::StartStreaming) + button("Start Streaming").on_press(Message::StartStreaming), + button("Stop Streaming").on_press(Message::StopStreaming), ] } pub fn load_config() -> Command { Command::perform(get_config(), Message::ConfigLoad) } } - - - diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index 033b2d3..8f73b77 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -1,7 +1,7 @@ +pub mod gui; pub mod recording; pub mod streaming; pub mod utils; -pub mod gui; pub const BUFFER_LENGTH: usize = 1000000; diff --git a/streamer/src/main.rs b/streamer/src/main.rs index b79fb5c..7a37c48 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -1,9 +1,9 @@ use streamer::gui::Streamer; #[tokio::main] -async fn main() -> iced::Result{ +async fn main() -> iced::Result { println!("Hello, world!"); iced::program("Streamer GUI", Streamer::update, Streamer::view) - .load(Streamer::load_config) - .run() + .load(Streamer::load_config) + .run() } diff --git a/streamer/src/recording.rs b/streamer/src/recording.rs index 828a8a6..57cde46 100644 --- a/streamer/src/recording.rs +++ b/streamer/src/recording.rs @@ -1,7 +1,10 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use tokio::sync::broadcast::Sender; +use tokio::sync::broadcast::{Receiver, Sender}; -pub async fn record(sound_stream_producer: Sender) { +pub async fn record( + sound_stream_producer: Sender, + mut stop_recording_consumer: Receiver, +) { let host = cpal::default_host(); let input_device = host.default_input_device().unwrap(); @@ -22,8 +25,11 @@ pub async fn record(sound_stream_producer: Sender) { input_stream.play().unwrap(); println!("Recording Started"); - std::thread::sleep(std::time::Duration::from_secs(1000000000)); - println!("DONE I HOPE"); + while let Err(_) = stop_recording_consumer.try_recv() { + std::thread::sleep(std::time::Duration::from_secs(1)); + } + input_stream.pause().unwrap(); + println!("Recording Stopped"); } fn err_fn(err: cpal::StreamError) { eprintln!("Something Happened: {}", err); diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 2c26726..b7e591e 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -3,56 +3,77 @@ use std::{io::Write, sync::Arc, time::Duration}; use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; -use tokio::sync::broadcast::{channel, Receiver, Sender}; +use tokio::{ + sync::broadcast::{channel, Receiver, Sender}, + task::JoinHandle, +}; use tokio_tungstenite::tungstenite::Message; use crate::{Config, BUFFER_LENGTH}; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; -pub async fn connect(sound_stream_consumer: Receiver, streamer_config:Config) { - let connect_addr = - match streamer_config.tls { +pub async fn connect( + sound_stream_consumer: Receiver, + streamer_config: Config, + mut stop_connection_consumer: Receiver, +) { + let connect_addr = match streamer_config.tls { true => format!("wss://{}", streamer_config.address), false => format!("ws://{}", streamer_config.address), }; - let ws_stream; + if let Err(_) = stop_connection_consumer.try_recv() { + let ws_stream; + match streamer_config.tls { + true => { + let tls_client_config = rustls_platform_verifier::tls_config(); + let tls_connector = + tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config)); - match streamer_config.tls { - true => { - let tls_client_config = rustls_platform_verifier::tls_config(); - let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config)); - - match tokio_tungstenite::connect_async_tls_with_config( - connect_addr.clone(), - None, - false, - Some(tls_connector), - ) - .await - { - Ok(wss_stream_connected) => ws_stream = wss_stream_connected.0, - Err(_) => { - return; - } - } - }, - false => { - match tokio_tungstenite::connect_async(connect_addr.clone()).await { + match tokio_tungstenite::connect_async_tls_with_config( + connect_addr.clone(), + None, + false, + Some(tls_connector), + ) + .await + { + Ok(wss_stream_connected) => ws_stream = wss_stream_connected.0, + Err(_) => { + return; + } + } + } + false => match tokio_tungstenite::connect_async(connect_addr.clone()).await { Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, Err(_) => { return; - }, - } - }, + } + }, + } + let (message_producer, message_consumer) = channel(BUFFER_LENGTH); + println!("Connected to: {}", connect_addr); + let message_organizer_task = tokio::spawn(message_organizer( + message_producer, + sound_stream_consumer, + streamer_config.quality, + streamer_config.latency, + )); + let stream_task = tokio::spawn(stream(ws_stream, message_consumer)); + tokio::spawn(status_checker( + message_organizer_task, + stream_task, + stop_connection_consumer, + )); } - let (message_producer, message_consumer) = channel(BUFFER_LENGTH); - println!("Connected to: {}", connect_addr); - tokio::spawn(message_organizer(message_producer, sound_stream_consumer, streamer_config.quality, streamer_config.latency)); - tokio::spawn(stream(ws_stream, message_consumer)); } -async fn message_organizer(message_producer: Sender, mut consumer: Receiver, quality: u8, latency:u16) { +async fn message_organizer( + message_producer: Sender, + mut consumer: Receiver, + quality: u8, + latency: u16, +) { loop { let mut messages: Vec = Vec::new(); let mut iteration = consumer.len(); @@ -110,7 +131,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece } } -async fn stream + std::marker::Unpin>( +async fn stream + std::marker::Unpin>( mut ws_stream: T, mut message_consumer: Receiver, ) { @@ -142,3 +163,16 @@ async fn stream + std::marker::Unpin>( } } } + +async fn status_checker( + message_organizer_task: JoinHandle<()>, + stream_task: JoinHandle<()>, + mut stop_connection_consumer: Receiver, +) { + while let Err(_) = stop_connection_consumer.try_recv() { + tokio::time::sleep(Duration::from_secs(3)).await; + } + stream_task.abort(); + message_organizer_task.abort(); + println!("Cleaning Done: Streamer Disconnected"); +} diff --git a/streamer/src/utils.rs b/streamer/src/utils.rs index 85c0dc3..6323894 100644 --- a/streamer/src/utils.rs +++ b/streamer/src/utils.rs @@ -5,9 +5,12 @@ use crate::Config; pub async fn get_config() -> Config { let mut config_file = File::open("configs/streamer_configs.txt").await.unwrap(); let mut configs_unparsed = String::new(); - config_file.read_to_string(&mut configs_unparsed).await.unwrap(); + config_file + .read_to_string(&mut configs_unparsed) + .await + .unwrap(); - let configs_parsed:Vec<&str> = configs_unparsed.split_terminator("\n").collect(); + let configs_parsed: Vec<&str> = configs_unparsed.split_terminator("\n").collect(); let mut configs_cleaned: Vec<&str> = vec![]; for config in configs_parsed { @@ -20,4 +23,4 @@ pub async fn get_config() -> Config { latency: configs_cleaned[2].parse().unwrap(), tls: configs_cleaned[3].parse().unwrap(), } -} \ No newline at end of file +}