diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 2d4785b..6e13760 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -3,7 +3,8 @@ use std::time::Duration; use futures_util::{SinkExt, StreamExt}; use tokio::{ net::{TcpListener, TcpStream}, - sync::broadcast::{channel, Receiver, Sender}, time::Instant, + sync::broadcast::{channel, Receiver, Sender}, + time::Instant, }; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; @@ -17,23 +18,32 @@ pub async fn start() { let (record_producer, record_consumer) = channel(BUFFER_LENGTH); let streamer_socket = TcpListener::bind("192.168.1.2:2525").await.unwrap(); let timer = Instant::now(); - + loop { match streamer_socket.accept().await { Ok((streamer_tcp, streamer_info)) => { match tokio_tungstenite::accept_async(streamer_tcp).await { Ok(ws_stream) => { - println!("New Streamer: {:#?} | {:#?}", streamer_info, timer.elapsed()); + println!( + "New Streamer: {:#?} | {:#?}", + streamer_info, + timer.elapsed() + ); let new_streamer = Streamer { ip: streamer_info.ip(), port: streamer_info.port(), }; - tokio::spawn(streamer_stream(new_streamer, record_producer, ws_stream, timer)); + tokio::spawn(streamer_stream( + new_streamer, + record_producer, + ws_stream, + timer, + )); break; - }, + } Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val), } - }, + } Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val), } } @@ -65,7 +75,11 @@ async fn status_checker(buffered_producer: Sender, timer: Instant) { if buffered_producer.receiver_count() != 0 { if buffered_producer.len() > 2 { bottleneck_flag = true; - println!("Bottleneck: {} | {:#?}", buffered_producer.len(), timer.elapsed()); + println!( + "Bottleneck: {} | {:#?}", + buffered_producer.len(), + timer.elapsed() + ); } if bottleneck_flag && buffered_producer.len() < 2 { bottleneck_flag = false; @@ -74,9 +88,8 @@ async fn status_checker(buffered_producer: Sender, timer: Instant) { if listener_counter != buffered_producer.receiver_count() { listener_counter = buffered_producer.receiver_count(); println!("Listener(s): {}", listener_counter); - } + } } - } } async fn buffer_layer(mut message_consumer: Receiver, buffered_producer: Sender) { @@ -85,9 +98,7 @@ async fn buffer_layer(mut message_consumer: Receiver, buffered_producer while message_consumer.len() > 0 { match message_consumer.recv().await { Ok(message) => match buffered_producer.send(message) { - Ok(_) => { - - } + Ok(_) => {} Err(_) => {} }, Err(_) => {} @@ -113,10 +124,14 @@ async fn streamer_stream( } } Err(_) => {} - } + } } None => { - println!("Streamer Disconnected: {} | {:#?}", format!("{}:{}", streamer.ip, streamer.port), timer.elapsed()); + println!( + "Streamer Disconnected: {} | {:#?}", + format!("{}:{}", streamer.ip, streamer.port), + timer.elapsed() + ); return; } } @@ -148,7 +163,7 @@ async fn stream( println!( "{} Forced to Disconnect | Reason -> Slow Consumer", format!("{}:{}", listener.ip, listener.port) - ); + ); break; } diff --git a/front/Cargo.toml b/front/Cargo.toml index 1707857..321e2a4 100644 --- a/front/Cargo.toml +++ b/front/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0.81" -brotli = "4.0.0" +brotli = "5.0.0" cpal = { version = "0.15.3", features = ["wasm-bindgen"] } dioxus = { version = "0.5.0", features = ["web"] } futures-core = "0.3.30" diff --git a/front/src/components.rs b/front/src/components.rs index b145aa8..05f10d6 100644 --- a/front/src/components.rs +++ b/front/src/components.rs @@ -113,7 +113,7 @@ pub fn coin_status_renderer(server_address: String) -> Element { } None => { is_loading.set(false); - coin_result.set(CoinStatus {status: Coin::Dead}); + coin_result.set(CoinStatus { status: Coin::Dead }); } } } diff --git a/front/src/lib.rs b/front/src/lib.rs index c59829e..69408e8 100644 --- a/front/src/lib.rs +++ b/front/src/lib.rs @@ -1,3 +1,7 @@ pub mod components; +pub mod listening; pub mod status; pub mod streaming; + +static BUFFER_LENGTH: usize = 1000000; +static BUFFER_LIMIT: usize = BUFFER_LENGTH / 100 * 90; diff --git a/front/src/listening.rs b/front/src/listening.rs new file mode 100644 index 0000000..9a75e11 --- /dev/null +++ b/front/src/listening.rs @@ -0,0 +1,46 @@ +use std::{mem::MaybeUninit, sync::Arc, time::Duration}; + +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use dioxus::signals::Signal; +use ringbuf::{Consumer, SharedRb}; + +use crate::BUFFER_LIMIT; + +pub async fn listen_podcast( + is_listening: Signal, + mut consumer: Consumer>>>>, +) { + log::info!("Attention! Show must start!"); + let host = cpal::default_host(); + let output_device = host.default_output_device().unwrap(); + let config: cpal::StreamConfig = output_device.default_output_config().unwrap().into(); + + let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + if consumer.len() > BUFFER_LIMIT { + consumer.clear(); + log::error!("Slow Consumer: DROPPED ALL Packets"); + } + for sample in data { + *sample = match consumer.pop() { + Some(s) => s, + None => 0.0, + }; + } + }; + + let output_stream = output_device + .build_output_stream(&config, output_data_fn, err_fn, None) + .unwrap(); + + output_stream.play().unwrap(); + + while is_listening() { + tokio_with_wasm::tokio::time::sleep(Duration::from_secs(1)).await; + } + + output_stream.pause().unwrap(); + log::info!("Attention! Time to turn home!"); +} +fn err_fn(err: cpal::StreamError) { + eprintln!("Something Happened: {}", err); +} diff --git a/front/src/status.rs b/front/src/status.rs index 0baf241..d4b7cff 100644 --- a/front/src/status.rs +++ b/front/src/status.rs @@ -77,20 +77,16 @@ pub async fn server_status_check( } pub async fn coin_status_check(server_address: &String) -> Option { match reqwest::get(format!("{}{}", server_address, "/coin")).await { - Ok(response) => { - match response.json::().await { - Ok(coin_status)=> { - Some(coin_status) - } - Err(err_val) => { - log::error!("Error: Can't Deserialise -> {}", err_val); - None - } + Ok(response) => match response.json::().await { + Ok(coin_status) => Some(coin_status), + Err(err_val) => { + log::error!("Error: Can't Deserialise -> {}", err_val); + None } }, Err(err_val) => { log::error!("Error: Response from Server -> {}", err_val); None - }, + } } } diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 4dd0d9a..95c85da 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -1,24 +1,22 @@ -use std::{io::Write, mem::MaybeUninit, sync::Arc, time::Duration}; +use std::{io::Write, mem::MaybeUninit, sync::Arc}; use brotli::DecompressorWriter; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; + use dioxus::{ prelude::spawn, signals::{Signal, Writable}, }; use futures_util::{stream::SplitStream, SinkExt, StreamExt}; -use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; +use ringbuf::{HeapRb, Producer, SharedRb}; use tokio_tungstenite_wasm::{Message, WebSocketStream}; -static BUFFER_LENGTH: usize = 1000000; -static BUFFER_LIMIT: usize = BUFFER_LENGTH/100*90; +use crate::{listening::listen_podcast, BUFFER_LENGTH}; pub async fn start_listening( mut is_maintaining: Signal<(bool, bool)>, mut is_listening: Signal, ) { - //seperate record and stream, refactor if is_listening() { log::info!("Trying Sir"); let connect_addr = "ws://192.168.1.2:2424"; @@ -61,10 +59,9 @@ pub async fn sound_stream( while let Some(message_with_question) = stream_consumer.next().await { if is_listening() { - //log::info!("{}", message_with_question.unwrap().len()); - let mut data:Vec = vec![]; - if let Message::Binary(message) = message_with_question.unwrap() { + let mut data: Vec = vec![]; + if let Message::Binary(message) = message_with_question.unwrap() { data = message; } @@ -72,15 +69,17 @@ pub async fn sound_stream( if let Err(err_val) = decompression_writer.write_all(&data) { log::error!("Error: Decompression | {}", err_val); } - let uncompressed_data = - match decompression_writer.into_inner() { + let uncompressed_data = match decompression_writer.into_inner() { Ok(healty_packet) => healty_packet, - Err(unhealty_packet) => {log::warn!("Warning: Unhealty Packet | {}", unhealty_packet.len());unhealty_packet}, + Err(unhealty_packet) => { + log::warn!("Warning: Unhealty Packet | {}", unhealty_packet.len()); + unhealty_packet + } }; let data = String::from_utf8(uncompressed_data).unwrap(); - let mut datum_parsed:Vec = vec![]; - let mut data_parsed:Vec = vec![]; + let mut datum_parsed: Vec = vec![]; + let mut data_parsed: Vec = vec![]; for char in data.chars() { if char == '+' || char == '-' { data_parsed.push(datum_parsed.iter().collect()); @@ -100,7 +99,7 @@ pub async fn sound_stream( Ok(sample) => sample, Err(_) => 0.0, }; - if let Err(_) = producer.push(sample){} + if let Err(_) = producer.push(sample) {} } } else { break; @@ -109,41 +108,3 @@ pub async fn sound_stream( log::info!("Connection Lost Sir"); } -async fn listen_podcast( - is_listening: Signal, - mut consumer: Consumer>>>>, -) { - log::info!("Attention! Show must start!"); - let host = cpal::default_host(); - let output_device = host.default_output_device().unwrap(); - let config: cpal::StreamConfig = output_device.default_output_config().unwrap().into(); - - let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - if consumer.len() > BUFFER_LIMIT { - consumer.clear(); - log::error!("Slow Consumer: DROPPED ALL Packets"); - } - for sample in data { - *sample = match consumer.pop() { - Some(s) => s, - None => 0.0, - }; - } - }; - - let output_stream = output_device - .build_output_stream(&config, output_data_fn, err_fn, None) - .unwrap(); - - output_stream.play().unwrap(); - - while is_listening() { - tokio_with_wasm::tokio::time::sleep(Duration::from_secs(1)).await; - } - - output_stream.pause().unwrap(); - log::info!("Attention! Time to turn home!"); -} -fn err_fn(err: cpal::StreamError) { - eprintln!("Something Happened: {}", err); -} diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 3289cde..21d3768 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -brotli = "4.0.0" +brotli = "5.0.0" cpal = "0.15.3" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } ringbuf = "0.3.3" diff --git a/streamer/src/main.rs b/streamer/src/main.rs index a651e31..64fee3f 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -10,5 +10,7 @@ async fn main() { let (sound_stream_producer, sound_stream_consumer) = channel(BUFFER_LENGTH); tokio::spawn(recording(sound_stream_producer)); tokio::spawn(start(sound_stream_consumer)); - tokio::time::sleep(Duration::from_secs(1000000000)).await; + loop { + tokio::time::sleep(Duration::from_secs(1000000000)).await; + } } diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 8f5a3da..c7bb053 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -78,7 +78,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece // message_producer.receiver_count() // ); } - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } }