diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 6192d4d..5eb84db 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -1,8 +1,7 @@ + use std::time::Duration; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use futures_util::SinkExt; -use ringbuf::HeapRb; +use futures_util::{SinkExt, StreamExt}; use tokio::{ net::{TcpListener, TcpStream}, sync::broadcast::{channel, Receiver, Sender}, @@ -16,18 +15,22 @@ const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; pub async fn start() { let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap(); println!("Dude Someone Triggered"); - let (record_producer, record_consumer) = channel(BUFFER_LENGTH); + let streamer_socket = TcpListener::bind("192.168.1.2:2525").await.unwrap(); + if let Ok((streamer_tcp, streamer_info)) = streamer_socket.accept().await { + let ws_stream = tokio_tungstenite::accept_async(streamer_tcp).await.unwrap(); + println!("New Streamer: {:#?}", streamer_info); + tokio::spawn(streamer_stream(record_producer, ws_stream)); + } let (message_producer, _) = channel(BUFFER_LENGTH); - tokio::spawn(record(record_producer)); tokio::spawn(message_organizer(message_producer.clone(), record_consumer)); - while let Ok((tcp_stream, info)) = socket.accept().await { + while let Ok((tcp_stream, listener_info)) = socket.accept().await { let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); - println!("New Connection: {}", info); + println!("New Listener: {}", listener_info); let new_listener = Listener { - ip: info.ip(), - port: info.port(), + ip: listener_info.ip(), + port: listener_info.port(), }; tokio::spawn(stream( new_listener, @@ -36,35 +39,48 @@ pub async fn start() { )); } } -async fn message_organizer(message_producer: Sender, mut consumer: Receiver) { + +async fn streamer_stream(record_producer:Sender, mut ws_stream: WebSocketStream) { + while let Some(message_with_question) = ws_stream.next().await { + match message_with_question { + Ok(message) => { + match record_producer.send(message) { + Ok(_) => { + + } + Err(_) => { + + } + } + } + Err(_) => { + + } + } + } +} + +async fn message_organizer(message_producer: Sender, mut consumer: Receiver) { loop { - let mut single_message: Vec = Vec::new(); + let mut messages:Vec = vec![]; let mut iteration = consumer.len(); while iteration > 0 { iteration -= 1; match consumer.recv().await { - Ok(single_data) => { - let ring = HeapRb::::new(BUFFER_LENGTH); - let (mut producer, mut consumer) = ring.split(); - let single_data_packet = single_data.to_string().as_bytes().to_vec(); - let terminator = "#".as_bytes().to_vec(); - - for element in single_data_packet { - producer.push(element).unwrap(); - } - for element in terminator { - producer.push(element).unwrap(); - } - while !consumer.is_empty() { - single_message.push(consumer.pop().unwrap()); + Ok(single_message) => { + let single_message_packet = single_message.to_string().as_bytes().to_vec(); + for element in single_message_packet { + messages.push(element); } } - Err(_) => {} + Err(_) => { + + } } } - tokio::time::sleep(Duration::from_secs(2)).await; - if !single_message.is_empty() { - match message_producer.send(single_message.into()) { + tokio::time::sleep(Duration::from_secs(1)).await; + if !messages.is_empty() { + match message_producer.send(messages.into()) { Ok(_) => {} Err(_) => {} } @@ -110,36 +126,3 @@ async fn stream( } } -async fn record(producer: Sender) { - println!("Hello, world!"); - let host = cpal::default_host(); - let input_device = host.default_input_device().unwrap(); - - println!("Input Device: {}", input_device.name().unwrap()); - - let config: cpal::StreamConfig = input_device.default_input_config().unwrap().into(); - - let input_data_fn = move |data: &[f32], _: &cpal::InputCallbackInfo| { - for &sample in data { - match producer.send(sample) { - Ok(_) => {} - Err(_) => {} - } - //println!("{}", sample); - } - }; - - let input_stream = input_device - .build_input_stream(&config, input_data_fn, err_fn, None) - .unwrap(); - - println!("STREAMIN"); - input_stream.play().unwrap(); - //oneshot ile durdurabiliriz sanırım - std::thread::sleep(std::time::Duration::from_secs(10000000)); - println!("DONE I HOPE"); -} - -fn err_fn(err: cpal::StreamError) { - eprintln!("Something Happened: {}", err); -} diff --git a/front/src/streaming.rs b/front/src/streaming.rs index 4425880..72fa11e 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -60,7 +60,7 @@ pub async fn sound_stream( mut producer: Producer>>>>, ) { log::info!("Attention! We need cables"); - + while let Some(msg) = stream_consumer.next().await { if is_listening() { let data = String::from_utf8(msg.unwrap().into()).unwrap(); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 10ea2e8..70c65a5 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -7,5 +7,7 @@ edition = "2021" [dependencies] cpal = "0.15.3" +futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } ringbuf = "0.3.3" tokio = { version = "1.36.0", features = ["full"] } +tokio-tungstenite = "0.21.0" diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs new file mode 100644 index 0000000..1dfe855 --- /dev/null +++ b/streamer/src/lib.rs @@ -0,0 +1,4 @@ +pub mod recording; +pub mod streaming; + +pub const BUFFER_LENGTH: usize = 1000000; diff --git a/streamer/src/main.rs b/streamer/src/main.rs index 764f4f9..a651e31 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -1,70 +1,14 @@ -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use ringbuf::HeapRb; - +use std::time::Duration; +use streamer::{recording::recording, streaming::start, BUFFER_LENGTH}; +use tokio::sync::broadcast::channel; #[tokio::main] async fn main() { println!("Hello, world!"); - let host = cpal::default_host(); - let input_device = host.default_input_device().unwrap(); - let output_device = host.default_output_device().unwrap(); - println!("Input Device: {}", input_device.name().unwrap()); - println!("Output Device: {}", output_device.name().unwrap()); - - let config:cpal::StreamConfig = input_device.default_input_config().unwrap().into(); - - let latency_frames = 0.1*config.sample_rate.0 as f32; - let latency_samples = latency_frames as usize * config.channels as usize; - - let ring = HeapRb::::new(latency_samples*2); - let (mut producer, mut consumer) = ring.split(); - - for _ in 0..latency_samples { - producer.push(0.0).unwrap(); - } - - let input_data_fn = move |data: &[f32], _:&cpal::InputCallbackInfo| { - let mut output_fell_behind = false; - for &sample in data { - if producer.push(sample).is_err() { - output_fell_behind = true; - } - } - if output_fell_behind { - eprintln!("Too fast friend"); - } - }; - - let output_data_fn = move |data: &mut [f32], _:&cpal::OutputCallbackInfo| { - let mut input_fell_behind = false; - for sample in data { - *sample = match consumer.pop() { - Some(s) => s, - None => { - input_fell_behind = true; - 0.0 - } - }; - } - if input_fell_behind { - eprintln!("Too fast"); - } - }; - - let input_stream = input_device.build_input_stream(&config, input_data_fn, err_fn, None).unwrap(); - let output_stream = output_device.build_output_stream(&config, output_data_fn, err_fn, None).unwrap(); - - println!("STREAMIN"); - input_stream.play().unwrap(); - output_stream.play().unwrap(); - - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - //std::thread::sleep(std::time::Duration::from_secs(10)); - println!("DONE I HOPE"); -} - -fn err_fn(err: cpal::StreamError) { - eprintln!("Something Happened: {}", err); + let (sound_stream_producer, sound_stream_consumer) = channel(BUFFER_LENGTH); + tokio::spawn(recording(sound_stream_producer)); + tokio::spawn(start(sound_stream_consumer)); + tokio::time::sleep(Duration::from_secs(1000000000)).await; } diff --git a/streamer/src/recording.rs b/streamer/src/recording.rs new file mode 100644 index 0000000..2e96044 --- /dev/null +++ b/streamer/src/recording.rs @@ -0,0 +1,31 @@ +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use tokio::sync::broadcast::Sender; + +pub async fn recording(sound_stream_producer: Sender) { + let host = cpal::default_host(); + let input_device = host.default_input_device().unwrap(); + + let config: cpal::StreamConfig = input_device.default_input_config().unwrap().into(); + + let input_data_fn = move |data: &[f32], _: &cpal::InputCallbackInfo| { + for &sample in data { + match sound_stream_producer.send(sample) { + Ok(_) => {} + Err(_) => {} + } + } + }; + + let input_stream = input_device + .build_input_stream(&config, input_data_fn, err_fn, None) + .unwrap(); + + println!("STREAMIN"); + input_stream.play().unwrap(); + + std::thread::sleep(std::time::Duration::from_secs(1000000000)); + println!("DONE I HOPE"); +} +fn err_fn(err: cpal::StreamError) { + eprintln!("Something Happened: {}", err); +} diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs new file mode 100644 index 0000000..307eb20 --- /dev/null +++ b/streamer/src/streaming.rs @@ -0,0 +1,98 @@ +use std::time::Duration; + +use futures_util::SinkExt; +use ringbuf::HeapRb; +use tokio::sync::broadcast::{channel, Receiver, Sender}; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; + +use crate::BUFFER_LENGTH; +const MAX_TOLERATED_MESSAGE_COUNT: usize = 10; + +pub async fn start(sound_stream_consumer: Receiver) { + let connect_addr = "ws://192.168.1.2:2525"; + let ws_stream; + match tokio_tungstenite::connect_async(connect_addr).await { + Ok(ws_stream_connected) => ws_stream = ws_stream_connected.0, + Err(_) => { + return; + } + } + let (message_producer, message_consumer) = channel(BUFFER_LENGTH); + println!("Connected to: {}", connect_addr); + tokio::spawn(message_organizer(message_producer, sound_stream_consumer)); + tokio::spawn(stream(ws_stream, message_consumer)); +} + +async fn message_organizer(message_producer: Sender, mut consumer: Receiver) { + loop { + let mut messages: Vec = Vec::new(); + let mut iteration = consumer.len(); + while iteration > 0 { + iteration -= 1; + match consumer.recv().await { + Ok(single_data) => { + let ring = HeapRb::::new(BUFFER_LENGTH); + let (mut producer, mut consumer) = ring.split(); + let single_data_packet = single_data.to_string().as_bytes().to_vec(); + let terminator = "#".as_bytes().to_vec(); + + for element in single_data_packet { + producer.push(element).unwrap(); + } + for element in terminator { + producer.push(element).unwrap(); + } + while !consumer.is_empty() { + messages.push(consumer.pop().unwrap()); + } + } + Err(_) => {} + } + } + tokio::time::sleep(Duration::from_secs(1)).await; + if !messages.is_empty() { + match message_producer.send(messages.into()) { + Ok(_) => {} + Err(_) => {} + } + println!( + "Message Counter = {} | Receiver Count = {}", + message_producer.len(), + message_producer.receiver_count() + ); + } + } +} + +async fn stream( + mut ws_stream: WebSocketStream>, + mut message_consumer: Receiver, +) { + while let Ok(message) = message_consumer.recv().await { + if message_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT { + // println!( + // "{} Forced to Disconnect | Reason -> Slow Consumer", + // format!("{}:{}", listener.ip, listener.port) + // ); + break; + } + match ws_stream.send(message).await { + Ok(_) => { + if let Err(_) = ws_stream.flush().await { + // println!( + // "{} is Disconnected", + // format!("{}:{}", listener.ip, listener.port) + // ); + break; + } + } + Err(_) => { + // println!( + // "{} is Disconnected", + // format!("{}:{}", listener.ip, listener.port) + // ); + break; + } + } + } +}