From 4a695dce405a5e8317dbfe6adedb9f64b57b6ab8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Tue, 29 Apr 2025 22:58:24 +0300 Subject: [PATCH] feat: :sparkles: websocket split --- client/src/gui.rs | 6 +++++- protocol/src/lib.rs | 11 ----------- server/Cargo.toml | 2 +- server/src/signal.rs | 44 ++++++++++++++++++++++++++++++++++++++------ 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/client/src/gui.rs b/client/src/gui.rs index f5ec76f..56fa87f 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -39,7 +39,11 @@ pub fn app() -> impl IntoView { let webrtc_answer = webrtc.clone(); let answer_button = button() .on(leptos::ev::click, move |_| { - log!("{:#?}", webrtc_answer.get_status()); + let webrtc_answer = webrtc_answer.clone(); + spawn_local(async move { + let answer_result = webrtc_answer.answer().await; + log!("Answer Result Is = {:#?}", answer_result); + }); }) .child("Answer"); diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 79d5249..db3c0b9 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -3,17 +3,6 @@ use std::{fmt::Display, str::FromStr}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UserAndSignal { - pub user: User, - pub signal: Signal, -} -impl UserAndSignal { - pub async fn new(user: User, signal: Signal) -> Self { - UserAndSignal { user, signal } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct User { pub username: String, diff --git a/server/Cargo.toml b/server/Cargo.toml index 6cc6e53..b8f8e20 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" tokio = { version = "1.42.1", default-features = false, features = ["macros", "rt-multi-thread"] } axum = "0.8.3" tower-http = { version = "0.6.2", default-features = false, features = ["cors"]} -fastwebsockets = { version = "0.10.0", features = ["upgrade", "with_axum"]} +fastwebsockets = { version = "0.10.0", features = ["upgrade", "unstable-split", "with_axum"]} serde = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } diff --git a/server/src/signal.rs b/server/src/signal.rs index 1e903c7..ee0b0dc 100644 --- a/server/src/signal.rs +++ b/server/src/signal.rs @@ -1,13 +1,27 @@ +use std::{collections::VecDeque, sync::LazyLock}; + use axum::{Router, http::StatusCode, response::IntoResponse, routing::get}; use fastwebsockets::{ - OpCode, + FragmentCollectorRead, OpCode, WebSocketError, upgrade::{IncomingUpgrade, UpgradeFut}, }; -use tokio::net::TcpListener; +use protocol::{Signal, SignalType}; +use tokio::{ + net::TcpListener, + sync::{RwLock, broadcast}, +}; use tower_http::cors::CorsLayer; const SERVER_ADDRESS: &str = "192.168.1.3:4546"; +static USER_MESSAGES: LazyLock>> = + LazyLock::new(|| VecDeque::new().into()); + +struct UserMessages { + user: String, + message_receiver: broadcast::Receiver, +} + pub async fn start_signalling() { let router = Router::new() .route("/", get(alive)) @@ -32,15 +46,33 @@ async fn signal(websocket: IncomingUpgrade) -> impl IntoResponse { async fn websocket_handler(websocket: UpgradeFut) { let mut websocket = websocket.await.unwrap(); - websocket.set_auto_pong(true); websocket.set_writev(false); websocket.set_auto_close(true); - while let Ok(received_frame) = websocket.read_frame().await { + let (mut websocket_receiver, websocker_sender) = websocket.split(tokio::io::split); + let mut user = String::default(); + let (message_sender, message_receiver) = broadcast::channel(100); + + if let Ok(received_frame) = websocket_receiver + .read_frame::<_, WebSocketError>(&mut move |_| async { unreachable!() }) + .await + { if let OpCode::Text = received_frame.opcode { - let received_payload = received_frame.payload; - println!("Sent:\n{:#?}", received_payload); + let signal = + serde_json::from_slice::(&received_frame.payload.to_vec()).unwrap(); + + if signal.get_signal_type() == SignalType::Auth && user == String::default() { + let new_user = UserMessages { + user: signal.get_data(), + message_receiver, + }; + user = new_user.user.to_owned(); + USER_MESSAGES.write().await.push_back(new_user); + } else { + return; + } + println!("{:#?}", signal); } } }