diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 583af66..029c813 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -13,7 +13,6 @@ const BUFFER_LENGTH: usize = 1000000; const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start() { let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); - println!("Dude Someone Triggered"); let (record_producer, record_consumer) = channel(BUFFER_LENGTH); let streamer_socket = TcpListener::bind("192.168.1.2:2525").await.unwrap(); if let Ok((streamer_tcp, streamer_info)) = streamer_socket.accept().await { @@ -26,6 +25,7 @@ pub async fn start() { let (buffered_producer, _) = channel(BUFFER_LENGTH); tokio::spawn(message_organizer(message_producer.clone(), record_consumer)); tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone())); + tokio::spawn(status_checker(buffered_producer.clone())); while let Ok((tcp_stream, listener_info)) = socket.accept().await { let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); println!("New Listener: {}", listener_info); @@ -40,25 +40,34 @@ pub async fn start() { )); } } +async fn status_checker(buffered_producer: Sender) { + let mut listener_counter = buffered_producer.receiver_count(); + //let mut buffer_len = buffered_producer.len(); + loop { + tokio::time::sleep(Duration::from_secs(3)).await; + if buffered_producer.receiver_count() != 0 { + if buffered_producer.len() > 2 { + println!("Bottleneck: {}", buffered_producer.len()); + } + if listener_counter != buffered_producer.receiver_count() { + listener_counter = buffered_producer.receiver_count(); + println!("Listener(s): {}", listener_counter); + } + } + + } +} async fn buffer_layer(mut message_consumer: Receiver, buffered_producer: Sender) { loop { - tokio::time::sleep(Duration::from_millis(200)).await; - let mut messages_buffered: Vec = vec![]; + tokio::time::sleep(Duration::from_millis(50)).await; while message_consumer.len() > 0 { match message_consumer.recv().await { - Ok(msg) => { - messages_buffered.push(msg); - } - Err(_) => {} - } - } - let mut message_buffered_concreted = String::new(); - if messages_buffered.len() > 0 { - for message_buffered in messages_buffered { - message_buffered_concreted = format!("{}{}", message_buffered_concreted, message_buffered); - } - match buffered_producer.send(message_buffered_concreted.into()) { - Ok(_) => {} + Ok(message) => match buffered_producer.send(message) { + Ok(_) => { + + } + Err(_) => {} + }, Err(_) => {} } } @@ -71,7 +80,7 @@ async fn streamer_stream( while let Some(message_with_question) = ws_stream.next().await { match message_with_question { Ok(message) => { - println!("{}", message.len()); + //println!("{}", message.len()); match record_producer.send(message) { Ok(_) => {} Err(_) => {} @@ -87,30 +96,14 @@ async fn message_organizer( mut record_consumer: Receiver, ) { loop { - let mut messages = String::new(); - let mut iteration = record_consumer.len(); - while iteration > 0 { - iteration -= 1; - match record_consumer.recv().await { - Ok(single_message) => { - let single_message_packet = single_message.to_string(); - messages = format!("{}{}", messages, single_message_packet); - } - Err(_) => {} - } - } - if messages.len() > 0 { - match message_producer.send(messages.into()) { + match record_consumer.recv().await { + Ok(single_message) => match message_producer.send(single_message) { Ok(_) => {} Err(_) => {} - } - println!( - "Message Counter = {} | Receiver Count = {}", - message_producer.len(), - message_producer.receiver_count() - ); + }, + Err(_) => {} } - tokio::time::sleep(Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_millis(50)).await; } } async fn stream( @@ -123,9 +116,10 @@ async fn stream( println!( "{} Forced to Disconnect | Reason -> Slow Consumer", format!("{}:{}", listener.ip, listener.port) - ); + ); break; } + match ws_stream.send(message).await { Ok(_) => { if let Err(_) = ws_stream.flush().await { diff --git a/front/Cargo.toml b/front/Cargo.toml index 51e94f6..1707857 100644 --- a/front/Cargo.toml +++ b/front/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0.81" +brotli = "4.0.0" cpal = { version = "0.15.3", features = ["wasm-bindgen"] } dioxus = { version = "0.5.0", features = ["web"] } futures-core = "0.3.30" diff --git a/front/src/streaming.rs b/front/src/streaming.rs index f99097b..5f8284d 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -1,5 +1,6 @@ -use std::{mem::MaybeUninit, sync::Arc, time::Duration}; +use std::{io::Write, mem::MaybeUninit, sync::Arc, time::Duration}; +use brotli::DecompressorWriter; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use dioxus::{ prelude::spawn, @@ -8,7 +9,7 @@ use dioxus::{ use futures_util::{stream::SplitStream, SinkExt, StreamExt}; use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; -use tokio_tungstenite_wasm::WebSocketStream; +use tokio_tungstenite_wasm::{Message, WebSocketStream}; static BUFFER_LIMIT: usize = 800000; static BUFFER_LENGTH: usize = 1000000; @@ -57,16 +58,28 @@ pub async fn sound_stream( ) { log::info!("Attention! We need cables"); - while let Some(msg) = stream_consumer.next().await { + while let Some(message_with_question) = stream_consumer.next().await { if is_listening() { - let data = msg.unwrap().to_string(); - //log::info!("{:#?}", data); - log::info!("{}", data.len()); + + //log::info!("{}", message_with_question.unwrap().len()); + let mut data:Vec = vec![]; + if let Message::Binary(message) = message_with_question.unwrap() { + data = message; + } + let mut decompression_writer = DecompressorWriter::new(vec![], BUFFER_LENGTH); + if let Err(err_val) = decompression_writer.write_all(&data) { + log::error!("Error: Decompression | {}", err_val); + } + let uncompressed_data = + match decompression_writer.into_inner() { + Ok(healty_packet) => healty_packet, + Err(unhealty_packet) => {log::warn!("Warning: Unhealty Packet | {}", unhealty_packet.len());unhealty_packet}, + }; + + let data = String::from_utf8(uncompressed_data).unwrap(); let mut datum_parsed:Vec = vec![]; let mut data_parsed:Vec = vec![]; - - for char in data.chars() { if char == '+' || char == '-' { data_parsed.push(datum_parsed.iter().collect()); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 70c65a5..3289cde 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +brotli = "4.0.0" cpal = "0.15.3" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } ringbuf = "0.3.3" diff --git a/streamer/src/recording.rs b/streamer/src/recording.rs index 2e96044..36edb2b 100644 --- a/streamer/src/recording.rs +++ b/streamer/src/recording.rs @@ -20,9 +20,8 @@ pub async fn recording(sound_stream_producer: Sender) { .build_input_stream(&config, input_data_fn, err_fn, None) .unwrap(); - println!("STREAMIN"); input_stream.play().unwrap(); - + println!("Recording Started"); std::thread::sleep(std::time::Duration::from_secs(1000000000)); println!("DONE I HOPE"); } diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index 12ddd59..8f5a3da 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -1,5 +1,6 @@ -use std::time::Duration; +use std::{io::Write, time::Duration}; +use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::sync::broadcast::{channel, Receiver, Sender}; @@ -41,7 +42,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece let _zero = charred.remove(1); let _point = charred.remove(1); } - charred.truncate(8); + charred.truncate(4); let mut single_data_packet: Vec = vec![]; for char in charred { let char_packet = char.to_string().as_bytes().to_vec(); @@ -60,17 +61,24 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece } } if !messages.is_empty() { - match message_producer.send(messages.into()) { + let mut compression_writer = CompressorWriter::new(vec![], BUFFER_LENGTH, 4, 24); + if let Err(err_val) = compression_writer.write_all(&messages) { + eprintln!("Error: Compression | {}", err_val); + } + let compressed_messages = compression_writer.into_inner(); + // println!("Compressed Len {}", compressed_messages.len()); + // println!("UNCompressed Len {}", messages.len()); + match message_producer.send(compressed_messages.into()) { Ok(_) => {} Err(_) => {} } - println!( - "Message Counter = {} | Receiver Count = {}", - message_producer.len(), - message_producer.receiver_count() - ); + // println!( + // "Message Counter = {} | Receiver Count = {}", + // message_producer.len(), + // message_producer.receiver_count() + // ); } - tokio::time::sleep(Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_millis(50)).await; } }