fix: browser freezing because of blocking channel

This commit is contained in:
Ahmet Kaan Gümüş 2025-04-26 03:34:07 +03:00
parent 8391ef31ba
commit 1e27b9280e
9 changed files with 172 additions and 386 deletions

View file

@ -7,6 +7,7 @@ edition = "2024"
console_error_panic_hook = "0.1.7"
leptos = { version = "0.7.8", features = ["csr"] }
wasm-bindgen-futures = "0.4.50"
async-channel = "2.3.1"
reqwest = { version = "0.12.15", features = ["json"] }
web-sys = { version = "0.3.77", features = [
"AudioBuffer",

View file

@ -1,41 +1,48 @@
use std::sync::Arc;
use leptos::{
IntoView, ev,
html::{ElementChild, button},
IntoView,
html::{ElementChild, button, label},
logging::log,
prelude::{OnAttribute, Read, Show, ShowProps, ToChildren},
server::LocalResource,
};
use wasm_bindgen_futures::spawn_local;
use crate::{media::audio, webrtc::WebRTC};
pub fn app() -> impl IntoView {
let audio_stream = LocalResource::new(|| audio());
let offer_props = ShowProps::builder()
let props = ShowProps::builder()
.when(move || audio_stream.read().is_some())
.children(ToChildren::to_children(move || {
button()
.on(ev::click, move |_| {
WebRTC::init(Some(audio_stream), None, None);
LocalResource::new(|| WebRTC::offer());
let audio_stream = audio_stream.read();
let audio_stream = audio_stream.as_deref().unwrap().clone();
let webrtc = WebRTC::new(Some(audio_stream), None, None).unwrap();
let webrtc = Arc::new(webrtc);
let webrtc_init = webrtc.clone();
spawn_local(async move { webrtc_init.init().await });
let webrtc_offer = webrtc.clone();
let offer_button = button()
.on(leptos::ev::click, move |_| {
log!("{:#?}", webrtc_offer.get_status());
})
.child("Offer")
.into_view()
.child("Offer");
let webrtc_answer = webrtc.clone();
let answer_button = button()
.on(leptos::ev::click, move |_| {
log!("{:#?}", webrtc_answer.get_status());
})
.child("Answer");
(offer_button, answer_button)
}))
.fallback(|| button().child("Sad Offer Button"))
.fallback(|| label().child("NOOOOOOOOOOOO"))
.build();
let answer_props = ShowProps::builder()
.when(move || audio_stream.read().is_some())
.children(ToChildren::to_children(move || {
button()
.on(ev::click, move |_| {
WebRTC::init(Some(audio_stream), None, None);
LocalResource::new(|| WebRTC::answer());
})
.child("Answer")
.into_view()
}))
.fallback(|| button().child("Sad Answer Button"))
.build();
(Show(offer_props), Show(answer_props))
Show(props)
}

View file

@ -1,7 +1,7 @@
use std::sync::mpsc;
use async_channel::{Receiver, Sender};
use leptos::logging::log;
use protocol::{Error, Signal, SignalType};
use wasm_bindgen_futures::spawn_local;
use web_sys::{
ErrorEvent, MessageEvent, WebSocket, js_sys,
wasm_bindgen::{JsCast, prelude::Closure},
@ -12,9 +12,9 @@ static SIGNALLING_ADDRESS: &str = "ws://192.168.1.3:4546";
thread_local! {
static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap();
static OFFER_CHANNEL: (mpsc::Sender<Signal>, mpsc::Receiver<Signal>) = mpsc::channel();
static ANSWER_CHANNEL: (mpsc::Sender<Signal>, mpsc::Receiver<Signal>) = mpsc::channel();
static ICE_CANDIDATE_CHANNEL: (mpsc::Sender<Signal>, mpsc::Receiver<Signal>) = mpsc::channel();
static OFFER_CHANNEL: (Sender<Signal>, Receiver<Signal>) = async_channel::unbounded();
static ANSWER_CHANNEL: (Sender<Signal>, Receiver<Signal>) = async_channel::unbounded();
static ICE_CANDIDATE_CHANNEL: (Sender<Signal>, Receiver<Signal>) = async_channel::unbounded();
}
struct SignallingChannel {}
@ -28,21 +28,34 @@ impl SignallingChannel {
if let Ok(received_data) = message_event.data().dyn_into::<js_sys::JsString>() {
if let Some(received_data) = received_data.as_string() {
if let Ok(received_signal) = serde_json::from_str::<Signal>(&received_data) {
if received_signal.get_signal_type() == SignalType::ICECandidate {
ICE_CANDIDATE_CHANNEL.with(|ice_candidate_channel| {
ice_candidate_channel
.0
let received_signal_type = received_signal.get_signal_type();
if received_signal_type == SignalType::ICECandidate {
let ice_candidate_channel_sender = ICE_CANDIDATE_CHANNEL
.with(|ice_candidate_channel| ice_candidate_channel.0.clone());
spawn_local(async move {
ice_candidate_channel_sender
.send(received_signal)
.expect("Never");
})
} else if received_signal.get_signal_type() == SignalType::Offer {
OFFER_CHANNEL.with(|offer_channel| {
offer_channel.0.send(received_signal).expect("Never");
})
} else if received_signal.get_signal_type() == SignalType::Answer {
ANSWER_CHANNEL.with(|answer_channel| {
answer_channel.0.send(received_signal).expect("Never");
})
.await
.expect("Never")
});
} else if received_signal_type == SignalType::Offer {
let offer_channel_sender =
ICE_CANDIDATE_CHANNEL.with(|offer_channel| offer_channel.0.clone());
spawn_local(async move {
offer_channel_sender
.send(received_signal)
.await
.expect("Never")
});
} else if received_signal_type == SignalType::Answer {
let answer_channel_sender = ICE_CANDIDATE_CHANNEL
.with(|answer_channel| answer_channel.0.clone());
spawn_local(async move {
answer_channel_sender
.send(received_signal)
.await
.expect("Never")
});
}
}
}
@ -69,31 +82,28 @@ impl SignallingChannel {
})
}
fn receive_offer() -> Result<Signal, Error> {
OFFER_CHANNEL.with(|offer_channel| {
offer_channel
.1
.recv()
.map_err(|err_val| Error::OfferChannelReceive(err_val.to_string()))
})
async fn receive_offer() -> Result<Signal, Error> {
OFFER_CHANNEL
.with(|offer_channel| offer_channel.1.clone())
.recv()
.await
.map_err(|err_val| Error::OfferChannelReceive(err_val.to_string()))
}
fn receive_answer() -> Result<Signal, Error> {
ANSWER_CHANNEL.with(|answer_channel| {
answer_channel
.1
.recv()
.map_err(|err_val| Error::AnswerChannelReceive(err_val.to_string()))
})
async fn receive_answer() -> Result<Signal, Error> {
ANSWER_CHANNEL
.with(|answer_channel| answer_channel.1.clone())
.recv()
.await
.map_err(|err_val| Error::OfferChannelReceive(err_val.to_string()))
}
fn receive_ice_candidate() -> Result<Signal, Error> {
ICE_CANDIDATE_CHANNEL.with(|ice_candidate_channel| {
ice_candidate_channel
.1
.recv()
.map_err(|err_val| Error::ICECandidateChannelReceive(err_val.to_string()))
})
async fn receive_ice_candidate() -> Result<Signal, Error> {
ICE_CANDIDATE_CHANNEL
.with(|ice_candidate_channel| ice_candidate_channel.1.clone())
.recv()
.await
.map_err(|err_val| Error::OfferChannelReceive(err_val.to_string()))
}
}
pub fn send_auth(data: &String) -> Result<(), Error> {
@ -108,8 +118,8 @@ pub fn send_offer(data: &String) -> Result<(), Error> {
SignallingChannel::send(&offer)
}
pub fn receive_offer() -> Result<Signal, Error> {
SignallingChannel::receive_offer()
pub async fn receive_offer() -> Result<Signal, Error> {
SignallingChannel::receive_offer().await
}
pub fn send_answer(data: &String) -> Result<(), Error> {
@ -118,8 +128,8 @@ pub fn send_answer(data: &String) -> Result<(), Error> {
SignallingChannel::send(&offer)
}
pub fn receive_answer() -> Result<Signal, Error> {
SignallingChannel::receive_answer()
pub async fn receive_answer() -> Result<Signal, Error> {
SignallingChannel::receive_answer().await
}
pub fn send_ice_candidate(data: &String) -> Result<(), Error> {
@ -128,6 +138,6 @@ pub fn send_ice_candidate(data: &String) -> Result<(), Error> {
SignallingChannel::send(&offer)
}
pub fn receive_ice_candidate() -> Result<Signal, Error> {
SignallingChannel::receive_ice_candidate()
pub async fn receive_ice_candidate() -> Result<Signal, Error> {
SignallingChannel::receive_ice_candidate().await
}

View file

@ -1,9 +1,10 @@
use leptos::{logging::log, prelude::Get, server::LocalResource};
use leptos::logging::log;
use protocol::Error;
use wasm_bindgen_futures::{JsFuture, spawn_local};
use wasm_bindgen_futures::JsFuture;
use web_sys::{
MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer,
RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit,
RtcPeerConnection, RtcPeerConnectionIceEvent, RtcPeerConnectionState, RtcSdpType,
RtcSessionDescriptionInit,
js_sys::{Array, Reflect},
wasm_bindgen::{JsCast, JsValue, prelude::Closure},
};
@ -15,14 +16,17 @@ use crate::signal::{
pub struct WebRTC {
peer_connection: RtcPeerConnection,
}
thread_local! {
pub static WEBRTC:WebRTC = WebRTC::new().unwrap();
audio_stream: Option<MediaStream>,
video_stream: Option<MediaStream>,
screen_stream: Option<MediaStream>,
}
impl WebRTC {
fn new() -> Result<Self, Error> {
pub fn new(
audio_stream: Option<MediaStream>,
video_stream: Option<MediaStream>,
screen_stream: Option<MediaStream>,
) -> Result<Self, Error> {
let ice_server_addresses = vec![JsValue::from("stun:stun.l.google.com:19302")]
.into_iter()
.collect::<Array>();
@ -48,74 +52,60 @@ impl WebRTC {
peer_connection.set_onicecandidate(Some(on_ice_candidate.as_ref().unchecked_ref()));
on_ice_candidate.forget();
let webrtc = Self { peer_connection };
let webrtc = Self {
peer_connection,
audio_stream,
video_stream,
screen_stream,
};
webrtc.add_streams();
Ok(webrtc)
}
pub fn init(
audio_stream: Option<LocalResource<MediaStream>>,
video_stream: Option<LocalResource<MediaStream>>,
screen_stream: Option<LocalResource<MediaStream>>,
) {
Self::add_streams(audio_stream, video_stream, screen_stream);
spawn_local(async {
while let Ok(received_ice_candidate) = receive_ice_candidate() {
let received_ice_candidate =
RtcIceCandidateInit::new(&received_ice_candidate.get_data());
if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) {
WEBRTC.with(|webrtc| {
let add_received_ice_candidate_promise = webrtc
.peer_connection
.add_ice_candidate_with_opt_rtc_ice_candidate(Some(
&received_ice_candidate,
));
spawn_local(async move {
if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise)
.await
.map_err(|_| Error::ICECandidateAdd)
{
log!("Error: Add ICE Candidate | {}", err_val);
}
});
});
pub async fn init(&self) {
while let Ok(received_ice_candidate) = receive_ice_candidate().await {
let received_ice_candidate =
RtcIceCandidateInit::new(&received_ice_candidate.get_data());
if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) {
let add_received_ice_candidate_promise = self
.peer_connection
.add_ice_candidate_with_opt_rtc_ice_candidate(Some(&received_ice_candidate));
if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise)
.await
.map_err(|_| Error::ICECandidateAdd)
{
log!("Error: Add ICE Candidate | {}", err_val);
}
}
});
}
}
fn add_streams(
audio_stream: Option<LocalResource<MediaStream>>,
video_stream: Option<LocalResource<MediaStream>>,
screen_stream: Option<LocalResource<MediaStream>>,
) {
WEBRTC.with(|webrtc| {
if let Some(audio_stream) = audio_stream {
if let Some(audio_stream) = audio_stream.get() {
webrtc.peer_connection.add_stream(&audio_stream);
}
}
if let Some(video_stream) = video_stream {
if let Some(video_stream) = video_stream.get() {
webrtc.peer_connection.add_stream(&video_stream);
}
}
if let Some(screen_stream) = screen_stream {
if let Some(screen_stream) = screen_stream.get() {
webrtc.peer_connection.add_stream(&screen_stream);
}
}
});
pub fn get_status(&self) -> RtcPeerConnectionState {
self.peer_connection.connection_state()
}
pub async fn offer() -> Result<(), Error> {
let offer_promise = WEBRTC.with(|webrtc| webrtc.peer_connection.create_offer());
fn add_streams(&self) {
if let Some(audio_stream) = self.audio_stream.as_ref() {
self.peer_connection.add_stream(&audio_stream);
}
if let Some(video_stream) = self.video_stream.as_ref() {
self.peer_connection.add_stream(&video_stream);
}
if let Some(screen_stream) = self.screen_stream.as_ref() {
self.peer_connection.add_stream(&screen_stream);
}
}
pub async fn offer(&self) -> Result<(), Error> {
log!("Offer Function");
let offer_promise = self.peer_connection.create_offer();
match JsFuture::from(offer_promise)
.await
.map_err(|_| Error::WebRTCOffer)
{
Ok(offer) => {
log!("Offer Created");
let offer_session_description_protocol =
Reflect::get(&offer, &JsValue::from_str("sdp"))
.map_err(|_| Error::WebRTCSessionDescriptionProtocol)?;
@ -126,22 +116,22 @@ impl WebRTC {
Ok(offer_session_description_protocol) => {
let offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
offer.set_sdp(&offer_session_description_protocol);
let set_local_description_promise = WEBRTC
.with(|webrtc| webrtc.peer_connection.set_local_description(&offer));
let set_local_description_promise =
self.peer_connection.set_local_description(&offer);
JsFuture::from(set_local_description_promise)
.await
.map_err(|_| Error::WebRTCSetLocalDescription)?;
log!("Before Sent Offer");
send_offer(&offer_session_description_protocol)?;
if let Ok(received_answer) = receive_answer() {
log!("After Sent Offer");
if let Ok(received_answer) = receive_answer().await {
let answer = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
answer.set_sdp(&received_answer.get_data());
let set_remote_description_promise = WEBRTC.with(|webrtc| {
webrtc.peer_connection.set_remote_description(&answer)
});
let set_remote_description_promise =
self.peer_connection.set_remote_description(&answer);
JsFuture::from(set_remote_description_promise)
.await
.map_err(|_| Error::WebRTCSetRemoteDescription)?;
@ -164,18 +154,18 @@ impl WebRTC {
return Err(Error::WebRTCOffer);
}
pub async fn answer() -> Result<(), Error> {
if let Ok(received_offer) = receive_offer() {
pub async fn answer(&self) -> Result<(), Error> {
if let Ok(received_offer) = receive_offer().await {
let offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
offer.set_sdp(&received_offer.get_data());
let set_remote_description_promise =
WEBRTC.with(|webrtc| webrtc.peer_connection.set_remote_description(&offer));
self.peer_connection.set_remote_description(&offer);
JsFuture::from(set_remote_description_promise)
.await
.map_err(|_| Error::WebRTCSetRemoteDescription)?;
let answer_promise = WEBRTC.with(|webrtc| webrtc.peer_connection.create_answer());
let answer_promise = self.peer_connection.create_answer();
match JsFuture::from(answer_promise)
.await
.map_err(|_| Error::WebRTCAnswer)
@ -192,9 +182,8 @@ impl WebRTC {
let answer = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
answer.set_sdp(&answer_session_description_protocol);
let set_local_description_promise = WEBRTC.with(|webrtc| {
webrtc.peer_connection.set_local_description(&answer)
});
let set_local_description_promise =
self.peer_connection.set_local_description(&answer);
JsFuture::from(set_local_description_promise)
.await
.map_err(|_| Error::WebRTCSetLocalDescription)?;