feat: webrtc offer

This commit is contained in:
Ahmet Kaan Gümüş 2025-04-24 23:05:51 +03:00
parent 7bf876b87b
commit 5a84d8fda7
3 changed files with 104 additions and 39 deletions

View file

@ -1,7 +1,4 @@
use std::{ use std::sync::mpsc;
collections::VecDeque,
sync::{Mutex, mpsc},
};
use leptos::logging::log; use leptos::logging::log;
use protocol::{Error, Signal, SignalType}; use protocol::{Error, Signal, SignalType};
@ -13,11 +10,10 @@ use web_sys::{
static SIGNALLING_ADDRESS: &str = "ws://192.168.1.3:4546"; static SIGNALLING_ADDRESS: &str = "ws://192.168.1.3:4546";
thread_local! { thread_local! {
static WEBSOCKET_RECEIVER: Mutex<VecDeque<Signal>> =
Mutex::new(VecDeque::new());
static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap(); static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap();
static OFFER_CHANNEL: (mpsc::Sender<Signal>, mpsc::Receiver<Signal>) = mpsc::channel();
static ANSWER_CHANNEL: (mpsc::Sender<Signal>, mpsc::Receiver<Signal>) = mpsc::channel();
static ICE_CANDIDATE_CHANNEL: (mpsc::Sender<Signal>, mpsc::Receiver<Signal>) = mpsc::channel(); static ICE_CANDIDATE_CHANNEL: (mpsc::Sender<Signal>, mpsc::Receiver<Signal>) = mpsc::channel();
} }
@ -39,11 +35,13 @@ impl SignallingChannel {
.send(received_signal) .send(received_signal)
.expect("Never"); .expect("Never");
}) })
} else { } else if received_signal.get_signal_type() == SignalType::Offer {
WEBSOCKET_RECEIVER.with(|websocket_receiver| { OFFER_CHANNEL.with(|offer_channel| {
if let Ok(mut websocket_receiver) = websocket_receiver.lock() { offer_channel.0.send(received_signal).expect("Never");
websocket_receiver.push_back(received_signal); })
} } else if received_signal.get_signal_type() == SignalType::Answer {
ANSWER_CHANNEL.with(|answer_channel| {
answer_channel.0.send(received_signal).expect("Never");
}) })
} }
} }
@ -71,24 +69,21 @@ impl SignallingChannel {
}) })
} }
fn receive(expected_signal_type: &SignalType) -> Result<Signal, Error> { fn receive_offer() -> Result<Signal, Error> {
WEBSOCKET_RECEIVER.with(|message_queue| { OFFER_CHANNEL.with(|offer_channel| {
if let Ok(mut message_queue) = message_queue.lock() { offer_channel
let received_signal = message_queue .1
.pop_front() .recv()
.ok_or_else(|| Error::WebSocketReceivedNothing)?; .map_err(|err_val| Error::OfferChannelReceive(err_val.to_string()))
})
}
if received_signal.get_signal_type() == *expected_signal_type { fn receive_answer() -> Result<Signal, Error> {
Ok(received_signal) ANSWER_CHANNEL.with(|answer_channel| {
} else if received_signal.get_signal_type() == SignalType::Auth { answer_channel
Err(Error::WebSocketAuth) .1
} else { .recv()
message_queue.clear(); .map_err(|err_val| Error::AnswerChannelReceive(err_val.to_string()))
Err(Error::WebSocketReceiveQueueReset)
}
} else {
Err(Error::WebSocketReceiveQueueLocked)
}
}) })
} }
@ -107,10 +102,6 @@ pub fn send_auth(data: &String) -> Result<(), Error> {
SignallingChannel::send(&offer) SignallingChannel::send(&offer)
} }
pub fn receive_auth() -> Result<Signal, Error> {
SignallingChannel::receive(&SignalType::Auth)
}
pub fn send_offer(data: &String) -> Result<(), Error> { pub fn send_offer(data: &String) -> Result<(), Error> {
let offer = Signal::new(&SignalType::Offer, data); let offer = Signal::new(&SignalType::Offer, data);
@ -118,7 +109,7 @@ pub fn send_offer(data: &String) -> Result<(), Error> {
} }
pub fn receive_offer() -> Result<Signal, Error> { pub fn receive_offer() -> Result<Signal, Error> {
SignallingChannel::receive(&SignalType::Offer) SignallingChannel::receive_offer()
} }
pub fn send_answer(data: &String) -> Result<(), Error> { pub fn send_answer(data: &String) -> Result<(), Error> {
@ -128,7 +119,7 @@ pub fn send_answer(data: &String) -> Result<(), Error> {
} }
pub fn receive_answer() -> Result<Signal, Error> { pub fn receive_answer() -> Result<Signal, Error> {
SignallingChannel::receive(&SignalType::Answer) SignallingChannel::receive_answer()
} }
pub fn send_ice_candidate(data: &String) -> Result<(), Error> { pub fn send_ice_candidate(data: &String) -> Result<(), Error> {

View file

@ -3,12 +3,12 @@ use protocol::Error;
use wasm_bindgen_futures::{JsFuture, spawn_local}; use wasm_bindgen_futures::{JsFuture, spawn_local};
use web_sys::{ use web_sys::{
MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer,
RtcPeerConnection, RtcPeerConnectionIceEvent, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit,
js_sys::Array, js_sys::{Array, Reflect},
wasm_bindgen::{JsCast, JsValue, prelude::Closure}, wasm_bindgen::{JsCast, JsValue, prelude::Closure},
}; };
use crate::signal::{receive_ice_candidate, send_ice_candidate}; use crate::signal::{receive_answer, receive_ice_candidate, send_ice_candidate, send_offer};
pub struct WebRTC { pub struct WebRTC {
audio_stream: Option<MediaStream>, audio_stream: Option<MediaStream>,
@ -71,7 +71,7 @@ impl WebRTC {
self.add_streams(); self.add_streams();
spawn_local(async { spawn_local(async {
if let Ok(received_ice_candidate) = receive_ice_candidate() { while let Ok(received_ice_candidate) = receive_ice_candidate() {
let received_ice_candidate = let received_ice_candidate =
RtcIceCandidateInit::new(&received_ice_candidate.get_data()); RtcIceCandidateInit::new(&received_ice_candidate.get_data());
if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) { if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) {
@ -106,4 +106,58 @@ impl WebRTC {
self.peer_connection.add_stream(screen_stream); self.peer_connection.add_stream(screen_stream);
} }
} }
async fn offer(&mut self) -> Result<(), Error> {
let offer_promise = self.peer_connection.create_offer();
match JsFuture::from(offer_promise)
.await
.map_err(|_| Error::WebRTCOffer)
{
Ok(offer) => {
let offer_session_description_protocol =
Reflect::get(&offer, &JsValue::from_str("sdp"))
.map_err(|_| Error::WebRTCSessionDescriptionProtocol)?;
match offer_session_description_protocol
.as_string()
.ok_or_else(|| Error::WebRTCOffer)
{
Ok(offer_session_description_protocol) => {
let offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
offer.set_sdp(&offer_session_description_protocol);
let set_local_description_promise =
self.peer_connection.set_local_description(&offer);
JsFuture::from(set_local_description_promise)
.await
.map_err(|_| Error::WebRTCSetLocalDescription)?;
send_offer(&offer_session_description_protocol)?;
if let Ok(received_answer) = receive_answer() {
let answer = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
answer.set_sdp(&received_answer.get_data());
let set_remote_description_promise =
self.peer_connection.set_remote_description(&answer);
JsFuture::from(set_remote_description_promise)
.await
.map_err(|_| Error::WebRTCSetRemoteDescription)?;
return Ok(());
}
}
Err(err_val) => {
log!(
"Error: WebRTC Offer | Session Description Protocol String Transformation | {}",
err_val
);
}
}
}
Err(err_val) => {
log!("Error: WebRTC Offer | Promise | {}", err_val);
}
}
return Err(Error::WebRTCOffer);
}
} }

View file

@ -30,6 +30,13 @@ pub enum Error {
WebSocketReceiveQueueReset, WebSocketReceiveQueueReset,
WebSocketReceiveQueueLocked, WebSocketReceiveQueueLocked,
WebRTCInitialization, WebRTCInitialization,
WebRTCOffer,
WebRTCAnswer,
WebRTCSessionDescriptionProtocol,
WebRTCSetLocalDescription,
WebRTCSetRemoteDescription,
OfferChannelReceive(String),
AnswerChannelReceive(String),
ICECandidateChannelReceive(String), ICECandidateChannelReceive(String),
ICECandidateAdd, ICECandidateAdd,
} }
@ -58,6 +65,19 @@ impl Display for Error {
Error::WebSocketReceiveQueueReset => write!(f, "WebSocket Receive Queue Reset"), Error::WebSocketReceiveQueueReset => write!(f, "WebSocket Receive Queue Reset"),
Error::WebSocketReceiveQueueLocked => write!(f, "WebSocket Receive Queue Locked"), Error::WebSocketReceiveQueueLocked => write!(f, "WebSocket Receive Queue Locked"),
Error::WebRTCInitialization => write!(f, "WebRTC Initialization"), Error::WebRTCInitialization => write!(f, "WebRTC Initialization"),
Error::WebRTCOffer => write!(f, "WebRTC Offer"),
Error::WebRTCAnswer => write!(f, "WebRTC Answer"),
Error::WebRTCSessionDescriptionProtocol => {
write!(f, "WebRTC Session Description Protocol")
}
Error::WebRTCSetLocalDescription => write!(f, "WebRTC Set Local Description"),
Error::WebRTCSetRemoteDescription => write!(f, "WebRTC Set Remote Description"),
Error::OfferChannelReceive(recv_error) => {
write!(f, "Offer Channel Receive | {}", recv_error)
}
Error::AnswerChannelReceive(recv_error) => {
write!(f, "Answer Channel Receive | {}", recv_error)
}
Error::ICECandidateChannelReceive(recv_error) => { Error::ICECandidateChannelReceive(recv_error) => {
write!(f, "ICE Candidate Channel Receive | {}", recv_error) write!(f, "ICE Candidate Channel Receive | {}", recv_error)
} }