feat: Disconnect handler

This commit is contained in:
Ahmet Kaan Gümüş 2025-05-07 17:01:56 +03:00
parent 604806670e
commit 5981ecf148

View file

@ -15,7 +15,7 @@ use tower_http::cors::CorsLayer;
const SERVER_ADDRESS: &str = "192.168.1.3:4546"; const SERVER_ADDRESS: &str = "192.168.1.3:4546";
static USER_MESSAGES: LazyLock<RwLock<VecDeque<UserMessages>>> = static ONLINE_USERS: LazyLock<RwLock<VecDeque<UserMessages>>> =
LazyLock::new(|| VecDeque::new().into()); LazyLock::new(|| VecDeque::new().into());
#[derive(Debug)] #[derive(Debug)]
@ -57,7 +57,7 @@ async fn websocket_handler(websocket: UpgradeFut) {
let (message_sender, message_receiver) = broadcast::channel(100); let (message_sender, message_receiver) = broadcast::channel(100);
if let Ok(received_frame) = websocket_receiver if let Ok(received_frame) = websocket_receiver
.read_frame::<_, WebSocketError>(&mut move |_| async { unreachable!() }) .read_frame::<_, WebSocketError>(&mut move |_| async { Ok(()) })
.await .await
{ {
if let OpCode::Text = received_frame.opcode { if let OpCode::Text = received_frame.opcode {
@ -70,7 +70,7 @@ async fn websocket_handler(websocket: UpgradeFut) {
message_receiver, message_receiver,
}; };
user = new_user.user.to_owned(); user = new_user.user.to_owned();
USER_MESSAGES.write().await.push_back(new_user); ONLINE_USERS.write().await.push_back(new_user);
} else { } else {
return; return;
} }
@ -78,11 +78,11 @@ async fn websocket_handler(websocket: UpgradeFut) {
} }
tokio::spawn(async move { tokio::spawn(async move {
while USER_MESSAGES.read().await.len() < 2 { while ONLINE_USERS.read().await.len() < 2 {
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
} }
loop { loop {
let mut user_messages = USER_MESSAGES.write().await; let mut user_messages = ONLINE_USERS.write().await;
for user_message in user_messages.iter_mut() { for user_message in user_messages.iter_mut() {
if user_message.user != user && user_message.message_receiver.len() > 0 { if user_message.user != user && user_message.message_receiver.len() > 0 {
while let Ok(message) = user_message.message_receiver.recv().await { while let Ok(message) = user_message.message_receiver.recv().await {
@ -93,6 +93,7 @@ async fn websocket_handler(websocket: UpgradeFut) {
.await .await
{ {
eprintln!("Error: WebSocket Send | {}", err_val); eprintln!("Error: WebSocket Send | {}", err_val);
break;
} }
if user_message.message_receiver.len() < 1 { if user_message.message_receiver.len() < 1 {
break; break;
@ -105,7 +106,7 @@ async fn websocket_handler(websocket: UpgradeFut) {
}); });
while let Ok(received_frame) = websocket_receiver while let Ok(received_frame) = websocket_receiver
.read_frame::<_, WebSocketError>(&mut move |_| async { unreachable!() }) .read_frame::<_, WebSocketError>(&mut move |_| async { Ok(()) })
.await .await
{ {
if let OpCode::Text = received_frame.opcode { if let OpCode::Text = received_frame.opcode {