diff --git a/client/src/signal.rs b/client/src/signal.rs index 7bc9743..1533d49 100644 --- a/client/src/signal.rs +++ b/client/src/signal.rs @@ -1,7 +1,4 @@ -use std::{ - collections::VecDeque, - sync::{Mutex, mpsc}, -}; +use std::sync::mpsc; use leptos::logging::log; use protocol::{Error, Signal, SignalType}; @@ -13,11 +10,10 @@ use web_sys::{ static SIGNALLING_ADDRESS: &str = "ws://192.168.1.3:4546"; thread_local! { -static WEBSOCKET_RECEIVER: Mutex> = - Mutex::new(VecDeque::new()); - static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap(); +static OFFER_CHANNEL: (mpsc::Sender, mpsc::Receiver) = mpsc::channel(); +static ANSWER_CHANNEL: (mpsc::Sender, mpsc::Receiver) = mpsc::channel(); static ICE_CANDIDATE_CHANNEL: (mpsc::Sender, mpsc::Receiver) = mpsc::channel(); } @@ -39,11 +35,13 @@ impl SignallingChannel { .send(received_signal) .expect("Never"); }) - } else { - WEBSOCKET_RECEIVER.with(|websocket_receiver| { - if let Ok(mut websocket_receiver) = websocket_receiver.lock() { - websocket_receiver.push_back(received_signal); - } + } else if received_signal.get_signal_type() == SignalType::Offer { + OFFER_CHANNEL.with(|offer_channel| { + offer_channel.0.send(received_signal).expect("Never"); + }) + } else if received_signal.get_signal_type() == SignalType::Answer { + ANSWER_CHANNEL.with(|answer_channel| { + answer_channel.0.send(received_signal).expect("Never"); }) } } @@ -71,24 +69,21 @@ impl SignallingChannel { }) } - fn receive(expected_signal_type: &SignalType) -> Result { - WEBSOCKET_RECEIVER.with(|message_queue| { - if let Ok(mut message_queue) = message_queue.lock() { - let received_signal = message_queue - .pop_front() - .ok_or_else(|| Error::WebSocketReceivedNothing)?; + fn receive_offer() -> Result { + OFFER_CHANNEL.with(|offer_channel| { + offer_channel + .1 + .recv() + .map_err(|err_val| Error::OfferChannelReceive(err_val.to_string())) + }) + } - if received_signal.get_signal_type() == *expected_signal_type { - Ok(received_signal) - } else if received_signal.get_signal_type() == SignalType::Auth { - Err(Error::WebSocketAuth) - } else { - message_queue.clear(); - Err(Error::WebSocketReceiveQueueReset) - } - } else { - Err(Error::WebSocketReceiveQueueLocked) - } + fn receive_answer() -> Result { + ANSWER_CHANNEL.with(|answer_channel| { + answer_channel + .1 + .recv() + .map_err(|err_val| Error::AnswerChannelReceive(err_val.to_string())) }) } @@ -107,10 +102,6 @@ pub fn send_auth(data: &String) -> Result<(), Error> { SignallingChannel::send(&offer) } -pub fn receive_auth() -> Result { - SignallingChannel::receive(&SignalType::Auth) -} - pub fn send_offer(data: &String) -> Result<(), Error> { let offer = Signal::new(&SignalType::Offer, data); @@ -118,7 +109,7 @@ pub fn send_offer(data: &String) -> Result<(), Error> { } pub fn receive_offer() -> Result { - SignallingChannel::receive(&SignalType::Offer) + SignallingChannel::receive_offer() } pub fn send_answer(data: &String) -> Result<(), Error> { @@ -128,7 +119,7 @@ pub fn send_answer(data: &String) -> Result<(), Error> { } pub fn receive_answer() -> Result { - SignallingChannel::receive(&SignalType::Answer) + SignallingChannel::receive_answer() } pub fn send_ice_candidate(data: &String) -> Result<(), Error> { diff --git a/client/src/webrtc.rs b/client/src/webrtc.rs index 07c9b90..34e1b9f 100644 --- a/client/src/webrtc.rs +++ b/client/src/webrtc.rs @@ -3,12 +3,12 @@ use protocol::Error; use wasm_bindgen_futures::{JsFuture, spawn_local}; use web_sys::{ MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, - RtcPeerConnection, RtcPeerConnectionIceEvent, - js_sys::Array, + RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit, + js_sys::{Array, Reflect}, wasm_bindgen::{JsCast, JsValue, prelude::Closure}, }; -use crate::signal::{receive_ice_candidate, send_ice_candidate}; +use crate::signal::{receive_answer, receive_ice_candidate, send_ice_candidate, send_offer}; pub struct WebRTC { audio_stream: Option, @@ -71,7 +71,7 @@ impl WebRTC { self.add_streams(); spawn_local(async { - if let Ok(received_ice_candidate) = receive_ice_candidate() { + while let Ok(received_ice_candidate) = receive_ice_candidate() { let received_ice_candidate = RtcIceCandidateInit::new(&received_ice_candidate.get_data()); if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) { @@ -106,4 +106,58 @@ impl WebRTC { self.peer_connection.add_stream(screen_stream); } } + + async fn offer(&mut self) -> Result<(), Error> { + let offer_promise = self.peer_connection.create_offer(); + match JsFuture::from(offer_promise) + .await + .map_err(|_| Error::WebRTCOffer) + { + Ok(offer) => { + let offer_session_description_protocol = + Reflect::get(&offer, &JsValue::from_str("sdp")) + .map_err(|_| Error::WebRTCSessionDescriptionProtocol)?; + match offer_session_description_protocol + .as_string() + .ok_or_else(|| Error::WebRTCOffer) + { + Ok(offer_session_description_protocol) => { + let offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer); + offer.set_sdp(&offer_session_description_protocol); + let set_local_description_promise = + self.peer_connection.set_local_description(&offer); + + JsFuture::from(set_local_description_promise) + .await + .map_err(|_| Error::WebRTCSetLocalDescription)?; + + send_offer(&offer_session_description_protocol)?; + + if let Ok(received_answer) = receive_answer() { + let answer = RtcSessionDescriptionInit::new(RtcSdpType::Answer); + answer.set_sdp(&received_answer.get_data()); + + let set_remote_description_promise = + self.peer_connection.set_remote_description(&answer); + JsFuture::from(set_remote_description_promise) + .await + .map_err(|_| Error::WebRTCSetRemoteDescription)?; + + return Ok(()); + } + } + Err(err_val) => { + log!( + "Error: WebRTC Offer | Session Description Protocol String Transformation | {}", + err_val + ); + } + } + } + Err(err_val) => { + log!("Error: WebRTC Offer | Promise | {}", err_val); + } + } + return Err(Error::WebRTCOffer); + } } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index b4dd6ad..c98554c 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -30,6 +30,13 @@ pub enum Error { WebSocketReceiveQueueReset, WebSocketReceiveQueueLocked, WebRTCInitialization, + WebRTCOffer, + WebRTCAnswer, + WebRTCSessionDescriptionProtocol, + WebRTCSetLocalDescription, + WebRTCSetRemoteDescription, + OfferChannelReceive(String), + AnswerChannelReceive(String), ICECandidateChannelReceive(String), ICECandidateAdd, } @@ -58,6 +65,19 @@ impl Display for Error { Error::WebSocketReceiveQueueReset => write!(f, "WebSocket Receive Queue Reset"), Error::WebSocketReceiveQueueLocked => write!(f, "WebSocket Receive Queue Locked"), Error::WebRTCInitialization => write!(f, "WebRTC Initialization"), + Error::WebRTCOffer => write!(f, "WebRTC Offer"), + Error::WebRTCAnswer => write!(f, "WebRTC Answer"), + Error::WebRTCSessionDescriptionProtocol => { + write!(f, "WebRTC Session Description Protocol") + } + Error::WebRTCSetLocalDescription => write!(f, "WebRTC Set Local Description"), + Error::WebRTCSetRemoteDescription => write!(f, "WebRTC Set Remote Description"), + Error::OfferChannelReceive(recv_error) => { + write!(f, "Offer Channel Receive | {}", recv_error) + } + Error::AnswerChannelReceive(recv_error) => { + write!(f, "Answer Channel Receive | {}", recv_error) + } Error::ICECandidateChannelReceive(recv_error) => { write!(f, "ICE Candidate Channel Receive | {}", recv_error) }