diff --git a/server/src/signal.rs b/server/src/signal.rs index be2bc02..299c99b 100644 --- a/server/src/signal.rs +++ b/server/src/signal.rs @@ -104,65 +104,69 @@ async fn websocket_handler(websocket: UpgradeFut) { }; *user.write().await = new_user.user.to_owned(); ONLINE_USERS.write().await.push_back(new_user); - } else { - return; - } - } - } - let user_for_sender = user.clone(); - tokio::spawn(async move { - while ONLINE_USERS.read().await.len() < 2 { - sleep(Duration::from_secs(1)).await; - } - loop { - let mut user_messages = ONLINE_USERS.write().await; - for user_message in user_messages.iter_mut() { - if user_message.user != *user_for_sender.read().await - && user_message.message_receiver.len() > 0 - { - while let Ok(message) = user_message.message_receiver.recv().await { - if let Err(err_val) = websocker_sender - .write_frame(Frame::text(fastwebsockets::Payload::Owned( - serde_json::to_vec(&message).unwrap(), - ))) - .await - { - eprintln!("Error: WebSocket Send | {}", err_val); - let _ = remove_user_from_online_users( - &user_for_sender.read().await.to_owned(), - ) - .await; - break; + let user_for_sender = user.clone(); + tokio::spawn(async move { + while ONLINE_USERS.read().await.len() < 2 { + sleep(Duration::from_secs(1)).await; + } + loop { + let mut user_messages = ONLINE_USERS.write().await; + for user_message in user_messages.iter_mut() { + if user_message.user != *user_for_sender.read().await + && user_message.message_receiver.len() > 0 + { + while let Ok(message) = user_message.message_receiver.recv().await { + if let Err(err_val) = websocker_sender + .write_frame(Frame::text(fastwebsockets::Payload::Owned( + serde_json::to_vec(&message).unwrap(), + ))) + .await + { + eprintln!("Error: WebSocket Send | {}", err_val); + let _ = remove_user_from_online_users( + &user_for_sender.read().await.to_owned(), + ) + .await; + break; + } + if user_message.message_receiver.len() < 1 { + break; + } + } + } } - if user_message.message_receiver.len() < 1 { - break; + sleep(Duration::from_secs(1)).await; + } + }); + + while let Ok(received_frame) = websocket_receiver + .read_frame::<_, WebSocketError>(&mut |_| async { + let _ = remove_user_from_online_users(&user.read().await.to_owned()).await; + Ok(()) + }) + .await + { + if let OpCode::Text = received_frame.opcode { + let signal = + serde_json::from_slice::(&received_frame.payload.to_vec()) + .unwrap(); + + if signal.get_signal_type() != SignalType::Auth { + if let Err(err_val) = message_sender.send(signal) { + eprintln!("Error: WebSocket Channel Send | {}", err_val); + let _ = + remove_user_from_online_users(&user.read().await.to_owned()) + .await; + } + } else { + let _ = + remove_user_from_online_users(&user.read().await.to_owned()).await; + return; } } } - } - sleep(Duration::from_secs(1)).await; - } - }); - - while let Ok(received_frame) = websocket_receiver - .read_frame::<_, WebSocketError>(&mut |_| async { - let _ = remove_user_from_online_users(&user.read().await.to_owned()).await; - Ok(()) - }) - .await - { - if let OpCode::Text = received_frame.opcode { - let signal = - serde_json::from_slice::(&received_frame.payload.to_vec()).unwrap(); - - if signal.get_signal_type() != SignalType::Auth { - if let Err(err_val) = message_sender.send(signal) { - eprintln!("Error: WebSocket Channel Send | {}", err_val); - let _ = remove_user_from_online_users(&user.read().await.to_owned()).await; - } } else { - let _ = remove_user_from_online_users(&user.read().await.to_owned()).await; return; } }