fix: ⚡ messages go to right channels
fix: ⚡ ice candidate transfer fix: ⚡ deadlock in server
This commit is contained in:
parent
4a695dce40
commit
53a73285b9
5 changed files with 121 additions and 32 deletions
|
@ -7,7 +7,12 @@ use leptos::{
|
||||||
};
|
};
|
||||||
use wasm_bindgen_futures::spawn_local;
|
use wasm_bindgen_futures::spawn_local;
|
||||||
|
|
||||||
use crate::{media::audio, signal::wait_until_communication_is_ready, webrtc::WebRTC};
|
use crate::{
|
||||||
|
media::audio,
|
||||||
|
signal::{send_auth, wait_until_communication_is_ready},
|
||||||
|
sleep,
|
||||||
|
webrtc::WebRTC,
|
||||||
|
};
|
||||||
|
|
||||||
pub fn app() -> impl IntoView {
|
pub fn app() -> impl IntoView {
|
||||||
let audio_stream = LocalResource::new(|| audio());
|
let audio_stream = LocalResource::new(|| audio());
|
||||||
|
@ -23,15 +28,23 @@ pub fn app() -> impl IntoView {
|
||||||
let audio_stream = audio_stream.as_deref().unwrap().clone();
|
let audio_stream = audio_stream.as_deref().unwrap().clone();
|
||||||
|
|
||||||
let webrtc = WebRTC::new(Some(audio_stream), None, None).unwrap();
|
let webrtc = WebRTC::new(Some(audio_stream), None, None).unwrap();
|
||||||
|
let webrtc_state = webrtc.clone();
|
||||||
|
spawn_local(async move {
|
||||||
|
loop {
|
||||||
|
log!("{:#?}", webrtc_state.get_status());
|
||||||
|
sleep(1000).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let webrtc_offer = webrtc.clone();
|
let webrtc_offer = webrtc.clone();
|
||||||
let offer_button = button()
|
let offer_button = button()
|
||||||
.on(leptos::ev::click, move |_| {
|
.on(leptos::ev::click, move |_| {
|
||||||
|
send_auth(&String::from("Offer")).unwrap();
|
||||||
let webrtc_offer = webrtc_offer.clone();
|
let webrtc_offer = webrtc_offer.clone();
|
||||||
log!("{:#?}", webrtc_offer.get_status());
|
|
||||||
spawn_local(async move {
|
spawn_local(async move {
|
||||||
let offer_result = webrtc_offer.offer().await;
|
if let Err(err_val) = webrtc_offer.offer().await {
|
||||||
log!("Offer Result Is = {:#?}", offer_result);
|
log!("Error: WebRTC Offer | {}", err_val);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
})
|
})
|
||||||
.child("Offer");
|
.child("Offer");
|
||||||
|
@ -39,10 +52,12 @@ pub fn app() -> impl IntoView {
|
||||||
let webrtc_answer = webrtc.clone();
|
let webrtc_answer = webrtc.clone();
|
||||||
let answer_button = button()
|
let answer_button = button()
|
||||||
.on(leptos::ev::click, move |_| {
|
.on(leptos::ev::click, move |_| {
|
||||||
|
send_auth(&String::from("Answer")).unwrap();
|
||||||
let webrtc_answer = webrtc_answer.clone();
|
let webrtc_answer = webrtc_answer.clone();
|
||||||
spawn_local(async move {
|
spawn_local(async move {
|
||||||
let answer_result = webrtc_answer.answer().await;
|
if let Err(err_val) = webrtc_answer.answer().await {
|
||||||
log!("Answer Result Is = {:#?}", answer_result);
|
log!("Error: WebRTC Answer | {}", err_val);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
})
|
})
|
||||||
.child("Answer");
|
.child("Answer");
|
||||||
|
|
|
@ -49,7 +49,7 @@ impl SignallingChannel {
|
||||||
});
|
});
|
||||||
} else if received_signal_type == SignalType::Offer {
|
} else if received_signal_type == SignalType::Offer {
|
||||||
let offer_channel_sender =
|
let offer_channel_sender =
|
||||||
ICE_CANDIDATE_CHANNEL.with(|offer_channel| offer_channel.0.clone());
|
OFFER_CHANNEL.with(|offer_channel| offer_channel.0.clone());
|
||||||
spawn_local(async move {
|
spawn_local(async move {
|
||||||
offer_channel_sender
|
offer_channel_sender
|
||||||
.send(received_signal)
|
.send(received_signal)
|
||||||
|
@ -57,8 +57,8 @@ impl SignallingChannel {
|
||||||
.expect("Never")
|
.expect("Never")
|
||||||
});
|
});
|
||||||
} else if received_signal_type == SignalType::Answer {
|
} else if received_signal_type == SignalType::Answer {
|
||||||
let answer_channel_sender = ICE_CANDIDATE_CHANNEL
|
let answer_channel_sender =
|
||||||
.with(|answer_channel| answer_channel.0.clone());
|
ANSWER_CHANNEL.with(|answer_channel| answer_channel.0.clone());
|
||||||
spawn_local(async move {
|
spawn_local(async move {
|
||||||
answer_channel_sender
|
answer_channel_sender
|
||||||
.send(received_signal)
|
.send(received_signal)
|
||||||
|
@ -139,9 +139,9 @@ pub async fn receive_offer() -> Result<Signal, Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_answer(data: &String) -> Result<(), Error> {
|
pub fn send_answer(data: &String) -> Result<(), Error> {
|
||||||
let offer = Signal::new(&SignalType::Answer, data);
|
let answer = Signal::new(&SignalType::Answer, data);
|
||||||
|
|
||||||
SignallingChannel::send(&offer)
|
SignallingChannel::send(&answer)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn receive_answer() -> Result<Signal, Error> {
|
pub async fn receive_answer() -> Result<Signal, Error> {
|
||||||
|
@ -149,9 +149,9 @@ pub async fn receive_answer() -> Result<Signal, Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_ice_candidate(data: &String) -> Result<(), Error> {
|
pub fn send_ice_candidate(data: &String) -> Result<(), Error> {
|
||||||
let offer = Signal::new(&SignalType::ICECandidate, data);
|
let ice_candidate = Signal::new(&SignalType::ICECandidate, data);
|
||||||
|
|
||||||
SignallingChannel::send(&offer)
|
SignallingChannel::send(&ice_candidate)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn receive_ice_candidate() -> Result<Signal, Error> {
|
pub async fn receive_ice_candidate() -> Result<Signal, Error> {
|
||||||
|
|
|
@ -7,7 +7,7 @@ use web_sys::{
|
||||||
MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer,
|
MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer,
|
||||||
RtcPeerConnection, RtcPeerConnectionIceEvent, RtcPeerConnectionState, RtcSdpType,
|
RtcPeerConnection, RtcPeerConnectionIceEvent, RtcPeerConnectionState, RtcSdpType,
|
||||||
RtcSessionDescriptionInit,
|
RtcSessionDescriptionInit,
|
||||||
js_sys::{Array, Reflect},
|
js_sys::{Array, JSON, Reflect},
|
||||||
wasm_bindgen::{JsCast, JsValue, prelude::Closure},
|
wasm_bindgen::{JsCast, JsValue, prelude::Closure},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -44,7 +44,9 @@ impl WebRTC {
|
||||||
let on_ice_candidate = Closure::<dyn Fn(_)>::new(
|
let on_ice_candidate = Closure::<dyn Fn(_)>::new(
|
||||||
move |peer_connection_ice_event: RtcPeerConnectionIceEvent| {
|
move |peer_connection_ice_event: RtcPeerConnectionIceEvent| {
|
||||||
if let Some(candidate) = peer_connection_ice_event.candidate() {
|
if let Some(candidate) = peer_connection_ice_event.candidate() {
|
||||||
if let Err(err_val) = send_ice_candidate(&candidate.candidate()) {
|
let candidate = JSON::stringify(&candidate).expect("Never");
|
||||||
|
let candidate = String::from(candidate);
|
||||||
|
if let Err(err_val) = send_ice_candidate(&candidate) {
|
||||||
log!("Error: Send ICE Candidate | {}", err_val);
|
log!("Error: Send ICE Candidate | {}", err_val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -71,19 +73,46 @@ impl WebRTC {
|
||||||
|
|
||||||
async fn ice_candidate_receiver(&self) {
|
async fn ice_candidate_receiver(&self) {
|
||||||
while let Ok(received_ice_candidate) = receive_ice_candidate().await {
|
while let Ok(received_ice_candidate) = receive_ice_candidate().await {
|
||||||
let received_ice_candidate =
|
let received_ice_candidate = JSON::parse(&received_ice_candidate.get_data()).unwrap();
|
||||||
RtcIceCandidateInit::new(&received_ice_candidate.get_data());
|
|
||||||
if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) {
|
let candidate = Reflect::get(&received_ice_candidate, &JsValue::from_str("candidate"))
|
||||||
|
.unwrap()
|
||||||
|
.as_string()
|
||||||
|
.unwrap();
|
||||||
|
let sdp_mid = Reflect::get(&received_ice_candidate, &JsValue::from_str("sdpMid"))
|
||||||
|
.unwrap()
|
||||||
|
.as_string()
|
||||||
|
.unwrap();
|
||||||
|
let sdp_m_line_index =
|
||||||
|
Reflect::get(&received_ice_candidate, &JsValue::from_str("sdpMLineIndex"))
|
||||||
|
.unwrap()
|
||||||
|
.as_f64()
|
||||||
|
.unwrap() as u16;
|
||||||
|
|
||||||
|
let received_ice_candidate = RtcIceCandidateInit::new(&candidate);
|
||||||
|
received_ice_candidate.set_sdp_mid(Some(&sdp_mid));
|
||||||
|
received_ice_candidate.set_sdp_m_line_index(Some(sdp_m_line_index));
|
||||||
|
|
||||||
|
match RtcIceCandidate::new(&received_ice_candidate)
|
||||||
|
.map_err(|err_val| Error::ICECandidateAdd(err_val.as_string().unwrap()))
|
||||||
|
{
|
||||||
|
Ok(received_ice_candidate) => {
|
||||||
let add_received_ice_candidate_promise = self
|
let add_received_ice_candidate_promise = self
|
||||||
.peer_connection
|
.peer_connection
|
||||||
.add_ice_candidate_with_opt_rtc_ice_candidate(Some(&received_ice_candidate));
|
.add_ice_candidate_with_opt_rtc_ice_candidate(Some(
|
||||||
|
&received_ice_candidate,
|
||||||
|
));
|
||||||
if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise)
|
if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| Error::ICECandidateAdd)
|
.map_err(|err_val| Error::ICECandidateAdd(err_val.as_string().unwrap()))
|
||||||
{
|
{
|
||||||
log!("Error: Add ICE Candidate | {}", err_val);
|
log!("Error: Add ICE Candidate | {}", err_val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(err_val) => {
|
||||||
|
log!("Error: New Ice Candidate | {}", err_val);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ pub enum Error {
|
||||||
OfferChannelReceive(String),
|
OfferChannelReceive(String),
|
||||||
AnswerChannelReceive(String),
|
AnswerChannelReceive(String),
|
||||||
ICECandidateChannelReceive(String),
|
ICECandidateChannelReceive(String),
|
||||||
ICECandidateAdd,
|
ICECandidateAdd(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::error::Error for Error {
|
impl std::error::Error for Error {
|
||||||
|
@ -70,7 +70,7 @@ impl Display for Error {
|
||||||
Error::ICECandidateChannelReceive(recv_error) => {
|
Error::ICECandidateChannelReceive(recv_error) => {
|
||||||
write!(f, "ICE Candidate Channel Receive | {}", recv_error)
|
write!(f, "ICE Candidate Channel Receive | {}", recv_error)
|
||||||
}
|
}
|
||||||
Error::ICECandidateAdd => write!(f, "ICE Candidate Add"),
|
Error::ICECandidateAdd(err_val) => write!(f, "ICE Candidate Add | {}", err_val),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,15 @@
|
||||||
use std::{collections::VecDeque, sync::LazyLock};
|
use std::{collections::VecDeque, sync::LazyLock, time::Duration};
|
||||||
|
|
||||||
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
|
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
|
||||||
use fastwebsockets::{
|
use fastwebsockets::{
|
||||||
FragmentCollectorRead, OpCode, WebSocketError,
|
Frame, OpCode, WebSocketError,
|
||||||
upgrade::{IncomingUpgrade, UpgradeFut},
|
upgrade::{IncomingUpgrade, UpgradeFut},
|
||||||
};
|
};
|
||||||
use protocol::{Signal, SignalType};
|
use protocol::{Signal, SignalType};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::TcpListener,
|
net::TcpListener,
|
||||||
sync::{RwLock, broadcast},
|
sync::{RwLock, broadcast},
|
||||||
|
time::sleep,
|
||||||
};
|
};
|
||||||
use tower_http::cors::CorsLayer;
|
use tower_http::cors::CorsLayer;
|
||||||
|
|
||||||
|
@ -17,6 +18,7 @@ const SERVER_ADDRESS: &str = "192.168.1.3:4546";
|
||||||
static USER_MESSAGES: LazyLock<RwLock<VecDeque<UserMessages>>> =
|
static USER_MESSAGES: LazyLock<RwLock<VecDeque<UserMessages>>> =
|
||||||
LazyLock::new(|| VecDeque::new().into());
|
LazyLock::new(|| VecDeque::new().into());
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct UserMessages {
|
struct UserMessages {
|
||||||
user: String,
|
user: String,
|
||||||
message_receiver: broadcast::Receiver<Signal>,
|
message_receiver: broadcast::Receiver<Signal>,
|
||||||
|
@ -50,7 +52,7 @@ async fn websocket_handler(websocket: UpgradeFut) {
|
||||||
websocket.set_writev(false);
|
websocket.set_writev(false);
|
||||||
websocket.set_auto_close(true);
|
websocket.set_auto_close(true);
|
||||||
|
|
||||||
let (mut websocket_receiver, websocker_sender) = websocket.split(tokio::io::split);
|
let (mut websocket_receiver, mut websocker_sender) = websocket.split(tokio::io::split);
|
||||||
let mut user = String::default();
|
let mut user = String::default();
|
||||||
let (message_sender, message_receiver) = broadcast::channel(100);
|
let (message_sender, message_receiver) = broadcast::channel(100);
|
||||||
|
|
||||||
|
@ -72,7 +74,50 @@ async fn websocket_handler(websocket: UpgradeFut) {
|
||||||
} else {
|
} else {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
println!("{:#?}", signal);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while USER_MESSAGES.read().await.len() < 2 {
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
loop {
|
||||||
|
let mut user_messages = USER_MESSAGES.write().await;
|
||||||
|
for user_message in user_messages.iter_mut() {
|
||||||
|
if user_message.user != user && user_message.message_receiver.len() > 0 {
|
||||||
|
while let Ok(message) = user_message.message_receiver.recv().await {
|
||||||
|
if let Err(err_val) = websocker_sender
|
||||||
|
.write_frame(Frame::text(fastwebsockets::Payload::Owned(
|
||||||
|
serde_json::to_vec(&message).unwrap(),
|
||||||
|
)))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
eprintln!("Error: WebSocket Send | {}", err_val);
|
||||||
|
}
|
||||||
|
if user_message.message_receiver.len() < 1 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while let Ok(received_frame) = websocket_receiver
|
||||||
|
.read_frame::<_, WebSocketError>(&mut move |_| async { unreachable!() })
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
if let OpCode::Text = received_frame.opcode {
|
||||||
|
let signal =
|
||||||
|
serde_json::from_slice::<Signal>(&received_frame.payload.to_vec()).unwrap();
|
||||||
|
|
||||||
|
if signal.get_signal_type() != SignalType::Auth {
|
||||||
|
if let Err(err_val) = message_sender.send(signal) {
|
||||||
|
eprintln!("Error: WebSocket Channel Send | {}", err_val);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue