refactor: ♻️ refactor and update dependencies

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-14 23:48:14 +03:00
parent 73f624b14f
commit 0f31fd54ea
10 changed files with 107 additions and 83 deletions

View file

@ -3,7 +3,8 @@ use std::time::Duration;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use tokio::{ use tokio::{
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
sync::broadcast::{channel, Receiver, Sender}, time::Instant, sync::broadcast::{channel, Receiver, Sender},
time::Instant,
}; };
use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
@ -23,17 +24,26 @@ pub async fn start() {
Ok((streamer_tcp, streamer_info)) => { Ok((streamer_tcp, streamer_info)) => {
match tokio_tungstenite::accept_async(streamer_tcp).await { match tokio_tungstenite::accept_async(streamer_tcp).await {
Ok(ws_stream) => { Ok(ws_stream) => {
println!("New Streamer: {:#?} | {:#?}", streamer_info, timer.elapsed()); println!(
"New Streamer: {:#?} | {:#?}",
streamer_info,
timer.elapsed()
);
let new_streamer = Streamer { let new_streamer = Streamer {
ip: streamer_info.ip(), ip: streamer_info.ip(),
port: streamer_info.port(), port: streamer_info.port(),
}; };
tokio::spawn(streamer_stream(new_streamer, record_producer, ws_stream, timer)); tokio::spawn(streamer_stream(
new_streamer,
record_producer,
ws_stream,
timer,
));
break; break;
}, }
Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val),
} }
}, }
Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val), Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val),
} }
} }
@ -65,7 +75,11 @@ async fn status_checker(buffered_producer: Sender<Message>, timer: Instant) {
if buffered_producer.receiver_count() != 0 { if buffered_producer.receiver_count() != 0 {
if buffered_producer.len() > 2 { if buffered_producer.len() > 2 {
bottleneck_flag = true; bottleneck_flag = true;
println!("Bottleneck: {} | {:#?}", buffered_producer.len(), timer.elapsed()); println!(
"Bottleneck: {} | {:#?}",
buffered_producer.len(),
timer.elapsed()
);
} }
if bottleneck_flag && buffered_producer.len() < 2 { if bottleneck_flag && buffered_producer.len() < 2 {
bottleneck_flag = false; bottleneck_flag = false;
@ -76,7 +90,6 @@ async fn status_checker(buffered_producer: Sender<Message>, timer: Instant) {
println!("Listener(s): {}", listener_counter); println!("Listener(s): {}", listener_counter);
} }
} }
} }
} }
async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer: Sender<Message>) { async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer: Sender<Message>) {
@ -85,9 +98,7 @@ async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer
while message_consumer.len() > 0 { while message_consumer.len() > 0 {
match message_consumer.recv().await { match message_consumer.recv().await {
Ok(message) => match buffered_producer.send(message) { Ok(message) => match buffered_producer.send(message) {
Ok(_) => { Ok(_) => {}
}
Err(_) => {} Err(_) => {}
}, },
Err(_) => {} Err(_) => {}
@ -116,7 +127,11 @@ async fn streamer_stream(
} }
} }
None => { None => {
println!("Streamer Disconnected: {} | {:#?}", format!("{}:{}", streamer.ip, streamer.port), timer.elapsed()); println!(
"Streamer Disconnected: {} | {:#?}",
format!("{}:{}", streamer.ip, streamer.port),
timer.elapsed()
);
return; return;
} }
} }

View file

@ -7,7 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.81" anyhow = "1.0.81"
brotli = "4.0.0" brotli = "5.0.0"
cpal = { version = "0.15.3", features = ["wasm-bindgen"] } cpal = { version = "0.15.3", features = ["wasm-bindgen"] }
dioxus = { version = "0.5.0", features = ["web"] } dioxus = { version = "0.5.0", features = ["web"] }
futures-core = "0.3.30" futures-core = "0.3.30"

View file

