From e7d10c46f89d1f41d7f25cbc7d24d074f8609cd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Sat, 25 May 2024 20:07:22 +0300 Subject: [PATCH] feat: :sparkles: blockchain and block transfer --- src/block.rs | 6 ++-- src/blockchain.rs | 13 +++++--- src/client_network.rs | 54 ++++++++++++++++++++++-------- src/main.rs | 8 +++-- src/server_network.rs | 77 ++++++++++++++++++++++--------------------- src/test.rs | 2 +- 6 files changed, 98 insertions(+), 62 deletions(-) diff --git a/src/block.rs b/src/block.rs index 9e77648..6284806 100644 --- a/src/block.rs +++ b/src/block.rs @@ -5,8 +5,6 @@ use serde::{Deserialize, Serialize}; use sha3::{Digest, Sha3_512}; use tokio::sync::broadcast::Sender; -use crate::blockchain::BlockChain; - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Block { pub index: u64, @@ -49,10 +47,10 @@ impl Block { block } - pub fn mine(&mut self, blockhain: BlockChain) -> Self { + pub fn mine(&mut self, difficulty: usize) -> Self { let mut hash = self.calculate_hash(); loop { - if !hash.starts_with(&"0".repeat(blockhain.difficulty)) { + if !hash.starts_with(&"0".repeat(difficulty)) { self.proof_of_work += 1; hash = self.calculate_hash(); } else { diff --git a/src/blockchain.rs b/src/blockchain.rs index d04e60e..c1d0c66 100644 --- a/src/blockchain.rs +++ b/src/blockchain.rs @@ -34,20 +34,25 @@ impl BlockChain { } } - pub fn add_block( + pub fn create_block( &mut self, - data: String, + data: impl ToString, instant: Instant, block_data_channel_sender: Sender, ) { let new_block = Block::new( self.chain.len() as u64, - data, + data.to_string(), self.chain[&self.chain.len() - 1].hash.clone(), instant, block_data_channel_sender, ) - .mine(self.clone()); + .mine(self.difficulty); self.chain.push(new_block); } + + pub fn add_block(&mut self, mut block: Block) { + block.mine(self.difficulty); + self.chain.push(block); + } } diff --git a/src/client_network.rs b/src/client_network.rs index c8d9c2c..93af3c4 100644 --- a/src/client_network.rs +++ b/src/client_network.rs @@ -2,7 +2,7 @@ use futures_util::{stream::SplitStream, StreamExt}; use tokio::net::TcpStream; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; -use crate::ClientConfig; +use crate::{block::Block, blockchain::BlockChain, ClientConfig}; pub async fn start_network(client_config: ClientConfig) { let ws_stream = match connect_async(format!( @@ -19,38 +19,64 @@ pub async fn start_network(client_config: ClientConfig) { } async fn sync(ws_stream_receiver: SplitStream>>) { - let mut ws_stream = match receive_blockchain(ws_stream_receiver).await { - Some(ws_stream) => ws_stream, + let (mut ws_stream, mut blockchain) = match receive_blockchain(ws_stream_receiver).await { + Some((ws_stream, blockchain)) => (ws_stream, blockchain), None => return, }; loop { - ws_stream = match receive_block(ws_stream).await { - Some(ws_stream) => ws_stream, + let block: Block; + (ws_stream, block) = match receive_block(ws_stream).await { + Some((ws_stream, block)) => (ws_stream, block), None => return, - } + }; + blockchain.add_block(block); } } -async fn receive_blockchain(mut ws_stream_receiver: SplitStream>>) -> Option>>> { +async fn receive_blockchain( + mut ws_stream_receiver: SplitStream>>, +) -> Option<( + SplitStream>>, + BlockChain, +)> { match ws_stream_receiver.next().await { Some(message) => match message { Ok(message) => { - println!("{}", message); - Some(ws_stream_receiver) - }, + if let tokio_tungstenite::tungstenite::Message::Text(message) = message { + let blockchain: BlockChain = match serde_json::from_str(&message[..]) { + Ok(blockchain) => blockchain, + Err(_) => return None, + }; + Some((ws_stream_receiver, blockchain)) + } else { + return None; + } + } Err(_) => return None, }, None => return None, } } -async fn receive_block(mut ws_stream_receiver: SplitStream>>) -> Option>>> { +async fn receive_block( + mut ws_stream_receiver: SplitStream>>, +) -> Option<( + SplitStream>>, + Block, +)> { match ws_stream_receiver.next().await { Some(message) => match message { Ok(message) => { - println!("{}", message); - Some(ws_stream_receiver) - }, + if let tokio_tungstenite::tungstenite::Message::Text(message) = message { + let block: Block = match serde_json::from_str(&message[..]) { + Ok(block) => block, + Err(_) => return None, + }; + Some((ws_stream_receiver, block)) + } else { + return None; + } + } Err(_) => return None, }, None => return None, diff --git a/src/main.rs b/src/main.rs index 84dfa7c..9f4eacd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,12 @@ +use std::sync::Arc; + use rust_blockchain::{ blockchain::BlockChain, client_network, server_network, utils::{read_client_config, read_server_config, take_args}, Runner, }; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, Mutex}; #[tokio::main] async fn main() { @@ -28,9 +30,11 @@ async fn server() { let blockchain = BlockChain::new(server_config.difficulty.into()); let block_data_channel_sender = broadcast::channel(1).0; + let blockhain_thread_safe = Arc::new(Mutex::new(blockchain)); + server_network::start_network( server_config, - &blockchain, + blockhain_thread_safe, block_data_channel_sender.subscribe(), ) .await; diff --git a/src/server_network.rs b/src/server_network.rs index 1522e72..e5e2af6 100644 --- a/src/server_network.rs +++ b/src/server_network.rs @@ -1,7 +1,9 @@ +use std::sync::Arc; + use futures_util::{stream::SplitSink, SinkExt, StreamExt}; use tokio::{ net::TcpListener, - sync::broadcast::Receiver, + sync::{broadcast::Receiver, Mutex}, }; use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; @@ -9,7 +11,7 @@ use crate::{block::Block, blockchain::BlockChain, ServerConfig}; pub async fn start_network( server_config: ServerConfig, - blockchain: &BlockChain, + blockchain_thread_safe: Arc>, block_data_channel_receiver: Receiver, ) { let listener_socket = match TcpListener::bind(format!( @@ -25,12 +27,12 @@ pub async fn start_network( if let Ok(connection) = listener_socket.accept().await { let ws_stream = match accept_async(connection.0).await { Ok(ws_stream) => ws_stream, - Err(_) => return , + Err(_) => return, }; let (ws_stream_sender, _) = ws_stream.split(); tokio::spawn(sync( ws_stream_sender, - blockchain.clone(), + blockchain_thread_safe.clone(), block_data_channel_receiver.resubscribe(), )); } @@ -39,50 +41,51 @@ pub async fn start_network( async fn sync( ws_stream_sender: SplitSink, Message>, - blockchain: BlockChain, - block_data_channel_receiver: Receiver, + blockchain_thread_safe: Arc>, + mut block_data_channel_receiver: Receiver, ) { - let ws_stream_sender = match send_blockchain(ws_stream_sender, blockchain).await { + let mut ws_stream_sender = match send_blockchain(ws_stream_sender, blockchain_thread_safe).await + { Some(ws_stream_sender) => ws_stream_sender, None => return, }; - send_blocks(ws_stream_sender, block_data_channel_receiver).await; + loop { + let block = match block_data_channel_receiver.recv().await { + Ok(block) => block, + Err(_) => return, + }; + ws_stream_sender = match send_block(ws_stream_sender, block).await { + Some(ws_stream_sender) => ws_stream_sender, + None => return, + } + } } -async fn send_blockchain(mut ws_stream_sender: SplitSink, Message>, blockchain: BlockChain) -> Option, Message>> { - let blockchain_data = serde_json::json!({ - "blockchain": blockchain - }) - .to_string(); +async fn send_blockchain( + mut ws_stream_sender: SplitSink, Message>, + blockchain_thread_safe: Arc>, +) -> Option, Message>> { + let blockchain = blockchain_thread_safe.lock().await; + let blockchain_data = serde_json::json!(*blockchain).to_string(); match ws_stream_sender.send(blockchain_data.into()).await { - Ok(_) => { - match ws_stream_sender.flush().await { - Ok(_) => Some(ws_stream_sender), - Err(_) => None, - } + Ok(_) => match ws_stream_sender.flush().await { + Ok(_) => Some(ws_stream_sender), + Err(_) => None, }, Err(_) => None, } } -async fn send_blocks(mut ws_stream_sender: SplitSink, Message>, mut block_data_channel_receiver: Receiver) { - loop { - match block_data_channel_receiver.recv().await { - Ok(block) => { - let block_data = serde_json::json!({ - "block": block - }) - .to_string(); - match ws_stream_sender.send(block_data.into()).await { - Ok(_) => { - if ws_stream_sender.flush().await.is_err() { - return; - } - } - Err(_) => return, - } - } - Err(_) => return, - } +async fn send_block( + mut ws_stream_sender: SplitSink, Message>, + block: Block, +) -> Option, Message>> { + let block_data = serde_json::json!(block).to_string(); + match ws_stream_sender.send(block_data.into()).await { + Ok(_) => match ws_stream_sender.flush().await { + Ok(_) => Some(ws_stream_sender), + Err(_) => None, + }, + Err(_) => None, } } diff --git a/src/test.rs b/src/test.rs index 3c8f4a9..47d089c 100644 --- a/src/test.rs +++ b/src/test.rs @@ -33,7 +33,7 @@ async fn create_block() { let instant = Instant::now(); let mut blockchain = BlockChain::new(1); let block_data_channel_sender = channel(1).0; - BlockChain::add_block( + BlockChain::create_block( &mut blockchain, "Ahmet Kaan Gümüş".to_string(), instant,