diff --git a/back/src/lib.rs b/back/src/lib.rs index 9a8a2b7..0661cfb 100644 --- a/back/src/lib.rs +++ b/back/src/lib.rs @@ -5,17 +5,15 @@ pub mod streaming; pub mod utils; #[derive(Debug, Clone)] -pub struct AppState{ - -} +pub struct AppState {} #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -enum ServerStatus{ +enum ServerStatus { Alive, Unstable, Dead, } -#[derive(Debug, Clone, PartialEq, Serialize,Deserialize)] -enum CoinStatus{ +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +enum CoinStatus { Tail, Head, -} \ No newline at end of file +} diff --git a/back/src/main.rs b/back/src/main.rs index 6841203..d7f9535 100644 --- a/back/src/main.rs +++ b/back/src/main.rs @@ -1,10 +1,10 @@ -use back::{AppState, routing, streaming}; -use std::{env, net::SocketAddr}; use axum_server::tls_rustls::RustlsConfig; +use back::{routing, streaming, AppState}; +use std::{env, net::SocketAddr}; -fn take_args() -> String{ - let mut bind_address:String = String::new(); - for element in env::args(){ +fn take_args() -> String { + let mut bind_address: String = String::new(); + for element in env::args() { bind_address = element } println!("\n\n\tOn Air -> http://{}\n\n", bind_address); @@ -14,13 +14,11 @@ fn take_args() -> String{ #[tokio::main] async fn main() { println!("Hello, world!"); - let config = RustlsConfig::from_pem_file( - "certificates/fullchain.pem", - "certificates/privkey.pem" - ).await.unwrap(); - let state = AppState{ - - }; + let config = + RustlsConfig::from_pem_file("certificates/fullchain.pem", "certificates/privkey.pem") + .await + .unwrap(); + let state = AppState {}; let app = routing::routing(axum::extract::State(state)).await; let addr = SocketAddr::from(take_args().parse::().unwrap()); tokio::spawn(streaming::start()); diff --git a/back/src/routing.rs b/back/src/routing.rs index ab1acb4..701ca90 100644 --- a/back/src/routing.rs +++ b/back/src/routing.rs @@ -1,17 +1,20 @@ -use crate::{AppState, ServerStatus, CoinStatus, streaming}; -use axum::{body::Body, extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, Router}; +use crate::{streaming, AppState, CoinStatus, ServerStatus}; +use axum::{ + body::Body, extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, + Router, +}; +use rand::prelude::*; use tokio::fs::File; use tokio_util::io::ReaderStream; use tower_http::cors::CorsLayer; -use rand::prelude::*; pub async fn routing(State(state): State) -> Router { Router::new() - .route("/", get(alive)) - .route("/coin", get(flip_coin)) - .route("/stream", get(stream)) - .layer(CorsLayer::permissive()) - .with_state(state.clone()) + .route("/", get(alive)) + .route("/coin", get(flip_coin)) + .route("/stream", get(stream)) + .layer(CorsLayer::permissive()) + .with_state(state.clone()) } async fn alive() -> impl IntoResponse { @@ -24,7 +27,7 @@ async fn alive() -> impl IntoResponse { async fn flip_coin() -> impl IntoResponse { let mut rng = rand::thread_rng(); - let random:f64 = rng.gen(); + let random: f64 = rng.gen(); let mut flip_status = CoinStatus::Tail; if random > 0.5 { flip_status = CoinStatus::Head; @@ -43,4 +46,4 @@ async fn stream() -> impl IntoResponse { let file = File::open("audios/audio.mp3").await.unwrap(); let stream = ReaderStream::new(file); Body::from_stream(stream) -} \ No newline at end of file +} diff --git a/back/src/streaming.rs b/back/src/streaming.rs index ad2ecb1..b74de32 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -1,12 +1,14 @@ use std::{mem::MaybeUninit, sync::Arc, time::Duration}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; -use tokio::{net::{TcpListener, TcpStream}, time::Instant}; use futures_util::SinkExt; +use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; +use tokio::{ + net::{TcpListener, TcpStream}, + time::Instant, +}; use tokio_tungstenite::WebSocketStream; - pub async fn start() { let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); while let Ok((tcp_stream, _)) = socket.accept().await { @@ -17,21 +19,24 @@ pub async fn start() { let timer = Instant::now(); - tokio::spawn(record(producer)); tokio::spawn(stream(timer, ws_stream, consumer)); } } -pub async fn stream(timer:Instant, mut ws_stream:WebSocketStream, mut consumer: Consumer>>>>) { +pub async fn stream( + timer: Instant, + mut ws_stream: WebSocketStream, + mut consumer: Consumer>>>>, +) { println!("Waiting"); loop { tokio::time::sleep(Duration::from_secs(2)).await; - let mut data:Vec = Vec::new(); + let mut data: Vec = Vec::new(); let now = timer.elapsed().as_secs(); - while !consumer.is_empty() && (timer.elapsed().as_secs()+2) > now{ + while !consumer.is_empty() && (timer.elapsed().as_secs() + 2) > now { match consumer.pop() { - Some(single_data) => { + Some(single_data) => { let ring = HeapRb::::new(1000000); let (mut producer, mut consumer) = ring.split(); let single_data_packet = single_data.to_string().as_bytes().to_vec(); @@ -46,9 +51,7 @@ pub async fn stream(timer:Instant, mut ws_stream:WebSocketStream, mut data.push(consumer.pop().unwrap()); } } - None => { - - } + None => {} } } ws_stream.send(data.into()).await.unwrap(); @@ -63,21 +66,21 @@ pub async fn record(mut producer: Producer {}, - Err(_) => {}, + Ok(_) => {} + Err(_) => {} } println!("{}", sample); } }; - - let input_stream = input_device.build_input_stream(&config, input_data_fn, err_fn, None).unwrap(); - + let input_stream = input_device + .build_input_stream(&config, input_data_fn, err_fn, None) + .unwrap(); println!("STREAMIN"); input_stream.play().unwrap(); diff --git a/back/src/utils.rs b/back/src/utils.rs index e69de29..8b13789 100644 --- a/back/src/utils.rs +++ b/back/src/utils.rs @@ -0,0 +1 @@ + diff --git a/front/src/components.rs b/front/src/components.rs index ba15400..00ac20d 100644 --- a/front/src/components.rs +++ b/front/src/components.rs @@ -1,9 +1,53 @@ -use crate::status::{ - coin_status_check, server_status_check, Coin, CoinStatus, Server, ServerStatus, +use crate::{ + status::{coin_status_check, server_status_check, Coin, CoinStatus, Server, ServerStatus}, + streaming::start_listening, }; use dioxus::prelude::*; use std::time::Duration; +#[component] +pub fn listen_renderer() -> Element { + let mut is_listening = use_signal(|| false); + let is_maintaining = use_signal(|| (false, false)); + let call_start_listening = move |_| { + if !is_listening() { + if !is_maintaining().0 && !is_maintaining().1 { + spawn({ + to_owned![is_listening]; + to_owned![is_maintaining]; + is_listening.set(true); + async move { + start_listening(is_maintaining, is_listening).await; + } + }); + } + } + else { + is_listening.set(false); + } + }; + rsx! { + div { + button { + disabled: !is_listening()&&(is_maintaining().0 || is_maintaining().1), + onclick: call_start_listening, + "style":"width: 100px; height: 100px;", + if is_listening() { + "Disconnect & Stop Listening" + } + else { + if is_maintaining().0 || is_maintaining().1 { + "Maintaining, Be Right Back Soon" + } + else { + "Connect & Listen" + } + } + } + } + } +} + #[component] pub fn server_status_renderer(server_address: String) -> Element { let server_check_time = 1_u64; diff --git a/front/src/main.rs b/front/src/main.rs index cb147c7..f7cb69f 100644 --- a/front/src/main.rs +++ b/front/src/main.rs @@ -1,8 +1,5 @@ use dioxus::prelude::*; -use front::{ - components::{coin_status_renderer, server_status_renderer}, - streaming::start_listening, -}; +use front::components::{coin_status_renderer, listen_renderer, server_status_renderer}; fn main() { println!("Hello, world!"); @@ -14,13 +11,7 @@ fn app() -> Element { let server_address = "https://tahinli.com.tr:2323".to_string(); rsx! { page_base {} - div { - button { - onclick: move |_| start_listening(), - "style":"width: 80px; height: 50px;", - "Listen" - } - } + listen_renderer {} coin_status_renderer {server_address:server_address.clone()} server_status_renderer {server_address:server_address.clone()} } diff --git a/front/src/streaming.rs b/front/src/streaming.rs index ba9cd8a..5117ec3 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -1,61 +1,97 @@ use std::{mem::MaybeUninit, sync::Arc, time::Duration}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use dioxus::hooks::{use_coroutine, Coroutine}; +use dioxus::{ + prelude::spawn, + signals::{Signal, Writable}, +}; use futures_util::StreamExt; + use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; use tokio_tungstenite_wasm::WebSocketStream; -use tokio_with_wasm::tokio::time::sleep; -pub async fn start_listening() { - log::info!("Trying Sir"); - let connect_addr = "ws://192.168.1.2:2424"; - let stream = tokio_tungstenite_wasm::connect(connect_addr).await.unwrap(); - let ring = HeapRb::::new(1000000); - let (producer, consumer) = ring.split(); - let _sound_stream: Coroutine<()> = use_coroutine(|_| async move { - sound_stream(stream, producer).await; - }); - tokio_with_wasm::tokio::time::sleep(Duration::from_secs(1)).await; - let _listen: Coroutine<()> = use_coroutine(|_| async move { - listen(consumer).await; - }); +static BUFFER_LIMIT: usize = 800000; +static BUFFER_LENGTH: usize = 1000000; + +pub async fn start_listening( + mut is_maintaining: Signal<(bool, bool)>, + mut is_listening: Signal, +) { + if is_listening() { + log::info!("Trying Sir"); + let connect_addr = "ws://192.168.1.2:2424"; + let stream: WebSocketStream; + match tokio_tungstenite_wasm::connect(connect_addr).await { + Ok(ws_stream) => stream = ws_stream, + Err(_) => { + is_listening.set(false); + return; + } + } + is_maintaining.set((true, true)); + let ring = HeapRb::::new(BUFFER_LENGTH); + let (producer, consumer) = ring.split(); + spawn({ + async move { + sound_stream(is_listening, stream, producer).await; + is_listening.set(false); + is_maintaining.set((false, is_maintaining().1)); + } + }); + spawn({ + async move { + listen_podcast(is_listening, consumer).await; + is_listening.set(false); + //buffer time waiting actually + tokio_with_wasm::tokio::time::sleep(Duration::from_secs(2)).await; + log::info!("{:#?}", is_maintaining()); + is_maintaining.set((is_maintaining().0, false)); + log::info!("pod{:#?}", is_maintaining()); + } + }); + } } -async fn sound_stream( +pub async fn sound_stream( + is_listening: Signal, mut stream: WebSocketStream, mut producer: Producer>>>>, ) { + log::info!("Attention! We need cables"); + while let Some(msg) = stream.next().await { - let data = String::from_utf8(msg.unwrap().into()).unwrap(); - let data_parsed:Vec<&str> = data.split("#").collect(); - //let mut sound_data:Vec = vec![]; - for element in data_parsed { - let single_data:f32 = match element.parse() { - Ok(single) => single, - Err(_) => 0.0, - }; - producer.push(single_data).unwrap(); + if is_listening() { + let data = String::from_utf8(msg.unwrap().into()).unwrap(); + let data_parsed: Vec<&str> = data.split("#").collect(); + for element in data_parsed { + let single_data: f32 = match element.parse() { + Ok(single) => single, + Err(_) => 0.0, + }; + if let Err(_) = producer.push(single_data) {} + } + } + else { + break; } } - // while let Some(msg) = stream.next().await { - // match msg.unwrap().to_string().parse::() { - // Ok(sound_data) => match producer.push(sound_data) { - // Ok(_) => {} - // Err(_) => {} - // }, - // Err(_) => {} - // }; - // } + log::info!("Connection Lost Sir"); } -async fn listen(mut consumer: Consumer>>>>) { - log::info!("Hi"); +async fn listen_podcast( + is_listening: Signal, + mut consumer: Consumer>>>>, +) { + log::info!("Attention! Show must start!"); let host = cpal::default_host(); let output_device = host.default_output_device().unwrap(); let config: cpal::StreamConfig = output_device.default_output_config().unwrap().into(); let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + if consumer.len() > BUFFER_LIMIT { + consumer.clear(); + log::error!("Slow Consumer: DROPPED ALL Packets"); + } for sample in data { *sample = match consumer.pop() { Some(s) => s, @@ -69,52 +105,13 @@ async fn listen(mut consumer: Consumer {log::info!("i8")}, - // cpal::SampleFormat::I16 => {log::info!("i16")}, - // //cpal::SampleFormat::I24 => {log::info!("i24")}, - // cpal::SampleFormat::I32 => {log::info!("i32")}, - // //cpal::SampleFormat::I48 => {log::info!("i48")}, - // cpal::SampleFormat::I64 => {log::info!("i64")}, - // cpal::SampleFormat::U8 => {log::info!("u8")}, - // cpal::SampleFormat::U16 => {log::info!("u16")}, - // //cpal::SampleFormat::U24 => {log::info!("u24")}, - // cpal::SampleFormat::U32 => {log::info!("u32")}, - // //cpal::SampleFormat::U48 => {log::info!("u48")}, - // cpal::SampleFormat::U64 => {log::info!("u64")}, - // cpal::SampleFormat::F32 => {log::info!("f32"); - // run::(consumer, &device, &config.clone().into()).await.unwrap();}, - // cpal::SampleFormat::F64 => {log::info!("f64")}, - // sample_format => panic!("Unsupported sample format '{sample_format}'"), - // } - // let config:StreamConfig = config.into(); - // let stream = device.build_output_stream( - // &config, - // move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - - // log::info!("{:?}", data); - // //I need to do something here, I think - // }, - // move |_err| { - - // }, - // None).unwrap(); - - // stream.play().unwrap(); - // tokio::time::sleep(Duration::from_secs(10)).await; - // stream.pause().unwrap(); + log::info!("Attention! Time to turn home!"); } fn err_fn(err: cpal::StreamError) { eprintln!("Something Happened: {}", err);