rust_webrtc/client/src/signal.rs
2025-04-22 04:19:52 +03:00

118 lines
4 KiB
Rust

use std::{collections::VecDeque, sync::Mutex};
use leptos::logging::log;
use protocol::{Error, Signal, SignalType};
use web_sys::{
ErrorEvent, MessageEvent, WebSocket, js_sys,
wasm_bindgen::{JsCast, prelude::Closure},
};
static SIGNALLING_ADDRESS: &str = "ws://192.168.1.3:4546";
thread_local! {
static WEBSOCKET_RECEIVER: Mutex<VecDeque<Signal>> =
Mutex::new(VecDeque::new());
static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap();
}
struct SignallingChannel {}
impl SignallingChannel {
fn init() -> Result<WebSocket, Error> {
let web_socket = WebSocket::new(SIGNALLING_ADDRESS).map_err(|_| Error::WebSocketInit)?;
let on_message_callback = Closure::<dyn Fn(_)>::new(move |message_event: MessageEvent| {
if let Ok(received_data) = message_event.data().dyn_into::<js_sys::JsString>() {
if let Some(received_data) = received_data.as_string() {
if let Ok(received_signal) = serde_json::from_str::<Signal>(&received_data) {
WEBSOCKET_RECEIVER.with(|websocket_receiver| {
if let Ok(mut websocket_receiver) = websocket_receiver.lock() {
websocket_receiver.push_back(received_signal);
}
})
}
}
}
});
web_socket.set_onmessage(Some(on_message_callback.as_ref().unchecked_ref()));
on_message_callback.forget();
let on_error_callback = Closure::<dyn Fn(_)>::new(move |error_event: ErrorEvent| {
log!("Error: WebSocket | {:#?}", error_event);
});
web_socket.set_onerror(Some(on_error_callback.as_ref().unchecked_ref()));
on_error_callback.forget();
Ok(web_socket)
}
async fn send(signal: &Signal) -> Result<(), Error> {
let json = serde_json::json!(signal);
WEBSOCKET.with(|websocket| {
websocket
.send_with_str(&json.to_string())
.map_err(|_| Error::WebSocketSend)
})
}
async fn receive(expected_signal_type: &SignalType) -> Result<Signal, Error> {
WEBSOCKET_RECEIVER.with(|message_queue| {
if let Ok(mut message_queue) = message_queue.lock() {
let received_signal = message_queue
.pop_front()
.ok_or_else(|| Error::WebSocketReceivedNothing)?;
if received_signal.get_signal_type() == *expected_signal_type {
Ok(received_signal)
} else if received_signal.get_signal_type() == SignalType::Auth {
Err(Error::WebSocketAuth)
} else {
message_queue.clear();
Err(Error::WebSocketReceiveQueueReset)
}
} else {
Err(Error::WebSocketReceiveQueueLocked)
}
})
}
}
pub async fn send_auth(data: &String) -> Result<(), Error> {
let offer = Signal::new(&SignalType::Auth, data);
SignallingChannel::send(&offer).await
}
pub async fn receive_auth() -> Result<Signal, Error> {
SignallingChannel::receive(&SignalType::Auth).await
}
pub async fn send_offer(data: &String) -> Result<(), Error> {
let offer = Signal::new(&SignalType::Offer, data);
SignallingChannel::send(&offer).await
}
pub async fn receive_offer() -> Result<Signal, Error> {
SignallingChannel::receive(&SignalType::Offer).await
}
pub async fn send_answer(data: &String) -> Result<(), Error> {
let offer = Signal::new(&SignalType::Answer, data);
SignallingChannel::send(&offer).await
}
pub async fn receive_answer() -> Result<Signal, Error> {
SignallingChannel::receive(&SignalType::Answer).await
}
pub async fn send_ice_candidate(data: &String) -> Result<(), Error> {
let offer = Signal::new(&SignalType::ICECandidate, data);
SignallingChannel::send(&offer).await
}
pub async fn receive_ice_candidate() -> Result<Signal, Error> {
SignallingChannel::receive(&SignalType::ICECandidate).await
}