From 92b748ab6aacf1381df753e6ed085fc5f60c935a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Wed, 29 May 2024 03:54:54 +0300 Subject: [PATCH] feat: :sparkles: receive calculated block for server feat: :sparkles: somewhat consensus structure --- Cargo.toml | 1 + src/blockchain.rs | 10 ++-- src/client_network.rs | 13 ++--- src/consensus.rs | 57 +++++++++++++++++++++ src/lib.rs | 31 ++++++++++++ src/main.rs | 3 +- src/server_network.rs | 115 +++++++++++++++++++++++++++++++++++++----- 7 files changed, 206 insertions(+), 24 deletions(-) create mode 100644 src/consensus.rs diff --git a/Cargo.toml b/Cargo.toml index bad0ad1..bd00607 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,4 @@ serde_json = "1.0.117" sha3 = "0.10.8" tokio = { version = "1.37.0", features = ["full"] } tokio-tungstenite = "0.21.0" +uuid = { version = "1.8.0", features = ["v4"] } diff --git a/src/blockchain.rs b/src/blockchain.rs index c1d0c66..249e4e1 100644 --- a/src/blockchain.rs +++ b/src/blockchain.rs @@ -51,8 +51,12 @@ impl BlockChain { self.chain.push(new_block); } - pub fn add_block(&mut self, mut block: Block) { - block.mine(self.difficulty); - self.chain.push(block); + pub fn add_block(&mut self, block: Block) -> Option { + if block.hash != String::new() { + self.chain.push(block.clone()); + Some(block) + } else { + None + } } } diff --git a/src/client_network.rs b/src/client_network.rs index 93af3c4..1639da2 100644 --- a/src/client_network.rs +++ b/src/client_network.rs @@ -19,14 +19,15 @@ pub async fn start_network(client_config: ClientConfig) { } async fn sync(ws_stream_receiver: SplitStream>>) { - let (mut ws_stream, mut blockchain) = match receive_blockchain(ws_stream_receiver).await { - Some((ws_stream, blockchain)) => (ws_stream, blockchain), - None => return, - }; + let (mut ws_stream_receiver, mut blockchain) = + match receive_blockchain(ws_stream_receiver).await { + Some((ws_stream_receiver, blockchain)) => (ws_stream_receiver, blockchain), + None => return, + }; loop { let block: Block; - (ws_stream, block) = match receive_block(ws_stream).await { - Some((ws_stream, block)) => (ws_stream, block), + (ws_stream_receiver, block) = match receive_block(ws_stream_receiver).await { + Some((ws_stream_receiver, block)) => (ws_stream_receiver, block), None => return, }; blockchain.add_block(block); diff --git a/src/consensus.rs b/src/consensus.rs new file mode 100644 index 0000000..c6eca84 --- /dev/null +++ b/src/consensus.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::{blockchain::BlockChain, BlockReceiver}; + +pub async fn accept_agreement( + blockchain_thread_safe: Arc>, + consensus_data_channels: Arc>>, +) { + let mut received_blocks = vec![]; + loop { + //notify consensus + for channel in consensus_data_channels.lock().await.iter_mut() { + match channel.block_receiver.try_recv() { + Ok(block) => { + received_blocks.push(block); + } + Err(_) => {} + } + } + + if received_blocks.len() > consensus_data_channels.lock().await.len() / 2 { + let mut block_hashes: Vec<(String, u128)> = vec![]; + for block in received_blocks.iter() { + block_hashes.sort(); + match block_hashes.binary_search_by_key(&block.hash, |(hash, _)| hash.to_string()) { + Ok(index) => block_hashes[index].1 += 1, + Err(_) => block_hashes.push((block.hash.clone(), 1)), + } + } + + let mut max_pair = (String::new(), 0); + for element in block_hashes.iter() { + if element.1 > max_pair.1 { + max_pair.0 = element.0.clone(); + max_pair.1 = element.1; + } + } + + //it's a bit strange, since we add first one that we find. + //first of what ? + //you know we can binary search ? + //03.46 right now. + for block in received_blocks.iter() { + if max_pair.0 == block.hash { + match blockchain_thread_safe.lock().await.add_block(block.clone()) { + Some(_successfully_added_block) => { + todo!("Notify Whole Network, Reward First Founder or Else") + } + None => todo!(), + } + } + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 292c14d..3773e7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,13 @@ use std::net::IpAddr; +use block::Block; +use tokio::sync::broadcast::Receiver; +use uuid::Uuid; + pub mod block; pub mod blockchain; pub mod client_network; +pub mod consensus; pub mod server_network; mod test; pub mod utils; @@ -20,3 +25,29 @@ pub struct ClientConfig { pub server_address: IpAddr, pub port: u16, } + +#[derive(Debug)] +pub struct BlockReceiver { + pub block_receiver: Receiver, + pub uuid: Uuid, +} + +impl Ord for BlockReceiver { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.uuid.cmp(&other.uuid) + } +} + +impl PartialOrd for BlockReceiver { + fn partial_cmp(&self, other: &Self) -> Option { + self.uuid.partial_cmp(&other.uuid) + } +} + +impl PartialEq for BlockReceiver { + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for BlockReceiver {} diff --git a/src/main.rs b/src/main.rs index 9f4eacd..8165ebd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,9 +29,10 @@ async fn server() { }; let blockchain = BlockChain::new(server_config.difficulty.into()); - let block_data_channel_sender = broadcast::channel(1).0; let blockhain_thread_safe = Arc::new(Mutex::new(blockchain)); + let block_data_channel_sender = broadcast::channel(1).0; + server_network::start_network( server_config, blockhain_thread_safe, diff --git a/src/server_network.rs b/src/server_network.rs index e5e2af6..1bac762 100644 --- a/src/server_network.rs +++ b/src/server_network.rs @@ -1,13 +1,20 @@ use std::sync::Arc; -use futures_util::{stream::SplitSink, SinkExt, StreamExt}; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; use tokio::{ - net::TcpListener, - sync::{broadcast::Receiver, Mutex}, + net::{TcpListener, TcpStream}, + sync::{ + broadcast::{self, Receiver, Sender}, + Mutex, + }, }; use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; +use uuid::Uuid; -use crate::{block::Block, blockchain::BlockChain, ServerConfig}; +use crate::{block::Block, blockchain::BlockChain, consensus, BlockReceiver, ServerConfig}; pub async fn start_network( server_config: ServerConfig, @@ -23,24 +30,103 @@ pub async fn start_network( Ok(listener_socket) => listener_socket, Err(_) => return, }; + let consensus_data_channels = Arc::new(Mutex::new(vec![])); + tokio::spawn(consensus::accept_agreement( + blockchain_thread_safe.clone(), + consensus_data_channels.clone(), + )); + //todo!("Consensus should be notified for new block, should forget old blocks"); loop { if let Ok(connection) = listener_socket.accept().await { let ws_stream = match accept_async(connection.0).await { Ok(ws_stream) => ws_stream, Err(_) => return, }; - let (ws_stream_sender, _) = ws_stream.split(); - tokio::spawn(sync( - ws_stream_sender, - blockchain_thread_safe.clone(), - block_data_channel_receiver.resubscribe(), - )); + let (ws_stream_sender, ws_stream_receiver) = ws_stream.split(); + let blockchain_thread_safe = blockchain_thread_safe.clone(); + let block_data_channel_receiver = block_data_channel_receiver.resubscribe(); + let consensus_data_channel_sender = broadcast::channel(1).0; + let block_receiver = BlockReceiver { + block_receiver: consensus_data_channel_sender.subscribe(), + uuid: Uuid::new_v4(), + }; + + let uuid = block_receiver.uuid.clone(); + + consensus_data_channels.lock().await.push(block_receiver); + + let consensus_data_channels = consensus_data_channels.clone(); + tokio::spawn(async move { + tokio::select! { + _ = sync_client( + ws_stream_sender, + blockchain_thread_safe, + block_data_channel_receiver, + ) => { + let mut consensus_data_channels = consensus_data_channels.lock().await; + consensus_data_channels.sort(); + if let Ok(block_receiver_index) = consensus_data_channels.binary_search_by_key(&uuid, |block_receive| block_receive.uuid) { + consensus_data_channels.remove(block_receiver_index); + }; + drop(consensus_data_channels); + return ; + } + + _ = sync_server(ws_stream_receiver, consensus_data_channel_sender) => { + let mut consensus_data_channels = consensus_data_channels.lock().await; + consensus_data_channels.sort(); + if let Ok(block_receiver_index) = consensus_data_channels.binary_search_by_key(&uuid, |block_receive| block_receive.uuid) { + consensus_data_channels.remove(block_receiver_index); + }; + drop(consensus_data_channels); + return ; + } + } + }); } } } -async fn sync( - ws_stream_sender: SplitSink, Message>, +async fn sync_server( + mut ws_stream_receiver: SplitStream>, + consensus_data_channel_sender: Sender, +) { + loop { + let block; + (ws_stream_receiver, block) = match receive_block(ws_stream_receiver).await { + Some((ws_stream_receiver, block)) => (ws_stream_receiver, block), + None => return, + }; + if let Err(_) = consensus_data_channel_sender.send(block) { + return; + } + } +} + +async fn receive_block( + mut ws_stream_receiver: SplitStream>, +) -> Option<(SplitStream>, Block)> { + match ws_stream_receiver.next().await { + Some(message) => match message { + Ok(message) => { + if let tokio_tungstenite::tungstenite::Message::Text(message) = message { + let block: Block = match serde_json::from_str(&message[..]) { + Ok(block) => block, + Err(_) => return None, + }; + Some((ws_stream_receiver, block)) + } else { + return None; + } + } + Err(_) => return None, + }, + None => return None, + } +} + +async fn sync_client( + ws_stream_sender: SplitSink, Message>, blockchain_thread_safe: Arc>, mut block_data_channel_receiver: Receiver, ) { @@ -49,6 +135,7 @@ async fn sync( Some(ws_stream_sender) => ws_stream_sender, None => return, }; + loop { let block = match block_data_channel_receiver.recv().await { Ok(block) => block, @@ -62,7 +149,7 @@ async fn sync( } async fn send_blockchain( - mut ws_stream_sender: SplitSink, Message>, + mut ws_stream_sender: SplitSink, Message>, blockchain_thread_safe: Arc>, ) -> Option, Message>> { let blockchain = blockchain_thread_safe.lock().await; @@ -77,7 +164,7 @@ async fn send_blockchain( } async fn send_block( - mut ws_stream_sender: SplitSink, Message>, + mut ws_stream_sender: SplitSink, Message>, block: Block, ) -> Option, Message>> { let block_data = serde_json::json!(block).to_string();