@ -1,3 +1,7 @@
pub mod components; pub mod components;
pub mod listening;
pub mod status; pub mod status;
pub mod streaming; pub mod streaming;
static BUFFER_LENGTH: usize = 1000000;
static BUFFER_LIMIT: usize = BUFFER_LENGTH / 100 * 90;

46
front/src/listening.rs Normal file
View file

@ -0,0 +1,46 @@
use std::{mem::MaybeUninit, sync::Arc, time::Duration};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use dioxus::signals::Signal;
use ringbuf::{Consumer, SharedRb};
use crate::BUFFER_LIMIT;
pub async fn listen_podcast(
is_listening: Signal<bool>,
mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>,
) {
log::info!("Attention! Show must start!");
let host = cpal::default_host();
let output_device = host.default_output_device().unwrap();
let config: cpal::StreamConfig = output_device.default_output_config().unwrap().into();
let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
if consumer.len() > BUFFER_LIMIT {
consumer.clear();
log::error!("Slow Consumer: DROPPED ALL Packets");
}
for sample in data {
*sample = match consumer.pop() {
Some(s) => s,
None => 0.0,
};
}
};
let output_stream = output_device
.build_output_stream(&config, output_data_fn, err_fn, None)
.unwrap();
output_stream.play().unwrap();
while is_listening() {
tokio_with_wasm::tokio::time::sleep(Duration::from_secs(1)).await;
}
output_stream.pause().unwrap();
log::info!("Attention! Time to turn home!");
}
fn err_fn(err: cpal::StreamError) {
eprintln!("Something Happened: {}", err);
}

View file

@ -77,20 +77,16 @@ pub async fn server_status_check(
} }
pub async fn coin_status_check(server_address: &String) -> Option<CoinStatus> { pub async fn coin_status_check(server_address: &String) -> Option<CoinStatus> {
match reqwest::get(format!("{}{}", server_address, "/coin")).await { match reqwest::get(format!("{}{}", server_address, "/coin")).await {
Ok(response) => { Ok(response) => match response.json::<CoinStatus>().await {
match response.json::<CoinStatus>().await { Ok(coin_status) => Some(coin_status),
Ok(coin_status)=> {
Some(coin_status)
}
Err(err_val) => { Err(err_val) => {
log::error!("Error: Can't Deserialise -> {}", err_val); log::error!("Error: Can't Deserialise -> {}", err_val);
None None
} }
}
}, },
Err(err_val) => { Err(err_val) => {
log::error!("Error: Response from Server -> {}", err_val); log::error!("Error: Response from Server -> {}", err_val);
None None
}, }
} }
} }

View file

@ -1,24 +1,22 @@
use std::{io::Write, mem::MaybeUninit, sync::Arc, time::Duration}; use std::{io::Write, mem::MaybeUninit, sync::Arc};
use brotli::DecompressorWriter; use brotli::DecompressorWriter;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use dioxus::{ use dioxus::{
prelude::spawn, prelude::spawn,
signals::{Signal, Writable}, signals::{Signal, Writable},
}; };
use futures_util::{stream::SplitStream, SinkExt, StreamExt}; use futures_util::{stream::SplitStream, SinkExt, StreamExt};
use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; use ringbuf::{HeapRb, Producer, SharedRb};
use tokio_tungstenite_wasm::{Message, WebSocketStream}; use tokio_tungstenite_wasm::{Message, WebSocketStream};
static BUFFER_LENGTH: usize = 1000000; use crate::{listening::listen_podcast, BUFFER_LENGTH};
static BUFFER_LIMIT: usize = BUFFER_LENGTH/100*90;
pub async fn start_listening( pub async fn start_listening(
mut is_maintaining: Signal<(bool, bool)>, mut is_maintaining: Signal<(bool, bool)>,
mut is_listening: Signal<bool>, mut is_listening: Signal<bool>,
) { ) {
//seperate record and stream, refactor
if is_listening() { if is_listening() {
log::info!("Trying Sir"); log::info!("Trying Sir");
let connect_addr = "ws://192.168.1.2:2424"; let connect_addr = "ws://192.168.1.2:2424";
@ -61,7 +59,6 @@ pub async fn sound_stream(
while let Some(message_with_question) = stream_consumer.next().await { while let Some(message_with_question) = stream_consumer.next().await {
if is_listening() { if is_listening() {
//log::info!("{}", message_with_question.unwrap().len()); //log::info!("{}", message_with_question.unwrap().len());
let mut data: Vec<u8> = vec![]; let mut data: Vec<u8> = vec![];
if let Message::Binary(message) = message_with_question.unwrap() { if let Message::Binary(message) = message_with_question.unwrap() {
@ -72,10 +69,12 @@ pub async fn sound_stream(
if let Err(err_val) = decompression_writer.write_all(&data) { if let Err(err_val) = decompression_writer.write_all(&data) {
log::error!("Error: Decompression | {}", err_val); log::error!("Error: Decompression | {}", err_val);
} }
let uncompressed_data = let uncompressed_data = match decompression_writer.into_inner() {
match decompression_writer.into_inner() {
Ok(healty_packet) => healty_packet, Ok(healty_packet) => healty_packet,
Err(unhealty_packet) => {log::warn!("Warning: Unhealty Packet | {}", unhealty_packet.len());unhealty_packet}, Err(unhealty_packet) => {
log::warn!("Warning: Unhealty Packet | {}", unhealty_packet.len());
unhealty_packet
}
}; };
let data = String::from_utf8(uncompressed_data).unwrap(); let data = String::from_utf8(uncompressed_data).unwrap();
@ -109,41 +108,3 @@ pub async fn sound_stream(
log::info!("Connection Lost Sir"); log::info!("Connection Lost Sir");
} }
async fn listen_podcast(
is_listening: Signal<bool>,
mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>,
) {
log::info!("Attention! Show must start!");
let host = cpal::default_host();
let output_device = host.default_output_device().unwrap();
let config: cpal::StreamConfig = output_device.default_output_config().unwrap().into();
let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
if consumer.len() > BUFFER_LIMIT {
consumer.clear();
log::error!("Slow Consumer: DROPPED ALL Packets");
}
for sample in data {
*sample = match consumer.pop() {
Some(s) => s,
None => 0.0,
};
}
};
let output_stream = output_device
.build_output_stream(&config, output_data_fn, err_fn, None)
.unwrap();
output_stream.play().unwrap();
while is_listening() {
tokio_with_wasm::tokio::time::sleep(Duration::from_secs(1)).await;
}
output_stream.pause().unwrap();
log::info!("Attention! Time to turn home!");
}
fn err_fn(err: cpal::StreamError) {
eprintln!("Something Happened: {}", err);
}

View file

@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
brotli = "4.0.0" brotli = "5.0.0"
cpal = "0.15.3" cpal = "0.15.3"
futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] }
ringbuf = "0.3.3" ringbuf = "0.3.3"

View file

@ -10,5 +10,7 @@ async fn main() {
let (sound_stream_producer, sound_stream_consumer) = channel(BUFFER_LENGTH); let (sound_stream_producer, sound_stream_consumer) = channel(BUFFER_LENGTH);
tokio::spawn(recording(sound_stream_producer)); tokio::spawn(recording(sound_stream_producer));
tokio::spawn(start(sound_stream_consumer)); tokio::spawn(start(sound_stream_consumer));
loop {
tokio::time::sleep(Duration::from_secs(1000000000)).await; tokio::time::sleep(Duration::from_secs(1000000000)).await;
} }
}

View file

@ -78,7 +78,7 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
// message_producer.receiver_count() // message_producer.receiver_count()
// ); // );
} }
tokio::time::sleep(Duration::from_millis(50)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} }
} }