feat: ✨ handle multiple clients with one sound stream
This commit is contained in:
parent
4b158d5195
commit
7707c66407
4 changed files with 85 additions and 26 deletions
|
@ -1,9 +1,16 @@
|
||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
pub mod routing;
|
pub mod routing;
|
||||||
pub mod streaming;
|
pub mod streaming;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct Listener {
|
||||||
|
ip: IpAddr,
|
||||||
|
port: u16,
|
||||||
|
}
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct AppState {}
|
pub struct AppState {}
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
|
|
@ -5,42 +5,54 @@ use futures_util::SinkExt;
|
||||||
use ringbuf::{Consumer, HeapRb, Producer, SharedRb};
|
use ringbuf::{Consumer, HeapRb, Producer, SharedRb};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::{TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
|
sync::broadcast::{channel, Receiver, Sender},
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
use tokio_tungstenite::WebSocketStream;
|
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
|
||||||
|
|
||||||
|
use crate::Listener;
|
||||||
|
|
||||||
|
const BUFFER_LENGTH: usize = 1000000;
|
||||||
|
const MAX_TOLERATED_MESSAGE_COUNT:usize = 10;
|
||||||
pub async fn start() {
|
pub async fn start() {
|
||||||
let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap();
|
let socket = TcpListener::bind("192.168.1.2:2424").await.unwrap();
|
||||||
while let Ok((tcp_stream, _)) = socket.accept().await {
|
println!("Dude Someone Triggered");
|
||||||
println!("Dude Someone Triggered");
|
let ring = HeapRb::<f32>::new(BUFFER_LENGTH);
|
||||||
let ring = HeapRb::<f32>::new(1000000);
|
let (producer, consumer) = ring.split();
|
||||||
let (producer, consumer) = ring.split();
|
let timer = Instant::now();
|
||||||
|
let (message_producer, _) = channel(BUFFER_LENGTH);
|
||||||
|
tokio::spawn(record(producer));
|
||||||
|
tokio::spawn(parent_stream(timer, message_producer.clone(), consumer));
|
||||||
|
while let Ok((tcp_stream, info)) = socket.accept().await {
|
||||||
let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap();
|
let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap();
|
||||||
|
println!("New Connection: {}", info);
|
||||||
let timer = Instant::now();
|
let new_listener = Listener {
|
||||||
|
ip: info.ip(),
|
||||||
tokio::spawn(record(producer));
|
port: info.port(),
|
||||||
tokio::spawn(stream(timer, ws_stream, consumer));
|
};
|
||||||
|
tokio::spawn(stream(
|
||||||
|
new_listener,
|
||||||
|
ws_stream,
|
||||||
|
message_producer.subscribe(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub async fn parent_stream(
|
||||||
pub async fn stream(
|
|
||||||
timer: Instant,
|
timer: Instant,
|
||||||
mut ws_stream: WebSocketStream<TcpStream>,
|
message_producer: Sender<Message>,
|
||||||
mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>,
|
mut consumer: Consumer<f32, Arc<SharedRb<f32, Vec<MaybeUninit<f32>>>>>,
|
||||||
) {
|
) {
|
||||||
println!("Waiting");
|
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
let mut single_message: Vec<u8> = Vec::new();
|
||||||
let mut data: Vec<u8> = Vec::new();
|
|
||||||
let now = timer.elapsed().as_secs();
|
let now = timer.elapsed().as_secs();
|
||||||
while !consumer.is_empty() && (timer.elapsed().as_secs() + 2) > now {
|
while !consumer.is_empty() && (timer.elapsed().as_secs() + 5) > now {
|
||||||
match consumer.pop() {
|
match consumer.pop() {
|
||||||
Some(single_data) => {
|
Some(single_data) => {
|
||||||
let ring = HeapRb::<u8>::new(1000000);
|
let ring = HeapRb::<u8>::new(1000000);
|
||||||
let (mut producer, mut consumer) = ring.split();
|
let (mut producer, mut consumer) = ring.split();
|
||||||
let single_data_packet = single_data.to_string().as_bytes().to_vec();
|
let single_data_packet = single_data.to_string().as_bytes().to_vec();
|
||||||
let terminator = "#".as_bytes().to_vec();
|
let terminator = "#".as_bytes().to_vec();
|
||||||
|
|
||||||
for element in single_data_packet {
|
for element in single_data_packet {
|
||||||
producer.push(element).unwrap();
|
producer.push(element).unwrap();
|
||||||
}
|
}
|
||||||
|
@ -48,14 +60,55 @@ pub async fn stream(
|
||||||
producer.push(element).unwrap();
|
producer.push(element).unwrap();
|
||||||
}
|
}
|
||||||
while !consumer.is_empty() {
|
while !consumer.is_empty() {
|
||||||
data.push(consumer.pop().unwrap());
|
single_message.push(consumer.pop().unwrap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {}
|
None => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ws_stream.send(data.into()).await.unwrap();
|
match message_producer.send(single_message.into()) {
|
||||||
ws_stream.flush().await.unwrap();
|
Ok(_) => {}
|
||||||
|
Err(_) => {}
|
||||||
|
}
|
||||||
|
println!(
|
||||||
|
"Message Len = {} | Receiver Count = {}",
|
||||||
|
message_producer.len(),
|
||||||
|
message_producer.receiver_count()
|
||||||
|
);
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn stream(
|
||||||
|
listener: Listener,
|
||||||
|
mut ws_stream: WebSocketStream<TcpStream>,
|
||||||
|
mut message_consumer: Receiver<Message>,
|
||||||
|
) {
|
||||||
|
while let Ok(message) = message_consumer.recv().await {
|
||||||
|
if message_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT {
|
||||||
|
println!(
|
||||||
|
"{} Forced to Disconnect | Reason -> Slow Consumer",
|
||||||
|
format!("{}:{}", listener.ip, listener.port)
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
match ws_stream.send(message).await {
|
||||||
|
Ok(_) => {
|
||||||
|
if let Err(_) = ws_stream.flush().await {
|
||||||
|
println!(
|
||||||
|
"{} is Disconnected",
|
||||||
|
format!("{}:{}", listener.ip, listener.port)
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
println!(
|
||||||
|
"{} is Disconnected",
|
||||||
|
format!("{}:{}", listener.ip, listener.port)
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +127,7 @@ pub async fn record(mut producer: Producer<f32, Arc<SharedRb<f32, Vec<MaybeUnini
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(_) => {}
|
Err(_) => {}
|
||||||
}
|
}
|
||||||
println!("{}", sample);
|
//println!("{}", sample);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -84,7 +137,8 @@ pub async fn record(mut producer: Producer<f32, Arc<SharedRb<f32, Vec<MaybeUnini
|
||||||
|
|
||||||
println!("STREAMIN");
|
println!("STREAMIN");
|
||||||
input_stream.play().unwrap();
|
input_stream.play().unwrap();
|
||||||
std::thread::sleep(std::time::Duration::from_secs(100));
|
//oneshot ile durdurabiliriz sanırım
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(10000000));
|
||||||
println!("DONE I HOPE");
|
println!("DONE I HOPE");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,8 +21,7 @@ pub fn listen_renderer() -> Element {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
is_listening.set(false);
|
is_listening.set(false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -70,8 +70,7 @@ pub async fn sound_stream(
|
||||||
};
|
};
|
||||||
if let Err(_) = producer.push(single_data) {}
|
if let Err(_) = producer.push(single_data) {}
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue