From 21d8781188f1500e1567df3ab84ee56c60be6fac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sat, 27 Apr 2024 15:36:28 +0300 Subject: [PATCH] feat: :lipstick: first gui for streamer fix: :ambulance: oneshot fail in listener socket --- back/src/streaming.rs | 7 ++++-- streamer/Cargo.toml | 1 + streamer/src/gui.rs | 52 +++++++++++++++++++++++++++++++++++++++ streamer/src/lib.rs | 2 ++ streamer/src/main.rs | 17 ++++--------- streamer/src/recording.rs | 2 +- streamer/src/streaming.rs | 2 +- 7 files changed, 67 insertions(+), 16 deletions(-) create mode 100644 streamer/src/gui.rs diff --git a/back/src/streaming.rs b/back/src/streaming.rs index ced0baa..ae17baa 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -64,8 +64,9 @@ pub async fn start(relay_configs: Config) { ip: "127.0.0.1".to_string().parse().unwrap(), port: 0000, }; - let mut is_streaming = false; + let mut is_streaming; loop { + is_streaming = false; match streamer_socket.accept().await { Ok((streamer_tcp, streamer_info)) => { new_streamer.ip = streamer_info.ip(); @@ -258,7 +259,9 @@ async fn status_checker( let cleaning_timer = Instant::now(); message_organizer_task.as_ref().unwrap().abort(); buffer_layer_task.as_ref().unwrap().abort(); - listener_socket_killer_producer.send(true).unwrap(); + if let Err(_) = listener_socket_killer_producer.send(true) { + eprintln!("Error: Cleaning | Socket Kill Failed, Receiver Dropped"); + } let mut listener_task_counter = 0; while listener_stream_tasks_receiver.len() > 0 { match listener_stream_tasks_receiver.recv().await { diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index dd8fc90..9a61ac1 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" brotli = "5.0.0" cpal = "0.15.3" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } +iced = { git = "https://github.com/iced-rs/iced", features = ["tokio"] } ringbuf = "0.3.3" rustls-pemfile = "2.1.2" rustls-platform-verifier = "0.2.0" diff --git a/streamer/src/gui.rs b/streamer/src/gui.rs new file mode 100644 index 0000000..0fdad88 --- /dev/null +++ b/streamer/src/gui.rs @@ -0,0 +1,52 @@ +use iced::{widget::{button, column, Column}, Command}; +use tokio::sync::broadcast::{channel, Sender}; + +use crate::{recording, streaming, utils::get_config, Config, BUFFER_LENGTH}; + +#[derive(Debug, Clone)] +pub enum Message { + StartStreaming, + ConfigLoad(Config), +} + +#[derive(Debug)] +pub struct Streamer { + config: Option, + sound_stream_producer:Sender, +} +impl Default for Streamer { + fn default() -> Self { + Self::new() + } +} + +impl Streamer { + fn new() -> Self { + Self { + config: None, + sound_stream_producer: channel(BUFFER_LENGTH).0, + } + } + pub fn update(&mut self, message:Message) { + match message { + Message::StartStreaming => { + tokio::spawn(streaming::connect(self.sound_stream_producer.subscribe(), self.config.clone().unwrap())); + tokio::spawn(recording::record(self.sound_stream_producer.clone())); + } + Message::ConfigLoad(config) => { + self.config = Some(config); + } + } + } + pub fn view(&self) -> Column { + column![ + button("Start Streaming").on_press(Message::StartStreaming) + ] + } + pub fn load_config() -> Command { + Command::perform(get_config(), Message::ConfigLoad) + } +} + + + diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index e424cc9..033b2d3 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -1,9 +1,11 @@ pub mod recording; pub mod streaming; pub mod utils; +pub mod gui; pub const BUFFER_LENGTH: usize = 1000000; +#[derive(Debug, Clone)] pub struct Config { pub address: String, pub quality: u8, diff --git a/streamer/src/main.rs b/streamer/src/main.rs index 019e531..b79fb5c 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -1,16 +1,9 @@ -use std::time::Duration; - -use streamer::{recording::recording, streaming::start, utils::get_config, BUFFER_LENGTH}; -use tokio::sync::broadcast::channel; +use streamer::gui::Streamer; #[tokio::main] -async fn main() { +async fn main() -> iced::Result{ println!("Hello, world!"); - let streamer_config = get_config().await; - let (sound_stream_producer, sound_stream_consumer) = channel(BUFFER_LENGTH); - tokio::spawn(recording(sound_stream_producer)); - tokio::spawn(start(sound_stream_consumer, streamer_config)); - loop { - tokio::time::sleep(Duration::from_secs(1000000000)).await; - } + iced::program("Streamer GUI", Streamer::update, Streamer::view) + .load(Streamer::load_config) + .run() } diff --git a/streamer/src/recording.rs b/streamer/src/recording.rs index 36edb2b..828a8a6 100644 --- a/streamer/src/recording.rs +++ b/streamer/src/recording.rs @@ -1,7 +1,7 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use tokio::sync::broadcast::Sender; -pub async fn recording(sound_stream_producer: Sender) { +pub async fn record(sound_stream_producer: Sender) { let host = cpal::default_host(); let input_device = host.default_input_device().unwrap(); diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 077fabb..2c26726 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -9,7 +9,7 @@ use tokio_tungstenite::tungstenite::Message; use crate::{Config, BUFFER_LENGTH}; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; -pub async fn start(sound_stream_consumer: Receiver, streamer_config:Config) { +pub async fn connect(sound_stream_consumer: Receiver, streamer_config:Config) { let connect_addr = match streamer_config.tls { true => format!("wss://{}", streamer_config.address),