feat: play && stop checks

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-07 06:10:09 +03:00
parent eabe8e0521
commit 4b158d5195
8 changed files with 176 additions and 141 deletions

View file

@ -5,17 +5,15 @@ pub mod streaming;
pub mod utils; pub mod utils;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AppState{ pub struct AppState {}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
enum ServerStatus{ enum ServerStatus {
Alive, Alive,
Unstable, Unstable,
Dead, Dead,
} }
#[derive(Debug, Clone, PartialEq, Serialize,Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
enum CoinStatus{ enum CoinStatus {
Tail, Tail,
Head, Head,
} }

View file

@ -1,10 +1,10 @@
use back::{AppState, routing, streaming};
use std::{env, net::SocketAddr};
use axum_server::tls_rustls::RustlsConfig; use axum_server::tls_rustls::RustlsConfig;
use back::{routing, streaming, AppState};
use std::{env, net::SocketAddr};
fn take_args() -> String{ fn take_args() -> String {
let mut bind_address:String = String::new(); let mut bind_address: String = String::new();
for element in env::args(){ for element in env::args() {
bind_address = element bind_address = element
} }
println!("\n\n\tOn Air -> http://{}\n\n", bind_address); println!("\n\n\tOn Air -> http://{}\n\n", bind_address);
@ -14,13 +14,11 @@ fn take_args() -> String{
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
println!("Hello, world!"); println!("Hello, world!");
let config = RustlsConfig::from_pem_file( let config =
"certificates/fullchain.pem", RustlsConfig::from_pem_file("certificates/fullchain.pem", "certificates/privkey.pem")
"certificates/privkey.pem" .await
).await.unwrap(); .unwrap();
let state = AppState{ let state = AppState {};
};
let app = routing::routing(axum::extract::State(state)).await; let app = routing::routing(axum::extract::State(state)).await;
let addr = SocketAddr::from(take_args().parse::<SocketAddr>().unwrap()); let addr = SocketAddr::from(take_args().parse::<SocketAddr>().unwrap());
tokio::spawn(streaming::start()); tokio::spawn(streaming::start());

View file

@ -1,17 +1,20 @@
use crate::{AppState, ServerStatus, CoinStatus, streaming}; use crate::{streaming, AppState, CoinStatus, ServerStatus};
use axum::{body::Body, extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, Router}; use axum::{
body::Body, extract::State, http::StatusCode, response::IntoResponse, routing::get, Json,
Router,
};
use rand::prelude::*;
use tokio::fs::File; use tokio::fs::File;
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use tower_http::cors::CorsLayer; use tower_http::cors::CorsLayer;
use rand::prelude::*;
pub async fn routing(State(state): State<AppState>) -> Router { pub async fn routing(State(state): State<AppState>) -> Router {
Router::new() Router::new()
.route("/", get(alive)) .route("/", get(alive))
.route("/coin", get(flip_coin)) .route("/coin", get(flip_coin))
.route("/stream", get(stream)) .route("/stream", get(stream))
.layer(CorsLayer::permissive()) .layer(CorsLayer::permissive())
.with_state(state.clone()) .with_state(state.clone())
} }
async fn alive() -> impl IntoResponse { async fn alive() -> impl IntoResponse {
@ -24,7 +27,7 @@ async fn alive() -> impl IntoResponse {
async fn flip_coin() -> impl IntoResponse { async fn flip_coin() -> impl IntoResponse {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let random:f64 = rng.gen(); let random: f64 = rng.gen();
let mut flip_status = CoinStatus::Tail; let mut flip_status = CoinStatus::Tail;
if random > 0.5 { if random > 0.5 {
flip_status = CoinStatus::Head; flip_status = CoinStatus::Head;

View file

@ -1,12 +1,14 @@
use std::{mem::MaybeUninit, sync::Arc, time::Duration}; use std::{mem::MaybeUninit, sync::Arc, time::Duration};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use ringbuf::{Consumer, HeapRb, Producer, SharedRb};
use tokio::{net::{TcpListener, TcpStream}, time::Instant};
use futures_util::SinkExt; use futures_util::SinkExt;
use ringbuf::{Consumer, HeapRb, Producer, SharedRb};
use tokio::{
net::{TcpListener, TcpStream},
time::Instant,
};
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
pub async fn start() { pub async fn start() {
let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap();
while let Ok((tcp_stream, _)) = socket.accept().await { while let Ok((tcp_stream, _)) = socket.accept().await {
@ -17,19 +19,22 @@ pub async fn start() {
let timer = Instant::now(); let timer = Instant::now();
tokio::spawn(record(producer)); tokio::spawn(record(producer));
tokio::spawn(stream(timer, ws_stream, consumer)); tokio::spawn(stream(timer, ws_stream, consumer));
} }
} }
pub async fn stream(timer:Instant, mut ws_stream:WebSocketStream<TcpStream>, mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>) { pub async fn stream(
timer: Instant,
mut ws_stream: WebSocketStream<TcpStream>,
mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>,
) {
println!("Waiting"); println!("Waiting");
loop { loop {
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
let mut data:Vec<u8> = Vec::new(); let mut data: Vec<u8> = Vec::new();
let now = timer.elapsed().as_secs(); let now = timer.elapsed().as_secs();
while !consumer.is_empty() && (timer.elapsed().as_secs()+2) > now{ while !consumer.is_empty() && (timer.elapsed().as_secs() + 2) > now {
match consumer.pop() { match consumer.pop() {
Some(single_data) => { Some(single_data) => {
let ring = HeapRb::<u8>::new(1000000); let ring = HeapRb::<u8>::new(1000000);
@ -46,9 +51,7 @@ pub async fn stream(timer:Instant, mut ws_stream:WebSocketStream<TcpStream>, mut
data.push(consumer.pop().unwrap()); data.push(consumer.pop().unwrap());
} }
} }
None => { None => {}
}
} }
} }
ws_stream.send(data.into()).await.unwrap(); ws_stream.send(data.into()).await.unwrap();
@ -63,21 +66,21 @@ pub async fn record(mut producer: Producer<f32, Arc<SharedRb<f32, Vec<MaybeUnini
println!("Input Device: {}", input_device.name().unwrap()); println!("Input Device: {}", input_device.name().unwrap());
let config:cpal::StreamConfig = input_device.default_input_config().unwrap().into(); let config: cpal::StreamConfig = input_device.default_input_config().unwrap().into();
let input_data_fn = move |data: &[f32], _:&cpal::InputCallbackInfo| { let input_data_fn = move |data: &[f32], _: &cpal::InputCallbackInfo| {
for &sample in data { for &sample in data {
match producer.push(sample) { match producer.push(sample) {
Ok(_) => {}, Ok(_) => {}
Err(_) => {}, Err(_) => {}
} }
println!("{}", sample); println!("{}", sample);
} }
}; };
let input_stream = input_device
let input_stream = input_device.build_input_stream(&config, input_data_fn, err_fn, None).unwrap(); .build_input_stream(&config, input_data_fn, err_fn, None)
.unwrap();
println!("STREAMIN"); println!("STREAMIN");
input_stream.play().unwrap(); input_stream.play().unwrap();

View file

@ -0,0 +1 @@

View file

@ -1,9 +1,53 @@
use crate::status::{ use crate::{
coin_status_check, server_status_check, Coin, CoinStatus, Server, ServerStatus, status::{coin_status_check, server_status_check, Coin, CoinStatus, Server, ServerStatus},
streaming::start_listening,
}; };
use dioxus::prelude::*; use dioxus::prelude::*;
use std::time::Duration; use std::time::Duration;
#[component]
pub fn listen_renderer() -> Element {
let mut is_listening = use_signal(|| false);
let is_maintaining = use_signal(|| (false, false));
let call_start_listening = move |_| {
if !is_listening() {
if !is_maintaining().0 && !is_maintaining().1 {
spawn({
to_owned![is_listening];
to_owned![is_maintaining];
is_listening.set(true);
async move {
start_listening(is_maintaining, is_listening).await;
}
});
}
}
else {
is_listening.set(false);
}
};
rsx! {
div {
button {
disabled: !is_listening()&&(is_maintaining().0 || is_maintaining().1),
onclick: call_start_listening,
"style":"width: 100px; height: 100px;",
if is_listening() {
"Disconnect & Stop Listening"
}
else {
if is_maintaining().0 || is_maintaining().1 {
"Maintaining, Be Right Back Soon"
}
else {
"Connect & Listen"
}
}
}
}
}
}
#[component] #[component]
pub fn server_status_renderer(server_address: String) -> Element { pub fn server_status_renderer(server_address: String) -> Element {
let server_check_time = 1_u64; let server_check_time = 1_u64;

View file

@ -1,8 +1,5 @@
use dioxus::prelude::*; use dioxus::prelude::*;
use front::{ use front::components::{coin_status_renderer, listen_renderer, server_status_renderer};
components::{coin_status_renderer, server_status_renderer},
streaming::start_listening,
};
fn main() { fn main() {
println!("Hello, world!"); println!("Hello, world!");
@ -14,13 +11,7 @@ fn app() -> Element {
let server_address = "https://tahinli.com.tr:2323".to_string(); let server_address = "https://tahinli.com.tr:2323".to_string();
rsx! { rsx! {
page_base {} page_base {}
div { listen_renderer {}
button {
onclick: move |_| start_listening(),
"style":"width: 80px; height: 50px;",
"Listen"
}
}
coin_status_renderer {server_address:server_address.clone()} coin_status_renderer {server_address:server_address.clone()}
server_status_renderer {server_address:server_address.clone()} server_status_renderer {server_address:server_address.clone()}
} }

View file

@ -1,61 +1,97 @@
use std::{mem::MaybeUninit, sync::Arc, time::Duration}; use std::{mem::MaybeUninit, sync::Arc, time::Duration};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use dioxus::hooks::{use_coroutine, Coroutine}; use dioxus::{
prelude::spawn,
signals::{Signal, Writable},
};
use futures_util::StreamExt; use futures_util::StreamExt;
use ringbuf::{Consumer, HeapRb, Producer, SharedRb}; use ringbuf::{Consumer, HeapRb, Producer, SharedRb};
use tokio_tungstenite_wasm::WebSocketStream; use tokio_tungstenite_wasm::WebSocketStream;
use tokio_with_wasm::tokio::time::sleep;
pub async fn start_listening() { static BUFFER_LIMIT: usize = 800000;
log::info!("Trying Sir"); static BUFFER_LENGTH: usize = 1000000;
let connect_addr = "ws://192.168.1.2:2424";
let stream = tokio_tungstenite_wasm::connect(connect_addr).await.unwrap(); pub async fn start_listening(
let ring = HeapRb::<f32>::new(1000000); mut is_maintaining: Signal<(bool, bool)>,
let (producer, consumer) = ring.split(); mut is_listening: Signal<bool>,
let _sound_stream: Coroutine<()> = use_coroutine(|_| async move { ) {
sound_stream(stream, producer).await; if is_listening() {
}); log::info!("Trying Sir");
tokio_with_wasm::tokio::time::sleep(Duration::from_secs(1)).await; let connect_addr = "ws://192.168.1.2:2424";
let _listen: Coroutine<()> = use_coroutine(|_| async move { let stream: WebSocketStream;
listen(consumer).await; match tokio_tungstenite_wasm::connect(connect_addr).await {
}); Ok(ws_stream) => stream = ws_stream,
Err(_) => {
is_listening.set(false);
return;
}
}
is_maintaining.set((true, true));
let ring = HeapRb::<f32>::new(BUFFER_LENGTH);
let (producer, consumer) = ring.split();
spawn({
async move {
sound_stream(is_listening, stream, producer).await;
is_listening.set(false);
is_maintaining.set((false, is_maintaining().1));
}
});
spawn({
async move {
listen_podcast(is_listening, consumer).await;
is_listening.set(false);
//buffer time waiting actually
tokio_with_wasm::tokio::time::sleep(Duration::from_secs(2)).await;
log::info!("{:#?}", is_maintaining());
is_maintaining.set((is_maintaining().0, false));
log::info!("pod{:#?}", is_maintaining());
}
});
}
} }
async fn sound_stream( pub async fn sound_stream(
is_listening: Signal<bool>,
mut stream: WebSocketStream, mut stream: WebSocketStream,
mut producer: Producer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>, mut producer: Producer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>,
) { ) {
log::info!("Attention! We need cables");
while let Some(msg) = stream.next().await { while let Some(msg) = stream.next().await {
let data = String::from_utf8(msg.unwrap().into()).unwrap(); if is_listening() {
let data_parsed:Vec<&str> = data.split("#").collect(); let data = String::from_utf8(msg.unwrap().into()).unwrap();
//let mut sound_data:Vec<f32> = vec![]; let data_parsed: Vec<&str> = data.split("#").collect();
for element in data_parsed { for element in data_parsed {
let single_data:f32 = match element.parse() { let single_data: f32 = match element.parse() {
Ok(single) => single, Ok(single) => single,
Err(_) => 0.0, Err(_) => 0.0,
}; };
producer.push(single_data).unwrap(); if let Err(_) = producer.push(single_data) {}
}
}
else {
break;
} }
} }
// while let Some(msg) = stream.next().await {
// match msg.unwrap().to_string().parse::<f32>() {
// Ok(sound_data) => match producer.push(sound_data) {
// Ok(_) => {}
// Err(_) => {}
// },
// Err(_) => {}
// };
// }
log::info!("Connection Lost Sir"); log::info!("Connection Lost Sir");
} }
async fn listen(mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>) { async fn listen_podcast(
log::info!("Hi"); is_listening: Signal<bool>,
mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>,
) {
log::info!("Attention! Show must start!");
let host = cpal::default_host(); let host = cpal::default_host();
let output_device = host.default_output_device().unwrap(); let output_device = host.default_output_device().unwrap();
let config: cpal::StreamConfig = output_device.default_output_config().unwrap().into(); let config: cpal::StreamConfig = output_device.default_output_config().unwrap().into();
let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
if consumer.len() > BUFFER_LIMIT {
consumer.clear();
log::error!("Slow Consumer: DROPPED ALL Packets");
}
for sample in data { for sample in data {
*sample = match consumer.pop() { *sample = match consumer.pop() {
Some(s) => s, Some(s) => s,
@ -69,52 +105,13 @@ async fn listen(mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f3
.unwrap(); .unwrap();
output_stream.play().unwrap(); output_stream.play().unwrap();
sleep(Duration::from_secs(100)).await;
while is_listening() {
tokio_with_wasm::tokio::time::sleep(Duration::from_secs(1)).await;
}
output_stream.pause().unwrap(); output_stream.pause().unwrap();
// let host = cpal::default_host(); log::info!("Attention! Time to turn home!");
// let devices = host.devices().unwrap();
// for (_derive_index, device) in devices.enumerate() {
// log::info!("{:?}", device.name());
// }
// let device = host.default_output_device().unwrap();
// let mut supported_config = device.supported_output_configs().unwrap();
// let config = supported_config.next().unwrap().with_max_sample_rate();
// log::info!("{:?}", config);
// match config.sample_format() {
// cpal::SampleFormat::I8 => {log::info!("i8")},
// cpal::SampleFormat::I16 => {log::info!("i16")},
// //cpal::SampleFormat::I24 => {log::info!("i24")},
// cpal::SampleFormat::I32 => {log::info!("i32")},
// //cpal::SampleFormat::I48 => {log::info!("i48")},
// cpal::SampleFormat::I64 => {log::info!("i64")},
// cpal::SampleFormat::U8 => {log::info!("u8")},
// cpal::SampleFormat::U16 => {log::info!("u16")},
// //cpal::SampleFormat::U24 => {log::info!("u24")},
// cpal::SampleFormat::U32 => {log::info!("u32")},
// //cpal::SampleFormat::U48 => {log::info!("u48")},
// cpal::SampleFormat::U64 => {log::info!("u64")},
// cpal::SampleFormat::F32 => {log::info!("f32");
// run::<f32>(consumer, &device, &config.clone().into()).await.unwrap();},
// cpal::SampleFormat::F64 => {log::info!("f64")},
// sample_format => panic!("Unsupported sample format '{sample_format}'"),
// }
// let config:StreamConfig = config.into();
// let stream = device.build_output_stream(
// &config,
// move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
// log::info!("{:?}", data);
// //I need to do something here, I think
// },
// move |_err| {
// },
// None).unwrap();
// stream.play().unwrap();
// tokio::time::sleep(Duration::from_secs(10)).await;
// stream.pause().unwrap();
} }
fn err_fn(err: cpal::StreamError) { fn err_fn(err: cpal::StreamError) {
eprintln!("Something Happened: {}", err); eprintln!("Something Happened: {}", err);