feat: websocket client

This commit is contained in:
Ahmet Kaan Gümüş 2025-04-22 04:19:52 +03:00
parent 3b122dc4f7
commit 978f082146
5 changed files with 166 additions and 146 deletions

View file

@ -12,6 +12,9 @@ web-sys = { version = "0.3.77", features = [
"AudioBuffer", "AudioBuffer",
"AudioBufferSourceNode", "AudioBufferSourceNode",
"AudioContext", "AudioContext",
"BinaryType",
"ErrorEvent",
"MessageEvent",
"HtmlAudioElement", "HtmlAudioElement",
"MediaDevices", "MediaDevices",
"MediaStream", "MediaStream",
@ -31,6 +34,7 @@ web-sys = { version = "0.3.77", features = [
"RtcSdpType", "RtcSdpType",
"RtcSessionDescription", "RtcSessionDescription",
"RtcSessionDescriptionInit", "RtcSessionDescriptionInit",
"WebSocket",
"Window", "Window",
] } ] }
protocol = { path = "../protocol" } protocol = { path = "../protocol" }

View file

@ -17,7 +17,8 @@ use web_sys::HtmlAudioElement;
use crate::{ use crate::{
media::audio, media::audio,
rtc::{answer, offer}, rtc::{answer, offer},
signal::start_signalling, signal::send_auth,
sleep,
}; };
pub fn app() -> impl IntoView { pub fn app() -> impl IntoView {
@ -46,17 +47,12 @@ pub fn app() -> impl IntoView {
.fallback(|| button().child("Sad Button")) .fallback(|| button().child("Sad Button"))
.build(); .build();
let username = signal(String::from("")); let username = signal(String::from(""));
( (Show(props), signalling(username), rtc_offer(), rtc_answer())
Show(props),
signalling(username),
rtc_offer(username.0),
rtc_answer(username.0),
)
} }
fn signalling(username: (ReadSignal<String>, WriteSignal<String>)) -> impl IntoView { fn signalling(username: (ReadSignal<String>, WriteSignal<String>)) -> impl IntoView {
let signalling_trigger = move || { let signalling_trigger = move || {
spawn_local(start_signalling(username.0.get())); spawn_local(async move { send_auth(&username.0.get()).await.unwrap() });
}; };
let signalling_server_input = form() let signalling_server_input = form()
.child( .child(
@ -79,10 +75,17 @@ fn signalling(username: (ReadSignal<String>, WriteSignal<String>)) -> impl IntoV
(signalling_server_input, signalling_submit_button) (signalling_server_input, signalling_submit_button)
} }
fn rtc_offer(username: ReadSignal<String>) -> impl IntoView { fn rtc_offer() -> impl IntoView {
let offer_trigger = move || { let offer_trigger = move || {
spawn_local(async move { spawn_local(async move {
let peer_connection = offer(username.get()).await; let peer_connection = offer(audio().await).await;
let audio_stream = audio().await;
peer_connection.add_stream(&audio_stream);
loop {
log!("{:#?}", peer_connection.ice_connection_state());
log!("{:#?}", peer_connection.get_remote_streams());
sleep(1000).await;
}
}); });
}; };
@ -95,10 +98,15 @@ fn rtc_offer(username: ReadSignal<String>) -> impl IntoView {
offer_button offer_button
} }
fn rtc_answer(username: ReadSignal<String>) -> impl IntoView { fn rtc_answer() -> impl IntoView {
let answer_trigger = move || { let answer_trigger = move || {
spawn_local(async move { spawn_local(async move {
let peer_connection = answer(username.get()).await; let peer_connection = answer(audio().await).await;
loop {
log!("{:#?}", peer_connection.ice_connection_state());
log!("{:#?}", peer_connection.get_remote_streams());
sleep(1000).await;
}
}); });
}; };

View file

@ -1,12 +1,8 @@
use leptos::{ use leptos::{logging::log, task::spawn_local};
logging::log,
prelude::{Set, Signal, Update, Write, WriteSignal},
task::spawn_local,
};
use wasm_bindgen_futures::JsFuture; use wasm_bindgen_futures::JsFuture;
use web_sys::{ use web_sys::{
RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, RtcPeerConnection, MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer,
RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit,
js_sys::{Array, Reflect}, js_sys::{Array, Reflect},
wasm_bindgen::{JsCast, JsValue, prelude::Closure}, wasm_bindgen::{JsCast, JsValue, prelude::Closure},
}; };
@ -19,12 +15,11 @@ use crate::{
sleep, sleep,
}; };
async fn local_ice_candidate_handler(username: &String, peer_connection: &RtcPeerConnection) { async fn local_ice_candidate_handler(peer_connection: &RtcPeerConnection) {
let local_ice_candidate_handler: Box<dyn Fn(String, RtcPeerConnectionIceEvent)> = let local_ice_candidate_handler: Box<dyn Fn(RtcPeerConnectionIceEvent)> =
Box::new(move |username, peer_connection_ice_event| { Box::new(move |peer_connection_ice_event| {
spawn_local(async move { spawn_local(async move {
{ {
let username: &String = &username;
async move { async move {
sleep(1000).await; sleep(1000).await;
@ -33,7 +28,7 @@ async fn local_ice_candidate_handler(username: &String, peer_connection: &RtcPee
{ {
let peer_connection_ice_candidate = let peer_connection_ice_candidate =
peer_connection_ice_candidate.as_string().unwrap(); peer_connection_ice_candidate.as_string().unwrap();
send_ice_candidate(username, &peer_connection_ice_candidate) send_ice_candidate(&peer_connection_ice_candidate)
.await .await
.unwrap(); .unwrap();
} }
@ -48,10 +43,10 @@ async fn local_ice_candidate_handler(username: &String, peer_connection: &RtcPee
peer_connection.set_onicecandidate(Some(local_ice_candidate_handler.as_ref().unchecked_ref())); peer_connection.set_onicecandidate(Some(local_ice_candidate_handler.as_ref().unchecked_ref()));
} }
async fn remote_ice_candidate_handler(username: &String, peer_connection: &RtcPeerConnection) { async fn remote_ice_candidate_handler(peer_connection: &RtcPeerConnection) {
if let Ok(user_and_signal) = receive_ice_candidate(username).await { if let Ok(received_signal) = receive_ice_candidate().await {
let peer_connection_ice_candidate_init = let peer_connection_ice_candidate_init =
RtcIceCandidateInit::new(&user_and_signal.signal.get_data()); RtcIceCandidateInit::new(&received_signal.get_data());
let peer_connection_ice_candidate = let peer_connection_ice_candidate =
RtcIceCandidate::new(&peer_connection_ice_candidate_init).unwrap(); RtcIceCandidate::new(&peer_connection_ice_candidate_init).unwrap();
let peer_connection_add_ice_candidate_promise = peer_connection let peer_connection_add_ice_candidate_promise = peer_connection
@ -74,8 +69,9 @@ async fn create_peer_connection_with_configuration() -> RtcPeerConnection {
RtcPeerConnection::new_with_configuration(&rtc_configuration).unwrap() RtcPeerConnection::new_with_configuration(&rtc_configuration).unwrap()
} }
pub async fn offer(username: String) -> RtcPeerConnection { pub async fn offer(audio_stream: MediaStream) -> RtcPeerConnection {
let peer_connection = create_peer_connection_with_configuration().await; let peer_connection = create_peer_connection_with_configuration().await;
peer_connection.add_stream(&audio_stream);
let peer_connection_create_offer_promise = peer_connection.create_offer(); let peer_connection_create_offer_promise = peer_connection.create_offer();
let peer_connection_session_offer = JsFuture::from(peer_connection_create_offer_promise) let peer_connection_session_offer = JsFuture::from(peer_connection_create_offer_promise)
.await .await
@ -93,21 +89,22 @@ pub async fn offer(username: String) -> RtcPeerConnection {
.unwrap() .unwrap()
.as_string() .as_string()
.unwrap(); .unwrap();
local_ice_candidate_handler(&peer_connection).await;
remote_ice_candidate_handler(&peer_connection).await;
log!("{}", peer_connection_session_offer); log!("{}", peer_connection_session_offer);
let data = peer_connection_session_offer; let data = peer_connection_session_offer;
if let Err(err_val) = send_offer(&username, &data).await { if let Err(err_val) = send_offer(&data).await {
log!("Error: Send Offer | {}", err_val) log!("Error: Send Offer | {}", err_val)
} }
loop { loop {
match receive_answer(&username).await { match receive_answer().await {
Ok(received_user_and_signal_answer) => { Ok(received_signal) => {
log!("{:#?}", received_user_and_signal_answer); log!("{:#?}", received_signal);
let peer_connection_session_answer = let peer_connection_session_answer =
RtcSessionDescriptionInit::new(RtcSdpType::Answer); RtcSessionDescriptionInit::new(RtcSdpType::Answer);
peer_connection_session_answer peer_connection_session_answer.set_sdp(received_signal.get_data().as_str());
.set_sdp(received_user_and_signal_answer.signal.get_data().as_str());
sleep(1000).await; sleep(1000).await;
JsFuture::from( JsFuture::from(
peer_connection.set_remote_description(&peer_connection_session_answer), peer_connection.set_remote_description(&peer_connection_session_answer),
@ -118,7 +115,6 @@ pub async fn offer(username: String) -> RtcPeerConnection {
log!("{:#?}", peer_connection.connection_state()); log!("{:#?}", peer_connection.connection_state());
log!("{:#?}", peer_connection.ice_connection_state()); log!("{:#?}", peer_connection.ice_connection_state());
local_ice_candidate_handler(&username, &peer_connection).await;
return peer_connection; return peer_connection;
} }
Err(err_val) => log!("Error: Receive Answer | {}", err_val), Err(err_val) => log!("Error: Receive Answer | {}", err_val),
@ -127,16 +123,16 @@ pub async fn offer(username: String) -> RtcPeerConnection {
} }
} }
pub async fn answer(username: String) -> RtcPeerConnection { pub async fn answer(audio_stream: MediaStream) -> RtcPeerConnection {
loop { loop {
match receive_offer(&username).await { match receive_offer().await {
Ok(received_user_and_signal_offer) => { Ok(received_signal) => {
log!("{:#?}", received_user_and_signal_offer); log!("{:#?}", received_signal);
let peer_connection = create_peer_connection_with_configuration().await; let peer_connection = create_peer_connection_with_configuration().await;
peer_connection.add_stream(&audio_stream);
let peer_connection_session_offer = let peer_connection_session_offer =
RtcSessionDescriptionInit::new(RtcSdpType::Offer); RtcSessionDescriptionInit::new(RtcSdpType::Offer);
peer_connection_session_offer peer_connection_session_offer.set_sdp(received_signal.get_data().as_str());
.set_sdp(received_user_and_signal_offer.signal.get_data().as_str());
JsFuture::from( JsFuture::from(
peer_connection.set_remote_description(&peer_connection_session_offer), peer_connection.set_remote_description(&peer_connection_session_offer),
) )
@ -159,11 +155,12 @@ pub async fn answer(username: String) -> RtcPeerConnection {
log!("{}", session_answer); log!("{}", session_answer);
let data = session_answer; let data = session_answer;
send_answer(&username, &data).await.unwrap(); send_answer(&data).await.unwrap();
log!("{:#?}", peer_connection.connection_state()); log!("{:#?}", peer_connection.connection_state());
log!("{:#?}", peer_connection.ice_connection_state()); log!("{:#?}", peer_connection.ice_connection_state());
local_ice_candidate_handler(&username, &peer_connection).await; local_ice_candidate_handler(&peer_connection).await;
remote_ice_candidate_handler(&peer_connection).await;
return peer_connection; return peer_connection;
} }

View file

@ -1,121 +1,118 @@
use std::sync::LazyLock; use std::{collections::VecDeque, sync::Mutex};
use leptos::logging::log; use leptos::logging::log;
use protocol::{Signal, SignalType, UserAndSignal}; use protocol::{Error, Signal, SignalType};
use reqwest::{Response, header::HeaderMap}; use web_sys::{
use serde_json::{Value, json}; ErrorEvent, MessageEvent, WebSocket, js_sys,
wasm_bindgen::{JsCast, prelude::Closure},
};
const SIGNALLING_ADDRESS: &str = "http://192.168.1.3:4546"; static SIGNALLING_ADDRESS: &str = "ws://192.168.1.3:4546";
static REQUEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(|| reqwest::Client::new());
async fn create_headers(headers: Vec<(&'static str, String)>) -> HeaderMap { thread_local! {
let mut header_map = HeaderMap::new(); static WEBSOCKET_RECEIVER: Mutex<VecDeque<Signal>> =
for (key, val) in headers { Mutex::new(VecDeque::new());
header_map.insert(key, val.parse().unwrap());
static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap();
}
struct SignallingChannel {}
impl SignallingChannel {
fn init() -> Result<WebSocket, Error> {
let web_socket = WebSocket::new(SIGNALLING_ADDRESS).map_err(|_| Error::WebSocketInit)?;
let on_message_callback = Closure::<dyn Fn(_)>::new(move |message_event: MessageEvent| {
if let Ok(received_data) = message_event.data().dyn_into::<js_sys::JsString>() {
if let Some(received_data) = received_data.as_string() {
if let Ok(received_signal) = serde_json::from_str::<Signal>(&received_data) {
WEBSOCKET_RECEIVER.with(|websocket_receiver| {
if let Ok(mut websocket_receiver) = websocket_receiver.lock() {
websocket_receiver.push_back(received_signal);
}
})
}
}
}
});
web_socket.set_onmessage(Some(on_message_callback.as_ref().unchecked_ref()));
on_message_callback.forget();
let on_error_callback = Closure::<dyn Fn(_)>::new(move |error_event: ErrorEvent| {
log!("Error: WebSocket | {:#?}", error_event);
});
web_socket.set_onerror(Some(on_error_callback.as_ref().unchecked_ref()));
on_error_callback.forget();
Ok(web_socket)
} }
header_map
}
async fn post_json(username: &String, json: &Value) -> Result<Response, reqwest::Error> { async fn send(signal: &Signal) -> Result<(), Error> {
let headers = create_headers(vec![]).await; let json = serde_json::json!(signal);
REQUEST_CLIENT WEBSOCKET.with(|websocket| {
.post(SIGNALLING_ADDRESS) websocket
.headers(headers) .send_with_str(&json.to_string())
.bearer_auth(username) .map_err(|_| Error::WebSocketSend)
.json(json) })
.send() }
.await
}
async fn get_json(username: &String, signal_type: SignalType) -> Result<Response, reqwest::Error> { async fn receive(expected_signal_type: &SignalType) -> Result<Signal, Error> {
let headers = create_headers(vec![("EXPECTED_SIGNAL", signal_type.to_string())]).await; WEBSOCKET_RECEIVER.with(|message_queue| {
REQUEST_CLIENT if let Ok(mut message_queue) = message_queue.lock() {
.get(SIGNALLING_ADDRESS) let received_signal = message_queue
.headers(headers) .pop_front()
.bearer_auth(username) .ok_or_else(|| Error::WebSocketReceivedNothing)?;
.send()
.await
}
pub async fn start_signalling(username: String) { if received_signal.get_signal_type() == *expected_signal_type {
log!("Start Signalling"); Ok(received_signal)
log!("{}\n{}", username, SIGNALLING_ADDRESS); } else if received_signal.get_signal_type() == SignalType::Auth {
let auth_signal = Signal::new(&SignalType::Auth, &"".to_owned()); Err(Error::WebSocketAuth)
let json = json!(auth_signal); } else {
match post_json(&username, &json).await { message_queue.clear();
Ok(signal_response) => log!("{:#?}", signal_response), Err(Error::WebSocketReceiveQueueReset)
Err(err_val) => { }
log!("Error: Signal Post | {}", err_val); } else {
} Err(Error::WebSocketReceiveQueueLocked)
}
})
} }
} }
pub async fn send_auth(data: &String) -> Result<(), Error> {
let offer = Signal::new(&SignalType::Auth, data);
pub async fn send_offer(username: &String, data: &String) -> Result<(), reqwest::Error> { SignallingChannel::send(&offer).await
let rtc_session_offer_signal = Signal::new(&SignalType::Offer, data);
let rtc_session_offer_signal = json!(rtc_session_offer_signal);
post_json(username, &rtc_session_offer_signal)
.await
.map(|_| Ok(()))?
} }
pub async fn receive_offer(username: &String) -> Result<UserAndSignal, Box<dyn std::error::Error>> { pub async fn receive_auth() -> Result<Signal, Error> {
let result = get_json(username, SignalType::Offer) SignallingChannel::receive(&SignalType::Auth).await
.await
.map(async |response| response.json::<UserAndSignal>().await)?
.await?;
if result.signal.get_signal_type() == SignalType::Offer {
Ok(result)
} else {
Err(protocol::Error::UnexpectedSignalType(
result.signal.get_signal_type(),
))?
}
} }
pub async fn send_answer(username: &String, data: &String) -> Result<(), reqwest::Error> { pub async fn send_offer(data: &String) -> Result<(), Error> {
let rtc_session_answer_signal = Signal::new(&SignalType::Answer, data); let offer = Signal::new(&SignalType::Offer, data);
let rtc_session_answer_signal = json!(rtc_session_answer_signal);
post_json(username, &rtc_session_answer_signal) SignallingChannel::send(&offer).await
.await
.map(|_| Ok(()))?
} }
pub async fn receive_answer( pub async fn receive_offer() -> Result<Signal, Error> {
username: &String, SignallingChannel::receive(&SignalType::Offer).await
) -> Result<UserAndSignal, Box<dyn std::error::Error>> {
let result = get_json(username, SignalType::Answer)
.await
.map(async |response| response.json::<UserAndSignal>().await)?
.await?;
if result.signal.get_signal_type() == SignalType::Answer {
Ok(result)
} else {
Err(protocol::Error::UnexpectedSignalType(
result.signal.get_signal_type(),
))?
}
} }
pub async fn send_ice_candidate(username: &String, data: &String) -> Result<(), reqwest::Error> { pub async fn send_answer(data: &String) -> Result<(), Error> {
let rtc_session_answer_signal = Signal::new(&SignalType::ICECandidate, data); let offer = Signal::new(&SignalType::Answer, data);
let rtc_session_answer_signal = json!(rtc_session_answer_signal);
post_json(username, &rtc_session_answer_signal) SignallingChannel::send(&offer).await
.await
.map(|_| Ok(()))?
} }
pub async fn receive_ice_candidate( pub async fn receive_answer() -> Result<Signal, Error> {
username: &String, SignallingChannel::receive(&SignalType::Answer).await
) -> Result<UserAndSignal, Box<dyn std::error::Error>> { }
let result = get_json(username, SignalType::Answer)
.await pub async fn send_ice_candidate(data: &String) -> Result<(), Error> {
.map(async |response| response.json::<UserAndSignal>().await)? let offer = Signal::new(&SignalType::ICECandidate, data);
.await?;
if result.signal.get_signal_type() == SignalType::ICECandidate { SignallingChannel::send(&offer).await
Ok(result) }
} else {
Err(protocol::Error::UnexpectedSignalType( pub async fn receive_ice_candidate() -> Result<Signal, Error> {
result.signal.get_signal_type(), SignallingChannel::receive(&SignalType::ICECandidate).await
))?
}
} }

View file

@ -21,8 +21,14 @@ pub struct User {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Error { pub enum Error {
UnexpectedSignalType(SignalType),
InvalidSignalType(String), InvalidSignalType(String),
UnexpectedSignalType(SignalType),
WebSocketInit,
WebSocketAuth,
WebSocketSend,
WebSocketReceivedNothing,
WebSocketReceiveQueueReset,
WebSocketReceiveQueueLocked,
} }
impl std::error::Error for Error { impl std::error::Error for Error {
@ -34,12 +40,20 @@ impl std::error::Error for Error {
impl Display for Error { impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
Error::UnexpectedSignalType(signal_type) => {
write!(f, "Unexpected Signal Type: {}", signal_type)
}
Error::InvalidSignalType(invalid_signal_type) => { Error::InvalidSignalType(invalid_signal_type) => {
write!(f, "Invalid Signal Type: {}", invalid_signal_type) write!(f, "Invalid Signal Type: {}", invalid_signal_type)
} }
Error::UnexpectedSignalType(signal_type) => {
write!(f, "Unexpected Signal Type: {}", signal_type)
}
Error::WebSocketInit => write!(f, "WebSocket Initialization"),
Error::WebSocketAuth => write!(f, "WebSocket Auth"),
Error::WebSocketSend => write!(f, "WebSocket Send"),
Error::WebSocketReceivedNothing => write!(f, "WebSocket Received Nothing"),
Error::WebSocketReceiveQueueReset => write!(f, "WebSocket Receive Queue Reset"),
Error::WebSocketReceiveQueueLocked => write!(f, "WebSocket Receive Queue Locked"),
} }
} }
} }