feat: ✨ websocket split
This commit is contained in:
parent
16481ed7bd
commit
4a695dce40
4 changed files with 44 additions and 19 deletions
|
@ -39,7 +39,11 @@ pub fn app() -> impl IntoView {
|
||||||
let webrtc_answer = webrtc.clone();
|
let webrtc_answer = webrtc.clone();
|
||||||
let answer_button = button()
|
let answer_button = button()
|
||||||
.on(leptos::ev::click, move |_| {
|
.on(leptos::ev::click, move |_| {
|
||||||
log!("{:#?}", webrtc_answer.get_status());
|
let webrtc_answer = webrtc_answer.clone();
|
||||||
|
spawn_local(async move {
|
||||||
|
let answer_result = webrtc_answer.answer().await;
|
||||||
|
log!("Answer Result Is = {:#?}", answer_result);
|
||||||
|
});
|
||||||
})
|
})
|
||||||
.child("Answer");
|
.child("Answer");
|
||||||
|
|
||||||
|
|
|
@ -3,17 +3,6 @@ use std::{fmt::Display, str::FromStr};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct UserAndSignal {
|
|
||||||
pub user: User,
|
|
||||||
pub signal: Signal,
|
|
||||||
}
|
|
||||||
impl UserAndSignal {
|
|
||||||
pub async fn new(user: User, signal: Signal) -> Self {
|
|
||||||
UserAndSignal { user, signal }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
pub struct User {
|
pub struct User {
|
||||||
pub username: String,
|
pub username: String,
|
||||||
|
|
|
@ -7,7 +7,7 @@ edition = "2024"
|
||||||
tokio = { version = "1.42.1", default-features = false, features = ["macros", "rt-multi-thread"] }
|
tokio = { version = "1.42.1", default-features = false, features = ["macros", "rt-multi-thread"] }
|
||||||
axum = "0.8.3"
|
axum = "0.8.3"
|
||||||
tower-http = { version = "0.6.2", default-features = false, features = ["cors"]}
|
tower-http = { version = "0.6.2", default-features = false, features = ["cors"]}
|
||||||
fastwebsockets = { version = "0.10.0", features = ["upgrade", "with_axum"]}
|
fastwebsockets = { version = "0.10.0", features = ["upgrade", "unstable-split", "with_axum"]}
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
|
|
|
@ -1,13 +1,27 @@
|
||||||
|
use std::{collections::VecDeque, sync::LazyLock};
|
||||||
|
|
||||||
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
|
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
|
||||||
use fastwebsockets::{
|
use fastwebsockets::{
|
||||||
OpCode,
|
FragmentCollectorRead, OpCode, WebSocketError,
|
||||||
upgrade::{IncomingUpgrade, UpgradeFut},
|
upgrade::{IncomingUpgrade, UpgradeFut},
|
||||||
};
|
};
|
||||||
use tokio::net::TcpListener;
|
use protocol::{Signal, SignalType};
|
||||||
|
use tokio::{
|
||||||
|
net::TcpListener,
|
||||||
|
sync::{RwLock, broadcast},
|
||||||
|
};
|
||||||
use tower_http::cors::CorsLayer;
|
use tower_http::cors::CorsLayer;
|
||||||
|
|
||||||
const SERVER_ADDRESS: &str = "192.168.1.3:4546";
|
const SERVER_ADDRESS: &str = "192.168.1.3:4546";
|
||||||
|
|
||||||
|
static USER_MESSAGES: LazyLock<RwLock<VecDeque<UserMessages>>> =
|
||||||
|
LazyLock::new(|| VecDeque::new().into());
|
||||||
|
|
||||||
|
struct UserMessages {
|
||||||
|
user: String,
|
||||||
|
message_receiver: broadcast::Receiver<Signal>,
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn start_signalling() {
|
pub async fn start_signalling() {
|
||||||
let router = Router::new()
|
let router = Router::new()
|
||||||
.route("/", get(alive))
|
.route("/", get(alive))
|
||||||
|
@ -32,15 +46,33 @@ async fn signal(websocket: IncomingUpgrade) -> impl IntoResponse {
|
||||||
|
|
||||||
async fn websocket_handler(websocket: UpgradeFut) {
|
async fn websocket_handler(websocket: UpgradeFut) {
|
||||||
let mut websocket = websocket.await.unwrap();
|
let mut websocket = websocket.await.unwrap();
|
||||||
|
|
||||||
websocket.set_auto_pong(true);
|
websocket.set_auto_pong(true);
|
||||||
websocket.set_writev(false);
|
websocket.set_writev(false);
|
||||||
websocket.set_auto_close(true);
|
websocket.set_auto_close(true);
|
||||||
|
|
||||||
while let Ok(received_frame) = websocket.read_frame().await {
|
let (mut websocket_receiver, websocker_sender) = websocket.split(tokio::io::split);
|
||||||
|
let mut user = String::default();
|
||||||
|
let (message_sender, message_receiver) = broadcast::channel(100);
|
||||||
|
|
||||||
|
if let Ok(received_frame) = websocket_receiver
|
||||||
|
.read_frame::<_, WebSocketError>(&mut move |_| async { unreachable!() })
|
||||||
|
.await
|
||||||
|
{
|
||||||
if let OpCode::Text = received_frame.opcode {
|
if let OpCode::Text = received_frame.opcode {
|
||||||
let received_payload = received_frame.payload;
|
let signal =
|
||||||
println!("Sent:\n{:#?}", received_payload);
|
serde_json::from_slice::<Signal>(&received_frame.payload.to_vec()).unwrap();
|
||||||
|
|
||||||
|
if signal.get_signal_type() == SignalType::Auth && user == String::default() {
|
||||||
|
let new_user = UserMessages {
|
||||||
|
user: signal.get_data(),
|
||||||
|
message_receiver,
|
||||||
|
};
|
||||||
|
user = new_user.user.to_owned();
|
||||||
|
USER_MESSAGES.write().await.push_back(new_user);
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
println!("{:#?}", signal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue