Merge pull request #37 from Tahinli/dev

Dev
This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-05-07 09:37:28 +00:00 committed by GitHub
commit 651da6068b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1484 additions and 153 deletions

View file

@ -26,30 +26,9 @@ const BUFFER_LENGTH: usize = 1000000;
const MAX_TOLERATED_MESSAGE_COUNT: usize = 10;
pub async fn start(relay_configs: Config) {
let timer = Instant::now();
let fullchain: io::Result<Vec<CertificateDer<'static>>> = certs(&mut BufReader::new(
File::open("certificates/fullchain.pem").unwrap(),
))
.collect();
let fullchain = fullchain.unwrap();
let privkey: io::Result<PrivateKeyDer<'static>> = pkcs8_private_keys(&mut BufReader::new(
File::open("certificates/privkey.pem").unwrap(),
))
.next()
.unwrap()
.map(Into::into);
let privkey = privkey.unwrap();
let server_tls_config = tokio_rustls::rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(fullchain, privkey)
.unwrap();
let acceptor = TlsAcceptor::from(Arc::new(server_tls_config));
let acceptor = tls_configurator().await;
loop {
//need to move them for multi streamer
let listener_socket = TcpListener::bind(relay_configs.listener_address.clone())
.await
.unwrap();
let (record_producer, record_consumer) = channel(BUFFER_LENGTH);
let streamer_socket = TcpListener::bind(relay_configs.streamer_address.clone())
.await
@ -65,62 +44,61 @@ pub async fn start(relay_configs: Config) {
port: 0000,
};
let mut is_streaming = false;
loop {
match streamer_socket.accept().await {
Ok((streamer_tcp, streamer_info)) => {
new_streamer.ip = streamer_info.ip();
new_streamer.port = streamer_info.port();
println!(
"New Streamer: {:#?} | {:#?}",
streamer_info,
timer.elapsed()
);
if relay_configs.tls {
match acceptor.accept(streamer_tcp).await {
Ok(streamer_tcp_tls) => {
match tokio_tungstenite::accept_async(streamer_tcp_tls).await {
Ok(ws_stream) => {
tokio::spawn(streamer_stream(
new_streamer.clone(),
record_producer,
ws_stream,
timer,
streamer_alive_producer,
));
is_streaming = true;
break;
}
Err(err_val) => {
eprintln!("Error: TCP to WS Transform | {}", err_val)
}
match streamer_socket.accept().await {
Ok((streamer_tcp, streamer_info)) => {
new_streamer.ip = streamer_info.ip();
new_streamer.port = streamer_info.port();
println!(
"New Streamer: {:#?} | {:#?}",
streamer_info,
timer.elapsed()
);
if relay_configs.tls {
match acceptor.accept(streamer_tcp).await {
Ok(streamer_tcp_tls) => {
match tokio_tungstenite::accept_async(streamer_tcp_tls).await {
Ok(ws_stream) => {
tokio::spawn(streamer_stream(
new_streamer.clone(),
record_producer,
ws_stream,
timer,
streamer_alive_producer,
));
is_streaming = true;
}
Err(err_val) => {
eprintln!("Error: TCP to WS Transform | {}", err_val)
}
}
Err(err_val) => {
eprintln!("Error: TCP TLS Streamer| {}", err_val);
break;
}
}
} else {
match tokio_tungstenite::accept_async(streamer_tcp).await {
Ok(ws_stream) => {
tokio::spawn(streamer_stream(
new_streamer.clone(),
record_producer,
ws_stream,
timer,
streamer_alive_producer,
));
is_streaming = true;
break;
}
Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val),
Err(err_val) => {
eprintln!("Error: TCP TLS Streamer| {}", err_val);
}
}
} else {
match tokio_tungstenite::accept_async(streamer_tcp).await {
Ok(ws_stream) => {
tokio::spawn(streamer_stream(
new_streamer.clone(),
record_producer,
ws_stream,
timer,
streamer_alive_producer,
));
is_streaming = true;
}
Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val),
}
}
Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val),
}
Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val),
}
if is_streaming {
let listener_socket = TcpListener::bind(relay_configs.listener_address.clone())
.await
.unwrap();
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
let (buffered_producer, _) = channel(BUFFER_LENGTH);
message_organizer_task = tokio::spawn(message_organizer(
@ -163,6 +141,27 @@ pub async fn start(relay_configs: Config) {
}
}
}
async fn tls_configurator() -> TlsAcceptor {
let fullchain: io::Result<Vec<CertificateDer<'static>>> = certs(&mut BufReader::new(
File::open("certificates/fullchain.pem").unwrap(),
))
.collect();
let fullchain = fullchain.unwrap();
let privkey: io::Result<PrivateKeyDer<'static>> = pkcs8_private_keys(&mut BufReader::new(
File::open("certificates/privkey.pem").unwrap(),
))
.next()
.unwrap()
.map(Into::into);
let privkey = privkey.unwrap();
let server_tls_config = tokio_rustls::rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(fullchain, privkey)
.unwrap();
let acceptor = TlsAcceptor::from(Arc::new(server_tls_config));
acceptor
}
async fn listener_handler(
listener_socket: TcpListener,
acceptor: TlsAcceptor,
@ -196,12 +195,14 @@ async fn listener_handler(
}
Err(err_val) => {
eprintln!("Error: TCP WSS Listener | {}", err_val);
drop(listener_socket);
return;
}
}
}
Err(err_val) => {
eprintln!("Error: TCP TLS Listener | {}", err_val);
drop(listener_socket);
return;
}
}
@ -219,6 +220,7 @@ async fn listener_handler(
}
Err(err_val) => {
eprintln!("Error: TCP WS Listener | {}", err_val);
drop(listener_socket);
return;
}
}
@ -258,7 +260,9 @@ async fn status_checker(
let cleaning_timer = Instant::now();
message_organizer_task.as_ref().unwrap().abort();
buffer_layer_task.as_ref().unwrap().abort();
listener_socket_killer_producer.send(true).unwrap();
if let Err(_) = listener_socket_killer_producer.send(true) {
eprintln!("Error: Cleaning | Socket Kill Failed, Receiver Dropped");
}
let mut listener_task_counter = 0;
while listener_stream_tasks_receiver.len() > 0 {
match listener_stream_tasks_receiver.recv().await {

View file

@ -73,7 +73,7 @@ pub async fn sound_stream(
unhealty_packet
}
};
log::info!("{}", uncompressed_data.len());
let data = String::from_utf8(uncompressed_data).unwrap();
let mut datum_parsed: Vec<char> = vec![];
let mut data_parsed: Vec<String> = vec![];

View file

@ -9,9 +9,12 @@ edition = "2021"
brotli = "5.0.0"
cpal = "0.15.3"
futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] }
iced = { git = "https://github.com/iced-rs/iced", features = ["tokio"] }
ringbuf = "0.3.3"
rubato = "0.15.0"
rustls-pemfile = "2.1.2"
rustls-platform-verifier = "0.2.0"
symphonia = { version = "0.5.4", features = ["all"] }
tokio = { version = "1.36.0", features = ["full"] }
tokio-rustls = "0.25.0"
tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-webpki-roots"] }

670
streamer/src/gui.rs Normal file
View file

@ -0,0 +1,670 @@
use std::fs::File;
use iced::{
alignment,
widget::{column, container, row, text::LineHeight, Container, Rule},
window, Color, Command, Subscription,
};
use tokio::sync::broadcast::{channel, Receiver, Sender};
use crate::{
gui_components::{button_with_centered_text, text_centered},
gui_utils,
utils::get_config,
Config, BUFFER_LENGTH,
};
#[derive(Debug, Clone)]
pub enum Player {
Play,
Pause,
Stop,
}
#[derive(Debug, Clone)]
struct Features {
stream: bool,
record: bool,
play_audio: bool,
}
#[derive(Debug)]
struct AudioFile {
file: Option<File>,
decoded_to_playing_sender: Option<Sender<f32>>,
}
#[derive(Debug, Clone)]
pub enum Event {
None,
Connect,
Disconnect,
Record,
StopRecord,
PlayAudio,
StopAudio,
PauseAudio,
ContinueAudio,
LoadConfig(Config),
IcedEvent(iced::Event),
CloseWindow(window::Id),
}
#[derive(Debug, Clone)]
pub enum State {
None,
Connected,
Disconnected,
Recording,
StopRecording,
PlayingAudio,
StopAudio,
PausedAudio,
ContinuedAudio,
}
#[derive(Debug, Clone)]
pub enum Message {
Event(Event),
State(State),
}
#[derive(Debug)]
struct DataChannel {
microphone_stream_sender: Sender<f32>,
audio_stream_sender: Sender<f32>,
}
#[derive(Debug)]
struct CommunicationChannel {
base_to_streaming_sender: Sender<bool>,
streaming_to_base_sender: Sender<bool>,
base_to_recording_sender: Sender<bool>,
recording_to_base_sender: Sender<bool>,
base_to_playing_sender: Sender<Player>,
playing_to_base_sender: Sender<Player>,
}
#[derive(Debug, PartialEq)]
enum Condition {
Active,
Loading,
Passive,
}
#[derive(Debug)]
struct GUIStatus {
are_we_connect: Condition,
are_we_record: Condition,
are_we_play_audio: Condition,
are_we_paused_audio: Condition,
}
#[derive(Debug)]
pub struct Streamer {
config: Option<Config>,
data_channel: DataChannel,
communication_channel: CommunicationChannel,
audio_file: AudioFile,
gui_status: GUIStatus,
}
impl Default for Streamer {
fn default() -> Self {
Self::new()
}
}
impl Streamer {
fn new() -> Self {
Self {
config: None,
data_channel: DataChannel {
microphone_stream_sender: channel(BUFFER_LENGTH).0,
audio_stream_sender: channel(BUFFER_LENGTH).0,
},
communication_channel: CommunicationChannel {
base_to_streaming_sender: channel(1).0,
streaming_to_base_sender: channel(1).0,
base_to_recording_sender: channel(1).0,
recording_to_base_sender: channel(1).0,
base_to_playing_sender: channel(1).0,
playing_to_base_sender: channel(1).0,
},
audio_file: AudioFile {
file: None,
decoded_to_playing_sender: None,
},
gui_status: GUIStatus {
are_we_connect: Condition::Passive,
are_we_record: Condition::Passive,
are_we_play_audio: Condition::Passive,
are_we_paused_audio: Condition::Passive,
},
}
}
pub fn update(&mut self, message: Message) -> Command<Message> {
match message {
Message::Event(event) => match event {
Event::None => Command::none(),
Event::Connect => {
println!("Connect");
self.gui_status.are_we_connect = Condition::Loading;
let microphone_stream_receiver =
self.data_channel.microphone_stream_sender.subscribe();
let audio_stream_receiver = self.data_channel.audio_stream_sender.subscribe();
let streamer_config = self.config.clone().unwrap();
let streaming_to_base_sender =
self.communication_channel.streaming_to_base_sender.clone();
let base_to_streaming_receiver = self
.communication_channel
.base_to_streaming_sender
.subscribe();
Command::perform(
async move {
gui_utils::connect(
microphone_stream_receiver,
audio_stream_receiver,
streamer_config,
streaming_to_base_sender,
base_to_streaming_receiver,
)
.await
},
Message::State,
)
}
Event::Disconnect => {
println!("Disconnect");
self.gui_status.are_we_connect = Condition::Loading;
let streaming_to_base_receiver = self
.communication_channel
.streaming_to_base_sender
.subscribe();
let base_to_streaming_sender =
self.communication_channel.base_to_streaming_sender.clone();
Command::perform(
async move {
gui_utils::disconnect(
streaming_to_base_receiver,
base_to_streaming_sender,
)
.await
},
Message::State,
)
}
Event::Record => {
println!("Record");
self.gui_status.are_we_record = Condition::Loading;
let microphone_stream_sender =
self.data_channel.microphone_stream_sender.clone();
let recording_to_base_sender =
self.communication_channel.recording_to_base_sender.clone();
let base_to_recording_receiver = self
.communication_channel
.base_to_recording_sender
.subscribe();
Command::perform(
async move {
gui_utils::start_recording(
microphone_stream_sender,
recording_to_base_sender,
base_to_recording_receiver,
)
.await
},
Message::State,
)
}
Event::StopRecord => {
println!("Stop Record");
self.gui_status.are_we_record = Condition::Loading;
let recording_to_base_receiver = self
.communication_channel
.recording_to_base_sender
.subscribe();
let base_to_recording_sender =
self.communication_channel.base_to_recording_sender.clone();
Command::perform(
async move {
gui_utils::stop_recording(
recording_to_base_receiver,
base_to_recording_sender,
)
.await
},
Message::State,
)
}
Event::PlayAudio => {
println!("Play Audio");
self.gui_status.are_we_play_audio = Condition::Loading;
let file = File::open("music.mp3").unwrap();
self.audio_file.file = Some(file);
self.audio_file.decoded_to_playing_sender = Some(
channel(
self.audio_file
.file
.as_ref()
.unwrap()
.metadata()
.unwrap()
.len() as usize
* 4,
)
.0,
);
let audio_stream_sender = self.data_channel.audio_stream_sender.clone();
let playing_to_base_sender =
self.communication_channel.playing_to_base_sender.clone();
let base_to_playing_receiver = self
.communication_channel
.base_to_playing_sender
.subscribe();
let playing_to_base_receiver_is_audio_finished = self
.communication_channel
.playing_to_base_sender
.subscribe();
let playing_to_base_receiver_is_audio_stopped = self
.communication_channel
.playing_to_base_sender
.subscribe();
let base_to_playing_sender =
self.communication_channel.base_to_playing_sender.clone();
let file = self.audio_file.file.as_ref().unwrap().try_clone().unwrap();
let decoded_to_playing_sender_for_playing =
self.audio_file.decoded_to_playing_sender.clone().unwrap();
let decoded_to_playing_sender_for_is_finished =
self.audio_file.decoded_to_playing_sender.clone().unwrap();
let playing_command = Command::perform(
async move {
gui_utils::start_playing(
audio_stream_sender,
decoded_to_playing_sender_for_playing,
file,
playing_to_base_sender,
base_to_playing_receiver,
)
.await
},
Message::State,
);
let is_finished_command = Command::perform(
async move {
gui_utils::is_playing_finished(
playing_to_base_receiver_is_audio_finished,
playing_to_base_receiver_is_audio_stopped,
base_to_playing_sender,
decoded_to_playing_sender_for_is_finished,
)
.await
},
Message::State,
);
let commands = vec![playing_command, is_finished_command];
Command::batch(commands)
}
Event::StopAudio => {
println!("Stop Audio");
self.gui_status.are_we_play_audio = Condition::Loading;
let playing_to_base_receiver = self
.communication_channel
.playing_to_base_sender
.subscribe();
let base_to_playing_sender =
self.communication_channel.base_to_playing_sender.clone();
Command::perform(
async move {
gui_utils::stop_playing(
playing_to_base_receiver,
base_to_playing_sender,
)
.await
},
Message::State,
)
}
Event::PauseAudio => {
println!("Pause Audio");
self.gui_status.are_we_paused_audio = Condition::Loading;
let playing_to_base_receiver = self
.communication_channel
.playing_to_base_sender
.subscribe();
let base_to_playing_sender =
self.communication_channel.base_to_playing_sender.clone();
Command::perform(
async move {
gui_utils::pause_playing(
playing_to_base_receiver,
base_to_playing_sender,
)
.await
},
Message::State,
)
}
Event::ContinueAudio => {
println!("Continue Audio");
self.gui_status.are_we_paused_audio = Condition::Loading;
let playing_to_base_receiver = self
.communication_channel
.playing_to_base_sender
.subscribe();
let base_to_playing_sender =
self.communication_channel.base_to_playing_sender.clone();
Command::perform(
async move {
gui_utils::continue_playing(
playing_to_base_receiver,
base_to_playing_sender,
)
.await
},
Message::State,
)
}
Event::LoadConfig(config) => {
self.config = Some(config);
Command::none()
}
Event::IcedEvent(iced_event) => match iced_event {
iced::Event::Keyboard(_) => Command::none(),
iced::Event::Mouse(_) => Command::none(),
iced::Event::Window(id, window_event) => {
if let window::Event::CloseRequested = window_event {
self.exit(id)
} else {
Command::none()
}
}
iced::Event::Touch(_) => Command::none(),
iced::Event::PlatformSpecific(_) => Command::none(),
},
Event::CloseWindow(id) => window::close(id),
},
Message::State(state) => match state {
State::None => Command::none(),
State::Connected => {
self.gui_status.are_we_connect = Condition::Active;
Command::none()
}
State::Disconnected => {
self.gui_status.are_we_connect = Condition::Passive;
Command::none()
}
State::Recording => {
self.gui_status.are_we_record = Condition::Active;
Command::none()
}
State::StopRecording => {
self.gui_status.are_we_record = Condition::Passive;
Command::none()
}
State::PlayingAudio => {
self.gui_status.are_we_play_audio = Condition::Active;
self.gui_status.are_we_paused_audio = Condition::Passive;
Command::none()
}
State::StopAudio => {
self.gui_status.are_we_play_audio = Condition::Passive;
Command::none()
}
State::PausedAudio => {
self.gui_status.are_we_paused_audio = Condition::Active;
Command::none()
}
State::ContinuedAudio => {
self.gui_status.are_we_paused_audio = Condition::Passive;
Command::none()
}
},
}
}
pub fn view(&self) -> Container<Message> {
//let color_red = Color::from_rgb8(255, 0, 0);
let color_green = Color::from_rgb8(0, 255, 0);
let color_blue = Color::from_rgb8(0, 0, 255);
let color_yellow = Color::from_rgb8(255, 255, 0);
//let color_white = Color::from_rgb8(255, 255, 255);
let color_grey = Color::from_rgb8(128, 128, 128);
//let color_black = Color::from_rgb8(0, 0, 0);
let color_pink = Color::from_rgb8(255, 150, 150);
let header = text_centered("Radioxide")
.size(35)
.line_height(LineHeight::Relative(1.0));
let connection_text = text_centered("Connection");
let recording_text = text_centered("Microphone");
let play_audio_text = text_centered("Play Audio");
let pause_audio_text = text_centered("Pause Audio");
let connection_status_text;
let recording_status_text;
let play_audio_status_text;
let paused_audio_status_text;
let connect_button = match self.gui_status.are_we_connect {
Condition::Active => {
connection_status_text = text_centered("Active").color(color_green);
button_with_centered_text("Disconnect").on_press(Message::Event(Event::Disconnect))
}
Condition::Loading => {
connection_status_text = text_centered("Loading").color(color_yellow);
button_with_centered_text("Processing")
}
Condition::Passive => {
connection_status_text = text_centered("Passive").color(color_pink);
button_with_centered_text("Connect").on_press(Message::Event(Event::Connect))
}
};
let record_button = match self.gui_status.are_we_record {
Condition::Active => {
recording_status_text = text_centered("Active").color(color_green);
button_with_centered_text("Stop Mic").on_press(Message::Event(Event::StopRecord))
}
Condition::Loading => {
recording_status_text = text_centered("Loading").color(color_yellow);
button_with_centered_text("Processing")
}
Condition::Passive => {
recording_status_text = text_centered("Passive").color(color_pink);
button_with_centered_text("Start Mic").on_press(Message::Event(Event::Record))
}
};
let play_audio_button = match self.gui_status.are_we_play_audio {
Condition::Active => {
play_audio_status_text = text_centered("Active").color(color_green);
button_with_centered_text("Stop Audio").on_press(Message::Event(Event::StopAudio))
}
Condition::Loading => {
play_audio_status_text = text_centered("Loading").color(color_yellow);
button_with_centered_text("Processing")
}
Condition::Passive => {
play_audio_status_text = text_centered("Passive").color(color_pink);
button_with_centered_text("Play Audio").on_press(Message::Event(Event::PlayAudio))
}
};
let pause_audio_button = if let Condition::Active = self.gui_status.are_we_play_audio {
match self.gui_status.are_we_paused_audio {
Condition::Active => {
paused_audio_status_text = text_centered("Paused").color(color_blue);
button_with_centered_text("Continue Audio")
.on_press(Message::Event(Event::ContinueAudio))
}
Condition::Loading => {
paused_audio_status_text = text_centered("Loading").color(color_yellow);
button_with_centered_text("Processing")
}
Condition::Passive => {
paused_audio_status_text = text_centered("Playing").color(color_yellow);
button_with_centered_text("Pause Audio")
.on_press(Message::Event(Event::PauseAudio))
}
}
} else {
paused_audio_status_text = text_centered("Waiting").color(color_grey);
button_with_centered_text("No Purpose")
};
let header_content = row![header].width(350).height(50);
let text_content = row![
connection_text,
Rule::vertical(1),
recording_text,
Rule::vertical(1),
play_audio_text,
Rule::vertical(1),
pause_audio_text,
]
.spacing(5)
.width(350)
.height(35);
let status_content = row![
connection_status_text,
Rule::vertical(1),
recording_status_text,
Rule::vertical(1),
play_audio_status_text,
Rule::vertical(1),
paused_audio_status_text,
]
.spacing(5)
.width(350)
.height(35);
let button_content = row![
connect_button,
record_button,
play_audio_button,
pause_audio_button
]
.spacing(5)
.width(350)
.height(35);
let content = column![
header_content,
Rule::horizontal(1),
text_content,
button_content,
status_content,
Rule::horizontal(1),
]
.spacing(20)
.width(350)
.height(300);
container(content)
.height(300)
.center_x()
.align_y(alignment::Vertical::Top)
}
pub fn subscription(&self) -> Subscription<Message> {
iced::event::listen()
.map(Event::IcedEvent)
.map(Message::Event)
}
pub fn load_config() -> Command<Message> {
Command::perform(
async move {
let config = get_config().await;
Event::LoadConfig(config)
},
Message::Event,
)
}
fn call_closer(
streaming_to_base_receiver: Receiver<bool>,
base_to_streaming_sender: Sender<bool>,
recording_to_base_receiver: Receiver<bool>,
base_to_recording_sender: Sender<bool>,
playing_to_base_receiver: Receiver<Player>,
base_to_playing_sender: Sender<Player>,
features_in_need: Features,
window_id: window::Id,
) -> Command<Message> {
Command::perform(
async move {
if features_in_need.stream {
gui_utils::disconnect(streaming_to_base_receiver, base_to_streaming_sender)
.await;
}
if features_in_need.record {
gui_utils::stop_recording(recording_to_base_receiver, base_to_recording_sender)
.await;
}
if features_in_need.play_audio {
gui_utils::stop_playing(playing_to_base_receiver, base_to_playing_sender).await;
}
Event::CloseWindow(window_id)
},
Message::Event,
)
}
fn exit(&self, window_id: window::Id) -> Command<Message> {
let mut features_in_need = Features {
stream: false,
record: false,
play_audio: false,
};
if self.gui_status.are_we_connect == Condition::Active {
features_in_need.stream = true;
}
if self.gui_status.are_we_record == Condition::Active {
features_in_need.record = true;
}
if self.gui_status.are_we_play_audio == Condition::Active {
features_in_need.play_audio = true;
}
let streaming_to_base_receiver = self
.communication_channel
.streaming_to_base_sender
.subscribe();
let base_to_streaming_sender = self.communication_channel.base_to_streaming_sender.clone();
let recording_to_base_receiver = self
.communication_channel
.recording_to_base_sender
.subscribe();
let base_to_recording_sender = self.communication_channel.base_to_recording_sender.clone();
let playing_to_base_receiver = self
.communication_channel
.playing_to_base_sender
.subscribe();
let base_to_playing_sender = self.communication_channel.base_to_playing_sender.clone();
Self::call_closer(
streaming_to_base_receiver,
base_to_streaming_sender,
recording_to_base_receiver,
base_to_recording_sender,
playing_to_base_receiver,
base_to_playing_sender,
features_in_need,
window_id,
)
}
}

View file

@ -0,0 +1,24 @@
use iced::{
alignment,
widget::{button, text, Button, Text},
Length,
};
use crate::gui::Message;
pub fn button_with_centered_text(txt: &'static str) -> Button<'static, Message> {
button(
text(txt)
.width(Length::Fill)
.horizontal_alignment(alignment::Horizontal::Center),
)
.height(Length::Fill)
.width(Length::Fill)
}
pub fn text_centered(txt: &'static str) -> Text {
text(txt)
.width(Length::Fill)
.height(Length::Fill)
.horizontal_alignment(alignment::Horizontal::Center)
}

293
streamer/src/gui_utils.rs Normal file
View file

@ -0,0 +1,293 @@
use std::{fs::File, time::Duration};
use tokio::sync::broadcast::{Receiver, Sender};
use crate::{
gui::{Player, State},
playing, recording, streaming, Config,
};
pub async fn connect(
microphone_stream_receiver: Receiver<f32>,
audio_stream_receiver: Receiver<f32>,
streamer_config: Config,
streaming_to_base_sender: Sender<bool>,
base_to_streaming_receiver: Receiver<bool>,
) -> State {
let mut streaming_to_base_receiver = streaming_to_base_sender.subscribe();
tokio::spawn(streaming::connect(
microphone_stream_receiver,
audio_stream_receiver,
streamer_config,
base_to_streaming_receiver,
streaming_to_base_sender.clone(),
));
let answer = streaming_to_base_receiver.recv().await;
drop(streaming_to_base_receiver);
match answer {
Ok(_) => State::Connected,
Err(err_val) => {
eprintln!(
"Error: Communication | Streaming to Base | Recv | Connect | {}",
err_val
);
State::Disconnected
}
}
}
pub async fn disconnect(
mut streaming_to_base_receiver: Receiver<bool>,
base_to_streaming_sender: Sender<bool>,
) -> State {
match base_to_streaming_sender.send(false) {
Ok(_) => {}
Err(err_val) => {
eprint!(
"Error: Communication | Base to Streaming | Send | Disconnect | {}",
err_val
);
}
}
drop(base_to_streaming_sender);
let answer = streaming_to_base_receiver.recv().await;
drop(streaming_to_base_receiver);
match answer {
Ok(_) => State::Disconnected,
Err(err_val) => {
eprintln!(
"Error: Communication | Streaming to Base | Recv | Disconnect | {}",
err_val
);
State::Connected
}
}
}
pub async fn start_recording(
microphone_stream_sender: Sender<f32>,
recording_to_base_sender: Sender<bool>,
base_to_recording_receiver: Receiver<bool>,
) -> State {
let mut recording_to_base_receiver = recording_to_base_sender.subscribe();
tokio::spawn(recording::record(
microphone_stream_sender.clone(),
recording_to_base_sender.clone(),
base_to_recording_receiver,
));
let answer = recording_to_base_receiver.recv().await;
drop(recording_to_base_receiver);
match answer {
Ok(_) => State::Recording,
Err(err_val) => {
eprintln!(
"Error: Communication | Recording to Base | Recv | Start Rec | {}",
err_val
);
State::StopRecording
}
}
}
pub async fn stop_recording(
mut recording_to_base_receiver: Receiver<bool>,
base_to_recording_sender: Sender<bool>,
) -> State {
match base_to_recording_sender.send(false) {
Ok(_) => {}
Err(err_val) => {
eprint!(
"Error: Communication | Base to Recording | Send | Stop Rec | {}",
err_val
);
}
}
drop(base_to_recording_sender);
let answer = recording_to_base_receiver.recv().await;
drop(recording_to_base_receiver);
match answer {
Ok(_) => State::StopRecording,
Err(err_val) => {
eprintln!(
"Error: Communication | Recording to Base | Stop Rec | {}",
err_val
);
State::Recording
}
}
}
pub async fn start_playing(
audio_stream_sender: Sender<f32>,
decoded_to_playing_sender: Sender<f32>,
file: File,
playing_to_base_sender: Sender<Player>,
base_to_playing_receiver: Receiver<Player>,
) -> State {
let mut playing_to_base_receiver = playing_to_base_sender.subscribe();
tokio::spawn(playing::play(
audio_stream_sender,
file,
decoded_to_playing_sender,
playing_to_base_sender,
base_to_playing_receiver,
));
let answer = playing_to_base_receiver.recv().await;
drop(playing_to_base_receiver);
match answer {
Ok(state) => match state {
Player::Play => State::PlayingAudio,
Player::Pause => State::PausedAudio,
Player::Stop => State::StopAudio,
},
Err(err_val) => {
eprint!(
"Error: Communication | Playing to Base | Recv | Start Play | {}",
err_val
);
State::StopAudio
}
}
}
pub async fn stop_playing(
mut playing_to_base_receiver: Receiver<Player>,
base_to_playing_sender: Sender<Player>,
) -> State {
match base_to_playing_sender.send(Player::Stop) {
Ok(_) => {}
Err(err_val) => {
eprintln!(
"Error: Communication | Base to Playing | Send | Stop Play | {}",
err_val
);
}
}
drop(base_to_playing_sender);
let answer = playing_to_base_receiver.recv().await;
drop(playing_to_base_receiver);
match answer {
Ok(state) => match state {
Player::Play => State::PlayingAudio,
Player::Pause => State::PausedAudio,
Player::Stop => State::StopAudio,
},
Err(err_val) => {
eprintln!(
"Error: Communication | Playing to Base | Recv | Stop Play | {}",
err_val
);
State::PlayingAudio
}
}
}
pub async fn is_playing_finished(
mut playing_to_base_receiver_is_audio_finished: Receiver<Player>,
mut playing_to_base_receiver_is_audio_stopped: Receiver<Player>,
base_to_playing_sender: Sender<Player>,
decoded_to_playing_sender: Sender<f32>,
) -> State {
tokio::select! {
is_audio_finished = async move {
match playing_to_base_receiver_is_audio_finished.recv().await {
Ok(state) => match state {
Player::Play => {
while !decoded_to_playing_sender.is_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
stop_playing(playing_to_base_receiver_is_audio_finished, base_to_playing_sender).await
}
Player::Pause => State::PlayingAudio,
Player::Stop => State::StopAudio,
},
Err(err_val) => {
eprintln!("Error: Communication | Playing to Base | Recv | Is Finish | {}", err_val);
State::PlayingAudio
}
}
} => is_audio_finished,
is_audio_stopped = async move {
loop {
match playing_to_base_receiver_is_audio_stopped.recv().await {
Ok(state) => if let Player::Stop = state {
return State::StopAudio;
},
Err(err_val) => {
eprintln!(
"Error: Communication | Playing to Base | Recv | Is Stop | {}",
err_val
);
return State::PlayingAudio;
}
}
}
}
=>is_audio_stopped,
}
}
pub async fn pause_playing(
mut playing_to_base_receiver: Receiver<Player>,
base_to_playing_sender: Sender<Player>,
) -> State {
match base_to_playing_sender.send(Player::Pause) {
Ok(_) => {}
Err(err_val) => {
eprintln!(
"Error: Communication | Base to Playing | Pause Play | Send | {}",
err_val
);
}
}
drop(base_to_playing_sender);
let answer = playing_to_base_receiver.recv().await;
drop(playing_to_base_receiver);
match answer {
Ok(state) => match state {
Player::Play => State::PlayingAudio,
Player::Pause => State::PausedAudio,
Player::Stop => State::StopAudio,
},
Err(err_val) => {
eprintln!(
"Error: Communication | Playing to Base | Recv | Pause Play | {}",
err_val
);
State::PlayingAudio
}
}
}
pub async fn continue_playing(
mut playing_to_base_receiver: Receiver<Player>,
base_to_playing_sender: Sender<Player>,
) -> State {
match base_to_playing_sender.send(Player::Play) {
Ok(_) => {}
Err(err_val) => {
eprintln!(
"Error: Communication | Base to Playing | Send | Continue Play | {}",
err_val
);
}
}
drop(base_to_playing_sender);
let answer = playing_to_base_receiver.recv().await;
drop(playing_to_base_receiver);
match answer {
Ok(state) => match state {
Player::Play => State::PlayingAudio,
Player::Pause => State::PausedAudio,
Player::Stop => State::StopAudio,
},
Err(err_val) => {
eprintln!(
"Error: Communication | Playing to Base | Continue Play | {}",
err_val
);
State::PausedAudio
}
}
}

View file

@ -1,9 +1,14 @@
pub mod gui;
pub mod gui_components;
pub mod gui_utils;
pub mod playing;
pub mod recording;
pub mod streaming;
pub mod utils;
pub const BUFFER_LENGTH: usize = 1000000;
#[derive(Debug, Clone)]
pub struct Config {
pub address: String,
pub quality: u8,

View file

@ -1,16 +1,17 @@
use std::time::Duration;
use streamer::{recording::recording, streaming::start, utils::get_config, BUFFER_LENGTH};
use tokio::sync::broadcast::channel;
use streamer::gui::Streamer;
#[tokio::main]
async fn main() {
println!("Hello, world!");
let streamer_config = get_config().await;
let (sound_stream_producer, sound_stream_consumer) = channel(BUFFER_LENGTH);
tokio::spawn(recording(sound_stream_producer));
tokio::spawn(start(sound_stream_consumer, streamer_config));
loop {
tokio::time::sleep(Duration::from_secs(1000000000)).await;
}
tokio::task::block_in_place(|| {
iced::program("Streamer GUI", Streamer::update, Streamer::view)
.centered()
.window_size((350.0, 450.0))
.load(Streamer::load_config)
.antialiasing(true)
.subscription(Streamer::subscription)
.exit_on_close_request(false)
.run()
.unwrap()
});
}

216
streamer/src/playing.rs Normal file
View file

@ -0,0 +1,216 @@
use std::fs::File;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use rubato::{
Resampler, SincFixedIn, SincInterpolationParameters, SincInterpolationType, WindowFunction,
};
use symphonia::core::{
audio::{AudioBufferRef, Signal},
codecs::{DecoderOptions, CODEC_TYPE_NULL},
formats::FormatOptions,
io::MediaSourceStream,
meta::MetadataOptions,
probe::Hint,
};
use tokio::{
sync::broadcast::{Receiver, Sender},
task,
};
use crate::gui::Player;
pub async fn play(
audio_stream_sender: Sender<f32>,
file: File,
decoded_to_playing_sender: Sender<f32>,
playing_to_base_sender: Sender<Player>,
mut base_to_playing_receiver: Receiver<Player>,
) {
let host = cpal::default_host();
let output_device = host.default_output_device().unwrap();
let output_device_config: cpal::StreamConfig =
output_device.default_output_config().unwrap().into();
let output_device_sample_rate = output_device_config.sample_rate.0;
let (mut audio_resampled_left, mut audio_resampled_right) =
decode_audio(output_device_sample_rate, file);
let mut decoded_to_playing_receiver = decoded_to_playing_sender.subscribe();
for _ in 0..audio_resampled_left.clone().len() {
decoded_to_playing_sender
.send(audio_resampled_left.pop().unwrap() as f32)
.unwrap();
decoded_to_playing_sender
.send(audio_resampled_right.pop().unwrap() as f32)
.unwrap();
}
let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
for sample in data {
if decoded_to_playing_receiver.len() > 0 {
let single = match decoded_to_playing_receiver.blocking_recv() {
Ok(single) => single,
Err(_) => 0.0,
};
if audio_stream_sender.receiver_count() > 0 {
let _ = audio_stream_sender.send(single);
}
*sample = single;
}
}
};
let output_stream = output_device
.build_output_stream(&output_device_config, output_data_fn, err_fn, None)
.unwrap();
output_stream.play().unwrap();
tokio::spawn(let_the_base_know(
playing_to_base_sender.clone(),
Player::Play,
));
task::block_in_place(|| loop {
match base_to_playing_receiver.blocking_recv() {
Ok(state) => match state {
Player::Play => {
output_stream.play().unwrap();
tokio::spawn(let_the_base_know(
playing_to_base_sender.clone(),
Player::Play,
));
}
Player::Pause => match output_stream.pause() {
Ok(_) => {
tokio::spawn(let_the_base_know(
playing_to_base_sender.clone(),
Player::Pause,
));
}
//todo when pause error, do software level stop
Err(_) => todo!(),
},
Player::Stop => break,
},
Err(_) => break,
}
});
drop(output_stream);
tokio::spawn(let_the_base_know(playing_to_base_sender, Player::Stop));
}
fn err_fn(err: cpal::StreamError) {
eprintln!("Something Happened: {}", err);
}
async fn let_the_base_know(playing_to_base_sender: Sender<Player>, action: Player) {
let _ = playing_to_base_sender.send(action);
}
fn decode_audio(output_device_sample_rate: u32, file: File) -> (Vec<f64>, Vec<f64>) {
let mut audio_decoded_left = vec![];
let mut audio_decoded_right = vec![];
let media_source_stream = MediaSourceStream::new(Box::new(file), Default::default());
let hint = Hint::new();
let metadata_options = MetadataOptions::default();
let format_options = FormatOptions::default();
let probed = symphonia::default::get_probe()
.format(
&hint,
media_source_stream,
&format_options,
&metadata_options,
)
.unwrap();
let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
.unwrap();
let audio_sample_rate = track.codec_params.sample_rate.unwrap();
DecoderOptions::default();
let decoder_options = DecoderOptions::default();
let mut decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &decoder_options)
.unwrap();
let track_id = track.id;
loop {
let packet = match format.next_packet() {
Ok(packet) => packet,
Err(_) => {
break;
}
};
while !format.metadata().is_latest() {
format.metadata().pop();
}
if packet.track_id() != track_id {
continue;
}
match decoder.decode(&packet) {
Ok(decoded) => match decoded {
AudioBufferRef::F32(buf) => {
for (left, right) in buf.chan(0).iter().zip(buf.chan(1).iter()) {
audio_decoded_left.push(*left as f64);
audio_decoded_right.push(*right as f64);
}
}
_ => {}
},
Err(_) => {
//eprintln!("Error: Sample Decode | {}", err_val);
println!("End ?");
}
}
}
let params = SincInterpolationParameters {
sinc_len: 256,
f_cutoff: 0.95,
interpolation: SincInterpolationType::Linear,
oversampling_factor: 256,
window: WindowFunction::BlackmanHarris2,
};
let mut resampler = SincFixedIn::<f64>::new(
output_device_sample_rate as f64 / audio_sample_rate as f64,
2.0,
params,
audio_decoded_left.len(),
2,
)
.unwrap();
let audio_decoded_channes_combined =
vec![audio_decoded_left.clone(), audio_decoded_right.clone()];
let audio_resampled = resampler
.process(&audio_decoded_channes_combined, None)
.unwrap();
let mut audio_resampled_left = vec![];
let mut audio_resampled_right = vec![];
for sample in &audio_resampled[0] {
audio_resampled_left.push(*sample);
}
for sample in &audio_resampled[1] {
audio_resampled_right.push(*sample);
}
audio_resampled_left.reverse();
audio_resampled_right.reverse();
(audio_resampled_left, audio_resampled_right)
}

View file

@ -1,7 +1,14 @@
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use tokio::sync::broadcast::Sender;
use tokio::{
sync::broadcast::{Receiver, Sender},
task,
};
pub async fn recording(sound_stream_producer: Sender<f32>) {
pub async fn record(
sound_stream_sender: Sender<f32>,
recording_to_base: Sender<bool>,
mut base_to_recording: Receiver<bool>,
) {
let host = cpal::default_host();
let input_device = host.default_input_device().unwrap();
@ -9,22 +16,34 @@ pub async fn recording(sound_stream_producer: Sender<f32>) {
let input_data_fn = move |data: &[f32], _: &cpal::InputCallbackInfo| {
for &sample in data {
match sound_stream_producer.send(sample) {
Ok(_) => {}
Err(_) => {}
if sound_stream_sender.receiver_count() > 0 {
match sound_stream_sender.send(sample) {
Ok(_) => {}
Err(_) => {}
}
}
}
};
let input_stream = input_device
.build_input_stream(&config, input_data_fn, err_fn, None)
.unwrap();
input_stream.play().unwrap();
println!("Recording Started");
std::thread::sleep(std::time::Duration::from_secs(1000000000));
println!("DONE I HOPE");
tokio::spawn(let_the_base_know(recording_to_base.clone()));
task::block_in_place(|| {
let _ = base_to_recording.blocking_recv();
});
input_stream.pause().unwrap();
tokio::spawn(let_the_base_know(recording_to_base.clone()));
println!("Recording Stopped");
}
fn err_fn(err: cpal::StreamError) {
eprintln!("Something Happened: {}", err);
}
async fn let_the_base_know(recording_to_base: Sender<bool>) {
let _ = recording_to_base.send(true);
}

View file

@ -3,65 +3,158 @@ use std::{io::Write, sync::Arc, time::Duration};
use brotli::CompressorWriter;
use futures_util::SinkExt;
use ringbuf::HeapRb;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::{
sync::broadcast::{channel, Receiver, Sender},
task::JoinHandle,
};
use tokio_tungstenite::tungstenite::Message;
use crate::{Config, BUFFER_LENGTH};
const MAX_TOLERATED_MESSAGE_COUNT: usize = 10;
pub async fn start(sound_stream_consumer: Receiver<f32>, streamer_config:Config) {
let connect_addr =
match streamer_config.tls {
pub async fn connect(
microphone_stream_receiver: Receiver<f32>,
audio_stream_receiver: Receiver<f32>,
streamer_config: Config,
mut base_to_streaming: Receiver<bool>,
streaming_to_base: Sender<bool>,
) {
let connect_addr = match streamer_config.tls {
true => format!("wss://{}", streamer_config.address),
false => format!("ws://{}", streamer_config.address),
};
let ws_stream;
if let Err(_) = base_to_streaming.try_recv() {
let ws_stream;
match streamer_config.tls {
true => {
let tls_client_config = rustls_platform_verifier::tls_config();
let tls_connector =
tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config));
match streamer_config.tls {
true => {
let tls_client_config = rustls_platform_verifier::tls_config();
let tls_connector = tokio_tungstenite::Connector::Rustls(Arc::new(tls_client_config));
match tokio_tungstenite::connect_async_tls_with_config(
connect_addr.clone(),
None,
false,
Some(tls_connector),
)
.await
{
Ok(wss_stream_connected) => ws_stream = wss_stream_connected.0,
Err(_) => {
return;
}
}
},
false => {
match tokio_tungstenite::connect_async(connect_addr.clone()).await {
match tokio_tungstenite::connect_async_tls_with_config(
connect_addr.clone(),
None,
false,
Some(tls_connector),
)
.await
{
Ok(wss_stream_connected) => ws_stream = wss_stream_connected.0,
Err(_) => {
return;
}
}
}
false => match tokio_tungstenite::connect_async(connect_addr.clone()).await {
Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0,
Err(_) => {
return;
},
}
},
}
},
}
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
println!("Connected to: {}", connect_addr);
let (flow_sender, flow_receiver) = channel(BUFFER_LENGTH);
let mixer_task = tokio::spawn(mixer(
microphone_stream_receiver,
audio_stream_receiver,
flow_sender,
streamer_config.latency,
));
let message_organizer_task = tokio::spawn(message_organizer(
message_producer,
flow_receiver,
streamer_config.quality,
streamer_config.latency,
));
let stream_task = tokio::spawn(stream(ws_stream, message_consumer));
let _ = streaming_to_base.send(true);
tokio::spawn(status_checker(
message_organizer_task,
stream_task,
mixer_task,
base_to_streaming,
streaming_to_base,
));
}
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
println!("Connected to: {}", connect_addr);
tokio::spawn(message_organizer(message_producer, sound_stream_consumer, streamer_config.quality, streamer_config.latency));
tokio::spawn(stream(ws_stream, message_consumer));
}
async fn mixer(
mut microphone_stream_receiver: Receiver<f32>,
mut audio_stream_receiver: Receiver<f32>,
flow_sender: Sender<f32>,
latency: u16,
) {
loop {
let mut microphone_stream = vec![];
let mut audio_stream = vec![];
let mut microphone_stream_iteration = microphone_stream_receiver.len();
while microphone_stream_iteration > 0 {
microphone_stream_iteration -= 1;
match microphone_stream_receiver.recv().await {
Ok(microphone_datum) => {
microphone_stream.push(microphone_datum);
}
Err(err_val) => {
eprintln!(
"Error: Communication | Microphone Stream | Recv | {}",
err_val
);
}
}
}
async fn message_organizer(message_producer: Sender<Message>, mut consumer: Receiver<f32>, quality: u8, latency:u16) {
let mut audio_stream_iteration = audio_stream_receiver.len();
while audio_stream_iteration > 0 {
audio_stream_iteration -= 1;
match audio_stream_receiver.recv().await {
Ok(audio_datum) => {
audio_stream.push(audio_datum);
}
Err(err_val) => {
eprintln!("Error: Communication | Audio Stream | Recv | {}", err_val);
}
}
}
let mut flow = vec![];
for element in microphone_stream {
flow.push(element * 0.5);
}
for (i, element) in audio_stream.iter().enumerate() {
if flow.len() > i && flow.len() != 0 {
flow[i] = flow[i] + element * 0.5;
} else {
flow.push(element * 0.5);
}
}
for element in flow {
match flow_sender.send(element) {
Ok(_) => {}
Err(err_val) => {
eprintln!("Error: Communication | Flow | Send | {}", err_val);
}
}
}
tokio::time::sleep(Duration::from_millis(latency.into())).await;
}
}
async fn message_organizer(
message_producer: Sender<Message>,
mut flow_receiver: Receiver<f32>,
quality: u8,
latency: u16,
) {
loop {
let mut messages: Vec<u8> = Vec::new();
let mut iteration = consumer.len();
let mut iteration = flow_receiver.len();
while iteration > 0 {
iteration -= 1;
match consumer.recv().await {
match flow_receiver.recv().await {
Ok(single_data) => {
let ring = HeapRb::<u8>::new(BUFFER_LENGTH);
let (mut producer, mut consumer) = ring.split();
let (mut producer, mut flow_receiver) = ring.split();
let mut charred: Vec<char> = single_data.to_string().chars().collect();
if charred[0] == '0' {
charred.insert(0, '+');
@ -81,8 +174,8 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
for element in single_data_packet {
producer.push(element).unwrap();
}
while !consumer.is_empty() {
messages.push(consumer.pop().unwrap());
while !flow_receiver.is_empty() {
messages.push(flow_receiver.pop().unwrap());
}
}
Err(_) => {}
@ -94,51 +187,51 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
eprintln!("Error: Compression | {}", err_val);
}
let compressed_messages = compression_writer.into_inner();
// println!("Compressed Len {}", compressed_messages.len());
// println!("UNCompressed Len {}", messages.len());
match message_producer.send(compressed_messages.into()) {
Ok(_) => {}
Err(_) => {}
}
// println!(
// "Message Counter = {} | Receiver Count = {}",
// message_producer.len(),
// message_producer.receiver_count()
// );
}
tokio::time::sleep(Duration::from_millis(latency.into())).await;
}
}
async fn stream <T: futures_util::Sink<Message> + std::marker::Unpin>(
async fn stream<T: futures_util::Sink<Message> + std::marker::Unpin>(
mut ws_stream: T,
mut message_consumer: Receiver<Message>,
) {
while let Ok(message) = message_consumer.recv().await {
if message_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT {
// println!(
// "{} Forced to Disconnect | Reason -> Slow Consumer",
// format!("{}:{}", listener.ip, listener.port)
// );
break;
}
match ws_stream.send(message).await {
Ok(_) => {
if let Err(_) = ws_stream.flush().await {
// println!(
// "{} is Disconnected",
// format!("{}:{}", listener.ip, listener.port)
// );
break;
}
}
Err(_) => {
// println!(
// "{} is Disconnected",
// format!("{}:{}", listener.ip, listener.port)
// );
break;
}
}
}
}
async fn status_checker(
message_organizer_task: JoinHandle<()>,
stream_task: JoinHandle<()>,
mixer_task: JoinHandle<()>,
mut base_to_streaming: Receiver<bool>,
streaming_to_base: Sender<bool>,
) {
while let Err(_) = base_to_streaming.try_recv() {
tokio::time::sleep(Duration::from_secs(3)).await;
}
stream_task.abort();
mixer_task.abort();
message_organizer_task.abort();
match streaming_to_base.send(true) {
Ok(_) => println!("Cleaning Done: Streamer Disconnected"),
Err(err_val) => eprintln!("Error: Cleaning | {}", err_val),
}
}

View file

@ -5,9 +5,12 @@ use crate::Config;
pub async fn get_config() -> Config {
let mut config_file = File::open("configs/streamer_configs.txt").await.unwrap();
let mut configs_unparsed = String::new();
config_file.read_to_string(&mut configs_unparsed).await.unwrap();
config_file
.read_to_string(&mut configs_unparsed)
.await
.unwrap();
let configs_parsed:Vec<&str> = configs_unparsed.split_terminator("\n").collect();
let configs_parsed: Vec<&str> = configs_unparsed.split_terminator("\n").collect();
let mut configs_cleaned: Vec<&str> = vec![];
for config in configs_parsed {
@ -20,4 +23,4 @@ pub async fn get_config() -> Config {
latency: configs_cleaned[2].parse().unwrap(),
tls: configs_cleaned[3].parse().unwrap(),
}
}
}