refactor: ♻️ whole structure of the streamer gui

feat:  Connect, Record, Play Audio Seperated
This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-28 18:58:01 +03:00
parent c5df598338
commit 99a03dbbc7
6 changed files with 313 additions and 101 deletions

View file

@ -1,27 +1,70 @@
use iced::{ use iced::{
widget::{button, column, container, Container}, widget::{container, row, Container},
Command, Command,
}; };
use tokio::sync::broadcast::{channel, Sender}; use tokio::sync::broadcast::{channel, Sender};
use crate::{recording, streaming, utils::get_config, Config, BUFFER_LENGTH}; use crate::{
gui_utils::button_with_centered_text, recording, streaming, utils::get_config, Config, BUFFER_LENGTH
};
#[derive(Debug, Clone)]
pub enum Event {
Connect,
Disconnect,
Record,
StopRecord,
PlayAudio,
StopAudio,
LoadConfig(Config),
}
#[derive(Debug, Clone)]
pub enum State {
Connected,
Disconnected,
Recording,
StopRecording,
PlayingAudio,
StopAudio,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Message { pub enum Message {
StartStreaming, Event(Event),
StopStreaming, State(State),
ConfigLoad(Config), }
#[derive(Debug)]
struct DataChannel {
sound_stream_sender: Sender<f32>,
}
#[derive(Debug)]
struct CommunicationChannel {
base_to_streaming: Sender<bool>,
streaming_to_base: Sender<bool>,
base_to_recording: Sender<bool>,
recording_to_base: Sender<bool>,
base_to_playing: Sender<bool>,
playing_to_base: Sender<bool>,
}
#[derive(Debug)]
enum Condition {
Active,
Loading,
Passive,
} }
#[derive(Debug)]
struct GUIStatus {
are_we_connect: Condition,
are_we_record: Condition,
are_we_play_audio: Condition,
}
#[derive(Debug)] #[derive(Debug)]
pub struct Streamer { pub struct Streamer {
config: Option<Config>, config: Option<Config>,
sound_stream_producer: Sender<f32>, data_channel: DataChannel,
stop_connection_producer: Sender<bool>, communication_channel: CommunicationChannel,
stop_recording_producer: Sender<bool>, gui_status: GUIStatus,
connection_cleaning_status_producer: Sender<bool>,
are_we_streaming: bool,
are_we_recovering: bool,
} }
impl Default for Streamer { impl Default for Streamer {
fn default() -> Self { fn default() -> Self {
@ -33,81 +76,226 @@ impl Streamer {
fn new() -> Self { fn new() -> Self {
Self { Self {
config: None, config: None,
sound_stream_producer: channel(BUFFER_LENGTH).0, data_channel: DataChannel {
stop_connection_producer: channel(BUFFER_LENGTH).0, sound_stream_sender: channel(BUFFER_LENGTH).0,
stop_recording_producer: channel(BUFFER_LENGTH).0, },
connection_cleaning_status_producer: channel(BUFFER_LENGTH).0, communication_channel: CommunicationChannel {
are_we_streaming: false, base_to_streaming: channel(1).0,
are_we_recovering: false, streaming_to_base: channel(1).0,
base_to_recording: channel(1).0,
recording_to_base: channel(1).0,
base_to_playing: channel(1).0,
playing_to_base: channel(1).0,
},
gui_status: GUIStatus {
are_we_connect: Condition::Passive,
are_we_record: Condition::Passive,
are_we_play_audio: Condition::Passive,
},
} }
} }
pub fn update(&mut self, message: Message) { pub fn update(&mut self, message: Message) -> Command<Message> {
match message { match message {
Message::StartStreaming => { Message::Event(event) => match event {
if !self.are_we_recovering && !self.are_we_streaming { Event::Connect => {
println!("Start Stream"); println!("Connect");
self.are_we_recovering = true; self.gui_status.are_we_connect = Condition::Loading;
self.are_we_streaming = true; let mut streaming_to_base_receiver =
self.communication_channel.streaming_to_base.subscribe();
tokio::spawn(streaming::connect( tokio::spawn(streaming::connect(
self.sound_stream_producer.subscribe(), self.data_channel.sound_stream_sender.subscribe(),
self.config.clone().unwrap(), self.config.clone().unwrap(),
self.stop_connection_producer.subscribe(), self.communication_channel.base_to_streaming.subscribe(),
self.connection_cleaning_status_producer.clone(), self.communication_channel.streaming_to_base.clone(),
)); ));
Command::perform(
async move {
match streaming_to_base_receiver.recv().await {
Ok(_) => State::Connected,
Err(err_val) => {
eprintln!("Error: Communication | {}", err_val);
State::Disconnected
}
}
},
Message::State,
)
}
Event::Disconnect => {
println!("Disconnect");
self.gui_status.are_we_connect = Condition::Loading;
let mut streaming_to_base_receiver =
self.communication_channel.streaming_to_base.subscribe();
let _ = self.communication_channel.base_to_streaming.send(false);
Command::perform(
async move {
match streaming_to_base_receiver.recv().await {
Ok(_) => State::Disconnected,
Err(err_val) => {
eprintln!("Error: Communication | {}", err_val);
State::Connected
}
}
},
Message::State,
)
}
Event::Record => {
println!("Record");
self.gui_status.are_we_record = Condition::Loading;
let mut recording_to_base_receiver =
self.communication_channel.recording_to_base.subscribe();
tokio::spawn(recording::record( tokio::spawn(recording::record(
self.sound_stream_producer.clone(), self.data_channel.sound_stream_sender.clone(),
self.stop_recording_producer.subscribe(), self.communication_channel.base_to_recording.subscribe(),
self.communication_channel.recording_to_base.clone(),
)); ));
self.are_we_recovering = false; Command::perform(
async move {
match recording_to_base_receiver.recv().await {
Ok(_) => State::Recording,
Err(err_val) => {
eprintln!("Error: Communication | Streaming | {}", err_val);
State::StopRecording
}
}
},
Message::State,
)
} }
} Event::StopRecord => {
Message::StopStreaming => { println!("Stop Record");
if !self.are_we_recovering && self.are_we_streaming { self.gui_status.are_we_record = Condition::Loading;
println!("Stop Stream"); let mut recording_to_base_receiver =
self.are_we_recovering = true; self.communication_channel.recording_to_base.subscribe();
self.are_we_streaming = false; let _ = self.communication_channel.base_to_recording.send(false);
let _ = self.connection_cleaning_status_producer.send(true); Command::perform(
let _ = self.stop_connection_producer.send(true); async move {
let _ = self.stop_recording_producer.send(true); match recording_to_base_receiver.recv().await {
while !self.connection_cleaning_status_producer.is_empty() {} Ok(_) => State::StopRecording,
self.are_we_recovering = false; Err(err_val) => {
eprintln!("Error: Communication | Recording | {}", err_val);
State::Recording
}
}
},
Message::State,
)
} }
} Event::PlayAudio => {
Message::ConfigLoad(config) => { println!("Play Audio");
self.config = Some(config); self.gui_status.are_we_play_audio = Condition::Loading;
} let mut playing_to_base_receiver =
self.communication_channel.playing_to_base.subscribe();
//tokio::spawn(future);
Command::perform(
async move {
match playing_to_base_receiver.recv().await {
Ok(_) => State::PlayingAudio,
Err(err_val) => {
eprint!("Error: Communication | Playing | {}", err_val);
State::StopAudio
}
}
},
Message::State,
)
}
Event::StopAudio => {
println!("Stop Audio");
self.gui_status.are_we_play_audio = Condition::Loading;
let mut playing_to_base_receiver =
self.communication_channel.playing_to_base.subscribe();
let _ = self.communication_channel.base_to_playing.send(false);
Command::perform(
async move {
match playing_to_base_receiver.recv().await {
Ok(_) => State::StopAudio,
Err(err_val) => {
eprint!("Error: Communication | Playing | {}", err_val);
State::PlayingAudio
}
}
},
Message::State,
)
}
Event::LoadConfig(config) => {
self.config = Some(config);
Command::none()
}
},
Message::State(state) => match state {
State::Connected => {
self.gui_status.are_we_connect = Condition::Active;
Command::none()
}
State::Disconnected => {
self.gui_status.are_we_connect = Condition::Passive;
Command::none()
}
State::Recording => {
self.gui_status.are_we_record = Condition::Active;
Command::none()
}
State::StopRecording => {
self.gui_status.are_we_record = Condition::Passive;
Command::none()
}
State::PlayingAudio => {
self.gui_status.are_we_play_audio = Condition::Active;
Command::none()
}
State::StopAudio => {
self.gui_status.are_we_play_audio = Condition::Passive;
Command::none()
}
},
} }
} }
pub fn view(&self) -> Container<Message> { pub fn view(&self) -> Container<Message> {
let column = match self.are_we_streaming { let connect_button = match self.gui_status.are_we_connect {
true => match self.are_we_recovering { Condition::Active => {
true => { button_with_centered_text("Disconnect").on_press(Message::Event(Event::Disconnect))
column![button("Stop Streaming").width(100),] }
} Condition::Loading => button_with_centered_text("Processing"),
false => { Condition::Passive => {
column![button("Stop Streaming") button_with_centered_text("Connect").on_press(Message::Event(Event::Connect))
.width(100) }
.on_press(Message::StopStreaming),]
}
},
false => match self.are_we_recovering {
true => {
column![button("Start Streaming").width(100),]
}
false => {
column![button("Start Streaming")
.width(100)
.on_press(Message::StartStreaming),]
}
},
}; };
container(column)
.width(200) let record_button = match self.gui_status.are_we_record {
.height(200) Condition::Active => {
.center_x() button_with_centered_text("Stop Record").on_press(Message::Event(Event::StopRecord))
.center_y() }
Condition::Loading => button_with_centered_text("Processing"),
Condition::Passive => {
button_with_centered_text("Record").on_press(Message::Event(Event::Record))
}
};
let play_audio_button =
match self.gui_status.are_we_play_audio {
Condition::Active => button_with_centered_text("Stop Audio")
.on_press(Message::Event(Event::StopAudio)),
Condition::Loading => button_with_centered_text("Processing"),
Condition::Passive => button_with_centered_text("Play Audio")
.on_press(Message::Event(Event::PlayAudio)),
};
let content = row![connect_button, record_button, play_audio_button]
.spacing(20)
.width(400)
.height(35);
container(content).height(300).center_x().center_y()
} }
pub fn load_config() -> Command<Message> { pub fn load_config() -> Command<Message> {
Command::perform(get_config(), Message::ConfigLoad) Command::perform(
async move {
let config = get_config().await;
Event::LoadConfig(config)
},
Message::Event,
)
} }
} }

17
streamer/src/gui_utils.rs Normal file
View file

@ -0,0 +1,17 @@
use iced::{
alignment,
widget::{button, text, Button},
Length,
};
use crate::gui::Message;
pub fn button_with_centered_text(txt: &'static str) -> Button<'static, Message> {
button(
text(txt)
.width(Length::Fill)
.horizontal_alignment(alignment::Horizontal::Center),
)
.height(Length::Fill)
.width(Length::Fill)
}

View file

@ -1,4 +1,5 @@
pub mod gui; pub mod gui;
pub mod gui_utils;
pub mod recording; pub mod recording;
pub mod streaming; pub mod streaming;
pub mod utils; pub mod utils;

View file

@ -5,7 +5,7 @@ async fn main() -> iced::Result {
println!("Hello, world!"); println!("Hello, world!");
iced::program("Streamer GUI", Streamer::update, Streamer::view) iced::program("Streamer GUI", Streamer::update, Streamer::view)
.centered() .centered()
.window_size((250.0, 250.0)) .window_size((350.0, 400.0))
.load(Streamer::load_config) .load(Streamer::load_config)
.run() .run()
} }

View file

@ -2,9 +2,12 @@ use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::broadcast::{Receiver, Sender};
pub async fn record( pub async fn record(
sound_stream_producer: Sender<f32>, sound_stream_sender: Sender<f32>,
mut stop_recording_consumer: Receiver<bool>, mut base_to_recording: Receiver<bool>,
recording_to_base: Sender<bool>,
) { ) {
let _ = recording_to_base.send(true);
let host = cpal::default_host(); let host = cpal::default_host();
let input_device = host.default_input_device().unwrap(); let input_device = host.default_input_device().unwrap();
@ -12,25 +15,31 @@ pub async fn record(
let input_data_fn = move |data: &[f32], _: &cpal::InputCallbackInfo| { let input_data_fn = move |data: &[f32], _: &cpal::InputCallbackInfo| {
for &sample in data { for &sample in data {
match sound_stream_producer.send(sample) { if sound_stream_sender.receiver_count() > 0 {
Ok(_) => {} match sound_stream_sender.send(sample) {
Err(_) => {} Ok(_) => {}
Err(_) => {}
}
} }
} }
}; };
let input_stream = input_device let input_stream = input_device
.build_input_stream(&config, input_data_fn, err_fn, None) .build_input_stream(&config, input_data_fn, err_fn, None)
.unwrap(); .unwrap();
input_stream.play().unwrap(); input_stream.play().unwrap();
println!("Recording Started"); println!("Recording Started");
while let Err(_) = stop_recording_consumer.try_recv() { tokio::spawn(let_the_base_know(recording_to_base.clone()));
while let Err(_) = base_to_recording.try_recv() {
std::thread::sleep(std::time::Duration::from_secs(1)); std::thread::sleep(std::time::Duration::from_secs(1));
} }
input_stream.pause().unwrap(); input_stream.pause().unwrap();
tokio::spawn(let_the_base_know(recording_to_base.clone()));
println!("Recording Stopped"); println!("Recording Stopped");
} }
fn err_fn(err: cpal::StreamError) { fn err_fn(err: cpal::StreamError) {
eprintln!("Something Happened: {}", err); eprintln!("Something Happened: {}", err);
} }
async fn let_the_base_know(recording_to_base: Sender<bool>) {
let _ = recording_to_base.send(true);
}

View file

@ -13,17 +13,17 @@ use crate::{Config, BUFFER_LENGTH};
const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10;
pub async fn connect( pub async fn connect(
sound_stream_consumer: Receiver<f32>, sound_stream_receiver: Receiver<f32>,
streamer_config: Config, streamer_config: Config,
mut stop_connection_consumer: Receiver<bool>, mut base_to_streaming: Receiver<bool>,
connection_cleaning_status_producer: Sender<bool>, streaming_to_base: Sender<bool>,
) { ) {
let connect_addr = match streamer_config.tls { let connect_addr = match streamer_config.tls {
true => format!("wss://{}", streamer_config.address), true => format!("wss://{}", streamer_config.address),
false => format!("ws://{}", streamer_config.address), false => format!("ws://{}", streamer_config.address),
}; };
if let Err(_) = stop_connection_consumer.try_recv() { if let Err(_) = base_to_streaming.try_recv() {
let ws_stream; let ws_stream;
match streamer_config.tls { match streamer_config.tls {
true => { true => {
@ -56,35 +56,36 @@ pub async fn connect(
println!("Connected to: {}", connect_addr); println!("Connected to: {}", connect_addr);
let message_organizer_task = tokio::spawn(message_organizer( let message_organizer_task = tokio::spawn(message_organizer(
message_producer, message_producer,
sound_stream_consumer, sound_stream_receiver,
streamer_config.quality, streamer_config.quality,
streamer_config.latency, streamer_config.latency,
)); ));
let stream_task = tokio::spawn(stream(ws_stream, message_consumer)); let stream_task = tokio::spawn(stream(ws_stream, message_consumer));
let _ = streaming_to_base.send(true);
tokio::spawn(status_checker( tokio::spawn(status_checker(
message_organizer_task, message_organizer_task,
stream_task, stream_task,
stop_connection_consumer, base_to_streaming,
connection_cleaning_status_producer, streaming_to_base,
)); ));
} }
} }
async fn message_organizer( async fn message_organizer(
message_producer: Sender<Message>, message_producer: Sender<Message>,
mut consumer: Receiver<f32>, mut receiver: Receiver<f32>,
quality: u8, quality: u8,
latency: u16, latency: u16,
) { ) {
loop { loop {
let mut messages: Vec<u8> = Vec::new(); let mut messages: Vec<u8> = Vec::new();
let mut iteration = consumer.len(); let mut iteration = receiver.len();
while iteration > 0 { while iteration > 0 {
iteration -= 1; iteration -= 1;
match consumer.recv().await { match receiver.recv().await {
Ok(single_data) => { Ok(single_data) => {
let ring = HeapRb::<u8>::new(BUFFER_LENGTH); let ring = HeapRb::<u8>::new(BUFFER_LENGTH);
let (mut producer, mut consumer) = ring.split(); let (mut producer, mut receiver) = ring.split();
let mut charred: Vec<char> = single_data.to_string().chars().collect(); let mut charred: Vec<char> = single_data.to_string().chars().collect();
if charred[0] == '0' { if charred[0] == '0' {
charred.insert(0, '+'); charred.insert(0, '+');
@ -104,8 +105,8 @@ async fn message_organizer(
for element in single_data_packet { for element in single_data_packet {
producer.push(element).unwrap(); producer.push(element).unwrap();
} }
while !consumer.is_empty() { while !receiver.is_empty() {
messages.push(consumer.pop().unwrap()); messages.push(receiver.pop().unwrap());
} }
} }
Err(_) => {} Err(_) => {}
@ -169,18 +170,14 @@ async fn stream<T: futures_util::Sink<Message> + std::marker::Unpin>(
async fn status_checker( async fn status_checker(
message_organizer_task: JoinHandle<()>, message_organizer_task: JoinHandle<()>,
stream_task: JoinHandle<()>, stream_task: JoinHandle<()>,
mut stop_connection_consumer: Receiver<bool>, mut base_to_streaming: Receiver<bool>,
connection_cleaning_status_producer: Sender<bool>, streaming_to_base: Sender<bool>,
) { ) {
let mut connection_cleaning_status_consumer = connection_cleaning_status_producer.subscribe(); while let Err(_) = base_to_streaming.try_recv() {
connection_cleaning_status_producer.send(true).unwrap();
while let Err(_) = stop_connection_consumer.try_recv() {
tokio::time::sleep(Duration::from_secs(3)).await; tokio::time::sleep(Duration::from_secs(3)).await;
} }
stream_task.abort(); stream_task.abort();
message_organizer_task.abort(); message_organizer_task.abort();
while let Ok(_) = connection_cleaning_status_consumer.try_recv() {} let _ = streaming_to_base.send(true);
drop(connection_cleaning_status_consumer);
drop(connection_cleaning_status_producer);
println!("Cleaning Done: Streamer Disconnected"); println!("Cleaning Done: Streamer Disconnected");
} }