diff --git a/streamer/src/gui.rs b/streamer/src/gui.rs index 93bd907..196f28d 100644 --- a/streamer/src/gui.rs +++ b/streamer/src/gui.rs @@ -1,27 +1,70 @@ use iced::{ - widget::{button, column, container, Container}, + widget::{container, row, Container}, Command, }; use tokio::sync::broadcast::{channel, Sender}; -use crate::{recording, streaming, utils::get_config, Config, BUFFER_LENGTH}; +use crate::{ + gui_utils::button_with_centered_text, recording, streaming, utils::get_config, Config, BUFFER_LENGTH +}; +#[derive(Debug, Clone)] +pub enum Event { + Connect, + Disconnect, + Record, + StopRecord, + PlayAudio, + StopAudio, + LoadConfig(Config), +} +#[derive(Debug, Clone)] + +pub enum State { + Connected, + Disconnected, + Recording, + StopRecording, + PlayingAudio, + StopAudio, +} #[derive(Debug, Clone)] pub enum Message { - StartStreaming, - StopStreaming, - ConfigLoad(Config), + Event(Event), + State(State), +} +#[derive(Debug)] +struct DataChannel { + sound_stream_sender: Sender, +} +#[derive(Debug)] +struct CommunicationChannel { + base_to_streaming: Sender, + streaming_to_base: Sender, + base_to_recording: Sender, + recording_to_base: Sender, + base_to_playing: Sender, + playing_to_base: Sender, +} +#[derive(Debug)] +enum Condition { + Active, + Loading, + Passive, } +#[derive(Debug)] +struct GUIStatus { + are_we_connect: Condition, + are_we_record: Condition, + are_we_play_audio: Condition, +} #[derive(Debug)] pub struct Streamer { config: Option, - sound_stream_producer: Sender, - stop_connection_producer: Sender, - stop_recording_producer: Sender, - connection_cleaning_status_producer: Sender, - are_we_streaming: bool, - are_we_recovering: bool, + data_channel: DataChannel, + communication_channel: CommunicationChannel, + gui_status: GUIStatus, } impl Default for Streamer { fn default() -> Self { @@ -33,81 +76,226 @@ impl Streamer { fn new() -> Self { Self { config: None, - sound_stream_producer: channel(BUFFER_LENGTH).0, - stop_connection_producer: channel(BUFFER_LENGTH).0, - stop_recording_producer: channel(BUFFER_LENGTH).0, - connection_cleaning_status_producer: channel(BUFFER_LENGTH).0, - are_we_streaming: false, - are_we_recovering: false, + data_channel: DataChannel { + sound_stream_sender: channel(BUFFER_LENGTH).0, + }, + communication_channel: CommunicationChannel { + base_to_streaming: channel(1).0, + streaming_to_base: channel(1).0, + base_to_recording: channel(1).0, + recording_to_base: channel(1).0, + base_to_playing: channel(1).0, + playing_to_base: channel(1).0, + }, + gui_status: GUIStatus { + are_we_connect: Condition::Passive, + are_we_record: Condition::Passive, + are_we_play_audio: Condition::Passive, + }, } } - pub fn update(&mut self, message: Message) { + pub fn update(&mut self, message: Message) -> Command { match message { - Message::StartStreaming => { - if !self.are_we_recovering && !self.are_we_streaming { - println!("Start Stream"); - self.are_we_recovering = true; - self.are_we_streaming = true; + Message::Event(event) => match event { + Event::Connect => { + println!("Connect"); + self.gui_status.are_we_connect = Condition::Loading; + let mut streaming_to_base_receiver = + self.communication_channel.streaming_to_base.subscribe(); tokio::spawn(streaming::connect( - self.sound_stream_producer.subscribe(), + self.data_channel.sound_stream_sender.subscribe(), self.config.clone().unwrap(), - self.stop_connection_producer.subscribe(), - self.connection_cleaning_status_producer.clone(), + self.communication_channel.base_to_streaming.subscribe(), + self.communication_channel.streaming_to_base.clone(), )); + Command::perform( + async move { + match streaming_to_base_receiver.recv().await { + Ok(_) => State::Connected, + Err(err_val) => { + eprintln!("Error: Communication | {}", err_val); + State::Disconnected + } + } + }, + Message::State, + ) + } + Event::Disconnect => { + println!("Disconnect"); + self.gui_status.are_we_connect = Condition::Loading; + let mut streaming_to_base_receiver = + self.communication_channel.streaming_to_base.subscribe(); + let _ = self.communication_channel.base_to_streaming.send(false); + Command::perform( + async move { + match streaming_to_base_receiver.recv().await { + Ok(_) => State::Disconnected, + Err(err_val) => { + eprintln!("Error: Communication | {}", err_val); + State::Connected + } + } + }, + Message::State, + ) + } + Event::Record => { + println!("Record"); + self.gui_status.are_we_record = Condition::Loading; + let mut recording_to_base_receiver = + self.communication_channel.recording_to_base.subscribe(); tokio::spawn(recording::record( - self.sound_stream_producer.clone(), - self.stop_recording_producer.subscribe(), + self.data_channel.sound_stream_sender.clone(), + self.communication_channel.base_to_recording.subscribe(), + self.communication_channel.recording_to_base.clone(), )); - self.are_we_recovering = false; + Command::perform( + async move { + match recording_to_base_receiver.recv().await { + Ok(_) => State::Recording, + Err(err_val) => { + eprintln!("Error: Communication | Streaming | {}", err_val); + State::StopRecording + } + } + }, + Message::State, + ) } - } - Message::StopStreaming => { - if !self.are_we_recovering && self.are_we_streaming { - println!("Stop Stream"); - self.are_we_recovering = true; - self.are_we_streaming = false; - let _ = self.connection_cleaning_status_producer.send(true); - let _ = self.stop_connection_producer.send(true); - let _ = self.stop_recording_producer.send(true); - while !self.connection_cleaning_status_producer.is_empty() {} - self.are_we_recovering = false; + Event::StopRecord => { + println!("Stop Record"); + self.gui_status.are_we_record = Condition::Loading; + let mut recording_to_base_receiver = + self.communication_channel.recording_to_base.subscribe(); + let _ = self.communication_channel.base_to_recording.send(false); + Command::perform( + async move { + match recording_to_base_receiver.recv().await { + Ok(_) => State::StopRecording, + Err(err_val) => { + eprintln!("Error: Communication | Recording | {}", err_val); + State::Recording + } + } + }, + Message::State, + ) } - } - Message::ConfigLoad(config) => { - self.config = Some(config); - } + Event::PlayAudio => { + println!("Play Audio"); + self.gui_status.are_we_play_audio = Condition::Loading; + let mut playing_to_base_receiver = + self.communication_channel.playing_to_base.subscribe(); + //tokio::spawn(future); + Command::perform( + async move { + match playing_to_base_receiver.recv().await { + Ok(_) => State::PlayingAudio, + Err(err_val) => { + eprint!("Error: Communication | Playing | {}", err_val); + State::StopAudio + } + } + }, + Message::State, + ) + } + Event::StopAudio => { + println!("Stop Audio"); + self.gui_status.are_we_play_audio = Condition::Loading; + let mut playing_to_base_receiver = + self.communication_channel.playing_to_base.subscribe(); + let _ = self.communication_channel.base_to_playing.send(false); + Command::perform( + async move { + match playing_to_base_receiver.recv().await { + Ok(_) => State::StopAudio, + Err(err_val) => { + eprint!("Error: Communication | Playing | {}", err_val); + State::PlayingAudio + } + } + }, + Message::State, + ) + } + Event::LoadConfig(config) => { + self.config = Some(config); + Command::none() + } + }, + Message::State(state) => match state { + State::Connected => { + self.gui_status.are_we_connect = Condition::Active; + Command::none() + } + State::Disconnected => { + self.gui_status.are_we_connect = Condition::Passive; + Command::none() + } + State::Recording => { + self.gui_status.are_we_record = Condition::Active; + Command::none() + } + State::StopRecording => { + self.gui_status.are_we_record = Condition::Passive; + Command::none() + } + State::PlayingAudio => { + self.gui_status.are_we_play_audio = Condition::Active; + Command::none() + } + State::StopAudio => { + self.gui_status.are_we_play_audio = Condition::Passive; + Command::none() + } + }, } } pub fn view(&self) -> Container { - let column = match self.are_we_streaming { - true => match self.are_we_recovering { - true => { - column![button("Stop Streaming").width(100),] - } - false => { - column![button("Stop Streaming") - .width(100) - .on_press(Message::StopStreaming),] - } - }, - false => match self.are_we_recovering { - true => { - column![button("Start Streaming").width(100),] - } - false => { - column![button("Start Streaming") - .width(100) - .on_press(Message::StartStreaming),] - } - }, + let connect_button = match self.gui_status.are_we_connect { + Condition::Active => { + button_with_centered_text("Disconnect").on_press(Message::Event(Event::Disconnect)) + } + Condition::Loading => button_with_centered_text("Processing"), + Condition::Passive => { + button_with_centered_text("Connect").on_press(Message::Event(Event::Connect)) + } }; - container(column) - .width(200) - .height(200) - .center_x() - .center_y() + + let record_button = match self.gui_status.are_we_record { + Condition::Active => { + button_with_centered_text("Stop Record").on_press(Message::Event(Event::StopRecord)) + } + Condition::Loading => button_with_centered_text("Processing"), + Condition::Passive => { + button_with_centered_text("Record").on_press(Message::Event(Event::Record)) + } + }; + + let play_audio_button = + match self.gui_status.are_we_play_audio { + Condition::Active => button_with_centered_text("Stop Audio") + .on_press(Message::Event(Event::StopAudio)), + Condition::Loading => button_with_centered_text("Processing"), + Condition::Passive => button_with_centered_text("Play Audio") + .on_press(Message::Event(Event::PlayAudio)), + }; + + let content = row![connect_button, record_button, play_audio_button] + .spacing(20) + .width(400) + .height(35); + container(content).height(300).center_x().center_y() } pub fn load_config() -> Command { - Command::perform(get_config(), Message::ConfigLoad) + Command::perform( + async move { + let config = get_config().await; + Event::LoadConfig(config) + }, + Message::Event, + ) } } diff --git a/streamer/src/gui_utils.rs b/streamer/src/gui_utils.rs new file mode 100644 index 0000000..1274a22 --- /dev/null +++ b/streamer/src/gui_utils.rs @@ -0,0 +1,17 @@ +use iced::{ + alignment, + widget::{button, text, Button}, + Length, +}; + +use crate::gui::Message; + +pub fn button_with_centered_text(txt: &'static str) -> Button<'static, Message> { + button( + text(txt) + .width(Length::Fill) + .horizontal_alignment(alignment::Horizontal::Center), + ) + .height(Length::Fill) + .width(Length::Fill) +} diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index 8f73b77..9993227 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -1,4 +1,5 @@ pub mod gui; +pub mod gui_utils; pub mod recording; pub mod streaming; pub mod utils; diff --git a/streamer/src/main.rs b/streamer/src/main.rs index 092d60c..5bb76c1 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -5,7 +5,7 @@ async fn main() -> iced::Result { println!("Hello, world!"); iced::program("Streamer GUI", Streamer::update, Streamer::view) .centered() - .window_size((250.0, 250.0)) + .window_size((350.0, 400.0)) .load(Streamer::load_config) .run() } diff --git a/streamer/src/recording.rs b/streamer/src/recording.rs index 57cde46..f0b7adc 100644 --- a/streamer/src/recording.rs +++ b/streamer/src/recording.rs @@ -2,9 +2,12 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use tokio::sync::broadcast::{Receiver, Sender}; pub async fn record( - sound_stream_producer: Sender, - mut stop_recording_consumer: Receiver, + sound_stream_sender: Sender, + mut base_to_recording: Receiver, + recording_to_base: Sender, ) { + let _ = recording_to_base.send(true); + let host = cpal::default_host(); let input_device = host.default_input_device().unwrap(); @@ -12,25 +15,31 @@ pub async fn record( let input_data_fn = move |data: &[f32], _: &cpal::InputCallbackInfo| { for &sample in data { - match sound_stream_producer.send(sample) { - Ok(_) => {} - Err(_) => {} + if sound_stream_sender.receiver_count() > 0 { + match sound_stream_sender.send(sample) { + Ok(_) => {} + Err(_) => {} + } } } }; - let input_stream = input_device .build_input_stream(&config, input_data_fn, err_fn, None) .unwrap(); - input_stream.play().unwrap(); println!("Recording Started"); - while let Err(_) = stop_recording_consumer.try_recv() { + tokio::spawn(let_the_base_know(recording_to_base.clone())); + while let Err(_) = base_to_recording.try_recv() { std::thread::sleep(std::time::Duration::from_secs(1)); } input_stream.pause().unwrap(); + tokio::spawn(let_the_base_know(recording_to_base.clone())); println!("Recording Stopped"); } fn err_fn(err: cpal::StreamError) { eprintln!("Something Happened: {}", err); } + +async fn let_the_base_know(recording_to_base: Sender) { + let _ = recording_to_base.send(true); +} diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 3065ea4..adcdf38 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -13,17 +13,17 @@ use crate::{Config, BUFFER_LENGTH}; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn connect( - sound_stream_consumer: Receiver, + sound_stream_receiver: Receiver, streamer_config: Config, - mut stop_connection_consumer: Receiver, - connection_cleaning_status_producer: Sender, + mut base_to_streaming: Receiver, + streaming_to_base: Sender, ) { let connect_addr = match streamer_config.tls { true => format!("wss://{}", streamer_config.address), false => format!("ws://{}", streamer_config.address), }; - if let Err(_) = stop_connection_consumer.try_recv() { + if let Err(_) = base_to_streaming.try_recv() { let ws_stream; match streamer_config.tls { true => { @@ -56,35 +56,36 @@ pub async fn connect( println!("Connected to: {}", connect_addr); let message_organizer_task = tokio::spawn(message_organizer( message_producer, - sound_stream_consumer, + sound_stream_receiver, streamer_config.quality, streamer_config.latency, )); let stream_task = tokio::spawn(stream(ws_stream, message_consumer)); + let _ = streaming_to_base.send(true); tokio::spawn(status_checker( message_organizer_task, stream_task, - stop_connection_consumer, - connection_cleaning_status_producer, + base_to_streaming, + streaming_to_base, )); } } async fn message_organizer( message_producer: Sender, - mut consumer: Receiver, + mut receiver: Receiver, quality: u8, latency: u16, ) { loop { let mut messages: Vec = Vec::new(); - let mut iteration = consumer.len(); + let mut iteration = receiver.len(); while iteration > 0 { iteration -= 1; - match consumer.recv().await { + match receiver.recv().await { Ok(single_data) => { let ring = HeapRb::::new(BUFFER_LENGTH); - let (mut producer, mut consumer) = ring.split(); + let (mut producer, mut receiver) = ring.split(); let mut charred: Vec = single_data.to_string().chars().collect(); if charred[0] == '0' { charred.insert(0, '+'); @@ -104,8 +105,8 @@ async fn message_organizer( for element in single_data_packet { producer.push(element).unwrap(); } - while !consumer.is_empty() { - messages.push(consumer.pop().unwrap()); + while !receiver.is_empty() { + messages.push(receiver.pop().unwrap()); } } Err(_) => {} @@ -169,18 +170,14 @@ async fn stream + std::marker::Unpin>( async fn status_checker( message_organizer_task: JoinHandle<()>, stream_task: JoinHandle<()>, - mut stop_connection_consumer: Receiver, - connection_cleaning_status_producer: Sender, + mut base_to_streaming: Receiver, + streaming_to_base: Sender, ) { - let mut connection_cleaning_status_consumer = connection_cleaning_status_producer.subscribe(); - connection_cleaning_status_producer.send(true).unwrap(); - while let Err(_) = stop_connection_consumer.try_recv() { + while let Err(_) = base_to_streaming.try_recv() { tokio::time::sleep(Duration::from_secs(3)).await; } stream_task.abort(); message_organizer_task.abort(); - while let Ok(_) = connection_cleaning_status_consumer.try_recv() {} - drop(connection_cleaning_status_consumer); - drop(connection_cleaning_status_producer); + let _ = streaming_to_base.send(true); println!("Cleaning Done: Streamer Disconnected"); }