feat: destroy chat after timeout

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-05-26 20:41:33 +03:00
parent 0d12e3f420
commit fb951c4ae7
6 changed files with 48 additions and 14 deletions

View file

@ -1,3 +1,4 @@
axum_address:192.168.1.2 axum_address:192.168.1.2
port:2334 port:2334
max_message:150 max_message:150
chat_cleaning_timeout:600

View file

@ -1,4 +1,4 @@
use std::{collections::VecDeque, time::Duration}; use std::collections::VecDeque;
use chrono::Utc; use chrono::Utc;
use serde::Serialize; use serde::Serialize;
@ -15,6 +15,7 @@ pub struct Message {
pub struct Chat { pub struct Chat {
pub room_id: String, pub room_id: String,
pub messages: VecDeque<Message>, pub messages: VecDeque<Message>,
pub last_interaction: u64,
} }
impl Chat { impl Chat {
@ -22,6 +23,7 @@ impl Chat {
Chat { Chat {
room_id, room_id,
messages: vec![].into(), messages: vec![].into(),
last_interaction: Utc::now().timestamp() as u64,
} }
} }
@ -29,9 +31,10 @@ impl Chat {
message.calculate_hash(); message.calculate_hash();
if !self.is_message_exists(message.clone()) { if !self.is_message_exists(message.clone()) {
self.messages.push_back(message); self.messages.push_back(message);
self.last_interaction = Utc::now().timestamp() as u64;
if self.messages.len() > max_message_count as usize { if self.messages.len() > max_message_count as usize {
let _ = self.messages.pop_front(); let _ = self.messages.pop_front();
} }
return true; return true;
} }
false false
@ -45,15 +48,6 @@ impl Chat {
} }
false false
} }
pub async fn message_cleaner(&mut self, max_message_count: u64) {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
if self.messages.len() > max_message_count as usize {
self.messages.clear();
}
}
}
} }
impl Message { impl Message {

View file

@ -1,6 +1,7 @@
use std::{net::IpAddr, sync::Arc}; use std::{net::IpAddr, sync::Arc, time::Duration};
use chat::Chat; use chat::Chat;
use chrono::Utc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
pub mod chat; pub mod chat;
@ -12,12 +13,14 @@ pub struct ServerConfig {
pub ip_address: IpAddr, pub ip_address: IpAddr,
pub port: u16, pub port: u16,
pub max_message_counter: u16, pub max_message_counter: u16,
pub chat_cleaning_timeout: u16,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AppState { pub struct AppState {
pub chats: Arc<Mutex<Vec<Chat>>>, pub chats: Arc<Mutex<Vec<Chat>>>,
pub max_message_counter: u16, pub max_message_counter: u16,
pub chat_cleaning_timeout: u16,
} }
impl AppState { impl AppState {
@ -30,4 +33,29 @@ impl AppState {
} }
None None
} }
pub async fn chat_destroyer(
chats: Arc<Mutex<Vec<Chat>>>,
room_id: String,
chat_cleaning_timeout: u16,
) {
let timeout_control = (chat_cleaning_timeout / 10) as u64;
loop {
tokio::time::sleep(Duration::from_secs(timeout_control)).await;
let mut chats = chats.lock().await;
let current = Utc::now().timestamp() as u64;
for i in 0..chats.len() {
if chats[i].room_id == room_id {
if chats[i].last_interaction < current - chat_cleaning_timeout as u64 {
chats.remove(i);
return;
}
}
}
// if chats[chat_index].last_interaction < current - chat_cleaning_timeout as u64 {
// chats.remove(chat_index);
// return;
// }
}
}
} }

View file

@ -18,6 +18,7 @@ async fn main() {
let state = AppState { let state = AppState {
chats: Arc::new(Mutex::new(vec![])), chats: Arc::new(Mutex::new(vec![])),
max_message_counter: server_config.max_message_counter, max_message_counter: server_config.max_message_counter,
chat_cleaning_timeout: server_config.chat_cleaning_timeout,
}; };
let app = routing(axum::extract::State(state)).await; let app = routing(axum::extract::State(state)).await;
let addr = SocketAddr::new(server_config.ip_address, server_config.port); let addr = SocketAddr::new(server_config.ip_address, server_config.port);

View file

@ -53,13 +53,20 @@ async fn receive_message(
let mut new_chat = Chat::new(room_id); let mut new_chat = Chat::new(room_id);
new_chat.add_message(message, state.max_message_counter); new_chat.add_message(message, state.max_message_counter);
let mut chats = state.chats.lock().await; let mut chats = state.chats.lock().await;
let room_id = new_chat.room_id.clone();
chats.push(new_chat); chats.push(new_chat);
drop(chats);
tokio::spawn(AppState::chat_destroyer(
state.chats.clone(),
room_id,
state.chat_cleaning_timeout,
));
} }
} }
} }
async fn send_message( async fn send_message(
Path(room_id): Path<String>, Path(room_id): Path<String>,
State(mut state): State<AppState>, State(mut state): State<AppState>,
) -> impl IntoResponse { ) -> impl IntoResponse {
match state.is_chat_exists(&room_id).await { match state.is_chat_exists(&room_id).await {

View file

@ -21,10 +21,13 @@ pub fn read_server_config() -> ServerConfig {
let max_message_counter: Vec<&str> = configs_uncleaned[2].split(':').collect(); let max_message_counter: Vec<&str> = configs_uncleaned[2].split(':').collect();
let max_message_counter = max_message_counter[1].parse().unwrap(); let max_message_counter = max_message_counter[1].parse().unwrap();
let chat_cleaning_timeout: Vec<&str> = configs_uncleaned[3].split(':').collect();
let chat_cleaning_timeout = chat_cleaning_timeout[1].parse().unwrap();
ServerConfig { ServerConfig {
ip_address, ip_address,
port, port,
max_message_counter, max_message_counter,
chat_cleaning_timeout,
} }
} }