diff --git a/back/src/streaming.rs b/back/src/streaming.rs index 5eb84db..6e464c4 100644 --- a/back/src/streaming.rs +++ b/back/src/streaming.rs @@ -22,9 +22,11 @@ pub async fn start() { println!("New Streamer: {:#?}", streamer_info); tokio::spawn(streamer_stream(record_producer, ws_stream)); } - - let (message_producer, _) = channel(BUFFER_LENGTH); + + let (message_producer, message_consumer) = channel(BUFFER_LENGTH); + let (buffered_producer, _) = channel(BUFFER_LENGTH); tokio::spawn(message_organizer(message_producer.clone(), record_consumer)); + tokio::spawn(buffer_layer(message_consumer, buffered_producer.clone())); while let Ok((tcp_stream, listener_info)) = socket.accept().await { let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap(); println!("New Listener: {}", listener_info); @@ -35,15 +37,36 @@ pub async fn start() { tokio::spawn(stream( new_listener, ws_stream, - message_producer.subscribe(), + buffered_producer.subscribe(), )); } } - +async fn buffer_layer(mut message_consumer: Receiver, buffered_producer: Sender) { + loop { + tokio::time::sleep(Duration::from_secs(2)).await; + let mut messages_buffered: Vec = vec![]; + while message_consumer.len() > 0 { + match message_consumer.recv().await { + Ok(msg) => { + messages_buffered.push(msg); + }Err(_) => { + + } + } + } + for message_buffered in messages_buffered { + match buffered_producer.send(message_buffered) { + Ok(_) => {}, + Err(_) => {}, + } + } + } +} async fn streamer_stream(record_producer:Sender, mut ws_stream: WebSocketStream) { while let Some(message_with_question) = ws_stream.next().await { match message_with_question { Ok(message) => { + println!("{}", message.len()); match record_producer.send(message) { Ok(_) => { @@ -60,26 +83,24 @@ async fn streamer_stream(record_producer:Sender, mut ws_stream: WebSock } } -async fn message_organizer(message_producer: Sender, mut consumer: Receiver) { +async fn message_organizer(message_producer: Sender, mut record_consumer: Receiver) { loop { - let mut messages:Vec = vec![]; - let mut iteration = consumer.len(); + let mut messages = String::new(); + let mut iteration = record_consumer.len(); while iteration > 0 { iteration -= 1; - match consumer.recv().await { + match record_consumer.recv().await { Ok(single_message) => { - let single_message_packet = single_message.to_string().as_bytes().to_vec(); - for element in single_message_packet { - messages.push(element); - } + let single_message_packet = single_message.to_string(); + messages = format!("{}{}", messages, single_message_packet); + } Err(_) => { } } } - tokio::time::sleep(Duration::from_secs(1)).await; - if !messages.is_empty() { + if messages.len() > 0 { match message_producer.send(messages.into()) { Ok(_) => {} Err(_) => {} @@ -90,15 +111,16 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece message_producer.receiver_count() ); } + tokio::time::sleep(Duration::from_millis(100)).await; } } async fn stream( listener: Listener, mut ws_stream: WebSocketStream, - mut message_consumer: Receiver, + mut buffered_consumer: Receiver, ) { - while let Ok(message) = message_consumer.recv().await { - if message_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT { + while let Ok(message) = buffered_consumer.recv().await { + if buffered_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT { println!( "{} Forced to Disconnect | Reason -> Slow Consumer", format!("{}:{}", listener.ip, listener.port) diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index aa8c957..de198e3 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -55,7 +55,6 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece Err(_) => {} } } - tokio::time::sleep(Duration::from_secs(1)).await; if !messages.is_empty() { match message_producer.send(messages.into()) { Ok(_) => {} @@ -67,6 +66,7 @@ async fn message_organizer(message_producer: Sender, mut consumer: Rece message_producer.receiver_count() ); } + tokio::time::sleep(Duration::from_millis(500)).await; } }