fix: 🥅 error handling: tls conversion, tcp to ws conversion
This commit is contained in:
parent
56d17084e1
commit
c404f5b23f
1 changed files with 104 additions and 71 deletions
|
@ -64,6 +64,7 @@ pub async fn start(relay_configs: Config) {
|
|||
ip: "127.0.0.1".to_string().parse().unwrap(),
|
||||
port: 0000,
|
||||
};
|
||||
let mut is_streaming = false;
|
||||
loop {
|
||||
match streamer_socket.accept().await {
|
||||
Ok((streamer_tcp, streamer_info)) => {
|
||||
|
@ -75,19 +76,29 @@ pub async fn start(relay_configs: Config) {
|
|||
timer.elapsed()
|
||||
);
|
||||
if relay_configs.tls {
|
||||
let streamer_tcp_tls = acceptor.accept(streamer_tcp).await.unwrap();
|
||||
match tokio_tungstenite::accept_async(streamer_tcp_tls).await {
|
||||
Ok(ws_stream) => {
|
||||
tokio::spawn(streamer_stream(
|
||||
new_streamer.clone(),
|
||||
record_producer,
|
||||
ws_stream,
|
||||
timer,
|
||||
streamer_alive_producer,
|
||||
));
|
||||
match acceptor.accept(streamer_tcp).await {
|
||||
Ok(streamer_tcp_tls) => {
|
||||
match tokio_tungstenite::accept_async(streamer_tcp_tls).await {
|
||||
Ok(ws_stream) => {
|
||||
tokio::spawn(streamer_stream(
|
||||
new_streamer.clone(),
|
||||
record_producer,
|
||||
ws_stream,
|
||||
timer,
|
||||
streamer_alive_producer,
|
||||
));
|
||||
is_streaming = true;
|
||||
break;
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: TCP to WS Transform | {}", err_val)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: TCP TLS Streamer| {}", err_val);
|
||||
break;
|
||||
}
|
||||
Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val),
|
||||
}
|
||||
} else {
|
||||
match tokio_tungstenite::accept_async(streamer_tcp).await {
|
||||
|
@ -99,6 +110,7 @@ pub async fn start(relay_configs: Config) {
|
|||
timer,
|
||||
streamer_alive_producer,
|
||||
));
|
||||
is_streaming = true;
|
||||
break;
|
||||
}
|
||||
Err(err_val) => eprintln!("Error: TCP to WS Transform | {}", err_val),
|
||||
|
@ -108,45 +120,47 @@ pub async fn start(relay_configs: Config) {
|
|||
Err(err_val) => eprintln!("Error: TCP Accept Connection | {}", err_val),
|
||||
}
|
||||
}
|
||||
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
|
||||
let (buffered_producer, _) = channel(BUFFER_LENGTH);
|
||||
message_organizer_task = tokio::spawn(message_organizer(
|
||||
message_producer.clone(),
|
||||
record_consumer,
|
||||
relay_configs.latency,
|
||||
))
|
||||
.into();
|
||||
buffer_layer_task = tokio::spawn(buffer_layer(
|
||||
message_consumer,
|
||||
buffered_producer.clone(),
|
||||
relay_configs.latency,
|
||||
))
|
||||
.into();
|
||||
let (listener_socket_killer_producer, listener_socket_killer_receiver) =
|
||||
tokio::sync::oneshot::channel();
|
||||
let listener_handler_task = tokio::spawn(listener_handler(
|
||||
listener_socket,
|
||||
acceptor.clone(),
|
||||
relay_configs.tls,
|
||||
buffered_producer.clone(),
|
||||
listener_stream_tasks_producer,
|
||||
timer,
|
||||
listener_socket_killer_receiver,
|
||||
));
|
||||
status_checker(
|
||||
buffered_producer.clone(),
|
||||
timer,
|
||||
new_streamer,
|
||||
streamer_alive_receiver,
|
||||
message_organizer_task,
|
||||
buffer_layer_task,
|
||||
listener_stream_tasks_receiver,
|
||||
listener_handler_task,
|
||||
listener_socket_killer_producer,
|
||||
relay_configs.listener_address.clone(),
|
||||
)
|
||||
.await;
|
||||
drop(streamer_socket);
|
||||
if is_streaming {
|
||||
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
|
||||
let (buffered_producer, _) = channel(BUFFER_LENGTH);
|
||||
message_organizer_task = tokio::spawn(message_organizer(
|
||||
message_producer.clone(),
|
||||
record_consumer,
|
||||
relay_configs.latency,
|
||||
))
|
||||
.into();
|
||||
buffer_layer_task = tokio::spawn(buffer_layer(
|
||||
message_consumer,
|
||||
buffered_producer.clone(),
|
||||
relay_configs.latency,
|
||||
))
|
||||
.into();
|
||||
let (listener_socket_killer_producer, listener_socket_killer_receiver) =
|
||||
tokio::sync::oneshot::channel();
|
||||
let listener_handler_task = tokio::spawn(listener_handler(
|
||||
listener_socket,
|
||||
acceptor.clone(),
|
||||
relay_configs.tls,
|
||||
buffered_producer.clone(),
|
||||
listener_stream_tasks_producer,
|
||||
timer,
|
||||
listener_socket_killer_receiver,
|
||||
));
|
||||
status_checker(
|
||||
buffered_producer.clone(),
|
||||
timer,
|
||||
new_streamer,
|
||||
streamer_alive_receiver,
|
||||
message_organizer_task,
|
||||
buffer_layer_task,
|
||||
listener_stream_tasks_receiver,
|
||||
listener_handler_task,
|
||||
listener_socket_killer_producer,
|
||||
relay_configs.listener_address.clone(),
|
||||
)
|
||||
.await;
|
||||
drop(streamer_socket);
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn listener_handler(
|
||||
|
@ -167,28 +181,47 @@ async fn listener_handler(
|
|||
port: listener_info.port(),
|
||||
};
|
||||
if is_tls {
|
||||
let streamer_tcp_tls = acceptor.accept(tcp_stream).await.unwrap();
|
||||
let wss_stream = tokio_tungstenite::accept_async(streamer_tcp_tls)
|
||||
.await
|
||||
.unwrap();
|
||||
let listener_stream_task = tokio::spawn(stream(
|
||||
new_listener,
|
||||
wss_stream,
|
||||
buffered_producer.subscribe(),
|
||||
));
|
||||
let _ = listener_stream_tasks_producer
|
||||
.send(listener_stream_task)
|
||||
.await;
|
||||
match acceptor.accept(tcp_stream).await {
|
||||
Ok(listener_tcp_tls) => {
|
||||
match tokio_tungstenite::accept_async(listener_tcp_tls).await {
|
||||
Ok(wss_stream) => {
|
||||
let listener_stream_task = tokio::spawn(stream(
|
||||
new_listener,
|
||||
wss_stream,
|
||||
buffered_producer.subscribe(),
|
||||
));
|
||||
let _ = listener_stream_tasks_producer
|
||||
.send(listener_stream_task)
|
||||
.await;
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: TCP WSS Listener | {}", err_val);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: TCP TLS Listener | {}", err_val);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let ws_stream = tokio_tungstenite::accept_async(tcp_stream).await.unwrap();
|
||||
let listener_stream_task = tokio::spawn(stream(
|
||||
new_listener,
|
||||
ws_stream,
|
||||
buffered_producer.subscribe(),
|
||||
));
|
||||
let _ = listener_stream_tasks_producer
|
||||
.send(listener_stream_task)
|
||||
.await;
|
||||
match tokio_tungstenite::accept_async(tcp_stream).await {
|
||||
Ok(ws_stream) => {
|
||||
let listener_stream_task = tokio::spawn(stream(
|
||||
new_listener,
|
||||
ws_stream,
|
||||
buffered_producer.subscribe(),
|
||||
));
|
||||
let _ = listener_stream_tasks_producer
|
||||
.send(listener_stream_task)
|
||||
.await;
|
||||
}
|
||||
Err(err_val) => {
|
||||
eprintln!("Error: TCP WS Listener | {}", err_val);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
println!("New Listener: {} | {:#?}", listener_info, timer.elapsed());
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue