From 3b122dc4f7edf817b5a4292f642f1addf90006b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Thu, 17 Apr 2025 04:09:18 +0300 Subject: [PATCH] feat: :sparkles: ice local and remote changes --- client/Cargo.toml | 2 + client/src/gui.rs | 8 +++- client/src/rtc.rs | 89 ++++++++++++++++++++++++++++++++-------- protocol/src/lib.rs | 2 +- server/src/middleware.rs | 9 ++-- server/src/signal.rs | 5 ++- 6 files changed, 88 insertions(+), 27 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 47274af..4f0171d 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -23,8 +23,10 @@ web-sys = { version = "0.3.77", features = [ "RtcConfiguration", "RtcIceCandidate", "RtcIceCandidateInit", + "RtcIceConnectionState", "RtcIceServer", "RtcPeerConnection", + "RtcPeerConnectionIceEvent", "RtcPeerConnectionState", "RtcSdpType", "RtcSessionDescription", diff --git a/client/src/gui.rs b/client/src/gui.rs index 1d73da2..da82796 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -81,7 +81,9 @@ fn signalling(username: (ReadSignal, WriteSignal)) -> impl IntoV fn rtc_offer(username: ReadSignal) -> impl IntoView { let offer_trigger = move || { - spawn_local(offer(username.get())); + spawn_local(async move { + let peer_connection = offer(username.get()).await; + }); }; let offer_button = button() @@ -95,7 +97,9 @@ fn rtc_offer(username: ReadSignal) -> impl IntoView { fn rtc_answer(username: ReadSignal) -> impl IntoView { let answer_trigger = move || { - spawn_local(answer(username.get())); + spawn_local(async move { + let peer_connection = answer(username.get()).await; + }); }; let answer_button = button() diff --git a/client/src/rtc.rs b/client/src/rtc.rs index 5cf6f77..2212735 100644 --- a/client/src/rtc.rs +++ b/client/src/rtc.rs @@ -1,16 +1,67 @@ -use leptos::logging::log; +use leptos::{ + logging::log, + prelude::{Set, Signal, Update, Write, WriteSignal}, + task::spawn_local, +}; use wasm_bindgen_futures::JsFuture; use web_sys::{ - RtcConfiguration, RtcIceServer, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit, + RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, RtcPeerConnection, + RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit, js_sys::{Array, Reflect}, - wasm_bindgen::{JsCast, JsValue}, + wasm_bindgen::{JsCast, JsValue, prelude::Closure}, }; use crate::{ - signal::{receive_answer, receive_offer, send_answer, send_offer}, + signal::{ + receive_answer, receive_ice_candidate, receive_offer, send_answer, send_ice_candidate, + send_offer, + }, sleep, }; +async fn local_ice_candidate_handler(username: &String, peer_connection: &RtcPeerConnection) { + let local_ice_candidate_handler: Box = + Box::new(move |username, peer_connection_ice_event| { + spawn_local(async move { + { + let username: &String = &username; + async move { + sleep(1000).await; + + if let Some(peer_connection_ice_candidate) = + peer_connection_ice_event.candidate() + { + let peer_connection_ice_candidate = + peer_connection_ice_candidate.as_string().unwrap(); + send_ice_candidate(username, &peer_connection_ice_candidate) + .await + .unwrap(); + } + } + } + .await + }); + }); + + let local_ice_candidate_handler = Closure::wrap(local_ice_candidate_handler); + + peer_connection.set_onicecandidate(Some(local_ice_candidate_handler.as_ref().unchecked_ref())); +} + +async fn remote_ice_candidate_handler(username: &String, peer_connection: &RtcPeerConnection) { + if let Ok(user_and_signal) = receive_ice_candidate(username).await { + let peer_connection_ice_candidate_init = + RtcIceCandidateInit::new(&user_and_signal.signal.get_data()); + let peer_connection_ice_candidate = + RtcIceCandidate::new(&peer_connection_ice_candidate_init).unwrap(); + let peer_connection_add_ice_candidate_promise = peer_connection + .add_ice_candidate_with_opt_rtc_ice_candidate(Some(&peer_connection_ice_candidate)); + JsFuture::from(peer_connection_add_ice_candidate_promise) + .await + .unwrap(); + } +} + async fn create_peer_connection_with_configuration() -> RtcPeerConnection { let ice_server_addresses = vec![JsValue::from("stun:stun.l.google.com:19302")] .into_iter() @@ -23,7 +74,7 @@ async fn create_peer_connection_with_configuration() -> RtcPeerConnection { RtcPeerConnection::new_with_configuration(&rtc_configuration).unwrap() } -pub async fn offer(username: String) { +pub async fn offer(username: String) -> RtcPeerConnection { let peer_connection = create_peer_connection_with_configuration().await; let peer_connection_create_offer_promise = peer_connection.create_offer(); let peer_connection_session_offer = JsFuture::from(peer_connection_create_offer_promise) @@ -49,7 +100,7 @@ pub async fn offer(username: String) { log!("Error: Send Offer | {}", err_val) } - for _ in 0..10 { + loop { match receive_answer(&username).await { Ok(received_user_and_signal_answer) => { log!("{:#?}", received_user_and_signal_answer); @@ -57,17 +108,18 @@ pub async fn offer(username: String) { RtcSessionDescriptionInit::new(RtcSdpType::Answer); peer_connection_session_answer .set_sdp(received_user_and_signal_answer.signal.get_data().as_str()); + sleep(1000).await; JsFuture::from( peer_connection.set_remote_description(&peer_connection_session_answer), ) .await .unwrap(); - for _ in 0..100 { - log!("{:#?}", peer_connection.connection_state()); - sleep(1000).await; - } - break; + log!("{:#?}", peer_connection.connection_state()); + log!("{:#?}", peer_connection.ice_connection_state()); + + local_ice_candidate_handler(&username, &peer_connection).await; + return peer_connection; } Err(err_val) => log!("Error: Receive Answer | {}", err_val), } @@ -75,10 +127,11 @@ pub async fn offer(username: String) { } } -pub async fn answer(username: String) { - for _ in 0..10 { +pub async fn answer(username: String) -> RtcPeerConnection { + loop { match receive_offer(&username).await { Ok(received_user_and_signal_offer) => { + log!("{:#?}", received_user_and_signal_offer); let peer_connection = create_peer_connection_with_configuration().await; let peer_connection_session_offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer); @@ -107,12 +160,12 @@ pub async fn answer(username: String) { let data = session_answer; send_answer(&username, &data).await.unwrap(); + log!("{:#?}", peer_connection.connection_state()); + log!("{:#?}", peer_connection.ice_connection_state()); - for _ in 0..100 { - log!("{:#?}", peer_connection.connection_state()); - sleep(1000).await; - } - break; + local_ice_candidate_handler(&username, &peer_connection).await; + + return peer_connection; } Err(err_val) => log!("Error: Receive Offer | {}", err_val), } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 8f3a3f0..ba8d803 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -14,7 +14,7 @@ impl UserAndSignal { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct User { pub username: String, } diff --git a/server/src/middleware.rs b/server/src/middleware.rs index 21edb90..a05e586 100644 --- a/server/src/middleware.rs +++ b/server/src/middleware.rs @@ -20,10 +20,9 @@ async fn extract_user_from_authorization_header(headers: &HeaderMap) -> Option Option impl IntoResponse { let headers = request.headers(); - dbg!(headers); + println!("Info: Verify | Headers| {:#?}", headers); if let Some(user) = extract_user_from_authorization_header(headers).await { let user = Arc::new(user); request.extensions_mut().insert(user); diff --git a/server/src/signal.rs b/server/src/signal.rs index 1fff106..959fc26 100644 --- a/server/src/signal.rs +++ b/server/src/signal.rs @@ -57,13 +57,16 @@ async fn create_signal( StatusCode::OK } +#[debug_handler] async fn read_signal( Extension(user_and_expected_signal): Extension>, ) -> impl IntoResponse { let mut target_index = None; let mut json_body = serde_json::json!(""); for (index, user_and_signal) in USERS_AND_SIGNALS.read().unwrap().iter().enumerate() { - if user_and_signal.signal.get_signal_type() == user_and_expected_signal.expected_signal { + if user_and_signal.signal.get_signal_type() == user_and_expected_signal.expected_signal + && user_and_signal.user != user_and_expected_signal.user + { json_body = serde_json::json!(user_and_signal); target_index = Some(index); }