From 35c1207cff86644f606f8375ae65cf109471f918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Fri, 24 May 2024 03:19:35 +0300 Subject: [PATCH] feat: :sparkles: client side basics feat: :sparkles: websocket --- Cargo.toml | 2 + configs/client_config.txt | 2 + src/client_network.rs | 58 ++++++++++++++++++++++++++ src/lib.rs | 7 +++- src/main.rs | 17 ++++++-- src/network.rs | 81 ----------------------------------- src/server_network.rs | 88 +++++++++++++++++++++++++++++++++++++++ src/utils.rs | 43 ++++++++++++++++--- 8 files changed, 207 insertions(+), 91 deletions(-) create mode 100644 configs/client_config.txt create mode 100644 src/client_network.rs delete mode 100644 src/network.rs create mode 100644 src/server_network.rs diff --git a/Cargo.toml b/Cargo.toml index 941b3eb..bad0ad1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,9 @@ edition = "2021" [dependencies] chrono = "0.4.38" +futures-util = "0.3.30" serde = { version = "1.0.202", features = ["derive"] } serde_json = "1.0.117" sha3 = "0.10.8" tokio = { version = "1.37.0", features = ["full"] } +tokio-tungstenite = "0.21.0" diff --git a/configs/client_config.txt b/configs/client_config.txt new file mode 100644 index 0000000..40add53 --- /dev/null +++ b/configs/client_config.txt @@ -0,0 +1,2 @@ +server_address:127.0.0.1 +port:2434 \ No newline at end of file diff --git a/src/client_network.rs b/src/client_network.rs new file mode 100644 index 0000000..c8d9c2c --- /dev/null +++ b/src/client_network.rs @@ -0,0 +1,58 @@ +use futures_util::{stream::SplitStream, StreamExt}; +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; + +use crate::ClientConfig; + +pub async fn start_network(client_config: ClientConfig) { + let ws_stream = match connect_async(format!( + "ws://{}:{}", + client_config.server_address, client_config.port + )) + .await + { + Ok(ws_stream) => ws_stream, + Err(_) => return, + }; + let (_, ws_receiver) = ws_stream.0.split(); + sync(ws_receiver).await; +} + +async fn sync(ws_stream_receiver: SplitStream>>) { + let mut ws_stream = match receive_blockchain(ws_stream_receiver).await { + Some(ws_stream) => ws_stream, + None => return, + }; + loop { + ws_stream = match receive_block(ws_stream).await { + Some(ws_stream) => ws_stream, + None => return, + } + } +} + +async fn receive_blockchain(mut ws_stream_receiver: SplitStream>>) -> Option>>> { + match ws_stream_receiver.next().await { + Some(message) => match message { + Ok(message) => { + println!("{}", message); + Some(ws_stream_receiver) + }, + Err(_) => return None, + }, + None => return None, + } +} + +async fn receive_block(mut ws_stream_receiver: SplitStream>>) -> Option>>> { + match ws_stream_receiver.next().await { + Some(message) => match message { + Ok(message) => { + println!("{}", message); + Some(ws_stream_receiver) + }, + Err(_) => return None, + }, + None => return None, + } +} diff --git a/src/lib.rs b/src/lib.rs index 89c2c4d..292c14d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,8 @@ use std::net::IpAddr; pub mod block; pub mod blockchain; -pub mod network; +pub mod client_network; +pub mod server_network; mod test; pub mod utils; @@ -15,3 +16,7 @@ pub struct ServerConfig { pub port: u16, pub difficulty: u8, } +pub struct ClientConfig { + pub server_address: IpAddr, + pub port: u16, +} diff --git a/src/main.rs b/src/main.rs index 9cbe3c0..84dfa7c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use rust_blockchain::{ blockchain::BlockChain, - network::start_network, - utils::{read_server_config, take_args}, + client_network, server_network, + utils::{read_client_config, read_server_config, take_args}, Runner, }; use tokio::sync::broadcast; @@ -13,10 +13,11 @@ async fn main() { match take_args() { Some(runner) => match runner { Runner::Server => server().await, - Runner::Client => todo!(), + Runner::Client => client().await, }, None => return, }; + todo!("Limbo Block: Not in chain, but processing by others or none. Sync it also") } async fn server() { @@ -27,10 +28,18 @@ async fn server() { let blockchain = BlockChain::new(server_config.difficulty.into()); let block_data_channel_sender = broadcast::channel(1).0; - start_network( + server_network::start_network( server_config, &blockchain, block_data_channel_sender.subscribe(), ) .await; } + +async fn client() { + let client_config = match read_client_config() { + Some(client_config) => client_config, + None => return, + }; + client_network::start_network(client_config).await; +} diff --git a/src/network.rs b/src/network.rs deleted file mode 100644 index c375599..0000000 --- a/src/network.rs +++ /dev/null @@ -1,81 +0,0 @@ -use tokio::{ - io::AsyncWriteExt, - net::{TcpListener, TcpStream}, - sync::broadcast::Receiver, -}; - -use crate::{block::Block, blockchain::BlockChain, ServerConfig}; - -pub async fn start_network( - server_config: ServerConfig, - blockchain: &BlockChain, - block_data_channel_receiver: Receiver, -) { - let listener_socket = match TcpListener::bind(format!( - "{}:{}", - server_config.server_address, server_config.port - )) - .await - { - Ok(listener_socket) => listener_socket, - Err(_) => return, - }; - - loop { - match listener_socket.accept().await { - Ok(connection) => { - tokio::spawn(sync( - connection.0, - blockchain.clone(), - block_data_channel_receiver.resubscribe(), - )); - } - Err(_) => {} - } - } -} - -async fn sync( - tcp_stream: TcpStream, - blockchain: BlockChain, - block_data_channel_receiver: Receiver, -) { - let tcp_stream = send_blockchain(tcp_stream, blockchain).await; - send_block(tcp_stream, block_data_channel_receiver).await; -} - -async fn send_blockchain(mut tcp_stream: TcpStream, blockchain: BlockChain) -> TcpStream { - let blockchain_data = serde_json::json!({ - "blockchain": blockchain - }) - .to_string(); - match tcp_stream.write_all(&blockchain_data.as_bytes()).await { - Ok(_) => match tcp_stream.flush().await { - Ok(_) => {} - Err(_) => {} - }, - Err(_) => {} - } - tcp_stream -} - -async fn send_block(mut tcp_stream: TcpStream, mut block_data_channel_receiver: Receiver) { - loop { - match block_data_channel_receiver.recv().await { - Ok(block) => { - let block_data = serde_json::json!({ - "block": block - }) - .to_string(); - match tcp_stream.write_all(&block_data.as_bytes()).await { - Ok(_) => match tcp_stream.flush().await { - Ok(_) => {} - Err(_) => {} - }, - Err(_) => {} - } - } - Err(_) => {} - } - } -} diff --git a/src/server_network.rs b/src/server_network.rs new file mode 100644 index 0000000..1522e72 --- /dev/null +++ b/src/server_network.rs @@ -0,0 +1,88 @@ +use futures_util::{stream::SplitSink, SinkExt, StreamExt}; +use tokio::{ + net::TcpListener, + sync::broadcast::Receiver, +}; +use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; + +use crate::{block::Block, blockchain::BlockChain, ServerConfig}; + +pub async fn start_network( + server_config: ServerConfig, + blockchain: &BlockChain, + block_data_channel_receiver: Receiver, +) { + let listener_socket = match TcpListener::bind(format!( + "{}:{}", + server_config.server_address, server_config.port + )) + .await + { + Ok(listener_socket) => listener_socket, + Err(_) => return, + }; + loop { + if let Ok(connection) = listener_socket.accept().await { + let ws_stream = match accept_async(connection.0).await { + Ok(ws_stream) => ws_stream, + Err(_) => return , + }; + let (ws_stream_sender, _) = ws_stream.split(); + tokio::spawn(sync( + ws_stream_sender, + blockchain.clone(), + block_data_channel_receiver.resubscribe(), + )); + } + } +} + +async fn sync( + ws_stream_sender: SplitSink, Message>, + blockchain: BlockChain, + block_data_channel_receiver: Receiver, +) { + let ws_stream_sender = match send_blockchain(ws_stream_sender, blockchain).await { + Some(ws_stream_sender) => ws_stream_sender, + None => return, + }; + send_blocks(ws_stream_sender, block_data_channel_receiver).await; +} + +async fn send_blockchain(mut ws_stream_sender: SplitSink, Message>, blockchain: BlockChain) -> Option, Message>> { + let blockchain_data = serde_json::json!({ + "blockchain": blockchain + }) + .to_string(); + match ws_stream_sender.send(blockchain_data.into()).await { + Ok(_) => { + match ws_stream_sender.flush().await { + Ok(_) => Some(ws_stream_sender), + Err(_) => None, + } + }, + Err(_) => None, + } +} + +async fn send_blocks(mut ws_stream_sender: SplitSink, Message>, mut block_data_channel_receiver: Receiver) { + loop { + match block_data_channel_receiver.recv().await { + Ok(block) => { + let block_data = serde_json::json!({ + "block": block + }) + .to_string(); + match ws_stream_sender.send(block_data.into()).await { + Ok(_) => { + if ws_stream_sender.flush().await.is_err() { + return; + } + } + Err(_) => return, + } + } + Err(_) => return, + } + } +} diff --git a/src/utils.rs b/src/utils.rs index ee0b880..2d57b78 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,6 @@ use std::{env, fs::File, io::Read}; -use crate::{Runner, ServerConfig}; +use crate::{ClientConfig, Runner, ServerConfig}; pub fn take_args() -> Option { let args: Vec = env::args().collect(); @@ -24,22 +24,22 @@ pub fn read_server_config() -> Option { match server_config_file.read_to_string(&mut server_configs) { Ok(_) => { let server_configs: Vec = - server_configs.split("\n").map(|x| x.to_string()).collect(); - let server_address = match server_configs[0].split(":").last() { + server_configs.split('\n').map(|x| x.to_string()).collect(); + let server_address = match server_configs[0].split(':').last() { Some(server_address_unchecked) => match server_address_unchecked.parse() { Ok(server_address) => server_address, Err(_) => return None, }, None => return None, }; - let port = match server_configs[1].split(":").last() { + let port = match server_configs[1].split(':').last() { Some(port_unchecked) => match port_unchecked.parse() { Ok(port) => port, Err(_) => return None, }, None => return None, }; - let difficulty = match server_configs[2].split(":").last() { + let difficulty = match server_configs[2].split(':').last() { Some(difficulty_unchecked) => match difficulty_unchecked.parse() { Ok(difficulty) => difficulty, Err(_) => return None, @@ -55,3 +55,36 @@ pub fn read_server_config() -> Option { Err(_) => None, } } + +pub fn read_client_config() -> Option { + let mut client_config_file = match File::open("configs/server_config.txt") { + Ok(client_config_file) => client_config_file, + Err(_) => return None, + }; + let mut client_configs = String::new(); + match client_config_file.read_to_string(&mut client_configs) { + Ok(_) => { + let client_configs: Vec = + client_configs.split('\n').map(|x| x.to_string()).collect(); + let server_address = match client_configs[0].split(':').last() { + Some(server_address_unchecked) => match server_address_unchecked.parse() { + Ok(server_address) => server_address, + Err(_) => return None, + }, + None => return None, + }; + let port = match client_configs[1].split(':').last() { + Some(port_unchecked) => match port_unchecked.parse() { + Ok(port) => port, + Err(_) => return None, + }, + None => return None, + }; + Some(ClientConfig { + server_address, + port, + }) + } + Err(_) => None, + } +}