feat: blockchain and block transfer

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-05-25 20:07:22 +03:00
parent 35c1207cff
commit e7d10c46f8
6 changed files with 98 additions and 62 deletions

View file

@ -5,8 +5,6 @@ use serde::{Deserialize, Serialize};
use sha3::{Digest, Sha3_512}; use sha3::{Digest, Sha3_512};
use tokio::sync::broadcast::Sender; use tokio::sync::broadcast::Sender;
use crate::blockchain::BlockChain;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Block { pub struct Block {
pub index: u64, pub index: u64,
@ -49,10 +47,10 @@ impl Block {
block block
} }
pub fn mine(&mut self, blockhain: BlockChain) -> Self { pub fn mine(&mut self, difficulty: usize) -> Self {
let mut hash = self.calculate_hash(); let mut hash = self.calculate_hash();
loop { loop {
if !hash.starts_with(&"0".repeat(blockhain.difficulty)) { if !hash.starts_with(&"0".repeat(difficulty)) {
self.proof_of_work += 1; self.proof_of_work += 1;
hash = self.calculate_hash(); hash = self.calculate_hash();
} else { } else {

View file

@ -34,20 +34,25 @@ impl BlockChain {
} }
} }
pub fn add_block( pub fn create_block(
&mut self, &mut self,
data: String, data: impl ToString,
instant: Instant, instant: Instant,
block_data_channel_sender: Sender<Block>, block_data_channel_sender: Sender<Block>,
) { ) {
let new_block = Block::new( let new_block = Block::new(
self.chain.len() as u64, self.chain.len() as u64,
data, data.to_string(),
self.chain[&self.chain.len() - 1].hash.clone(), self.chain[&self.chain.len() - 1].hash.clone(),
instant, instant,
block_data_channel_sender, block_data_channel_sender,
) )
.mine(self.clone()); .mine(self.difficulty);
self.chain.push(new_block); self.chain.push(new_block);
} }
pub fn add_block(&mut self, mut block: Block) {
block.mine(self.difficulty);
self.chain.push(block);
}
} }

View file

@ -2,7 +2,7 @@ use futures_util::{stream::SplitStream, StreamExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use crate::ClientConfig; use crate::{block::Block, blockchain::BlockChain, ClientConfig};
pub async fn start_network(client_config: ClientConfig) { pub async fn start_network(client_config: ClientConfig) {
let ws_stream = match connect_async(format!( let ws_stream = match connect_async(format!(
@ -19,38 +19,64 @@ pub async fn start_network(client_config: ClientConfig) {
} }
async fn sync(ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) { async fn sync(ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
let mut ws_stream = match receive_blockchain(ws_stream_receiver).await { let (mut ws_stream, mut blockchain) = match receive_blockchain(ws_stream_receiver).await {
Some(ws_stream) => ws_stream, Some((ws_stream, blockchain)) => (ws_stream, blockchain),
None => return, None => return,
}; };
loop { loop {
ws_stream = match receive_block(ws_stream).await { let block: Block;
Some(ws_stream) => ws_stream, (ws_stream, block) = match receive_block(ws_stream).await {
Some((ws_stream, block)) => (ws_stream, block),
None => return, None => return,
} };
blockchain.add_block(block);
} }
} }
async fn receive_blockchain(mut ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) -> Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>> { async fn receive_blockchain(
mut ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Option<(
SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
BlockChain,
)> {
match ws_stream_receiver.next().await { match ws_stream_receiver.next().await {
Some(message) => match message { Some(message) => match message {
Ok(message) => { Ok(message) => {
println!("{}", message); if let tokio_tungstenite::tungstenite::Message::Text(message) = message {
Some(ws_stream_receiver) let blockchain: BlockChain = match serde_json::from_str(&message[..]) {
}, Ok(blockchain) => blockchain,
Err(_) => return None,
};
Some((ws_stream_receiver, blockchain))
} else {
return None;
}
}
Err(_) => return None, Err(_) => return None,
}, },
None => return None, None => return None,
} }
} }
async fn receive_block(mut ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) -> Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>> { async fn receive_block(
mut ws_stream_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Option<(
SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
Block,
)> {
match ws_stream_receiver.next().await { match ws_stream_receiver.next().await {
Some(message) => match message { Some(message) => match message {
Ok(message) => { Ok(message) => {
println!("{}", message); if let tokio_tungstenite::tungstenite::Message::Text(message) = message {
Some(ws_stream_receiver) let block: Block = match serde_json::from_str(&message[..]) {
}, Ok(block) => block,
Err(_) => return None,
};
Some((ws_stream_receiver, block))
} else {
return None;
}
}
Err(_) => return None, Err(_) => return None,
}, },
None => return None, None => return None,

View file

@ -1,10 +1,12 @@
use std::sync::Arc;
use rust_blockchain::{ use rust_blockchain::{
blockchain::BlockChain, blockchain::BlockChain,
client_network, server_network, client_network, server_network,
utils::{read_client_config, read_server_config, take_args}, utils::{read_client_config, read_server_config, take_args},
Runner, Runner,
}; };
use tokio::sync::broadcast; use tokio::sync::{broadcast, Mutex};
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -28,9 +30,11 @@ async fn server() {
let blockchain = BlockChain::new(server_config.difficulty.into()); let blockchain = BlockChain::new(server_config.difficulty.into());
let block_data_channel_sender = broadcast::channel(1).0; let block_data_channel_sender = broadcast::channel(1).0;
let blockhain_thread_safe = Arc::new(Mutex::new(blockchain));
server_network::start_network( server_network::start_network(
server_config, server_config,
&blockchain, blockhain_thread_safe,
block_data_channel_sender.subscribe(), block_data_channel_sender.subscribe(),
) )
.await; .await;

View file

@ -1,7 +1,9 @@
use std::sync::Arc;
use futures_util::{stream::SplitSink, SinkExt, StreamExt}; use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use tokio::{ use tokio::{
net::TcpListener, net::TcpListener,
sync::broadcast::Receiver, sync::{broadcast::Receiver, Mutex},
}; };
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
@ -9,7 +11,7 @@ use crate::{block::Block, blockchain::BlockChain, ServerConfig};
pub async fn start_network( pub async fn start_network(
server_config: ServerConfig, server_config: ServerConfig,
blockchain: &BlockChain, blockchain_thread_safe: Arc<Mutex<BlockChain>>,
block_data_channel_receiver: Receiver<Block>, block_data_channel_receiver: Receiver<Block>,
) { ) {
let listener_socket = match TcpListener::bind(format!( let listener_socket = match TcpListener::bind(format!(
@ -30,7 +32,7 @@ pub async fn start_network(
let (ws_stream_sender, _) = ws_stream.split(); let (ws_stream_sender, _) = ws_stream.split();
tokio::spawn(sync( tokio::spawn(sync(
ws_stream_sender, ws_stream_sender,
blockchain.clone(), blockchain_thread_safe.clone(),
block_data_channel_receiver.resubscribe(), block_data_channel_receiver.resubscribe(),
)); ));
} }
@ -39,50 +41,51 @@ pub async fn start_network(
async fn sync( async fn sync(
ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
blockchain: BlockChain, blockchain_thread_safe: Arc<Mutex<BlockChain>>,
block_data_channel_receiver: Receiver<Block>, mut block_data_channel_receiver: Receiver<Block>,
) { ) {
let ws_stream_sender = match send_blockchain(ws_stream_sender, blockchain).await { let mut ws_stream_sender = match send_blockchain(ws_stream_sender, blockchain_thread_safe).await
{
Some(ws_stream_sender) => ws_stream_sender, Some(ws_stream_sender) => ws_stream_sender,
None => return, None => return,
}; };
send_blocks(ws_stream_sender, block_data_channel_receiver).await; loop {
let block = match block_data_channel_receiver.recv().await {
Ok(block) => block,
Err(_) => return,
};
ws_stream_sender = match send_block(ws_stream_sender, block).await {
Some(ws_stream_sender) => ws_stream_sender,
None => return,
}
}
} }
async fn send_blockchain(mut ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, blockchain: BlockChain) -> Option<SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>> { async fn send_blockchain(
let blockchain_data = serde_json::json!({ mut ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
"blockchain": blockchain blockchain_thread_safe: Arc<Mutex<BlockChain>>,
}) ) -> Option<SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>> {
.to_string(); let blockchain = blockchain_thread_safe.lock().await;
let blockchain_data = serde_json::json!(*blockchain).to_string();
match ws_stream_sender.send(blockchain_data.into()).await { match ws_stream_sender.send(blockchain_data.into()).await {
Ok(_) => { Ok(_) => match ws_stream_sender.flush().await {
match ws_stream_sender.flush().await {
Ok(_) => Some(ws_stream_sender), Ok(_) => Some(ws_stream_sender),
Err(_) => None, Err(_) => None,
}
}, },
Err(_) => None, Err(_) => None,
} }
} }
async fn send_blocks(mut ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, mut block_data_channel_receiver: Receiver<Block>) { async fn send_block(
loop { mut ws_stream_sender: SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
match block_data_channel_receiver.recv().await { block: Block,
Ok(block) => { ) -> Option<SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>> {
let block_data = serde_json::json!({ let block_data = serde_json::json!(block).to_string();
"block": block
})
.to_string();
match ws_stream_sender.send(block_data.into()).await { match ws_stream_sender.send(block_data.into()).await {
Ok(_) => { Ok(_) => match ws_stream_sender.flush().await {
if ws_stream_sender.flush().await.is_err() { Ok(_) => Some(ws_stream_sender),
return; Err(_) => None,
} },
} Err(_) => None,
Err(_) => return,
}
}
Err(_) => return,
}
} }
} }

View file

@ -33,7 +33,7 @@ async fn create_block() {
let instant = Instant::now(); let instant = Instant::now();
let mut blockchain = BlockChain::new(1); let mut blockchain = BlockChain::new(1);
let block_data_channel_sender = channel(1).0; let block_data_channel_sender = channel(1).0;
BlockChain::add_block( BlockChain::create_block(
&mut blockchain, &mut blockchain,
"Ahmet Kaan Gümüş".to_string(), "Ahmet Kaan Gümüş".to_string(),
instant, instant,