diff --git a/client/src/gui.rs b/client/src/gui.rs index c778d7e..cae524e 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -1,25 +1,15 @@ use leptos::{ - IntoView, - attr::Value, - ev, - html::{ElementChild, button, form, input}, + IntoView, ev, + html::{ElementChild, button}, logging::log, - prelude::{ - BindAttribute, Get, OnAttribute, Read, ReadSignal, Show, ShowProps, ToChildren, - WriteSignal, signal, - }, + prelude::{OnAttribute, Read, Show, ShowProps, ToChildren}, server::LocalResource, task::spawn_local, }; use wasm_bindgen_futures::JsFuture; use web_sys::HtmlAudioElement; -use crate::{ - media::audio, - rtc::{answer, offer}, - signal::send_auth, - sleep, -}; +use crate::media::audio; pub fn app() -> impl IntoView { let audio_stream = LocalResource::new(|| audio()); @@ -46,75 +36,5 @@ pub fn app() -> impl IntoView { })) .fallback(|| button().child("Sad Button")) .build(); - let username = signal(String::from("")); - (Show(props), signalling(username), rtc_offer(), rtc_answer()) -} - -fn signalling(username: (ReadSignal, WriteSignal)) -> impl IntoView { - let signalling_trigger = move || { - spawn_local(async move { send_auth(&username.0.get()).await.unwrap() }); - }; - let signalling_server_input = form() - .child( - input() - .bind(Value, username) - .placeholder("Some Username") - .r#type("text"), - ) - .on(ev::submit, move |event| { - event.prevent_default(); - signalling_trigger(); - }); - - let signalling_submit_button = button() - .on(ev::click, move |event| { - event.prevent_default(); - signalling_trigger(); - }) - .child("Signal"); - (signalling_server_input, signalling_submit_button) -} - -fn rtc_offer() -> impl IntoView { - let offer_trigger = move || { - spawn_local(async move { - let peer_connection = offer(audio().await).await; - let audio_stream = audio().await; - peer_connection.add_stream(&audio_stream); - loop { - log!("{:#?}", peer_connection.ice_connection_state()); - log!("{:#?}", peer_connection.get_remote_streams()); - sleep(1000).await; - } - }); - }; - - let offer_button = button() - .on(ev::click, move |event| { - event.prevent_default(); - offer_trigger(); - }) - .child("RTC Offer"); - offer_button -} - -fn rtc_answer() -> impl IntoView { - let answer_trigger = move || { - spawn_local(async move { - let peer_connection = answer(audio().await).await; - loop { - log!("{:#?}", peer_connection.ice_connection_state()); - log!("{:#?}", peer_connection.get_remote_streams()); - sleep(1000).await; - } - }); - }; - - let answer_button = button() - .on(ev::click, move |event| { - event.prevent_default(); - answer_trigger(); - }) - .child("RTC Answer"); - answer_button + Show(props) } diff --git a/client/src/lib.rs b/client/src/lib.rs index ad00192..5bf7659 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,6 +1,6 @@ pub mod gui; pub mod media; -pub mod rtc; +pub mod webrtc; pub mod signal; pub async fn sleep(timeout: u16) { diff --git a/client/src/rtc.rs b/client/src/rtc.rs deleted file mode 100644 index 29a13b1..0000000 --- a/client/src/rtc.rs +++ /dev/null @@ -1,171 +0,0 @@ -use leptos::{logging::log, task::spawn_local}; -use wasm_bindgen_futures::JsFuture; -use web_sys::{ - MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, - RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit, - js_sys::{Array, Reflect}, - wasm_bindgen::{JsCast, JsValue, prelude::Closure}, -}; - -use crate::{ - signal::{ - receive_answer, receive_ice_candidate, receive_offer, send_answer, send_ice_candidate, - send_offer, - }, - sleep, -}; - -async fn local_ice_candidate_handler(peer_connection: &RtcPeerConnection) { - let local_ice_candidate_handler: Box = - Box::new(move |peer_connection_ice_event| { - spawn_local(async move { - { - async move { - sleep(1000).await; - - if let Some(peer_connection_ice_candidate) = - peer_connection_ice_event.candidate() - { - let peer_connection_ice_candidate = - peer_connection_ice_candidate.as_string().unwrap(); - send_ice_candidate(&peer_connection_ice_candidate) - .await - .unwrap(); - } - } - } - .await - }); - }); - - let local_ice_candidate_handler = Closure::wrap(local_ice_candidate_handler); - - peer_connection.set_onicecandidate(Some(local_ice_candidate_handler.as_ref().unchecked_ref())); -} - -async fn remote_ice_candidate_handler(peer_connection: &RtcPeerConnection) { - if let Ok(received_signal) = receive_ice_candidate().await { - let peer_connection_ice_candidate_init = - RtcIceCandidateInit::new(&received_signal.get_data()); - let peer_connection_ice_candidate = - RtcIceCandidate::new(&peer_connection_ice_candidate_init).unwrap(); - let peer_connection_add_ice_candidate_promise = peer_connection - .add_ice_candidate_with_opt_rtc_ice_candidate(Some(&peer_connection_ice_candidate)); - JsFuture::from(peer_connection_add_ice_candidate_promise) - .await - .unwrap(); - } -} - -async fn create_peer_connection_with_configuration() -> RtcPeerConnection { - let ice_server_addresses = vec![JsValue::from("stun:stun.l.google.com:19302")] - .into_iter() - .collect::(); - let ice_server = RtcIceServer::new(); - ice_server.set_urls(&JsValue::from(ice_server_addresses)); - let ice_servers = vec![ice_server].into_iter().collect::(); - let rtc_configuration = RtcConfiguration::new(); - rtc_configuration.set_ice_servers(&ice_servers); - RtcPeerConnection::new_with_configuration(&rtc_configuration).unwrap() -} - -pub async fn offer(audio_stream: MediaStream) -> RtcPeerConnection { - let peer_connection = create_peer_connection_with_configuration().await; - peer_connection.add_stream(&audio_stream); - let peer_connection_create_offer_promise = peer_connection.create_offer(); - let peer_connection_session_offer = JsFuture::from(peer_connection_create_offer_promise) - .await - .unwrap(); - log!("{:#?}", peer_connection_session_offer); - let peer_connection_session_offer = peer_connection_session_offer - .as_ref() - .unchecked_ref::(); - log!("{:#?}", peer_connection_session_offer); - JsFuture::from(peer_connection.set_local_description(peer_connection_session_offer)) - .await - .unwrap(); - let peer_connection_session_offer = - Reflect::get(&peer_connection_session_offer, &JsValue::from_str("sdp")) - .unwrap() - .as_string() - .unwrap(); - local_ice_candidate_handler(&peer_connection).await; - remote_ice_candidate_handler(&peer_connection).await; - log!("{}", peer_connection_session_offer); - let data = peer_connection_session_offer; - - if let Err(err_val) = send_offer(&data).await { - log!("Error: Send Offer | {}", err_val) - } - - loop { - match receive_answer().await { - Ok(received_signal) => { - log!("{:#?}", received_signal); - let peer_connection_session_answer = - RtcSessionDescriptionInit::new(RtcSdpType::Answer); - peer_connection_session_answer.set_sdp(received_signal.get_data().as_str()); - sleep(1000).await; - JsFuture::from( - peer_connection.set_remote_description(&peer_connection_session_answer), - ) - .await - .unwrap(); - - log!("{:#?}", peer_connection.connection_state()); - log!("{:#?}", peer_connection.ice_connection_state()); - - return peer_connection; - } - Err(err_val) => log!("Error: Receive Answer | {}", err_val), - } - sleep(1000).await; - } -} - -pub async fn answer(audio_stream: MediaStream) -> RtcPeerConnection { - loop { - match receive_offer().await { - Ok(received_signal) => { - log!("{:#?}", received_signal); - let peer_connection = create_peer_connection_with_configuration().await; - peer_connection.add_stream(&audio_stream); - let peer_connection_session_offer = - RtcSessionDescriptionInit::new(RtcSdpType::Offer); - peer_connection_session_offer.set_sdp(received_signal.get_data().as_str()); - JsFuture::from( - peer_connection.set_remote_description(&peer_connection_session_offer), - ) - .await - .unwrap(); - let peer_connection_create_answer_promise = peer_connection.create_answer(); - let peer_connection_answer = JsFuture::from(peer_connection_create_answer_promise) - .await - .unwrap(); - let peer_connection_answer = peer_connection_answer - .as_ref() - .unchecked_ref::(); - JsFuture::from(peer_connection.set_local_description(peer_connection_answer)) - .await - .unwrap(); - let session_answer = Reflect::get(&peer_connection_answer, &JsValue::from("sdp")) - .unwrap() - .as_string() - .unwrap(); - log!("{}", session_answer); - let data = session_answer; - - send_answer(&data).await.unwrap(); - log!("{:#?}", peer_connection.connection_state()); - log!("{:#?}", peer_connection.ice_connection_state()); - - local_ice_candidate_handler(&peer_connection).await; - remote_ice_candidate_handler(&peer_connection).await; - - return peer_connection; - } - Err(err_val) => log!("Error: Receive Offer | {}", err_val), - } - sleep(1000).await; - } -} diff --git a/client/src/signal.rs b/client/src/signal.rs index ce2dab3..7bc9743 100644 --- a/client/src/signal.rs +++ b/client/src/signal.rs @@ -1,4 +1,7 @@ -use std::{collections::VecDeque, sync::Mutex}; +use std::{ + collections::VecDeque, + sync::{Mutex, mpsc}, +}; use leptos::logging::log; use protocol::{Error, Signal, SignalType}; @@ -14,23 +17,35 @@ static WEBSOCKET_RECEIVER: Mutex> = Mutex::new(VecDeque::new()); static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap(); + +static ICE_CANDIDATE_CHANNEL: (mpsc::Sender, mpsc::Receiver) = mpsc::channel(); } struct SignallingChannel {} impl SignallingChannel { fn init() -> Result { - let web_socket = WebSocket::new(SIGNALLING_ADDRESS).map_err(|_| Error::WebSocketInit)?; + let web_socket = + WebSocket::new(SIGNALLING_ADDRESS).map_err(|_| Error::WebSocketInitialization)?; let on_message_callback = Closure::::new(move |message_event: MessageEvent| { if let Ok(received_data) = message_event.data().dyn_into::() { if let Some(received_data) = received_data.as_string() { if let Ok(received_signal) = serde_json::from_str::(&received_data) { - WEBSOCKET_RECEIVER.with(|websocket_receiver| { - if let Ok(mut websocket_receiver) = websocket_receiver.lock() { - websocket_receiver.push_back(received_signal); - } - }) + if received_signal.get_signal_type() == SignalType::ICECandidate { + ICE_CANDIDATE_CHANNEL.with(|ice_candidate_channel| { + ice_candidate_channel + .0 + .send(received_signal) + .expect("Never"); + }) + } else { + WEBSOCKET_RECEIVER.with(|websocket_receiver| { + if let Ok(mut websocket_receiver) = websocket_receiver.lock() { + websocket_receiver.push_back(received_signal); + } + }) + } } } } @@ -47,7 +62,7 @@ impl SignallingChannel { Ok(web_socket) } - async fn send(signal: &Signal) -> Result<(), Error> { + fn send(signal: &Signal) -> Result<(), Error> { let json = serde_json::json!(signal); WEBSOCKET.with(|websocket| { websocket @@ -56,7 +71,7 @@ impl SignallingChannel { }) } - async fn receive(expected_signal_type: &SignalType) -> Result { + fn receive(expected_signal_type: &SignalType) -> Result { WEBSOCKET_RECEIVER.with(|message_queue| { if let Ok(mut message_queue) = message_queue.lock() { let received_signal = message_queue @@ -76,43 +91,52 @@ impl SignallingChannel { } }) } + + fn receive_ice_candidate() -> Result { + ICE_CANDIDATE_CHANNEL.with(|ice_candidate_channel| { + ice_candidate_channel + .1 + .recv() + .map_err(|err_val| Error::ICECandidateChannelReceive(err_val.to_string())) + }) + } } -pub async fn send_auth(data: &String) -> Result<(), Error> { +pub fn send_auth(data: &String) -> Result<(), Error> { let offer = Signal::new(&SignalType::Auth, data); - SignallingChannel::send(&offer).await + SignallingChannel::send(&offer) } -pub async fn receive_auth() -> Result { - SignallingChannel::receive(&SignalType::Auth).await +pub fn receive_auth() -> Result { + SignallingChannel::receive(&SignalType::Auth) } -pub async fn send_offer(data: &String) -> Result<(), Error> { +pub fn send_offer(data: &String) -> Result<(), Error> { let offer = Signal::new(&SignalType::Offer, data); - SignallingChannel::send(&offer).await + SignallingChannel::send(&offer) } -pub async fn receive_offer() -> Result { - SignallingChannel::receive(&SignalType::Offer).await +pub fn receive_offer() -> Result { + SignallingChannel::receive(&SignalType::Offer) } -pub async fn send_answer(data: &String) -> Result<(), Error> { +pub fn send_answer(data: &String) -> Result<(), Error> { let offer = Signal::new(&SignalType::Answer, data); - SignallingChannel::send(&offer).await + SignallingChannel::send(&offer) } -pub async fn receive_answer() -> Result { - SignallingChannel::receive(&SignalType::Answer).await +pub fn receive_answer() -> Result { + SignallingChannel::receive(&SignalType::Answer) } -pub async fn send_ice_candidate(data: &String) -> Result<(), Error> { +pub fn send_ice_candidate(data: &String) -> Result<(), Error> { let offer = Signal::new(&SignalType::ICECandidate, data); - SignallingChannel::send(&offer).await + SignallingChannel::send(&offer) } -pub async fn receive_ice_candidate() -> Result { - SignallingChannel::receive(&SignalType::ICECandidate).await +pub fn receive_ice_candidate() -> Result { + SignallingChannel::receive_ice_candidate() } diff --git a/client/src/webrtc.rs b/client/src/webrtc.rs new file mode 100644 index 0000000..07c9b90 --- /dev/null +++ b/client/src/webrtc.rs @@ -0,0 +1,109 @@ +use leptos::logging::log; +use protocol::Error; +use wasm_bindgen_futures::{JsFuture, spawn_local}; +use web_sys::{ + MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, + RtcPeerConnection, RtcPeerConnectionIceEvent, + js_sys::Array, + wasm_bindgen::{JsCast, JsValue, prelude::Closure}, +}; + +use crate::signal::{receive_ice_candidate, send_ice_candidate}; + +pub struct WebRTC { + audio_stream: Option, + video_stream: Option, + screen_stream: Option, + peer_connection: RtcPeerConnection, +} + +thread_local! { + static WEBRTC:WebRTC = WebRTC::new().unwrap(); +} + +impl WebRTC { + fn new() -> Result { + let ice_server_addresses = vec![JsValue::from("stun:stun.l.google.com:19302")] + .into_iter() + .collect::(); + let ice_server = RtcIceServer::new(); + ice_server.set_urls(&ice_server_addresses); + let ice_servers = vec![ice_server].into_iter().collect::(); + let rtc_configuration = RtcConfiguration::new(); + rtc_configuration.set_ice_servers(&ice_servers); + + let peer_connection = RtcPeerConnection::new_with_configuration(&rtc_configuration) + .map_err(|_| Error::WebRTCInitialization)?; + + let on_ice_candidate = Closure::::new( + move |peer_connection_ice_event: RtcPeerConnectionIceEvent| { + if let Some(candidate) = peer_connection_ice_event.candidate() { + if let Err(err_val) = send_ice_candidate(&candidate.as_string().unwrap()) { + log!("Error: Send ICE Candidate | {}", err_val); + } + } + }, + ); + + peer_connection.set_onicecandidate(Some(on_ice_candidate.as_ref().unchecked_ref())); + on_ice_candidate.forget(); + + let webrtc = Self { + audio_stream: None, + video_stream: None, + screen_stream: None, + peer_connection, + }; + + Ok(webrtc) + } + + async fn init( + &mut self, + audio_stream: Option, + video_stream: Option, + screen_stream: Option, + ) { + self.audio_stream = audio_stream; + self.video_stream = video_stream; + self.screen_stream = screen_stream; + + self.add_streams(); + + spawn_local(async { + if let Ok(received_ice_candidate) = receive_ice_candidate() { + let received_ice_candidate = + RtcIceCandidateInit::new(&received_ice_candidate.get_data()); + if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) { + WEBRTC.with(|webrtc| { + let add_received_ice_candidate_promise = webrtc + .peer_connection + .add_ice_candidate_with_opt_rtc_ice_candidate(Some( + &received_ice_candidate, + )); + spawn_local(async move { + if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise) + .await + .map_err(|_| Error::ICECandidateAdd) + { + log!("Error: Add ICE Candidate | {}", err_val); + } + }); + }); + } + } + }); + } + + fn add_streams(&mut self) { + if let Some(audio_stream) = &self.audio_stream { + self.peer_connection.add_stream(audio_stream); + } + if let Some(video_stream) = &self.video_stream { + self.peer_connection.add_stream(video_stream); + } + if let Some(screen_stream) = &self.screen_stream { + self.peer_connection.add_stream(screen_stream); + } + } +} diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 6524ab5..b4dd6ad 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -23,12 +23,15 @@ pub struct User { pub enum Error { InvalidSignalType(String), UnexpectedSignalType(SignalType), - WebSocketInit, + WebSocketInitialization, WebSocketAuth, WebSocketSend, WebSocketReceivedNothing, WebSocketReceiveQueueReset, WebSocketReceiveQueueLocked, + WebRTCInitialization, + ICECandidateChannelReceive(String), + ICECandidateAdd, } impl std::error::Error for Error { @@ -48,12 +51,17 @@ impl Display for Error { write!(f, "Unexpected Signal Type: {}", signal_type) } - Error::WebSocketInit => write!(f, "WebSocket Initialization"), + Error::WebSocketInitialization => write!(f, "WebSocket Initialization"), Error::WebSocketAuth => write!(f, "WebSocket Auth"), Error::WebSocketSend => write!(f, "WebSocket Send"), Error::WebSocketReceivedNothing => write!(f, "WebSocket Received Nothing"), Error::WebSocketReceiveQueueReset => write!(f, "WebSocket Receive Queue Reset"), Error::WebSocketReceiveQueueLocked => write!(f, "WebSocket Receive Queue Locked"), + Error::WebRTCInitialization => write!(f, "WebRTC Initialization"), + Error::ICECandidateChannelReceive(recv_error) => { + write!(f, "ICE Candidate Channel Receive | {}", recv_error) + } + Error::ICECandidateAdd => write!(f, "ICE Candidate Add"), } } }