feat: ⚗️ buffer layer

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-04-11 13:31:59 +03:00
parent eb62dbd023
commit e04666b620
2 changed files with 40 additions and 18 deletions

View file

@ -22,9 +22,11 @@ pub async fn start() {
println!("New Streamer: {:#?}", streamer_info);
tokio::spawn(streamer_stream(record_producer, ws_stream));
}
let (message_producer, _) = channel(BUFFER_LENGTH);
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
let (buffered_producer, _) = channel(BUFFER_LENGTH);
tokio::spawn(message_organizer(message_producer.clone(), record_consumer));
tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone()));
while let Ok((tcp_stream, listener_info)) = socket.accept().await {
let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap();
println!("New Listener: {}", listener_info);
@ -35,15 +37,36 @@ pub async fn start() {
tokio::spawn(stream(
new_listener,
ws_stream,
message_producer.subscribe(),
buffered_producer.subscribe(),
));
}
}
async fn buffer_layer(mut message_consumer: Receiver<Message>, buffered_producer: Sender<Message>) {
loop {
tokio::time::sleep(Duration::from_secs(2)).await;
let mut messages_buffered: Vec<Message> = vec![];
while message_consumer.len() > 0 {
match message_consumer.recv().await {
Ok(msg) => {
messages_buffered.push(msg);
}Err(_) => {
}
}
}
for message_buffered in messages_buffered {
match buffered_producer.send(message_buffered) {
Ok(_) => {},
Err(_) => {},
}
}
}
}
async fn streamer_stream(record_producer:Sender<Message>, mut ws_stream: WebSocketStream<TcpStream>) {
while let Some(message_with_question) = ws_stream.next().await {
match message_with_question {
Ok(message) => {
println!("{}", message.len());
match record_producer.send(message) {
Ok(_) => {
@ -60,26 +83,24 @@ async fn streamer_stream(record_producer:Sender<Message>, mut ws_stream: WebSock
}
}
async fn message_organizer(message_producer: Sender<Message>, mut consumer: Receiver<Message>) {
async fn message_organizer(message_producer: Sender<Message>, mut record_consumer: Receiver<Message>) {
loop {
let mut messages:Vec<u8> = vec![];
let mut iteration = consumer.len();
let mut messages = String::new();
let mut iteration = record_consumer.len();
while iteration > 0 {
iteration -= 1;
match consumer.recv().await {
match record_consumer.recv().await {
Ok(single_message) => {
let single_message_packet = single_message.to_string().as_bytes().to_vec();
for element in single_message_packet {
messages.push(element);
}
let single_message_packet = single_message.to_string();
messages = format!("{}{}", messages, single_message_packet);
}
Err(_) => {
}
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
if !messages.is_empty() {
if messages.len() > 0 {
match message_producer.send(messages.into()) {
Ok(_) => {}
Err(_) => {}
@ -90,15 +111,16 @@ async fn message_organizer(message_producer: Sender<Message>, mut consumer: Rece
message_producer.receiver_count()
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn stream(
listener: Listener,
mut ws_stream: WebSocketStream<TcpStream>,
mut message_consumer: Receiver<Message>,
mut buffered_consumer: Receiver<Message>,
) {
while let Ok(message) = message_consumer.recv().await {
if message_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT {
while let Ok(message) = buffered_consumer.recv().await {
if buffered_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT {
println!(
"{} Forced to Disconnect | Reason -> Slow Consumer",
format!("{}:{}", listener.ip, listener.port)