diff --git a/client/Cargo.toml b/client/Cargo.toml index 4f0171d..68d0e8b 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -12,6 +12,9 @@ web-sys = { version = "0.3.77", features = [ "AudioBuffer", "AudioBufferSourceNode", "AudioContext", + "BinaryType", + "ErrorEvent", + "MessageEvent", "HtmlAudioElement", "MediaDevices", "MediaStream", @@ -31,6 +34,7 @@ web-sys = { version = "0.3.77", features = [ "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", + "WebSocket", "Window", ] } protocol = { path = "../protocol" } diff --git a/client/src/gui.rs b/client/src/gui.rs index da82796..c778d7e 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -17,7 +17,8 @@ use web_sys::HtmlAudioElement; use crate::{ media::audio, rtc::{answer, offer}, - signal::start_signalling, + signal::send_auth, + sleep, }; pub fn app() -> impl IntoView { @@ -46,17 +47,12 @@ pub fn app() -> impl IntoView { .fallback(|| button().child("Sad Button")) .build(); let username = signal(String::from("")); - ( - Show(props), - signalling(username), - rtc_offer(username.0), - rtc_answer(username.0), - ) + (Show(props), signalling(username), rtc_offer(), rtc_answer()) } fn signalling(username: (ReadSignal, WriteSignal)) -> impl IntoView { let signalling_trigger = move || { - spawn_local(start_signalling(username.0.get())); + spawn_local(async move { send_auth(&username.0.get()).await.unwrap() }); }; let signalling_server_input = form() .child( @@ -79,10 +75,17 @@ fn signalling(username: (ReadSignal, WriteSignal)) -> impl IntoV (signalling_server_input, signalling_submit_button) } -fn rtc_offer(username: ReadSignal) -> impl IntoView { +fn rtc_offer() -> impl IntoView { let offer_trigger = move || { spawn_local(async move { - let peer_connection = offer(username.get()).await; + let peer_connection = offer(audio().await).await; + let audio_stream = audio().await; + peer_connection.add_stream(&audio_stream); + loop { + log!("{:#?}", peer_connection.ice_connection_state()); + log!("{:#?}", peer_connection.get_remote_streams()); + sleep(1000).await; + } }); }; @@ -95,10 +98,15 @@ fn rtc_offer(username: ReadSignal) -> impl IntoView { offer_button } -fn rtc_answer(username: ReadSignal) -> impl IntoView { +fn rtc_answer() -> impl IntoView { let answer_trigger = move || { spawn_local(async move { - let peer_connection = answer(username.get()).await; + let peer_connection = answer(audio().await).await; + loop { + log!("{:#?}", peer_connection.ice_connection_state()); + log!("{:#?}", peer_connection.get_remote_streams()); + sleep(1000).await; + } }); }; diff --git a/client/src/rtc.rs b/client/src/rtc.rs index 2212735..29a13b1 100644 --- a/client/src/rtc.rs +++ b/client/src/rtc.rs @@ -1,12 +1,8 @@ -use leptos::{ - logging::log, - prelude::{Set, Signal, Update, Write, WriteSignal}, - task::spawn_local, -}; +use leptos::{logging::log, task::spawn_local}; use wasm_bindgen_futures::JsFuture; use web_sys::{ - RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, RtcPeerConnection, - RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit, + MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, + RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit, js_sys::{Array, Reflect}, wasm_bindgen::{JsCast, JsValue, prelude::Closure}, }; @@ -19,12 +15,11 @@ use crate::{ sleep, }; -async fn local_ice_candidate_handler(username: &String, peer_connection: &RtcPeerConnection) { - let local_ice_candidate_handler: Box = - Box::new(move |username, peer_connection_ice_event| { +async fn local_ice_candidate_handler(peer_connection: &RtcPeerConnection) { + let local_ice_candidate_handler: Box = + Box::new(move |peer_connection_ice_event| { spawn_local(async move { { - let username: &String = &username; async move { sleep(1000).await; @@ -33,7 +28,7 @@ async fn local_ice_candidate_handler(username: &String, peer_connection: &RtcPee { let peer_connection_ice_candidate = peer_connection_ice_candidate.as_string().unwrap(); - send_ice_candidate(username, &peer_connection_ice_candidate) + send_ice_candidate(&peer_connection_ice_candidate) .await .unwrap(); } @@ -48,10 +43,10 @@ async fn local_ice_candidate_handler(username: &String, peer_connection: &RtcPee peer_connection.set_onicecandidate(Some(local_ice_candidate_handler.as_ref().unchecked_ref())); } -async fn remote_ice_candidate_handler(username: &String, peer_connection: &RtcPeerConnection) { - if let Ok(user_and_signal) = receive_ice_candidate(username).await { +async fn remote_ice_candidate_handler(peer_connection: &RtcPeerConnection) { + if let Ok(received_signal) = receive_ice_candidate().await { let peer_connection_ice_candidate_init = - RtcIceCandidateInit::new(&user_and_signal.signal.get_data()); + RtcIceCandidateInit::new(&received_signal.get_data()); let peer_connection_ice_candidate = RtcIceCandidate::new(&peer_connection_ice_candidate_init).unwrap(); let peer_connection_add_ice_candidate_promise = peer_connection @@ -74,8 +69,9 @@ async fn create_peer_connection_with_configuration() -> RtcPeerConnection { RtcPeerConnection::new_with_configuration(&rtc_configuration).unwrap() } -pub async fn offer(username: String) -> RtcPeerConnection { +pub async fn offer(audio_stream: MediaStream) -> RtcPeerConnection { let peer_connection = create_peer_connection_with_configuration().await; + peer_connection.add_stream(&audio_stream); let peer_connection_create_offer_promise = peer_connection.create_offer(); let peer_connection_session_offer = JsFuture::from(peer_connection_create_offer_promise) .await @@ -93,21 +89,22 @@ pub async fn offer(username: String) -> RtcPeerConnection { .unwrap() .as_string() .unwrap(); + local_ice_candidate_handler(&peer_connection).await; + remote_ice_candidate_handler(&peer_connection).await; log!("{}", peer_connection_session_offer); let data = peer_connection_session_offer; - if let Err(err_val) = send_offer(&username, &data).await { + if let Err(err_val) = send_offer(&data).await { log!("Error: Send Offer | {}", err_val) } loop { - match receive_answer(&username).await { - Ok(received_user_and_signal_answer) => { - log!("{:#?}", received_user_and_signal_answer); + match receive_answer().await { + Ok(received_signal) => { + log!("{:#?}", received_signal); let peer_connection_session_answer = RtcSessionDescriptionInit::new(RtcSdpType::Answer); - peer_connection_session_answer - .set_sdp(received_user_and_signal_answer.signal.get_data().as_str()); + peer_connection_session_answer.set_sdp(received_signal.get_data().as_str()); sleep(1000).await; JsFuture::from( peer_connection.set_remote_description(&peer_connection_session_answer), @@ -118,7 +115,6 @@ pub async fn offer(username: String) -> RtcPeerConnection { log!("{:#?}", peer_connection.connection_state()); log!("{:#?}", peer_connection.ice_connection_state()); - local_ice_candidate_handler(&username, &peer_connection).await; return peer_connection; } Err(err_val) => log!("Error: Receive Answer | {}", err_val), @@ -127,16 +123,16 @@ pub async fn offer(username: String) -> RtcPeerConnection { } } -pub async fn answer(username: String) -> RtcPeerConnection { +pub async fn answer(audio_stream: MediaStream) -> RtcPeerConnection { loop { - match receive_offer(&username).await { - Ok(received_user_and_signal_offer) => { - log!("{:#?}", received_user_and_signal_offer); + match receive_offer().await { + Ok(received_signal) => { + log!("{:#?}", received_signal); let peer_connection = create_peer_connection_with_configuration().await; + peer_connection.add_stream(&audio_stream); let peer_connection_session_offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer); - peer_connection_session_offer - .set_sdp(received_user_and_signal_offer.signal.get_data().as_str()); + peer_connection_session_offer.set_sdp(received_signal.get_data().as_str()); JsFuture::from( peer_connection.set_remote_description(&peer_connection_session_offer), ) @@ -159,11 +155,12 @@ pub async fn answer(username: String) -> RtcPeerConnection { log!("{}", session_answer); let data = session_answer; - send_answer(&username, &data).await.unwrap(); + send_answer(&data).await.unwrap(); log!("{:#?}", peer_connection.connection_state()); log!("{:#?}", peer_connection.ice_connection_state()); - local_ice_candidate_handler(&username, &peer_connection).await; + local_ice_candidate_handler(&peer_connection).await; + remote_ice_candidate_handler(&peer_connection).await; return peer_connection; } diff --git a/client/src/signal.rs b/client/src/signal.rs index dd53eab..ce2dab3 100644 --- a/client/src/signal.rs +++ b/client/src/signal.rs @@ -1,121 +1,118 @@ -use std::sync::LazyLock; +use std::{collections::VecDeque, sync::Mutex}; use leptos::logging::log; -use protocol::{Signal, SignalType, UserAndSignal}; -use reqwest::{Response, header::HeaderMap}; -use serde_json::{Value, json}; +use protocol::{Error, Signal, SignalType}; +use web_sys::{ + ErrorEvent, MessageEvent, WebSocket, js_sys, + wasm_bindgen::{JsCast, prelude::Closure}, +}; -const SIGNALLING_ADDRESS: &str = "http://192.168.1.3:4546"; -static REQUEST_CLIENT: LazyLock = LazyLock::new(|| reqwest::Client::new()); +static SIGNALLING_ADDRESS: &str = "ws://192.168.1.3:4546"; -async fn create_headers(headers: Vec<(&'static str, String)>) -> HeaderMap { - let mut header_map = HeaderMap::new(); - for (key, val) in headers { - header_map.insert(key, val.parse().unwrap()); +thread_local! { +static WEBSOCKET_RECEIVER: Mutex> = + Mutex::new(VecDeque::new()); + +static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap(); +} + +struct SignallingChannel {} + +impl SignallingChannel { + fn init() -> Result { + let web_socket = WebSocket::new(SIGNALLING_ADDRESS).map_err(|_| Error::WebSocketInit)?; + + let on_message_callback = Closure::::new(move |message_event: MessageEvent| { + if let Ok(received_data) = message_event.data().dyn_into::() { + if let Some(received_data) = received_data.as_string() { + if let Ok(received_signal) = serde_json::from_str::(&received_data) { + WEBSOCKET_RECEIVER.with(|websocket_receiver| { + if let Ok(mut websocket_receiver) = websocket_receiver.lock() { + websocket_receiver.push_back(received_signal); + } + }) + } + } + } + }); + web_socket.set_onmessage(Some(on_message_callback.as_ref().unchecked_ref())); + on_message_callback.forget(); + + let on_error_callback = Closure::::new(move |error_event: ErrorEvent| { + log!("Error: WebSocket | {:#?}", error_event); + }); + web_socket.set_onerror(Some(on_error_callback.as_ref().unchecked_ref())); + on_error_callback.forget(); + + Ok(web_socket) } - header_map -} -async fn post_json(username: &String, json: &Value) -> Result { - let headers = create_headers(vec![]).await; - REQUEST_CLIENT - .post(SIGNALLING_ADDRESS) - .headers(headers) - .bearer_auth(username) - .json(json) - .send() - .await -} + async fn send(signal: &Signal) -> Result<(), Error> { + let json = serde_json::json!(signal); + WEBSOCKET.with(|websocket| { + websocket + .send_with_str(&json.to_string()) + .map_err(|_| Error::WebSocketSend) + }) + } -async fn get_json(username: &String, signal_type: SignalType) -> Result { - let headers = create_headers(vec![("EXPECTED_SIGNAL", signal_type.to_string())]).await; - REQUEST_CLIENT - .get(SIGNALLING_ADDRESS) - .headers(headers) - .bearer_auth(username) - .send() - .await -} + async fn receive(expected_signal_type: &SignalType) -> Result { + WEBSOCKET_RECEIVER.with(|message_queue| { + if let Ok(mut message_queue) = message_queue.lock() { + let received_signal = message_queue + .pop_front() + .ok_or_else(|| Error::WebSocketReceivedNothing)?; -pub async fn start_signalling(username: String) { - log!("Start Signalling"); - log!("{}\n{}", username, SIGNALLING_ADDRESS); - let auth_signal = Signal::new(&SignalType::Auth, &"".to_owned()); - let json = json!(auth_signal); - match post_json(&username, &json).await { - Ok(signal_response) => log!("{:#?}", signal_response), - Err(err_val) => { - log!("Error: Signal Post | {}", err_val); - } + if received_signal.get_signal_type() == *expected_signal_type { + Ok(received_signal) + } else if received_signal.get_signal_type() == SignalType::Auth { + Err(Error::WebSocketAuth) + } else { + message_queue.clear(); + Err(Error::WebSocketReceiveQueueReset) + } + } else { + Err(Error::WebSocketReceiveQueueLocked) + } + }) } } +pub async fn send_auth(data: &String) -> Result<(), Error> { + let offer = Signal::new(&SignalType::Auth, data); -pub async fn send_offer(username: &String, data: &String) -> Result<(), reqwest::Error> { - let rtc_session_offer_signal = Signal::new(&SignalType::Offer, data); - let rtc_session_offer_signal = json!(rtc_session_offer_signal); - post_json(username, &rtc_session_offer_signal) - .await - .map(|_| Ok(()))? + SignallingChannel::send(&offer).await } -pub async fn receive_offer(username: &String) -> Result> { - let result = get_json(username, SignalType::Offer) - .await - .map(async |response| response.json::().await)? - .await?; - if result.signal.get_signal_type() == SignalType::Offer { - Ok(result) - } else { - Err(protocol::Error::UnexpectedSignalType( - result.signal.get_signal_type(), - ))? - } +pub async fn receive_auth() -> Result { + SignallingChannel::receive(&SignalType::Auth).await } -pub async fn send_answer(username: &String, data: &String) -> Result<(), reqwest::Error> { - let rtc_session_answer_signal = Signal::new(&SignalType::Answer, data); - let rtc_session_answer_signal = json!(rtc_session_answer_signal); - post_json(username, &rtc_session_answer_signal) - .await - .map(|_| Ok(()))? +pub async fn send_offer(data: &String) -> Result<(), Error> { + let offer = Signal::new(&SignalType::Offer, data); + + SignallingChannel::send(&offer).await } -pub async fn receive_answer( - username: &String, -) -> Result> { - let result = get_json(username, SignalType::Answer) - .await - .map(async |response| response.json::().await)? - .await?; - if result.signal.get_signal_type() == SignalType::Answer { - Ok(result) - } else { - Err(protocol::Error::UnexpectedSignalType( - result.signal.get_signal_type(), - ))? - } +pub async fn receive_offer() -> Result { + SignallingChannel::receive(&SignalType::Offer).await } -pub async fn send_ice_candidate(username: &String, data: &String) -> Result<(), reqwest::Error> { - let rtc_session_answer_signal = Signal::new(&SignalType::ICECandidate, data); - let rtc_session_answer_signal = json!(rtc_session_answer_signal); - post_json(username, &rtc_session_answer_signal) - .await - .map(|_| Ok(()))? +pub async fn send_answer(data: &String) -> Result<(), Error> { + let offer = Signal::new(&SignalType::Answer, data); + + SignallingChannel::send(&offer).await } -pub async fn receive_ice_candidate( - username: &String, -) -> Result> { - let result = get_json(username, SignalType::Answer) - .await - .map(async |response| response.json::().await)? - .await?; - if result.signal.get_signal_type() == SignalType::ICECandidate { - Ok(result) - } else { - Err(protocol::Error::UnexpectedSignalType( - result.signal.get_signal_type(), - ))? - } +pub async fn receive_answer() -> Result { + SignallingChannel::receive(&SignalType::Answer).await +} + +pub async fn send_ice_candidate(data: &String) -> Result<(), Error> { + let offer = Signal::new(&SignalType::ICECandidate, data); + + SignallingChannel::send(&offer).await +} + +pub async fn receive_ice_candidate() -> Result { + SignallingChannel::receive(&SignalType::ICECandidate).await } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index ba8d803..6524ab5 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -21,8 +21,14 @@ pub struct User { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Error { - UnexpectedSignalType(SignalType), InvalidSignalType(String), + UnexpectedSignalType(SignalType), + WebSocketInit, + WebSocketAuth, + WebSocketSend, + WebSocketReceivedNothing, + WebSocketReceiveQueueReset, + WebSocketReceiveQueueLocked, } impl std::error::Error for Error { @@ -34,12 +40,20 @@ impl std::error::Error for Error { impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Error::UnexpectedSignalType(signal_type) => { - write!(f, "Unexpected Signal Type: {}", signal_type) - } Error::InvalidSignalType(invalid_signal_type) => { write!(f, "Invalid Signal Type: {}", invalid_signal_type) } + + Error::UnexpectedSignalType(signal_type) => { + write!(f, "Unexpected Signal Type: {}", signal_type) + } + + Error::WebSocketInit => write!(f, "WebSocket Initialization"), + Error::WebSocketAuth => write!(f, "WebSocket Auth"), + Error::WebSocketSend => write!(f, "WebSocket Send"), + Error::WebSocketReceivedNothing => write!(f, "WebSocket Received Nothing"), + Error::WebSocketReceiveQueueReset => write!(f, "WebSocket Receive Queue Reset"), + Error::WebSocketReceiveQueueLocked => write!(f, "WebSocket Receive Queue Locked"), } } }