From 4f874d87893f00a6d9c4ac892575260f2454b64e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Wed, 16 Apr 2025 06:27:08 +0300 Subject: [PATCH] feat: :sparkles: token extractor middleware --- client/Cargo.toml | 3 ++ client/src/rtc.rs | 23 ++++++++--- client/src/signal.rs | 50 +++++++++++++++++++----- protocol/src/lib.rs | 47 ++++++++++++++++++++--- server/Cargo.toml | 4 ++ server/src/lib.rs | 1 + server/src/main.rs | 7 +++- server/src/middleware.rs | 83 ++++++++++++++++++++++++++++++++++++++++ server/src/signal.rs | 61 ++++++++++++++++++++++++----- 9 files changed, 247 insertions(+), 32 deletions(-) create mode 100644 server/src/middleware.rs diff --git a/client/Cargo.toml b/client/Cargo.toml index 4ca8d54..47274af 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -21,8 +21,11 @@ web-sys = { version = "0.3.77", features = [ "MediaTrackConstraintSet", "Navigator", "RtcConfiguration", + "RtcIceCandidate", + "RtcIceCandidateInit", "RtcIceServer", "RtcPeerConnection", + "RtcPeerConnectionState", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", diff --git a/client/src/rtc.rs b/client/src/rtc.rs index 1369509..5cf6f77 100644 --- a/client/src/rtc.rs +++ b/client/src/rtc.rs @@ -48,18 +48,25 @@ pub async fn offer(username: String) { if let Err(err_val) = send_offer(&username, &data).await { log!("Error: Send Offer | {}", err_val) } + for _ in 0..10 { match receive_answer(&username).await { - Ok(received_answer) => { - log!("{:#?}", received_answer); + Ok(received_user_and_signal_answer) => { + log!("{:#?}", received_user_and_signal_answer); let peer_connection_session_answer = RtcSessionDescriptionInit::new(RtcSdpType::Answer); - peer_connection_session_answer.set_sdp(received_answer.get_data().as_str()); + peer_connection_session_answer + .set_sdp(received_user_and_signal_answer.signal.get_data().as_str()); JsFuture::from( peer_connection.set_remote_description(&peer_connection_session_answer), ) .await .unwrap(); + + for _ in 0..100 { + log!("{:#?}", peer_connection.connection_state()); + sleep(1000).await; + } break; } Err(err_val) => log!("Error: Receive Answer | {}", err_val), @@ -71,11 +78,12 @@ pub async fn offer(username: String) { pub async fn answer(username: String) { for _ in 0..10 { match receive_offer(&username).await { - Ok(offer) => { + Ok(received_user_and_signal_offer) => { let peer_connection = create_peer_connection_with_configuration().await; let peer_connection_session_offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer); - peer_connection_session_offer.set_sdp(offer.get_data().as_str()); + peer_connection_session_offer + .set_sdp(received_user_and_signal_offer.signal.get_data().as_str()); JsFuture::from( peer_connection.set_remote_description(&peer_connection_session_offer), ) @@ -99,6 +107,11 @@ pub async fn answer(username: String) { let data = session_answer; send_answer(&username, &data).await.unwrap(); + + for _ in 0..100 { + log!("{:#?}", peer_connection.connection_state()); + sleep(1000).await; + } break; } Err(err_val) => log!("Error: Receive Offer | {}", err_val), diff --git a/client/src/signal.rs b/client/src/signal.rs index fa4055b..dd53eab 100644 --- a/client/src/signal.rs +++ b/client/src/signal.rs @@ -1,11 +1,11 @@ use std::sync::LazyLock; use leptos::logging::log; -use protocol::{Signal, SignalType}; +use protocol::{Signal, SignalType, UserAndSignal}; use reqwest::{Response, header::HeaderMap}; use serde_json::{Value, json}; -const SIGNALLING_ADDRESS: &str = "http://127.0.0.1:4546"; +const SIGNALLING_ADDRESS: &str = "http://192.168.1.3:4546"; static REQUEST_CLIENT: LazyLock = LazyLock::new(|| reqwest::Client::new()); async fn create_headers(headers: Vec<(&'static str, String)>) -> HeaderMap { @@ -58,15 +58,17 @@ pub async fn send_offer(username: &String, data: &String) -> Result<(), reqwest: .map(|_| Ok(()))? } -pub async fn receive_offer(username: &String) -> Result> { +pub async fn receive_offer(username: &String) -> Result> { let result = get_json(username, SignalType::Offer) .await - .map(async |response| response.json::().await)? + .map(async |response| response.json::().await)? .await?; - if result.get_signal_type() == SignalType::Offer { + if result.signal.get_signal_type() == SignalType::Offer { Ok(result) } else { - Err(protocol::Error::SignalType(result.get_signal_type()))? + Err(protocol::Error::UnexpectedSignalType( + result.signal.get_signal_type(), + ))? } } @@ -78,14 +80,42 @@ pub async fn send_answer(username: &String, data: &String) -> Result<(), reqwest .map(|_| Ok(()))? } -pub async fn receive_answer(username: &String) -> Result> { +pub async fn receive_answer( + username: &String, +) -> Result> { let result = get_json(username, SignalType::Answer) .await - .map(async |response| response.json::().await)? + .map(async |response| response.json::().await)? .await?; - if result.get_signal_type() == SignalType::Answer { + if result.signal.get_signal_type() == SignalType::Answer { Ok(result) } else { - Err(protocol::Error::SignalType(result.get_signal_type()))? + Err(protocol::Error::UnexpectedSignalType( + result.signal.get_signal_type(), + ))? + } +} + +pub async fn send_ice_candidate(username: &String, data: &String) -> Result<(), reqwest::Error> { + let rtc_session_answer_signal = Signal::new(&SignalType::ICECandidate, data); + let rtc_session_answer_signal = json!(rtc_session_answer_signal); + post_json(username, &rtc_session_answer_signal) + .await + .map(|_| Ok(()))? +} + +pub async fn receive_ice_candidate( + username: &String, +) -> Result> { + let result = get_json(username, SignalType::Answer) + .await + .map(async |response| response.json::().await)? + .await?; + if result.signal.get_signal_type() == SignalType::ICECandidate { + Ok(result) + } else { + Err(protocol::Error::UnexpectedSignalType( + result.signal.get_signal_type(), + ))? } } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 09186f3..8f3a3f0 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -1,11 +1,28 @@ -use std::fmt::Display; +use std::{fmt::Display, str::FromStr}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserAndSignal { + pub user: User, + pub signal: Signal, +} +impl UserAndSignal { + pub async fn new(user: User, signal: Signal) -> Self { + UserAndSignal { user, signal } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct User { + pub username: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum Error { - SignalType(SignalType), + UnexpectedSignalType(SignalType), + InvalidSignalType(String), } impl std::error::Error for Error { @@ -17,8 +34,11 @@ impl std::error::Error for Error { impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Error::SignalType(signal_type) => { - write!(f, "Not Expected Signal Type: {}", signal_type) + Error::UnexpectedSignalType(signal_type) => { + write!(f, "Unexpected Signal Type: {}", signal_type) + } + Error::InvalidSignalType(invalid_signal_type) => { + write!(f, "Invalid Signal Type: {}", invalid_signal_type) } } } @@ -29,9 +49,23 @@ pub enum SignalType { Auth, Offer, Answer, + ICECandidate, } -#[derive(Debug, Clone, Serialize, Deserialize)] +impl FromStr for SignalType { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "Auth" => Ok(SignalType::Auth), + "Offer" => Ok(SignalType::Offer), + "Answer" => Ok(SignalType::Answer), + _ => Err(Error::InvalidSignalType(s.to_owned())), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Signal { signal_type: SignalType, data: String, @@ -44,6 +78,7 @@ impl Display for SignalType { SignalType::Auth => write!(f, "Auth"), SignalType::Offer => write!(f, "Offer"), SignalType::Answer => write!(f, "Answer"), + SignalType::ICECandidate => write!(f, "ICE Candidate"), } } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 774fdf9..89c393f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -7,7 +7,11 @@ edition = "2024" axum = { version = "0.8.3", features = ["json"] } axum-macros = "0.5.0" tokio = "1.42.1" +tower-http = { version = "0.6.2", features = ["cors", "trace"] } +tracing-subscriber = "0.3.19" +tracing = "0.1.41" webrtc = "0.12.0" serde = { workspace = true } +serde_json = { workspace = true } chrono = { workspace = true } protocol = { path = "../protocol" } diff --git a/server/src/lib.rs b/server/src/lib.rs index 9ba444c..62bc3f4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -2,6 +2,7 @@ use std::sync::LazyLock; use utils::naive_toml_parser; +mod middleware; pub mod signal; pub mod utils; diff --git a/server/src/main.rs b/server/src/main.rs index 6c4d798..26ac281 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,8 +1,11 @@ use rust_communication_server::signal::start_signalling; +use tracing::Level; #[tokio::main] async fn main() { println!("Hello, world!"); - - tokio::spawn(start_signalling()).await.unwrap(); + tracing_subscriber::fmt() + .with_max_level(Level::TRACE) + .init(); + start_signalling().await; } diff --git a/server/src/middleware.rs b/server/src/middleware.rs new file mode 100644 index 0000000..21edb90 --- /dev/null +++ b/server/src/middleware.rs @@ -0,0 +1,83 @@ +use std::{str::FromStr, sync::Arc}; + +use axum::{ + extract::Request, + http::{self, HeaderMap, StatusCode}, + middleware::Next, + response::IntoResponse, +}; +use protocol::{SignalType, User}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct UserAndExpectedSignal { + pub user: User, + pub expected_signal: SignalType, +} + +async fn extract_user_from_authorization_header(headers: &HeaderMap) -> Option { + if let Some(authorization_header) = headers.get(http::header::AUTHORIZATION) { + dbg!(authorization_header); + if let Ok(authorization_header) = authorization_header.to_str() { + if let Some((bearer, authorization_token)) = authorization_header.split_once(' ') { + dbg!( + "Info: Verify | Http Header | {} || {}", + bearer, + authorization_token + ); + if bearer.to_lowercase() == "bearer" { + let user = User { + username: authorization_token.to_string(), + }; + return Some(user); + } + } + } + } + None +} + +pub async fn verify_then_get_user(mut request: Request, next: Next) -> impl IntoResponse { + let headers = request.headers(); + dbg!(headers); + if let Some(user) = extract_user_from_authorization_header(headers).await { + let user = Arc::new(user); + request.extensions_mut().insert(user); + return next.run(request).await; + } + + StatusCode::FORBIDDEN.into_response() +} + +pub async fn verify_then_get_user_and_expected_signal( + mut request: Request, + next: Next, +) -> impl IntoResponse { + let headers = request.headers(); + if let Some(user) = extract_user_from_authorization_header(headers).await { + if let Ok(expected_signal) = headers.get("EXPECTED_SIGNAL").unwrap().to_str() { + match SignalType::from_str(expected_signal) { + Ok(expected_signal) => { + let user_and_expected_signal = UserAndExpectedSignal { + user, + expected_signal, + }; + let user_and_expected_signal = Arc::new(user_and_expected_signal); + request.extensions_mut().insert(user_and_expected_signal); + next.run(request).await + } + Err(err_val) => { + eprintln!( + "Error: Verify and Get Expected Signal | Signal Type Conversion | {}", + err_val + ); + StatusCode::BAD_REQUEST.into_response() + } + } + } else { + StatusCode::BAD_REQUEST.into_response() + } + } else { + StatusCode::FORBIDDEN.into_response() + } +} diff --git a/server/src/signal.rs b/server/src/signal.rs index e18ef29..1fff106 100644 --- a/server/src/signal.rs +++ b/server/src/signal.rs @@ -1,28 +1,45 @@ use std::sync::{Arc, LazyLock, RwLock}; use axum::{ - Json, Router, + Extension, Json, Router, http::StatusCode, response::IntoResponse, routing::{get, post}, }; use axum_macros::debug_handler; -use protocol::Signal; +use protocol::{Signal, User, UserAndSignal}; use tokio::net::TcpListener; +use tower_http::{cors::CorsLayer, trace::TraceLayer}; -static SIGNALS: LazyLock>>> = - LazyLock::new(|| Arc::new(RwLock::new(vec![]))); +use crate::middleware::{ + UserAndExpectedSignal, verify_then_get_user, verify_then_get_user_and_expected_signal, +}; + +static USERS_AND_SIGNALS: LazyLock>> = + LazyLock::new(|| RwLock::new(vec![])); pub async fn start_signalling() { - let route = route(); - let listener = TcpListener::bind("0.0.0.0:4546").await.unwrap(); + let route = route() + .layer(CorsLayer::permissive()) + .layer(TraceLayer::new_for_http()); + let listener = TcpListener::bind("192.168.1.3:4546").await.unwrap(); + println!("http://192.168.1.3:4546"); axum::serve(listener, route).await.unwrap(); } fn route() -> Router { Router::new() .route("/alive", get(alive)) - .route("/", post(signal)) + .route( + "/", + get(read_signal).route_layer(axum::middleware::from_fn( + verify_then_get_user_and_expected_signal, + )), + ) + .route( + "/", + post(create_signal).route_layer(axum::middleware::from_fn(verify_then_get_user)), + ) } async fn alive() -> impl IntoResponse { @@ -30,7 +47,33 @@ async fn alive() -> impl IntoResponse { } #[debug_handler] -async fn signal(Json(signal): Json) -> impl IntoResponse { - SIGNALS.write().unwrap().push(signal); +async fn create_signal( + Extension(user): Extension>, + Json(signal): Json, +) -> impl IntoResponse { + let user = (*user).clone(); + let user_and_signal = UserAndSignal::new(user, signal).await; + USERS_AND_SIGNALS.write().unwrap().push(user_and_signal); StatusCode::OK } + +async fn read_signal( + Extension(user_and_expected_signal): Extension>, +) -> impl IntoResponse { + let mut target_index = None; + let mut json_body = serde_json::json!(""); + for (index, user_and_signal) in USERS_AND_SIGNALS.read().unwrap().iter().enumerate() { + if user_and_signal.signal.get_signal_type() == user_and_expected_signal.expected_signal { + json_body = serde_json::json!(user_and_signal); + target_index = Some(index); + } + } + + match target_index { + Some(target_index) => { + USERS_AND_SIGNALS.write().unwrap().remove(target_index); + (StatusCode::OK, Json(json_body)).into_response() + } + None => StatusCode::BAD_REQUEST.into_response(), + } +}