feat: client side basics

feat:  websocket
This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-05-24 03:19:35 +03:00
parent 7f31e76881
commit 35c1207cff
8 changed files with 207 additions and 91 deletions

View file

@ -7,7 +7,9 @@ edition = "2021"
[dependencies] [dependencies]
chrono = "0.4.38" chrono = "0.4.38"
futures-util = "0.3.30"
serde = { version = "1.0.202", features = ["derive"] } serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117" serde_json = "1.0.117"
sha3 = "0.10.8" sha3 = "0.10.8"
tokio = { version = "1.37.0", features = ["full"] } tokio = { version = "1.37.0", features = ["full"] }
tokio-tungstenite = "0.21.0"

View file

@ -0,0 +1,2 @@
server_address:127.0.0.1
port:2434

58
src/client_network.rs Normal file
View file

@ -0,0 +1,58 @@
use futures_util::{stream::SplitStream, StreamExt};
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use crate::ClientConfig;
pub async fn start_network(client_config: ClientConfig) {
let ws_stream = match connect_async(format!(
"ws://{}:{}",
client_config.server_address, client_config.port
))
.await
{
Ok(ws_stream) => ws_stream,
Err(_) => return,
};
let (_, ws_receiver) = ws_stream.0.split();
sync(ws_receiver).await;
}
async fn sync(ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
let mut ws_stream = match receive_blockchain(ws_stream_receiver).await {
Some(ws_stream) => ws_stream,
None => return,
};
loop {
ws_stream = match receive_block(ws_stream).await {
Some(ws_stream) => ws_stream,
None => return,
}
}
}
async fn receive_blockchain(mut ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) -> Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>> {
match ws_stream_receiver.next().await {
Some(message) => match message {
Ok(message) => {
println!("{}", message);
Some(ws_stream_receiver)
},
Err(_) => return None,
},
None => return None,
}
}
async fn receive_block(mut ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) -> Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>> {
match ws_stream_receiver.next().await {
Some(message) => match message {
Ok(message) => {
println!("{}", message);
Some(ws_stream_receiver)
},
Err(_) => return None,
},
None => return None,
}
}

View file

@ -2,7 +2,8 @@ use std::net::IpAddr;
pub mod block; pub mod block;
pub mod blockchain; pub mod blockchain;
pub mod network; pub mod client_network;
pub mod server_network;
mod test; mod test;
pub mod utils; pub mod utils;
@ -15,3 +16,7 @@ pub struct ServerConfig {
pub port: u16, pub port: u16,
pub difficulty: u8, pub difficulty: u8,
} }
pub struct ClientConfig {
pub server_address: IpAddr,
pub port: u16,
}

View file

@ -1,7 +1,7 @@
use rust_blockchain::{ use rust_blockchain::{
blockchain::BlockChain, blockchain::BlockChain,
network::start_network, client_network, server_network,
utils::{read_server_config, take_args}, utils::{read_client_config, read_server_config, take_args},
Runner, Runner,
}; };
use tokio::sync::broadcast; use tokio::sync::broadcast;
@ -13,10 +13,11 @@ async fn main() {
match take_args() { match take_args() {
Some(runner) => match runner { Some(runner) => match runner {
Runner::Server => server().await, Runner::Server => server().await,
Runner::Client => todo!(), Runner::Client => client().await,
}, },
None => return, None => return,
}; };
todo!("Limbo Block: Not in chain, but processing by others or none. Sync it also")
} }
async fn server() { async fn server() {
@ -27,10 +28,18 @@ async fn server() {
let blockchain = BlockChain::new(server_config.difficulty.into()); let blockchain = BlockChain::new(server_config.difficulty.into());
let block_data_channel_sender = broadcast::channel(1).0; let block_data_channel_sender = broadcast::channel(1).0;
start_network( server_network::start_network(
server_config, server_config,
&blockchain, &blockchain,
block_data_channel_sender.subscribe(), block_data_channel_sender.subscribe(),
) )
.await; .await;
} }
async fn client() {
let client_config = match read_client_config() {
Some(client_config) => client_config,
None => return,
};
client_network::start_network(client_config).await;
}

View file

@ -1,81 +0,0 @@
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
sync::broadcast::Receiver,
};
use crate::{block::Block, blockchain::BlockChain, ServerConfig};
pub async fn start_network(
server_config: ServerConfig,
blockchain: &BlockChain,
block_data_channel_receiver: Receiver<Block>,
) {
let listener_socket = match TcpListener::bind(format!(
"{}:{}",
server_config.server_address, server_config.port
))
.await
{
Ok(listener_socket) => listener_socket,
Err(_) => return,
};
loop {
match listener_socket.accept().await {
Ok(connection) => {
tokio::spawn(sync(
connection.0,
blockchain.clone(),
block_data_channel_receiver.resubscribe(),
));
}
Err(_) => {}
}
}
}
async fn sync(
tcp_stream: TcpStream,
blockchain: BlockChain,
block_data_channel_receiver: Receiver<Block>,
) {
let tcp_stream = send_blockchain(tcp_stream, blockchain).await;
send_block(tcp_stream, block_data_channel_receiver).await;
}
async fn send_blockchain(mut tcp_stream: TcpStream, blockchain: BlockChain) -> TcpStream {
let blockchain_data = serde_json::json!({
"blockchain": blockchain
})
.to_string();
match tcp_stream.write_all(&blockchain_data.as_bytes()).await {
Ok(_) => match tcp_stream.flush().await {
Ok(_) => {}
Err(_) => {}
},
Err(_) => {}
}
tcp_stream
}
async fn send_block(mut tcp_stream: TcpStream, mut block_data_channel_receiver: Receiver<Block>) {
loop {
match block_data_channel_receiver.recv().await {
Ok(block) => {
let block_data = serde_json::json!({
"block": block
})
.to_string();
match tcp_stream.write_all(&block_data.as_bytes()).await {
Ok(_) => match tcp_stream.flush().await {
Ok(_) => {}
Err(_) => {}
},
Err(_) => {}
}
}
Err(_) => {}
}
}
}

88
src/server_network.rs Normal file
View file

@ -0,0 +1,88 @@
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use tokio::{
net::TcpListener,
sync::broadcast::Receiver,
};
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
use crate::{block::Block, blockchain::BlockChain, ServerConfig};
pub async fn start_network(
server_config: ServerConfig,
blockchain: &BlockChain,
block_data_channel_receiver: Receiver<Block>,
) {
let listener_socket = match TcpListener::bind(format!(
"{}:{}",
server_config.server_address, server_config.port
))
.await
{
Ok(listener_socket) => listener_socket,
Err(_) => return,
};
loop {
if let Ok(connection) = listener_socket.accept().await {
let ws_stream = match accept_async(connection.0).await {
Ok(ws_stream) => ws_stream,
Err(_) => return ,
};
let (ws_stream_sender, _) = ws_stream.split();
tokio::spawn(sync(
ws_stream_sender,
blockchain.clone(),
block_data_channel_receiver.resubscribe(),
));
}
}
}
async fn sync(
ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
blockchain: BlockChain,
block_data_channel_receiver: Receiver<Block>,
) {
let ws_stream_sender = match send_blockchain(ws_stream_sender, blockchain).await {
Some(ws_stream_sender) => ws_stream_sender,
None => return,
};
send_blocks(ws_stream_sender, block_data_channel_receiver).await;
}
async fn send_blockchain(mut ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, blockchain: BlockChain) -> Option<SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>> {
let blockchain_data = serde_json::json!({
"blockchain": blockchain
})
.to_string();
match ws_stream_sender.send(blockchain_data.into()).await {
Ok(_) => {
match ws_stream_sender.flush().await {
Ok(_) => Some(ws_stream_sender),
Err(_) => None,
}
},
Err(_) => None,
}
}
async fn send_blocks(mut ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, mut block_data_channel_receiver: Receiver<Block>) {
loop {
match block_data_channel_receiver.recv().await {
Ok(block) => {
let block_data = serde_json::json!({
"block": block
})
.to_string();
match ws_stream_sender.send(block_data.into()).await {
Ok(_) => {
if ws_stream_sender.flush().await.is_err() {
return;
}
}
Err(_) => return,
}
}
Err(_) => return,
}
}
}

View file

@ -1,6 +1,6 @@
use std::{env, fs::File, io::Read}; use std::{env, fs::File, io::Read};
use crate::{Runner, ServerConfig}; use crate::{ClientConfig, Runner, ServerConfig};
pub fn take_args() -> Option<Runner> { pub fn take_args() -> Option<Runner> {
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
@ -24,22 +24,22 @@ pub fn read_server_config() -> Option<ServerConfig> {
match server_config_file.read_to_string(&mut server_configs) { match server_config_file.read_to_string(&mut server_configs) {
Ok(_) => { Ok(_) => {
let server_configs: Vec<String> = let server_configs: Vec<String> =
server_configs.split("\n").map(|x| x.to_string()).collect(); server_configs.split('\n').map(|x| x.to_string()).collect();
let server_address = match server_configs[0].split(":").last() { let server_address = match server_configs[0].split(':').last() {
Some(server_address_unchecked) => match server_address_unchecked.parse() { Some(server_address_unchecked) => match server_address_unchecked.parse() {
Ok(server_address) => server_address, Ok(server_address) => server_address,
Err(_) => return None, Err(_) => return None,
}, },
None => return None, None => return None,
}; };
let port = match server_configs[1].split(":").last() { let port = match server_configs[1].split(':').last() {
Some(port_unchecked) => match port_unchecked.parse() { Some(port_unchecked) => match port_unchecked.parse() {
Ok(port) => port, Ok(port) => port,
Err(_) => return None, Err(_) => return None,
}, },
None => return None, None => return None,
}; };
let difficulty = match server_configs[2].split(":").last() { let difficulty = match server_configs[2].split(':').last() {
Some(difficulty_unchecked) => match difficulty_unchecked.parse() { Some(difficulty_unchecked) => match difficulty_unchecked.parse() {
Ok(difficulty) => difficulty, Ok(difficulty) => difficulty,
Err(_) => return None, Err(_) => return None,
@ -55,3 +55,36 @@ pub fn read_server_config() -> Option<ServerConfig> {
Err(_) => None, Err(_) => None,
} }
} }
pub fn read_client_config() -> Option<ClientConfig> {
let mut client_config_file = match File::open("configs/server_config.txt") {
Ok(client_config_file) => client_config_file,
Err(_) => return None,
};
let mut client_configs = String::new();
match client_config_file.read_to_string(&mut client_configs) {
Ok(_) => {
let client_configs: Vec<String> =
client_configs.split('\n').map(|x| x.to_string()).collect();
let server_address = match client_configs[0].split(':').last() {
Some(server_address_unchecked) => match server_address_unchecked.parse() {
Ok(server_address) => server_address,
Err(_) => return None,
},
None => return None,
};
let port = match client_configs[1].split(':').last() {
Some(port_unchecked) => match port_unchecked.parse() {
Ok(port) => port,
Err(_) => return None,
},
None => return None,
};
Some(ClientConfig {
server_address,
port,
})
}
Err(_) => None,
}
}