diff --git a/back/src/lib.rs b/back/src/lib.rs index 4b4db3c..9a8a2b7 100644 --- a/back/src/lib.rs +++ b/back/src/lib.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; pub mod routing; pub mod streaming; +pub mod utils; #[derive(Debug, Clone)] pub struct AppState{ diff --git a/back/src/streaming.rs b/back/src/streaming.rs index c0dc91d..ad2ecb1 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -1,39 +1,58 @@ -use std::{mem::MaybeUninit, sync::Arc}; +use std::{mem::MaybeUninit, sync::Arc, time::Duration}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{net::{TcpListener, TcpStream}, time::Instant}; use futures_util::SinkExt; use tokio_tungstenite::WebSocketStream; + pub async fn start() { - let socket = TcpListener::bind("127.0.0.1:2424").await.unwrap(); + let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); while let Ok((tcp_stream, _)) = socket.accept().await { println!("Dude Someone Triggered"); let ring = HeapRb::::new(1000000); let (producer, consumer) = ring.split(); let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); - + + let timer = Instant::now(); + + tokio::spawn(record(producer)); - std::thread::sleep(std::time::Duration::from_secs(3)); - tokio::spawn(stream(ws_stream, consumer)); + tokio::spawn(stream(timer, ws_stream, consumer)); } } -pub async fn stream(mut ws_stream:WebSocketStream, mut consumer: Consumer>>>>) { +pub async fn stream(timer:Instant, mut ws_stream:WebSocketStream, mut consumer: Consumer>>>>) { println!("Waiting"); loop { - if !consumer.is_empty() { + tokio::time::sleep(Duration::from_secs(2)).await; + let mut data:Vec = Vec::new(); + let now = timer.elapsed().as_secs(); + while !consumer.is_empty() && (timer.elapsed().as_secs()+2) > now{ match consumer.pop() { - Some(data) => { - ws_stream.send(data.to_string().into()).await.unwrap(); + Some(single_data) => { + let ring = HeapRb::::new(1000000); + let (mut producer, mut consumer) = ring.split(); + let single_data_packet = single_data.to_string().as_bytes().to_vec(); + let terminator = "#".as_bytes().to_vec(); + for element in single_data_packet { + producer.push(element).unwrap(); + } + for element in terminator { + producer.push(element).unwrap(); + } + while !consumer.is_empty() { + data.push(consumer.pop().unwrap()); + } } None => { - //ws_stream.send(0.0.to_string().into()).await.unwrap(); + } } - ws_stream.flush().await.unwrap(); - } + } + ws_stream.send(data.into()).await.unwrap(); + ws_stream.flush().await.unwrap(); } } diff --git a/back/src/utils.rs b/back/src/utils.rs new file mode 100644 index 0000000..e69de29 diff --git a/front/Cargo.toml b/front/Cargo.toml index cd8f583..51e94f6 100644 --- a/front/Cargo.toml +++ b/front/Cargo.toml @@ -8,11 +8,11 @@ edition = "2021" [dependencies] anyhow = "1.0.81" cpal = { version = "0.15.3", features = ["wasm-bindgen"] } -dioxus = { version = "0.5.0-alpha.2", features = ["web"] } +dioxus = { version = "0.5.0", features = ["web"] } futures-core = "0.3.30" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } log = "0.4.21" -reqwest = { version = "0.11.24", features = ["json"] } +reqwest = { version = "0.12.2", features = ["json"] } ringbuf = "0.3.3" serde = { version = "1.0.197", features = ["derive"] } tokio-tungstenite-wasm = "0.3.1" diff --git a/front/src/streaming.rs b/front/src/streaming.rs index b8cd734..ba9cd8a 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -9,7 +9,7 @@ use tokio_with_wasm::tokio::time::sleep; pub async fn start_listening() { log::info!("Trying Sir"); - let connect_addr = "ws://127.0.0.1:2424"; + let connect_addr = "ws://192.168.1.2:2424"; let stream = tokio_tungstenite_wasm::connect(connect_addr).await.unwrap(); let ring = HeapRb::::new(1000000); let (producer, consumer) = ring.split(); @@ -27,14 +27,26 @@ async fn sound_stream( mut producer: Producer>>>>, ) { while let Some(msg) = stream.next().await { - match msg.unwrap().to_string().parse::() { - Ok(sound_data) => match producer.push(sound_data) { - Ok(_) => {} - Err(_) => {} - }, - Err(_) => {} - }; + let data = String::from_utf8(msg.unwrap().into()).unwrap(); + let data_parsed:Vec<&str> = data.split("#").collect(); + //let mut sound_data:Vec = vec![]; + for element in data_parsed { + let single_data:f32 = match element.parse() { + Ok(single) => single, + Err(_) => 0.0, + }; + producer.push(single_data).unwrap(); + } } + // while let Some(msg) = stream.next().await { + // match msg.unwrap().to_string().parse::() { + // Ok(sound_data) => match producer.push(sound_data) { + // Ok(_) => {} + // Err(_) => {} + // }, + // Err(_) => {} + // }; + // } log::info!("Connection Lost Sir"); } async fn listen(mut consumer: Consumer>>>>) {