refactor: ♻️ message send and receive via websocket

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-09 12:07:10 +03:00
parent ff5742562e
commit ab51292439

View file

@ -104,65 +104,69 @@ async fn websocket_handler(websocket: UpgradeFut) {
}; };
*user.write().await = new_user.user.to_owned(); *user.write().await = new_user.user.to_owned();
ONLINE_USERS.write().await.push_back(new_user); ONLINE_USERS.write().await.push_back(new_user);
} else {
return;
}
}
}
let user_for_sender = user.clone(); let user_for_sender = user.clone();
tokio::spawn(async move { tokio::spawn(async move {
while ONLINE_USERS.read().await.len() < 2 { while ONLINE_USERS.read().await.len() < 2 {
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
} }
loop { loop {
let mut user_messages = ONLINE_USERS.write().await; let mut user_messages = ONLINE_USERS.write().await;
for user_message in user_messages.iter_mut() { for user_message in user_messages.iter_mut() {
if user_message.user != *user_for_sender.read().await if user_message.user != *user_for_sender.read().await
&& user_message.message_receiver.len() > 0 && user_message.message_receiver.len() > 0
{ {
while let Ok(message) = user_message.message_receiver.recv().await { while let Ok(message) = user_message.message_receiver.recv().await {
if let Err(err_val) = websocker_sender if let Err(err_val) = websocker_sender
.write_frame(Frame::text(fastwebsockets::Payload::Owned( .write_frame(Frame::text(fastwebsockets::Payload::Owned(
serde_json::to_vec(&message).unwrap(), serde_json::to_vec(&message).unwrap(),
))) )))
.await .await
{ {
eprintln!("Error: WebSocket Send | {}", err_val); eprintln!("Error: WebSocket Send | {}", err_val);
let _ = remove_user_from_online_users( let _ = remove_user_from_online_users(
&user_for_sender.read().await.to_owned(), &user_for_sender.read().await.to_owned(),
) )
.await; .await;
break; break;
}
if user_message.message_receiver.len() < 1 {
break;
}
}
}
} }
if user_message.message_receiver.len() < 1 { sleep(Duration::from_secs(1)).await;
break; }
});
while let Ok(received_frame) = websocket_receiver
.read_frame::<_, WebSocketError>(&mut |_| async {
let _ = remove_user_from_online_users(&user.read().await.to_owned()).await;
Ok(())
})
.await
{
if let OpCode::Text = received_frame.opcode {
let signal =
serde_json::from_slice::<Signal>(&received_frame.payload.to_vec())
.unwrap();
if signal.get_signal_type() != SignalType::Auth {
if let Err(err_val) = message_sender.send(signal) {
eprintln!("Error: WebSocket Channel Send | {}", err_val);
let _ =
remove_user_from_online_users(&user.read().await.to_owned())
.await;
}
} else {
let _ =
remove_user_from_online_users(&user.read().await.to_owned()).await;
return;
} }
} }
} }
}
sleep(Duration::from_secs(1)).await;
}
});
while let Ok(received_frame) = websocket_receiver
.read_frame::<_, WebSocketError>(&mut |_| async {
let _ = remove_user_from_online_users(&user.read().await.to_owned()).await;
Ok(())
})
.await
{
if let OpCode::Text = received_frame.opcode {
let signal =
serde_json::from_slice::<Signal>(&received_frame.payload.to_vec()).unwrap();
if signal.get_signal_type() != SignalType::Auth {
if let Err(err_val) = message_sender.send(signal) {
eprintln!("Error: WebSocket Channel Send | {}", err_val);
let _ = remove_user_from_online_users(&user.read().await.to_owned()).await;
}
} else { } else {
let _ = remove_user_from_online_users(&user.read().await.to_owned()).await;
return; return;
} }
} }