feat: ✨ receive calculated block for server
feat: ✨ somewhat consensus structure
This commit is contained in:
parent
e7d10c46f8
commit
92b748ab6a
7 changed files with 206 additions and 24 deletions
|
@ -13,3 +13,4 @@ serde_json = "1.0.117"
|
||||||
sha3 = "0.10.8"
|
sha3 = "0.10.8"
|
||||||
tokio = { version = "1.37.0", features = ["full"] }
|
tokio = { version = "1.37.0", features = ["full"] }
|
||||||
tokio-tungstenite = "0.21.0"
|
tokio-tungstenite = "0.21.0"
|
||||||
|
uuid = { version = "1.8.0", features = ["v4"] }
|
||||||
|
|
|
@ -51,8 +51,12 @@ impl BlockChain {
|
||||||
self.chain.push(new_block);
|
self.chain.push(new_block);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_block(&mut self, mut block: Block) {
|
pub fn add_block(&mut self, block: Block) -> Option<Block> {
|
||||||
block.mine(self.difficulty);
|
if block.hash != String::new() {
|
||||||
self.chain.push(block);
|
self.chain.push(block.clone());
|
||||||
|
Some(block)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,14 +19,15 @@ pub async fn start_network(client_config: ClientConfig) {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sync(ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
|
async fn sync(ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
|
||||||
let (mut ws_stream, mut blockchain) = match receive_blockchain(ws_stream_receiver).await {
|
let (mut ws_stream_receiver, mut blockchain) =
|
||||||
Some((ws_stream, blockchain)) => (ws_stream, blockchain),
|
match receive_blockchain(ws_stream_receiver).await {
|
||||||
None => return,
|
Some((ws_stream_receiver, blockchain)) => (ws_stream_receiver, blockchain),
|
||||||
};
|
None => return,
|
||||||
|
};
|
||||||
loop {
|
loop {
|
||||||
let block: Block;
|
let block: Block;
|
||||||
(ws_stream, block) = match receive_block(ws_stream).await {
|
(ws_stream_receiver, block) = match receive_block(ws_stream_receiver).await {
|
||||||
Some((ws_stream, block)) => (ws_stream, block),
|
Some((ws_stream_receiver, block)) => (ws_stream_receiver, block),
|
||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
blockchain.add_block(block);
|
blockchain.add_block(block);
|
||||||
|
|
57
src/consensus.rs
Normal file
57
src/consensus.rs
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
use crate::{blockchain::BlockChain, BlockReceiver};
|
||||||
|
|
||||||
|
pub async fn accept_agreement(
|
||||||
|
blockchain_thread_safe: Arc<Mutex<BlockChain>>,
|
||||||
|
consensus_data_channels: Arc<Mutex<Vec<BlockReceiver>>>,
|
||||||
|
) {
|
||||||
|
let mut received_blocks = vec![];
|
||||||
|
loop {
|
||||||
|
//notify consensus
|
||||||
|
for channel in consensus_data_channels.lock().await.iter_mut() {
|
||||||
|
match channel.block_receiver.try_recv() {
|
||||||
|
Ok(block) => {
|
||||||
|
received_blocks.push(block);
|
||||||
|
}
|
||||||
|
Err(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if received_blocks.len() > consensus_data_channels.lock().await.len() / 2 {
|
||||||
|
let mut block_hashes: Vec<(String, u128)> = vec![];
|
||||||
|
for block in received_blocks.iter() {
|
||||||
|
block_hashes.sort();
|
||||||
|
match block_hashes.binary_search_by_key(&block.hash, |(hash, _)| hash.to_string()) {
|
||||||
|
Ok(index) => block_hashes[index].1 += 1,
|
||||||
|
Err(_) => block_hashes.push((block.hash.clone(), 1)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut max_pair = (String::new(), 0);
|
||||||
|
for element in block_hashes.iter() {
|
||||||
|
if element.1 > max_pair.1 {
|
||||||
|
max_pair.0 = element.0.clone();
|
||||||
|
max_pair.1 = element.1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//it's a bit strange, since we add first one that we find.
|
||||||
|
//first of what ?
|
||||||
|
//you know we can binary search ?
|
||||||
|
//03.46 right now.
|
||||||
|
for block in received_blocks.iter() {
|
||||||
|
if max_pair.0 == block.hash {
|
||||||
|
match blockchain_thread_safe.lock().await.add_block(block.clone()) {
|
||||||
|
Some(_successfully_added_block) => {
|
||||||
|
todo!("Notify Whole Network, Reward First Founder or Else")
|
||||||
|
}
|
||||||
|
None => todo!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
31
src/lib.rs
31
src/lib.rs
|
@ -1,8 +1,13 @@
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
|
||||||
|
use block::Block;
|
||||||
|
use tokio::sync::broadcast::Receiver;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub mod block;
|
pub mod block;
|
||||||
pub mod blockchain;
|
pub mod blockchain;
|
||||||
pub mod client_network;
|
pub mod client_network;
|
||||||
|
pub mod consensus;
|
||||||
pub mod server_network;
|
pub mod server_network;
|
||||||
mod test;
|
mod test;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
|
@ -20,3 +25,29 @@ pub struct ClientConfig {
|
||||||
pub server_address: IpAddr,
|
pub server_address: IpAddr,
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct BlockReceiver {
|
||||||
|
pub block_receiver: Receiver<Block>,
|
||||||
|
pub uuid: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ord for BlockReceiver {
|
||||||
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||||
|
self.uuid.cmp(&other.uuid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialOrd for BlockReceiver {
|
||||||
|
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||||
|
self.uuid.partial_cmp(&other.uuid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for BlockReceiver {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.uuid == other.uuid
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Eq for BlockReceiver {}
|
||||||
|
|
|
@ -29,9 +29,10 @@ async fn server() {
|
||||||
};
|
};
|
||||||
|
|
||||||
let blockchain = BlockChain::new(server_config.difficulty.into());
|
let blockchain = BlockChain::new(server_config.difficulty.into());
|
||||||
let block_data_channel_sender = broadcast::channel(1).0;
|
|
||||||
let blockhain_thread_safe = Arc::new(Mutex::new(blockchain));
|
let blockhain_thread_safe = Arc::new(Mutex::new(blockchain));
|
||||||
|
|
||||||
|
let block_data_channel_sender = broadcast::channel(1).0;
|
||||||
|
|
||||||
server_network::start_network(
|
server_network::start_network(
|
||||||
server_config,
|
server_config,
|
||||||
blockhain_thread_safe,
|
blockhain_thread_safe,
|
||||||
|
|
|
@ -1,13 +1,20 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
|
use futures_util::{
|
||||||
|
stream::{SplitSink, SplitStream},
|
||||||
|
SinkExt, StreamExt,
|
||||||
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::TcpListener,
|
net::{TcpListener, TcpStream},
|
||||||
sync::{broadcast::Receiver, Mutex},
|
sync::{
|
||||||
|
broadcast::{self, Receiver, Sender},
|
||||||
|
Mutex,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
|
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{block::Block, blockchain::BlockChain, ServerConfig};
|
use crate::{block::Block, blockchain::BlockChain, consensus, BlockReceiver, ServerConfig};
|
||||||
|
|
||||||
pub async fn start_network(
|
pub async fn start_network(
|
||||||
server_config: ServerConfig,
|
server_config: ServerConfig,
|
||||||
|
@ -23,24 +30,103 @@ pub async fn start_network(
|
||||||
Ok(listener_socket) => listener_socket,
|
Ok(listener_socket) => listener_socket,
|
||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
};
|
};
|
||||||
|
let consensus_data_channels = Arc::new(Mutex::new(vec![]));
|
||||||
|
tokio::spawn(consensus::accept_agreement(
|
||||||
|
blockchain_thread_safe.clone(),
|
||||||
|
consensus_data_channels.clone(),
|
||||||
|
));
|
||||||
|
//todo!("Consensus should be notified for new block, should forget old blocks");
|
||||||
loop {
|
loop {
|
||||||
if let Ok(connection) = listener_socket.accept().await {
|
if let Ok(connection) = listener_socket.accept().await {
|
||||||
let ws_stream = match accept_async(connection.0).await {
|
let ws_stream = match accept_async(connection.0).await {
|
||||||
Ok(ws_stream) => ws_stream,
|
Ok(ws_stream) => ws_stream,
|
||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
};
|
};
|
||||||
let (ws_stream_sender, _) = ws_stream.split();
|
let (ws_stream_sender, ws_stream_receiver) = ws_stream.split();
|
||||||
tokio::spawn(sync(
|
let blockchain_thread_safe = blockchain_thread_safe.clone();
|
||||||
ws_stream_sender,
|
let block_data_channel_receiver = block_data_channel_receiver.resubscribe();
|
||||||
blockchain_thread_safe.clone(),
|
let consensus_data_channel_sender = broadcast::channel(1).0;
|
||||||
block_data_channel_receiver.resubscribe(),
|
let block_receiver = BlockReceiver {
|
||||||
));
|
block_receiver: consensus_data_channel_sender.subscribe(),
|
||||||
|
uuid: Uuid::new_v4(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let uuid = block_receiver.uuid.clone();
|
||||||
|
|
||||||
|
consensus_data_channels.lock().await.push(block_receiver);
|
||||||
|
|
||||||
|
let consensus_data_channels = consensus_data_channels.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::select! {
|
||||||
|
_ = sync_client(
|
||||||
|
ws_stream_sender,
|
||||||
|
blockchain_thread_safe,
|
||||||
|
block_data_channel_receiver,
|
||||||
|
) => {
|
||||||
|
let mut consensus_data_channels = consensus_data_channels.lock().await;
|
||||||
|
consensus_data_channels.sort();
|
||||||
|
if let Ok(block_receiver_index) = consensus_data_channels.binary_search_by_key(&uuid, |block_receive| block_receive.uuid) {
|
||||||
|
consensus_data_channels.remove(block_receiver_index);
|
||||||
|
};
|
||||||
|
drop(consensus_data_channels);
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = sync_server(ws_stream_receiver, consensus_data_channel_sender) => {
|
||||||
|
let mut consensus_data_channels = consensus_data_channels.lock().await;
|
||||||
|
consensus_data_channels.sort();
|
||||||
|
if let Ok(block_receiver_index) = consensus_data_channels.binary_search_by_key(&uuid, |block_receive| block_receive.uuid) {
|
||||||
|
consensus_data_channels.remove(block_receiver_index);
|
||||||
|
};
|
||||||
|
drop(consensus_data_channels);
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sync(
|
async fn sync_server(
|
||||||
ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
|
mut ws_stream_receiver: SplitStream<WebSocketStream<TcpStream>>,
|
||||||
|
consensus_data_channel_sender: Sender<Block>,
|
||||||
|
) {
|
||||||
|
loop {
|
||||||
|
let block;
|
||||||
|
(ws_stream_receiver, block) = match receive_block(ws_stream_receiver).await {
|
||||||
|
Some((ws_stream_receiver, block)) => (ws_stream_receiver, block),
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
if let Err(_) = consensus_data_channel_sender.send(block) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn receive_block(
|
||||||
|
mut ws_stream_receiver: SplitStream<WebSocketStream<TcpStream>>,
|
||||||
|
) -> Option<(SplitStream<WebSocketStream<TcpStream>>, Block)> {
|
||||||
|
match ws_stream_receiver.next().await {
|
||||||
|
Some(message) => match message {
|
||||||
|
Ok(message) => {
|
||||||
|
if let tokio_tungstenite::tungstenite::Message::Text(message) = message {
|
||||||
|
let block: Block = match serde_json::from_str(&message[..]) {
|
||||||
|
Ok(block) => block,
|
||||||
|
Err(_) => return None,
|
||||||
|
};
|
||||||
|
Some((ws_stream_receiver, block))
|
||||||
|
} else {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => return None,
|
||||||
|
},
|
||||||
|
None => return None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync_client(
|
||||||
|
ws_stream_sender: SplitSink<WebSocketStream<TcpStream>, Message>,
|
||||||
blockchain_thread_safe: Arc<Mutex<BlockChain>>,
|
blockchain_thread_safe: Arc<Mutex<BlockChain>>,
|
||||||
mut block_data_channel_receiver: Receiver<Block>,
|
mut block_data_channel_receiver: Receiver<Block>,
|
||||||
) {
|
) {
|
||||||
|
@ -49,6 +135,7 @@ async fn sync(
|
||||||
Some(ws_stream_sender) => ws_stream_sender,
|
Some(ws_stream_sender) => ws_stream_sender,
|
||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let block = match block_data_channel_receiver.recv().await {
|
let block = match block_data_channel_receiver.recv().await {
|
||||||
Ok(block) => block,
|
Ok(block) => block,
|
||||||
|
@ -62,7 +149,7 @@ async fn sync(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_blockchain(
|
async fn send_blockchain(
|
||||||
mut ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
|
mut ws_stream_sender: SplitSink<WebSocketStream<TcpStream>, Message>,
|
||||||
blockchain_thread_safe: Arc<Mutex<BlockChain>>,
|
blockchain_thread_safe: Arc<Mutex<BlockChain>>,
|
||||||
) -> Option<SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>> {
|
) -> Option<SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>> {
|
||||||
let blockchain = blockchain_thread_safe.lock().await;
|
let blockchain = blockchain_thread_safe.lock().await;
|
||||||
|
@ -77,7 +164,7 @@ async fn send_blockchain(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_block(
|
async fn send_block(
|
||||||
mut ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
|
mut ws_stream_sender: SplitSink<WebSocketStream<TcpStream>, Message>,
|
||||||
block: Block,
|
block: Block,
|
||||||
) -> Option<SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>> {
|
) -> Option<SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>> {
|
||||||
let block_data = serde_json::json!(block).to_string();
|
let block_data = serde_json::json!(block).to_string();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue