From 53a73285b9fbdd4e5d1788ae5f4fb4dac8c03f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=BCm=C3=BC=C5=9F?= Date: Wed, 30 Apr 2025 18:22:37 +0300 Subject: [PATCH] fix: :zap: messages go to right channels fix: :zap: ice candidate transfer fix: :zap: deadlock in server --- client/src/gui.rs | 27 +++++++++++++++++----- client/src/signal.rs | 14 +++++------ client/src/webrtc.rs | 55 +++++++++++++++++++++++++++++++++----------- protocol/src/lib.rs | 4 ++-- server/src/signal.rs | 53 ++++++++++++++++++++++++++++++++++++++---- 5 files changed, 121 insertions(+), 32 deletions(-) diff --git a/client/src/gui.rs b/client/src/gui.rs index 56fa87f..6842e65 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -7,7 +7,12 @@ use leptos::{ }; use wasm_bindgen_futures::spawn_local; -use crate::{media::audio, signal::wait_until_communication_is_ready, webrtc::WebRTC}; +use crate::{ + media::audio, + signal::{send_auth, wait_until_communication_is_ready}, + sleep, + webrtc::WebRTC, +}; pub fn app() -> impl IntoView { let audio_stream = LocalResource::new(|| audio()); @@ -23,15 +28,23 @@ pub fn app() -> impl IntoView { let audio_stream = audio_stream.as_deref().unwrap().clone(); let webrtc = WebRTC::new(Some(audio_stream), None, None).unwrap(); + let webrtc_state = webrtc.clone(); + spawn_local(async move { + loop { + log!("{:#?}", webrtc_state.get_status()); + sleep(1000).await; + } + }); let webrtc_offer = webrtc.clone(); let offer_button = button() .on(leptos::ev::click, move |_| { + send_auth(&String::from("Offer")).unwrap(); let webrtc_offer = webrtc_offer.clone(); - log!("{:#?}", webrtc_offer.get_status()); spawn_local(async move { - let offer_result = webrtc_offer.offer().await; - log!("Offer Result Is = {:#?}", offer_result); + if let Err(err_val) = webrtc_offer.offer().await { + log!("Error: WebRTC Offer | {}", err_val); + } }); }) .child("Offer"); @@ -39,10 +52,12 @@ pub fn app() -> impl IntoView { let webrtc_answer = webrtc.clone(); let answer_button = button() .on(leptos::ev::click, move |_| { + send_auth(&String::from("Answer")).unwrap(); let webrtc_answer = webrtc_answer.clone(); spawn_local(async move { - let answer_result = webrtc_answer.answer().await; - log!("Answer Result Is = {:#?}", answer_result); + if let Err(err_val) = webrtc_answer.answer().await { + log!("Error: WebRTC Answer | {}", err_val); + } }); }) .child("Answer"); diff --git a/client/src/signal.rs b/client/src/signal.rs index c7eb400..f6056bf 100644 --- a/client/src/signal.rs +++ b/client/src/signal.rs @@ -49,7 +49,7 @@ impl SignallingChannel { }); } else if received_signal_type == SignalType::Offer { let offer_channel_sender = - ICE_CANDIDATE_CHANNEL.with(|offer_channel| offer_channel.0.clone()); + OFFER_CHANNEL.with(|offer_channel| offer_channel.0.clone()); spawn_local(async move { offer_channel_sender .send(received_signal) @@ -57,8 +57,8 @@ impl SignallingChannel { .expect("Never") }); } else if received_signal_type == SignalType::Answer { - let answer_channel_sender = ICE_CANDIDATE_CHANNEL - .with(|answer_channel| answer_channel.0.clone()); + let answer_channel_sender = + ANSWER_CHANNEL.with(|answer_channel| answer_channel.0.clone()); spawn_local(async move { answer_channel_sender .send(received_signal) @@ -139,9 +139,9 @@ pub async fn receive_offer() -> Result { } pub fn send_answer(data: &String) -> Result<(), Error> { - let offer = Signal::new(&SignalType::Answer, data); + let answer = Signal::new(&SignalType::Answer, data); - SignallingChannel::send(&offer) + SignallingChannel::send(&answer) } pub async fn receive_answer() -> Result { @@ -149,9 +149,9 @@ pub async fn receive_answer() -> Result { } pub fn send_ice_candidate(data: &String) -> Result<(), Error> { - let offer = Signal::new(&SignalType::ICECandidate, data); + let ice_candidate = Signal::new(&SignalType::ICECandidate, data); - SignallingChannel::send(&offer) + SignallingChannel::send(&ice_candidate) } pub async fn receive_ice_candidate() -> Result { diff --git a/client/src/webrtc.rs b/client/src/webrtc.rs index 12db646..9c73e31 100644 --- a/client/src/webrtc.rs +++ b/client/src/webrtc.rs @@ -7,7 +7,7 @@ use web_sys::{ MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcPeerConnectionState, RtcSdpType, RtcSessionDescriptionInit, - js_sys::{Array, Reflect}, + js_sys::{Array, JSON, Reflect}, wasm_bindgen::{JsCast, JsValue, prelude::Closure}, }; @@ -44,7 +44,9 @@ impl WebRTC { let on_ice_candidate = Closure::::new( move |peer_connection_ice_event: RtcPeerConnectionIceEvent| { if let Some(candidate) = peer_connection_ice_event.candidate() { - if let Err(err_val) = send_ice_candidate(&candidate.candidate()) { + let candidate = JSON::stringify(&candidate).expect("Never"); + let candidate = String::from(candidate); + if let Err(err_val) = send_ice_candidate(&candidate) { log!("Error: Send ICE Candidate | {}", err_val); } } @@ -71,17 +73,44 @@ impl WebRTC { async fn ice_candidate_receiver(&self) { while let Ok(received_ice_candidate) = receive_ice_candidate().await { - let received_ice_candidate = - RtcIceCandidateInit::new(&received_ice_candidate.get_data()); - if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) { - let add_received_ice_candidate_promise = self - .peer_connection - .add_ice_candidate_with_opt_rtc_ice_candidate(Some(&received_ice_candidate)); - if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise) - .await - .map_err(|_| Error::ICECandidateAdd) - { - log!("Error: Add ICE Candidate | {}", err_val); + let received_ice_candidate = JSON::parse(&received_ice_candidate.get_data()).unwrap(); + + let candidate = Reflect::get(&received_ice_candidate, &JsValue::from_str("candidate")) + .unwrap() + .as_string() + .unwrap(); + let sdp_mid = Reflect::get(&received_ice_candidate, &JsValue::from_str("sdpMid")) + .unwrap() + .as_string() + .unwrap(); + let sdp_m_line_index = + Reflect::get(&received_ice_candidate, &JsValue::from_str("sdpMLineIndex")) + .unwrap() + .as_f64() + .unwrap() as u16; + + let received_ice_candidate = RtcIceCandidateInit::new(&candidate); + received_ice_candidate.set_sdp_mid(Some(&sdp_mid)); + received_ice_candidate.set_sdp_m_line_index(Some(sdp_m_line_index)); + + match RtcIceCandidate::new(&received_ice_candidate) + .map_err(|err_val| Error::ICECandidateAdd(err_val.as_string().unwrap())) + { + Ok(received_ice_candidate) => { + let add_received_ice_candidate_promise = self + .peer_connection + .add_ice_candidate_with_opt_rtc_ice_candidate(Some( + &received_ice_candidate, + )); + if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise) + .await + .map_err(|err_val| Error::ICECandidateAdd(err_val.as_string().unwrap())) + { + log!("Error: Add ICE Candidate | {}", err_val); + } + } + Err(err_val) => { + log!("Error: New Ice Candidate | {}", err_val); } } } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index db3c0b9..6c64d8b 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -27,7 +27,7 @@ pub enum Error { OfferChannelReceive(String), AnswerChannelReceive(String), ICECandidateChannelReceive(String), - ICECandidateAdd, + ICECandidateAdd(String), } impl std::error::Error for Error { @@ -70,7 +70,7 @@ impl Display for Error { Error::ICECandidateChannelReceive(recv_error) => { write!(f, "ICE Candidate Channel Receive | {}", recv_error) } - Error::ICECandidateAdd => write!(f, "ICE Candidate Add"), + Error::ICECandidateAdd(err_val) => write!(f, "ICE Candidate Add | {}", err_val), } } } diff --git a/server/src/signal.rs b/server/src/signal.rs index ee0b0dc..42394f3 100644 --- a/server/src/signal.rs +++ b/server/src/signal.rs @@ -1,14 +1,15 @@ -use std::{collections::VecDeque, sync::LazyLock}; +use std::{collections::VecDeque, sync::LazyLock, time::Duration}; use axum::{Router, http::StatusCode, response::IntoResponse, routing::get}; use fastwebsockets::{ - FragmentCollectorRead, OpCode, WebSocketError, + Frame, OpCode, WebSocketError, upgrade::{IncomingUpgrade, UpgradeFut}, }; use protocol::{Signal, SignalType}; use tokio::{ net::TcpListener, sync::{RwLock, broadcast}, + time::sleep, }; use tower_http::cors::CorsLayer; @@ -17,6 +18,7 @@ const SERVER_ADDRESS: &str = "192.168.1.3:4546"; static USER_MESSAGES: LazyLock>> = LazyLock::new(|| VecDeque::new().into()); +#[derive(Debug)] struct UserMessages { user: String, message_receiver: broadcast::Receiver, @@ -50,7 +52,7 @@ async fn websocket_handler(websocket: UpgradeFut) { websocket.set_writev(false); websocket.set_auto_close(true); - let (mut websocket_receiver, websocker_sender) = websocket.split(tokio::io::split); + let (mut websocket_receiver, mut websocker_sender) = websocket.split(tokio::io::split); let mut user = String::default(); let (message_sender, message_receiver) = broadcast::channel(100); @@ -72,7 +74,50 @@ async fn websocket_handler(websocket: UpgradeFut) { } else { return; } - println!("{:#?}", signal); + } + } + + tokio::spawn(async move { + while USER_MESSAGES.read().await.len() < 2 { + sleep(Duration::from_secs(1)).await; + } + loop { + let mut user_messages = USER_MESSAGES.write().await; + for user_message in user_messages.iter_mut() { + if user_message.user != user && user_message.message_receiver.len() > 0 { + while let Ok(message) = user_message.message_receiver.recv().await { + if let Err(err_val) = websocker_sender + .write_frame(Frame::text(fastwebsockets::Payload::Owned( + serde_json::to_vec(&message).unwrap(), + ))) + .await + { + eprintln!("Error: WebSocket Send | {}", err_val); + } + if user_message.message_receiver.len() < 1 { + break; + } + } + } + } + } + }); + + while let Ok(received_frame) = websocket_receiver + .read_frame::<_, WebSocketError>(&mut move |_| async { unreachable!() }) + .await + { + if let OpCode::Text = received_frame.opcode { + let signal = + serde_json::from_slice::(&received_frame.payload.to_vec()).unwrap(); + + if signal.get_signal_type() != SignalType::Auth { + if let Err(err_val) = message_sender.send(signal) { + eprintln!("Error: WebSocket Channel Send | {}", err_val); + } + } else { + return; + } } } }