rust_webrtc/server/src/signal.rs

171 lines
5.7 KiB
Rust
Raw Normal View History

use std::{
collections::VecDeque,
sync::{Arc, LazyLock},
time::Duration,
};
2025-04-29 22:58:24 +03:00
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
use fastwebsockets::{
Frame, OpCode, WebSocketError,
upgrade::{IncomingUpgrade, UpgradeFut},
};
use protocol::{Error, Signal, SignalType};
2025-04-29 22:58:24 +03:00
use tokio::{
net::TcpListener,
sync::{RwLock, broadcast},
time::sleep,
2025-04-29 22:58:24 +03:00
};
use tower_http::cors::CorsLayer;
const SERVER_ADDRESS: &str = "192.168.1.3:4546";
2025-05-07 17:01:56 +03:00
static ONLINE_USERS: LazyLock<RwLock<VecDeque<UserMessages>>> =
2025-04-29 22:58:24 +03:00
LazyLock::new(|| VecDeque::new().into());
#[derive(Debug)]
2025-04-29 22:58:24 +03:00
struct UserMessages {
user: String,
message_receiver: broadcast::Receiver<Signal>,
}
pub async fn start_signalling() {
let router = Router::new()
.route("/", get(alive))
.route("/signal", get(signal))
.layer(CorsLayer::permissive());
let listener = TcpListener::bind(SERVER_ADDRESS).await.unwrap();
println!("{}", SERVER_ADDRESS);
axum::serve(listener, router).await.unwrap();
}
async fn alive() -> impl IntoResponse {
StatusCode::OK
}
async fn signal(websocket: IncomingUpgrade) -> impl IntoResponse {
let (response, websocket) = websocket.upgrade().unwrap();
tokio::spawn(websocket_handler(websocket));
response
}
async fn remove_user_from_online_users(user: &String) -> Result<(), Error> {
let mut target_index = None;
let mut online_users = ONLINE_USERS.write().await;
for (index, online_user) in online_users.iter().enumerate() {
if online_user.user == *user {
target_index = Some(index);
}
}
if let Some(target_index) = target_index {
online_users.remove(target_index).expect("Should Not");
Ok(())
} else {
Err(Error::UnregisteredUser)
}
}
async fn websocket_handler(websocket: UpgradeFut) {
let mut websocket = websocket.await.unwrap();
websocket.set_auto_pong(true);
websocket.set_writev(false);
websocket.set_auto_close(true);
let (mut websocket_receiver, mut websocker_sender) = websocket.split(tokio::io::split);
let user = Arc::new(RwLock::new(String::default()));
2025-04-29 22:58:24 +03:00
let (message_sender, message_receiver) = broadcast::channel(100);
let user_for_receiver_first_connection_disconnect_check = user.clone();
2025-04-29 22:58:24 +03:00
if let Ok(received_frame) = websocket_receiver
.read_frame::<_, WebSocketError>(&mut move |_| {
let user_for_receiver_disconnect_check =
user_for_receiver_first_connection_disconnect_check.clone();
async move {
let _ = remove_user_from_online_users(
&*user_for_receiver_disconnect_check.read().await,
)
.await;
Ok(())
}
})
2025-04-29 22:58:24 +03:00
.await
{
if let OpCode::Text = received_frame.opcode {
2025-04-29 22:58:24 +03:00
let signal =
serde_json::from_slice::<Signal>(&received_frame.payload.to_vec()).unwrap();
if signal.get_signal_type() == SignalType::Auth
&& *user.read().await == String::default()
{
2025-04-29 22:58:24 +03:00
let new_user = UserMessages {
user: signal.get_data(),
message_receiver,
};
*user.write().await = new_user.user.to_owned();
2025-05-07 17:01:56 +03:00
ONLINE_USERS.write().await.push_back(new_user);
2025-04-29 22:58:24 +03:00
} else {
return;
}
}
}
let user_for_sender = user.clone();
tokio::spawn(async move {
2025-05-07 17:01:56 +03:00
while ONLINE_USERS.read().await.len() < 2 {
sleep(Duration::from_secs(1)).await;
}
loop {
2025-05-07 17:01:56 +03:00
let mut user_messages = ONLINE_USERS.write().await;
for user_message in user_messages.iter_mut() {
if user_message.user != *user_for_sender.read().await
&& user_message.message_receiver.len() > 0
{
while let Ok(message) = user_message.message_receiver.recv().await {
if let Err(err_val) = websocker_sender
.write_frame(Frame::text(fastwebsockets::Payload::Owned(
serde_json::to_vec(&message).unwrap(),
)))
.await
{
eprintln!("Error: WebSocket Send | {}", err_val);
let _ = remove_user_from_online_users(
&user_for_sender.read().await.to_owned(),
)
.await;
2025-05-07 17:01:56 +03:00
break;
}
if user_message.message_receiver.len() < 1 {
break;
}
}
}
}
sleep(Duration::from_secs(1)).await;
}
});
while let Ok(received_frame) = websocket_receiver
.read_frame::<_, WebSocketError>(&mut |_| async {
let _ = remove_user_from_online_users(&user.read().await.to_owned()).await;
Ok(())
})
.await
{
if let OpCode::Text = received_frame.opcode {
let signal =
serde_json::from_slice::<Signal>(&received_frame.payload.to_vec()).unwrap();
if signal.get_signal_type() != SignalType::Auth {
if let Err(err_val) = message_sender.send(signal) {
eprintln!("Error: WebSocket Channel Send | {}", err_val);
let _ = remove_user_from_online_users(&user.read().await.to_owned()).await;
}
} else {
let _ = remove_user_from_online_users(&user.read().await.to_owned()).await;
return;
}
}
}
}