diff --git a/client/Cargo.toml b/client/Cargo.toml index 68d0e8b..d71004d 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" console_error_panic_hook = "0.1.7" leptos = { version = "0.7.8", features = ["csr"] } wasm-bindgen-futures = "0.4.50" +async-channel = "2.3.1" reqwest = { version = "0.12.15", features = ["json"] } web-sys = { version = "0.3.77", features = [ "AudioBuffer", diff --git a/client/src/gui.rs b/client/src/gui.rs index ac87cba..9d53442 100644 --- a/client/src/gui.rs +++ b/client/src/gui.rs @@ -1,41 +1,48 @@ +use std::sync::Arc; + use leptos::{ - IntoView, ev, - html::{ElementChild, button}, + IntoView, + html::{ElementChild, button, label}, + logging::log, prelude::{OnAttribute, Read, Show, ShowProps, ToChildren}, server::LocalResource, }; +use wasm_bindgen_futures::spawn_local; use crate::{media::audio, webrtc::WebRTC}; pub fn app() -> impl IntoView { let audio_stream = LocalResource::new(|| audio()); - let offer_props = ShowProps::builder() + + let props = ShowProps::builder() .when(move || audio_stream.read().is_some()) .children(ToChildren::to_children(move || { - button() - .on(ev::click, move |_| { - WebRTC::init(Some(audio_stream), None, None); - LocalResource::new(|| WebRTC::offer()); + let audio_stream = audio_stream.read(); + let audio_stream = audio_stream.as_deref().unwrap().clone(); + + let webrtc = WebRTC::new(Some(audio_stream), None, None).unwrap(); + let webrtc = Arc::new(webrtc); + let webrtc_init = webrtc.clone(); + spawn_local(async move { webrtc_init.init().await }); + + let webrtc_offer = webrtc.clone(); + let offer_button = button() + .on(leptos::ev::click, move |_| { + log!("{:#?}", webrtc_offer.get_status()); }) - .child("Offer") - .into_view() + .child("Offer"); + + let webrtc_answer = webrtc.clone(); + let answer_button = button() + .on(leptos::ev::click, move |_| { + log!("{:#?}", webrtc_answer.get_status()); + }) + .child("Answer"); + + (offer_button, answer_button) })) - .fallback(|| button().child("Sad Offer Button")) + .fallback(|| label().child("NOOOOOOOOOOOO")) .build(); - let answer_props = ShowProps::builder() - .when(move || audio_stream.read().is_some()) - .children(ToChildren::to_children(move || { - button() - .on(ev::click, move |_| { - WebRTC::init(Some(audio_stream), None, None); - LocalResource::new(|| WebRTC::answer()); - }) - .child("Answer") - .into_view() - })) - .fallback(|| button().child("Sad Answer Button")) - .build(); - - (Show(offer_props), Show(answer_props)) + Show(props) } diff --git a/client/src/signal.rs b/client/src/signal.rs index 1533d49..bd79e13 100644 --- a/client/src/signal.rs +++ b/client/src/signal.rs @@ -1,7 +1,7 @@ -use std::sync::mpsc; - +use async_channel::{Receiver, Sender}; use leptos::logging::log; use protocol::{Error, Signal, SignalType}; +use wasm_bindgen_futures::spawn_local; use web_sys::{ ErrorEvent, MessageEvent, WebSocket, js_sys, wasm_bindgen::{JsCast, prelude::Closure}, @@ -12,9 +12,9 @@ static SIGNALLING_ADDRESS: &str = "ws://192.168.1.3:4546"; thread_local! { static WEBSOCKET: WebSocket = SignallingChannel::init().unwrap(); -static OFFER_CHANNEL: (mpsc::Sender, mpsc::Receiver) = mpsc::channel(); -static ANSWER_CHANNEL: (mpsc::Sender, mpsc::Receiver) = mpsc::channel(); -static ICE_CANDIDATE_CHANNEL: (mpsc::Sender, mpsc::Receiver) = mpsc::channel(); +static OFFER_CHANNEL: (Sender, Receiver) = async_channel::unbounded(); +static ANSWER_CHANNEL: (Sender, Receiver) = async_channel::unbounded(); +static ICE_CANDIDATE_CHANNEL: (Sender, Receiver) = async_channel::unbounded(); } struct SignallingChannel {} @@ -28,21 +28,34 @@ impl SignallingChannel { if let Ok(received_data) = message_event.data().dyn_into::() { if let Some(received_data) = received_data.as_string() { if let Ok(received_signal) = serde_json::from_str::(&received_data) { - if received_signal.get_signal_type() == SignalType::ICECandidate { - ICE_CANDIDATE_CHANNEL.with(|ice_candidate_channel| { - ice_candidate_channel - .0 + let received_signal_type = received_signal.get_signal_type(); + if received_signal_type == SignalType::ICECandidate { + let ice_candidate_channel_sender = ICE_CANDIDATE_CHANNEL + .with(|ice_candidate_channel| ice_candidate_channel.0.clone()); + spawn_local(async move { + ice_candidate_channel_sender .send(received_signal) - .expect("Never"); - }) - } else if received_signal.get_signal_type() == SignalType::Offer { - OFFER_CHANNEL.with(|offer_channel| { - offer_channel.0.send(received_signal).expect("Never"); - }) - } else if received_signal.get_signal_type() == SignalType::Answer { - ANSWER_CHANNEL.with(|answer_channel| { - answer_channel.0.send(received_signal).expect("Never"); - }) + .await + .expect("Never") + }); + } else if received_signal_type == SignalType::Offer { + let offer_channel_sender = + ICE_CANDIDATE_CHANNEL.with(|offer_channel| offer_channel.0.clone()); + spawn_local(async move { + offer_channel_sender + .send(received_signal) + .await + .expect("Never") + }); + } else if received_signal_type == SignalType::Answer { + let answer_channel_sender = ICE_CANDIDATE_CHANNEL + .with(|answer_channel| answer_channel.0.clone()); + spawn_local(async move { + answer_channel_sender + .send(received_signal) + .await + .expect("Never") + }); } } } @@ -69,31 +82,28 @@ impl SignallingChannel { }) } - fn receive_offer() -> Result { - OFFER_CHANNEL.with(|offer_channel| { - offer_channel - .1 - .recv() - .map_err(|err_val| Error::OfferChannelReceive(err_val.to_string())) - }) + async fn receive_offer() -> Result { + OFFER_CHANNEL + .with(|offer_channel| offer_channel.1.clone()) + .recv() + .await + .map_err(|err_val| Error::OfferChannelReceive(err_val.to_string())) } - fn receive_answer() -> Result { - ANSWER_CHANNEL.with(|answer_channel| { - answer_channel - .1 - .recv() - .map_err(|err_val| Error::AnswerChannelReceive(err_val.to_string())) - }) + async fn receive_answer() -> Result { + ANSWER_CHANNEL + .with(|answer_channel| answer_channel.1.clone()) + .recv() + .await + .map_err(|err_val| Error::OfferChannelReceive(err_val.to_string())) } - fn receive_ice_candidate() -> Result { - ICE_CANDIDATE_CHANNEL.with(|ice_candidate_channel| { - ice_candidate_channel - .1 - .recv() - .map_err(|err_val| Error::ICECandidateChannelReceive(err_val.to_string())) - }) + async fn receive_ice_candidate() -> Result { + ICE_CANDIDATE_CHANNEL + .with(|ice_candidate_channel| ice_candidate_channel.1.clone()) + .recv() + .await + .map_err(|err_val| Error::OfferChannelReceive(err_val.to_string())) } } pub fn send_auth(data: &String) -> Result<(), Error> { @@ -108,8 +118,8 @@ pub fn send_offer(data: &String) -> Result<(), Error> { SignallingChannel::send(&offer) } -pub fn receive_offer() -> Result { - SignallingChannel::receive_offer() +pub async fn receive_offer() -> Result { + SignallingChannel::receive_offer().await } pub fn send_answer(data: &String) -> Result<(), Error> { @@ -118,8 +128,8 @@ pub fn send_answer(data: &String) -> Result<(), Error> { SignallingChannel::send(&offer) } -pub fn receive_answer() -> Result { - SignallingChannel::receive_answer() +pub async fn receive_answer() -> Result { + SignallingChannel::receive_answer().await } pub fn send_ice_candidate(data: &String) -> Result<(), Error> { @@ -128,6 +138,6 @@ pub fn send_ice_candidate(data: &String) -> Result<(), Error> { SignallingChannel::send(&offer) } -pub fn receive_ice_candidate() -> Result { - SignallingChannel::receive_ice_candidate() +pub async fn receive_ice_candidate() -> Result { + SignallingChannel::receive_ice_candidate().await } diff --git a/client/src/webrtc.rs b/client/src/webrtc.rs index 4afda2b..5b604b7 100644 --- a/client/src/webrtc.rs +++ b/client/src/webrtc.rs @@ -1,9 +1,10 @@ -use leptos::{logging::log, prelude::Get, server::LocalResource}; +use leptos::logging::log; use protocol::Error; -use wasm_bindgen_futures::{JsFuture, spawn_local}; +use wasm_bindgen_futures::JsFuture; use web_sys::{ MediaStream, RtcConfiguration, RtcIceCandidate, RtcIceCandidateInit, RtcIceServer, - RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit, + RtcPeerConnection, RtcPeerConnectionIceEvent, RtcPeerConnectionState, RtcSdpType, + RtcSessionDescriptionInit, js_sys::{Array, Reflect}, wasm_bindgen::{JsCast, JsValue, prelude::Closure}, }; @@ -15,14 +16,17 @@ use crate::signal::{ pub struct WebRTC { peer_connection: RtcPeerConnection, -} - -thread_local! { - pub static WEBRTC:WebRTC = WebRTC::new().unwrap(); + audio_stream: Option, + video_stream: Option, + screen_stream: Option, } impl WebRTC { - fn new() -> Result { + pub fn new( + audio_stream: Option, + video_stream: Option, + screen_stream: Option, + ) -> Result { let ice_server_addresses = vec![JsValue::from("stun:stun.l.google.com:19302")] .into_iter() .collect::(); @@ -48,74 +52,60 @@ impl WebRTC { peer_connection.set_onicecandidate(Some(on_ice_candidate.as_ref().unchecked_ref())); on_ice_candidate.forget(); - let webrtc = Self { peer_connection }; + let webrtc = Self { + peer_connection, + audio_stream, + video_stream, + screen_stream, + }; + webrtc.add_streams(); Ok(webrtc) } - pub fn init( - audio_stream: Option>, - video_stream: Option>, - screen_stream: Option>, - ) { - Self::add_streams(audio_stream, video_stream, screen_stream); - - spawn_local(async { - while let Ok(received_ice_candidate) = receive_ice_candidate() { - let received_ice_candidate = - RtcIceCandidateInit::new(&received_ice_candidate.get_data()); - if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) { - WEBRTC.with(|webrtc| { - let add_received_ice_candidate_promise = webrtc - .peer_connection - .add_ice_candidate_with_opt_rtc_ice_candidate(Some( - &received_ice_candidate, - )); - spawn_local(async move { - if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise) - .await - .map_err(|_| Error::ICECandidateAdd) - { - log!("Error: Add ICE Candidate | {}", err_val); - } - }); - }); + pub async fn init(&self) { + while let Ok(received_ice_candidate) = receive_ice_candidate().await { + let received_ice_candidate = + RtcIceCandidateInit::new(&received_ice_candidate.get_data()); + if let Ok(received_ice_candidate) = RtcIceCandidate::new(&received_ice_candidate) { + let add_received_ice_candidate_promise = self + .peer_connection + .add_ice_candidate_with_opt_rtc_ice_candidate(Some(&received_ice_candidate)); + if let Err(err_val) = JsFuture::from(add_received_ice_candidate_promise) + .await + .map_err(|_| Error::ICECandidateAdd) + { + log!("Error: Add ICE Candidate | {}", err_val); } } - }); + } } - fn add_streams( - audio_stream: Option>, - video_stream: Option>, - screen_stream: Option>, - ) { - WEBRTC.with(|webrtc| { - if let Some(audio_stream) = audio_stream { - if let Some(audio_stream) = audio_stream.get() { - webrtc.peer_connection.add_stream(&audio_stream); - } - } - if let Some(video_stream) = video_stream { - if let Some(video_stream) = video_stream.get() { - webrtc.peer_connection.add_stream(&video_stream); - } - } - if let Some(screen_stream) = screen_stream { - if let Some(screen_stream) = screen_stream.get() { - webrtc.peer_connection.add_stream(&screen_stream); - } - } - }); + pub fn get_status(&self) -> RtcPeerConnectionState { + self.peer_connection.connection_state() } - pub async fn offer() -> Result<(), Error> { - let offer_promise = WEBRTC.with(|webrtc| webrtc.peer_connection.create_offer()); + fn add_streams(&self) { + if let Some(audio_stream) = self.audio_stream.as_ref() { + self.peer_connection.add_stream(&audio_stream); + } + if let Some(video_stream) = self.video_stream.as_ref() { + self.peer_connection.add_stream(&video_stream); + } + if let Some(screen_stream) = self.screen_stream.as_ref() { + self.peer_connection.add_stream(&screen_stream); + } + } + + pub async fn offer(&self) -> Result<(), Error> { + log!("Offer Function"); + let offer_promise = self.peer_connection.create_offer(); match JsFuture::from(offer_promise) .await .map_err(|_| Error::WebRTCOffer) { Ok(offer) => { + log!("Offer Created"); let offer_session_description_protocol = Reflect::get(&offer, &JsValue::from_str("sdp")) .map_err(|_| Error::WebRTCSessionDescriptionProtocol)?; @@ -126,22 +116,22 @@ impl WebRTC { Ok(offer_session_description_protocol) => { let offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer); offer.set_sdp(&offer_session_description_protocol); - let set_local_description_promise = WEBRTC - .with(|webrtc| webrtc.peer_connection.set_local_description(&offer)); + let set_local_description_promise = + self.peer_connection.set_local_description(&offer); JsFuture::from(set_local_description_promise) .await .map_err(|_| Error::WebRTCSetLocalDescription)?; + log!("Before Sent Offer"); send_offer(&offer_session_description_protocol)?; - - if let Ok(received_answer) = receive_answer() { + log!("After Sent Offer"); + if let Ok(received_answer) = receive_answer().await { let answer = RtcSessionDescriptionInit::new(RtcSdpType::Answer); answer.set_sdp(&received_answer.get_data()); - let set_remote_description_promise = WEBRTC.with(|webrtc| { - webrtc.peer_connection.set_remote_description(&answer) - }); + let set_remote_description_promise = + self.peer_connection.set_remote_description(&answer); JsFuture::from(set_remote_description_promise) .await .map_err(|_| Error::WebRTCSetRemoteDescription)?; @@ -164,18 +154,18 @@ impl WebRTC { return Err(Error::WebRTCOffer); } - pub async fn answer() -> Result<(), Error> { - if let Ok(received_offer) = receive_offer() { + pub async fn answer(&self) -> Result<(), Error> { + if let Ok(received_offer) = receive_offer().await { let offer = RtcSessionDescriptionInit::new(RtcSdpType::Offer); offer.set_sdp(&received_offer.get_data()); let set_remote_description_promise = - WEBRTC.with(|webrtc| webrtc.peer_connection.set_remote_description(&offer)); + self.peer_connection.set_remote_description(&offer); JsFuture::from(set_remote_description_promise) .await .map_err(|_| Error::WebRTCSetRemoteDescription)?; - let answer_promise = WEBRTC.with(|webrtc| webrtc.peer_connection.create_answer()); + let answer_promise = self.peer_connection.create_answer(); match JsFuture::from(answer_promise) .await .map_err(|_| Error::WebRTCAnswer) @@ -192,9 +182,8 @@ impl WebRTC { let answer = RtcSessionDescriptionInit::new(RtcSdpType::Answer); answer.set_sdp(&answer_session_description_protocol); - let set_local_description_promise = WEBRTC.with(|webrtc| { - webrtc.peer_connection.set_local_description(&answer) - }); + let set_local_description_promise = + self.peer_connection.set_local_description(&answer); JsFuture::from(set_local_description_promise) .await .map_err(|_| Error::WebRTCSetLocalDescription)?; diff --git a/server/Cargo.toml b/server/Cargo.toml index 89c393f..9b63cb4 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -4,13 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] -axum = { version = "0.8.3", features = ["json"] } -axum-macros = "0.5.0" -tokio = "1.42.1" -tower-http = { version = "0.6.2", features = ["cors", "trace"] } -tracing-subscriber = "0.3.19" -tracing = "0.1.41" -webrtc = "0.12.0" +tokio = { version = "1.42.1", default-features = false, features = ["macros", "rt-multi-thread"] } +fastwebsockets = "0.10.0" serde = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } diff --git a/server/src/lib.rs b/server/src/lib.rs index 62bc3f4..10f7339 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,75 +1,2 @@ -use std::sync::LazyLock; - -use utils::naive_toml_parser; - -mod middleware; pub mod signal; pub mod utils; - -const SERVER_CONFIG_FILE_LOCATION: &str = "./configs/server_config.toml"; -const DATABASE_CONFIG_FILE_LOCATION: &str = "./configs/database_config.toml"; - -pub static SERVER_CONFIG: LazyLock = LazyLock::new(ServerConfig::default); - -#[derive(Debug)] -pub struct DatabaseConfig { - pub address: String, - pub username: String, - pub password: String, - pub database: String, - pub backend: String, - pub connection_pool_size: u32, -} -impl Default for DatabaseConfig { - fn default() -> Self { - let (header, mut database_configs) = naive_toml_parser(DATABASE_CONFIG_FILE_LOCATION); - - if header == "[database_config]" { - Self { - address: database_configs.pop_front().unwrap().parse().unwrap(), - username: database_configs.pop_front().unwrap().parse().unwrap(), - password: database_configs.pop_front().unwrap().parse().unwrap(), - database: database_configs.pop_front().unwrap().parse().unwrap(), - backend: database_configs.pop_front().unwrap().parse().unwrap(), - connection_pool_size: database_configs.pop_front().unwrap().parse().unwrap(), - } - } else { - panic!("Database Config File Must Include [database_config] at the First Line") - } - } -} - -#[derive(Debug)] -pub struct ServerConfig { - pub address: String, - pub otp_time_limit: usize, - pub login_token_expiration_time_limit: usize, - pub login_token_refresh_time_limit: usize, - pub concurrency_limit: usize, -} - -impl Default for ServerConfig { - fn default() -> Self { - let (header, mut server_configs) = naive_toml_parser(SERVER_CONFIG_FILE_LOCATION); - let value_or_max = |value: String| value.parse().map_or(usize::MAX, |value| value); - let value_or_semaphore_max = |value: String| { - value - .parse() - .map_or(tokio::sync::Semaphore::MAX_PERMITS, |value| value) - }; - - if header == "[server_config]" { - Self { - address: server_configs.pop_front().unwrap().parse().unwrap(), - otp_time_limit: value_or_max(server_configs.pop_front().unwrap()), - login_token_expiration_time_limit: value_or_max( - server_configs.pop_front().unwrap(), - ), - login_token_refresh_time_limit: value_or_max(server_configs.pop_front().unwrap()), - concurrency_limit: value_or_semaphore_max(server_configs.pop_front().unwrap()), - } - } else { - panic!("Server Config File Must Include [server_config] at the First Line") - } - } -} diff --git a/server/src/main.rs b/server/src/main.rs index 26ac281..7974ceb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,11 +1,7 @@ use rust_communication_server::signal::start_signalling; -use tracing::Level; #[tokio::main] async fn main() { println!("Hello, world!"); - tracing_subscriber::fmt() - .with_max_level(Level::TRACE) - .init(); start_signalling().await; } diff --git a/server/src/middleware.rs b/server/src/middleware.rs deleted file mode 100644 index a05e586..0000000 --- a/server/src/middleware.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::{str::FromStr, sync::Arc}; - -use axum::{ - extract::Request, - http::{self, HeaderMap, StatusCode}, - middleware::Next, - response::IntoResponse, -}; -use protocol::{SignalType, User}; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct UserAndExpectedSignal { - pub user: User, - pub expected_signal: SignalType, -} - -async fn extract_user_from_authorization_header(headers: &HeaderMap) -> Option { - if let Some(authorization_header) = headers.get(http::header::AUTHORIZATION) { - dbg!(authorization_header); - if let Ok(authorization_header) = authorization_header.to_str() { - if let Some((bearer, authorization_token)) = authorization_header.split_once(' ') { - println!( - "Info: Extraction | Authorization Header | {} || {}", - bearer, authorization_token - ); - if bearer.to_lowercase() == "bearer" { - let user = User { - username: authorization_token.to_string(), - }; - return Some(user); - } - } - } - } - None -} - -pub async fn verify_then_get_user(mut request: Request, next: Next) -> impl IntoResponse { - let headers = request.headers(); - println!("Info: Verify | Headers| {:#?}", headers); - if let Some(user) = extract_user_from_authorization_header(headers).await { - let user = Arc::new(user); - request.extensions_mut().insert(user); - return next.run(request).await; - } - - StatusCode::FORBIDDEN.into_response() -} - -pub async fn verify_then_get_user_and_expected_signal( - mut request: Request, - next: Next, -) -> impl IntoResponse { - let headers = request.headers(); - if let Some(user) = extract_user_from_authorization_header(headers).await { - if let Ok(expected_signal) = headers.get("EXPECTED_SIGNAL").unwrap().to_str() { - match SignalType::from_str(expected_signal) { - Ok(expected_signal) => { - let user_and_expected_signal = UserAndExpectedSignal { - user, - expected_signal, - }; - let user_and_expected_signal = Arc::new(user_and_expected_signal); - request.extensions_mut().insert(user_and_expected_signal); - next.run(request).await - } - Err(err_val) => { - eprintln!( - "Error: Verify and Get Expected Signal | Signal Type Conversion | {}", - err_val - ); - StatusCode::BAD_REQUEST.into_response() - } - } - } else { - StatusCode::BAD_REQUEST.into_response() - } - } else { - StatusCode::FORBIDDEN.into_response() - } -} diff --git a/server/src/signal.rs b/server/src/signal.rs index 959fc26..05efe7b 100644 --- a/server/src/signal.rs +++ b/server/src/signal.rs @@ -1,82 +1,25 @@ -use std::sync::{Arc, LazyLock, RwLock}; - -use axum::{ - Extension, Json, Router, - http::StatusCode, - response::IntoResponse, - routing::{get, post}, -}; -use axum_macros::debug_handler; -use protocol::{Signal, User, UserAndSignal}; +use fastwebsockets::{FragmentCollector, OpCode, Role, WebSocket}; use tokio::net::TcpListener; -use tower_http::{cors::CorsLayer, trace::TraceLayer}; -use crate::middleware::{ - UserAndExpectedSignal, verify_then_get_user, verify_then_get_user_and_expected_signal, -}; - -static USERS_AND_SIGNALS: LazyLock>> = - LazyLock::new(|| RwLock::new(vec![])); +const SERVER_ADDRESS: &str = "192.168.1.3:4546"; pub async fn start_signalling() { - let route = route() - .layer(CorsLayer::permissive()) - .layer(TraceLayer::new_for_http()); - let listener = TcpListener::bind("192.168.1.3:4546").await.unwrap(); - println!("http://192.168.1.3:4546"); - axum::serve(listener, route).await.unwrap(); -} + let tcp_listener = TcpListener::bind(SERVER_ADDRESS).await.unwrap(); + while let Ok((tcp_stream, client_address)) = tcp_listener.accept().await { + let mut websocket = WebSocket::after_handshake(tcp_stream, Role::Server); + websocket.set_writev(false); + websocket.set_auto_close(true); + websocket.set_auto_pong(true); + let mut websocket = FragmentCollector::new(websocket); -fn route() -> Router { - Router::new() - .route("/alive", get(alive)) - .route( - "/", - get(read_signal).route_layer(axum::middleware::from_fn( - verify_then_get_user_and_expected_signal, - )), - ) - .route( - "/", - post(create_signal).route_layer(axum::middleware::from_fn(verify_then_get_user)), - ) -} - -async fn alive() -> impl IntoResponse { - StatusCode::OK -} - -#[debug_handler] -async fn create_signal( - Extension(user): Extension>, - Json(signal): Json, -) -> impl IntoResponse { - let user = (*user).clone(); - let user_and_signal = UserAndSignal::new(user, signal).await; - USERS_AND_SIGNALS.write().unwrap().push(user_and_signal); - StatusCode::OK -} - -#[debug_handler] -async fn read_signal( - Extension(user_and_expected_signal): Extension>, -) -> impl IntoResponse { - let mut target_index = None; - let mut json_body = serde_json::json!(""); - for (index, user_and_signal) in USERS_AND_SIGNALS.read().unwrap().iter().enumerate() { - if user_and_signal.signal.get_signal_type() == user_and_expected_signal.expected_signal - && user_and_signal.user != user_and_expected_signal.user - { - json_body = serde_json::json!(user_and_signal); - target_index = Some(index); + while let Ok(received_frame) = websocket.read_frame().await { + if let OpCode::Text = received_frame.opcode { + let received_payload = received_frame.payload; + println!( + "Client: {:#?} | Sent:\n{:#?}", + client_address, received_payload + ); + } } } - - match target_index { - Some(target_index) => { - USERS_AND_SIGNALS.write().unwrap().remove(target_index); - (StatusCode::OK, Json(json_body)).into_response() - } - None => StatusCode::BAD_REQUEST.into_response(), - } }