diff --git a/src/client_network.rs b/src/client_network.rs index fe7e32a..a286b58 100644 --- a/src/client_network.rs +++ b/src/client_network.rs @@ -36,10 +36,14 @@ async fn sync( Some((ws_stream_receiver, block)) => (ws_stream_receiver, block), None => return, }; - let block = blockchain.add_block(block); - ws_stream_sender = match send_block(ws_stream_sender, block).await { - Some(ws_stream_sender) => ws_stream_sender, - None => return, + if block.hash == String::new() { + let block = blockchain.add_block(block); + ws_stream_sender = match send_block(ws_stream_sender, block).await { + Some(ws_stream_sender) => ws_stream_sender, + None => return, + } + } else { + blockchain.push_block(block); } } } diff --git a/src/consensus.rs b/src/consensus.rs index 5e3c14d..c5033c0 100644 --- a/src/consensus.rs +++ b/src/consensus.rs @@ -1,20 +1,23 @@ use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{broadcast::Sender, Mutex}; -use crate::{blockchain::BlockChain, BlockReceiver}; +use crate::{block::Block, blockchain::BlockChain, BlockReceiver}; pub async fn accept_agreement( blockchain_thread_safe: Arc>, consensus_data_channels: Arc>>, + block_data_channel_sender: Sender, + limbo_block: Arc>, ) { let mut received_blocks = vec![]; loop { - //notify consensus for channel in consensus_data_channels.lock().await.iter_mut() { match channel.block_receiver.try_recv() { Ok(block) => { - received_blocks.push(block); + if block.previous_hash == limbo_block.lock().await.previous_hash { + received_blocks.push(block); + } } Err(_) => {} } @@ -23,25 +26,16 @@ pub async fn accept_agreement( if received_blocks.len() > consensus_data_channels.lock().await.len() / 2 { let mut block_hashes: Vec<(String, u128)> = vec![]; for block in received_blocks.iter() { - block_hashes.sort(); + block_hashes.sort_by_key(|(hash, _counter)| hash.to_string()); match block_hashes.binary_search_by_key(&block.hash, |(hash, _)| hash.to_string()) { Ok(index) => block_hashes[index].1 += 1, Err(_) => block_hashes.push((block.hash.clone(), 1)), } } - let mut max_pair = (String::new(), 0); - for element in block_hashes.iter() { - if element.1 > max_pair.1 { - max_pair.0 = element.0.clone(); - max_pair.1 = element.1; - } - } + block_hashes.sort_by_key(|(_hash, counter)| *counter); + let max_pair = block_hashes[0].clone(); - //it's a bit strange, since we add first one that we find. - //first of what ? - //you know we can binary search ? - //03.46 right now. for block in received_blocks.iter() { if max_pair.0 == block.hash { match blockchain_thread_safe @@ -50,8 +44,13 @@ pub async fn accept_agreement( .push_block(block.clone()) { Some(successfully_added_block) => { - println!("{:#?}", successfully_added_block); - todo!("Notify Whole Network, Reward First Founder or Else") + match block_data_channel_sender.send(successfully_added_block) { + Ok(_) => { + *limbo_block.lock().await = + blockchain_thread_safe.lock().await.genesis_block.clone() + } + Err(_) => {} + } } None => todo!(), } diff --git a/src/main.rs b/src/main.rs index 1ddc91e..5a027ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,8 +19,6 @@ async fn main() { }, None => return, }; - //todo!("Limbo Block: Not in chain, but processing by others or none. Sync it also"); - //todo!("Consensus should be notified for new block, should forget old blocks"); } async fn server() { @@ -30,6 +28,7 @@ async fn server() { }; let blockchain = BlockChain::new(server_config.difficulty.into()); + let limbo_block = Arc::new(Mutex::new(blockchain.genesis_block.clone())); let blockchain_thread_safe = Arc::new(Mutex::new(blockchain)); let block_data_channel_sender = broadcast::channel(1).0; @@ -37,7 +36,8 @@ async fn server() { server_network::start_network( server_config, blockchain_thread_safe, - block_data_channel_sender.subscribe(), + block_data_channel_sender, + limbo_block, ) .await; } diff --git a/src/server_network.rs b/src/server_network.rs index 78e8ca4..0920f5c 100644 --- a/src/server_network.rs +++ b/src/server_network.rs @@ -19,8 +19,10 @@ use crate::{block::Block, blockchain::BlockChain, consensus, BlockReceiver, Serv pub async fn start_network( server_config: ServerConfig, blockchain_thread_safe: Arc>, - block_data_channel_receiver: Receiver, + block_data_channel_sender: Sender, + limbo_block: Arc>, ) { + let block_data_channel_receiver = block_data_channel_sender.subscribe(); let listener_socket = match TcpListener::bind(format!( "{}:{}", server_config.server_address, server_config.port @@ -34,6 +36,8 @@ pub async fn start_network( tokio::spawn(consensus::accept_agreement( blockchain_thread_safe.clone(), consensus_data_channels.clone(), + block_data_channel_sender, + limbo_block.clone(), )); loop { if let Ok(connection) = listener_socket.accept().await { @@ -55,12 +59,14 @@ pub async fn start_network( consensus_data_channels.lock().await.push(block_receiver); let consensus_data_channels = consensus_data_channels.clone(); + let limbo_block = limbo_block.clone(); tokio::spawn(async move { tokio::select! { _ = sync_client( ws_stream_sender, blockchain_thread_safe, block_data_channel_receiver, + limbo_block, ) => { let mut consensus_data_channels = consensus_data_channels.lock().await; consensus_data_channels.sort(); @@ -128,13 +134,20 @@ async fn sync_client( ws_stream_sender: SplitSink, Message>, blockchain_thread_safe: Arc>, mut block_data_channel_receiver: Receiver, + limbo_block: Arc>, ) { - let mut ws_stream_sender = match send_blockchain(ws_stream_sender, blockchain_thread_safe).await - { - Some(ws_stream_sender) => ws_stream_sender, - None => return, - }; - + let mut ws_stream_sender = + match send_blockchain(ws_stream_sender, blockchain_thread_safe.clone()).await { + Some(ws_stream_sender) => ws_stream_sender, + None => return, + }; + let limbo_block = limbo_block.lock().await; + if limbo_block.timestamp != blockchain_thread_safe.lock().await.genesis_block.timestamp { + ws_stream_sender = match send_block(ws_stream_sender, limbo_block.clone()).await { + Some(ws_stream_sender) => ws_stream_sender, + None => return, + } + } loop { let block = match block_data_channel_receiver.recv().await { Ok(block) => block,