fix: 🐛 panic when message_organizer_task | buffer_layer_task returns err

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-05-18 01:45:28 +03:00
parent 3432ba68de
commit e428e253c9

View file

@ -179,51 +179,47 @@ async fn listener_handler(
port: listener_info.port(),
};
match acceptor {
Some(ref acceptor) => {
match acceptor.accept(tcp_stream).await {
Ok(listener_tcp_tls) => {
match tokio_tungstenite::accept_async(listener_tcp_tls).await {
Ok(wss_stream) => {
let listener_stream_task = tokio::spawn(stream(
new_listener,
wss_stream,
buffered_producer.subscribe(),
));
let _ = listener_stream_tasks_producer
.send(listener_stream_task)
.await;
}
Err(err_val) => {
eprintln!("Error: TCP WSS Listener | {}", err_val);
drop(listener_socket);
return;
}
Some(ref acceptor) => match acceptor.accept(tcp_stream).await {
Ok(listener_tcp_tls) => {
match tokio_tungstenite::accept_async(listener_tcp_tls).await {
Ok(wss_stream) => {
let listener_stream_task = tokio::spawn(stream(
new_listener,
wss_stream,
buffered_producer.subscribe(),
));
let _ = listener_stream_tasks_producer
.send(listener_stream_task)
.await;
}
Err(err_val) => {
eprintln!("Error: TCP WSS Listener | {}", err_val);
drop(listener_socket);
return;
}
}
Err(err_val) => {
eprintln!("Error: TCP TLS Listener | {}", err_val);
drop(listener_socket);
return;
}
}
Err(err_val) => {
eprintln!("Error: TCP TLS Listener | {}", err_val);
drop(listener_socket);
return;
}
},
None => {
match tokio_tungstenite::accept_async(tcp_stream).await {
Ok(ws_stream) => {
let listener_stream_task = tokio::spawn(stream(
new_listener,
ws_stream,
buffered_producer.subscribe(),
));
let _ = listener_stream_tasks_producer
.send(listener_stream_task)
.await;
}
Err(err_val) => {
eprintln!("Error: TCP WS Listener | {}", err_val);
drop(listener_socket);
return;
}
None => match tokio_tungstenite::accept_async(tcp_stream).await {
Ok(ws_stream) => {
let listener_stream_task = tokio::spawn(stream(
new_listener,
ws_stream,
buffered_producer.subscribe(),
));
let _ = listener_stream_tasks_producer
.send(listener_stream_task)
.await;
}
Err(err_val) => {
eprintln!("Error: TCP WS Listener | {}", err_val);
drop(listener_socket);
return;
}
},
}
@ -250,7 +246,6 @@ async fn status_checker(
) {
let mut listener_counter = buffered_producer.receiver_count();
let mut bottleneck_flag = false;
//let mut buffer_len = buffered_producer.len();
loop {
tokio::time::sleep(Duration::from_secs(3)).await;
match streamer_alive_receiver.try_recv() {
@ -260,8 +255,18 @@ async fn status_checker(
format!("{}:{}", streamer.ip, streamer.port)
);
let cleaning_timer = Instant::now();
message_organizer_task.as_ref().unwrap().abort();
buffer_layer_task.as_ref().unwrap().abort();
match message_organizer_task.as_ref() {
Some(message_organizer_task) => message_organizer_task.abort(),
None => {
eprintln!("Error: Message Organizer Task -> None");
}
}
match buffer_layer_task.as_ref() {
Some(buffer_layer_task) => buffer_layer_task.abort(),
None => {
eprintln!("Error: Buffer Layer Task -> None");
}
}
if let Err(_) = listener_socket_killer_producer.send(true) {
eprintln!("Error: Cleaning | Socket Kill Failed, Receiver Dropped");
}