From 6b336c62fbdb264979709423e9f2ac42be838fe7 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 15 Nov 2022 23:33:03 +0100 Subject: [PATCH] feat: add blog propagation handlers (#205) --- crates/net/eth-wire/src/types/broadcast.rs | 31 ++++++++++++++ crates/net/network/src/fetch.rs | 18 ++++++++- crates/net/network/src/import.rs | 26 +++++++++++- crates/net/network/src/manager.rs | 25 ++++++++++-- crates/net/network/src/message.rs | 11 ++++- crates/net/network/src/state.rs | 47 +++++++++++++++++++++- crates/net/network/src/swarm.rs | 4 ++ 7 files changed, 151 insertions(+), 11 deletions(-) diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index 601aaac38..a80dc2378 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -10,6 +10,20 @@ pub struct NewBlockHashes( pub Vec, ); +// === impl NewBlockHashes === + +impl NewBlockHashes { + /// Returns the latest block in the list of blocks. + pub fn latest(&self) -> Option<&BlockHashNumber> { + self.0.iter().fold(None, |latest, block| { + if let Some(latest) = latest { + return if latest.number > block.number { Some(latest) } else { Some(block) } + } + Some(block) + }) + } +} + /// A block hash _and_ a block number. #[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] pub struct BlockHashNumber { @@ -87,3 +101,20 @@ impl From> for NewPooledTransactionHashes { NewPooledTransactionHashes(v) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn can_return_latest_block() { + let mut blocks = NewBlockHashes(vec![BlockHashNumber { hash: H256::random(), number: 0 }]); + let latest = blocks.latest().unwrap(); + assert_eq!(latest.number, 0); + + blocks.0.push(BlockHashNumber { hash: H256::random(), number: 100 }); + blocks.0.push(BlockHashNumber { hash: H256::random(), number: 2 }); + let latest = blocks.latest().unwrap(); + assert_eq!(latest.number, 100); + } +} diff --git a/crates/net/network/src/fetch.rs b/crates/net/network/src/fetch.rs index 151051b76..f732fd2e0 100644 --- a/crates/net/network/src/fetch.rs +++ b/crates/net/network/src/fetch.rs @@ -43,7 +43,7 @@ impl StateFetcher { &mut self, peer_id: PeerId, best_hash: H256, - best_number: Option, + best_number: u64, ) { self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number }); } @@ -61,6 +61,20 @@ impl StateFetcher { } } + /// Updates the block information for the peer. + /// + /// Returns `true` if this a newer block + pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) -> bool { + if let Some(peer) = self.peers.get_mut(peer_id) { + if number > peer.best_number { + peer.best_hash = hash; + peer.best_number = number; + return true + } + } + false + } + /// Invoked when an active session is about to be disconnected. pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) { if let Some(peer) = self.peers.get_mut(peer_id) { @@ -246,7 +260,7 @@ struct Peer { /// Best known hash that the peer has best_hash: H256, /// Tracks the best number of the peer. - best_number: Option, + best_number: u64, } /// Tracks the state of an individual peer diff --git a/crates/net/network/src/import.rs b/crates/net/network/src/import.rs index 3a54d76dd..b5fb3342e 100644 --- a/crates/net/network/src/import.rs +++ b/crates/net/network/src/import.rs @@ -22,11 +22,33 @@ pub struct BlockImportOutcome { /// Sender of the `NewBlock` message. pub peer: PeerId, /// The result after validating the block - pub result: Result, + pub result: Result, +} + +/// Represents the successful validation of a received `NewBlock` message. +#[derive(Debug)] +pub enum BlockValidation { + /// Basic Header validity check, after which the block should be relayed to peers via a + /// `NewBlock` message + ValidHeader { + /// received block + block: NewBlockMessage, + }, + /// Successfully imported: state-root matches after execution. The block should be relayed via + /// `NewBlockHashes` + ValidBlock { + /// validated block. + block: NewBlockMessage, + }, } /// Represents the error case of a failed block import -pub enum BlockImportError {} +#[derive(Debug, thiserror::Error)] +pub enum BlockImportError { + /// Consensus error + #[error(transparent)] + Consensus(#[from] reth_interfaces::consensus::Error), +} /// An implementation of `BlockImport` that does nothing #[derive(Debug, Default)] diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 0c5bc208e..399c9eaf1 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -19,7 +19,7 @@ use crate::{ config::NetworkConfig, discovery::Discovery, error::NetworkError, - import::{BlockImport, BlockImportOutcome}, + import::{BlockImport, BlockImportOutcome, BlockValidation}, listener::ConnectionListener, message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, network::{NetworkHandle, NetworkHandleMessage}, @@ -192,10 +192,30 @@ where } } + /// Invoked after a `NewBlock` message from the peer was validated + fn on_block_import_result(&mut self, outcome: BlockImportOutcome) { + let BlockImportOutcome { peer, result } = outcome; + match result { + Ok(validated_block) => match validated_block { + BlockValidation::ValidHeader { block } => { + self.swarm.state_mut().update_peer_block(&peer, block.hash, block.number()); + self.swarm.state_mut().announce_new_block(block); + } + BlockValidation::ValidBlock { block } => { + self.swarm.state_mut().announce_new_block_hash(block); + } + }, + Err(_err) => { + // TODO report peer for bad block + } + } + } + /// Handles a received Message from the peer. fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) { match msg { PeerMessage::NewBlockHashes(hashes) => { + let hashes = Arc::try_unwrap(hashes).unwrap_or_else(|arc| (*arc).clone()); // update peer's state, to track what blocks this peer has seen self.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0) } @@ -240,9 +260,6 @@ where .send_message(&peer_id, PeerMessage::PooledTransactions(msg)), } } - - /// Invoked after a `NewBlock` message from the peer was validated - fn on_block_import_result(&mut self, _outcome: BlockImportOutcome) {} } impl Future for NetworkManager diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 20abe492e..7613cac77 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -26,11 +26,20 @@ pub struct NewBlockMessage { pub block: Arc, } +// === impl NewBlockMessage === + +impl NewBlockMessage { + /// Returns the block number of the block + pub fn number(&self) -> u64 { + self.block.block.header.number + } +} + /// Represents all messages that can be sent to a peer session #[derive(Debug)] pub enum PeerMessage { /// Announce new block hashes - NewBlockHashes(NewBlockHashes), + NewBlockHashes(Arc), /// Broadcast new block. NewBlock(NewBlockMessage), /// Broadcast transactions. diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 38645098a..a27ac007c 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -10,7 +10,7 @@ use crate::{ }, peers::{PeerAction, PeersManager}, }; -use reth_eth_wire::{capability::Capabilities, BlockHashNumber, Status}; +use reth_eth_wire::{capability::Capabilities, BlockHashNumber, NewBlockHashes, Status}; use reth_interfaces::provider::BlockProvider; use reth_primitives::{PeerId, H256}; use std::{ @@ -93,7 +93,8 @@ where debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible"); // find the corresponding block number - let block_number = self.client.block_number(status.blockhash).ok().flatten(); + let block_number = + self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default(); self.state_fetcher.new_connected_peer(peer, status.blockhash, block_number); self.connected_peers.insert( @@ -129,6 +130,7 @@ where // number of peers) let num_propagate = (self.connected_peers.len() as f64).sqrt() as u64 + 1; + let number = msg.block.block.header.number; let mut count = 0; for (peer_id, peer) in self.connected_peers.iter_mut() { if peer.blocks.contains(&msg.hash) { @@ -141,6 +143,11 @@ where self.queued_messages .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() }); + // update peer block info + if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) { + peer.best_hash = msg.hash; + } + // mark the block as seen by the peer peer.blocks.insert(msg.hash); @@ -153,6 +160,36 @@ where } } + /// Completes the block propagation process started in [`NetworkState::announce_new_block()`] + /// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet. + pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage) { + let number = msg.block.block.header.number; + let hashes = Arc::new(NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }])); + for (peer_id, peer) in self.connected_peers.iter_mut() { + if peer.blocks.contains(&msg.hash) { + // skip peers which already reported the block + continue + } + + if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) { + peer.best_hash = msg.hash; + } + + self.queued_messages.push_back(StateAction::NewBlockHashes { + peer_id: *peer_id, + hashes: Arc::clone(&hashes), + }); + } + } + + /// Updates the block information for the peer. + pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) { + if let Some(peer) = self.connected_peers.get_mut(peer_id) { + peer.best_hash = hash; + } + self.state_fetcher.update_peer_block(peer_id, hash, number); + } + /// Invoked after a `NewBlock` message was received by the peer. /// /// This will keep track of blocks we know a peer has @@ -342,6 +379,12 @@ pub enum StateAction { /// The `NewBlock` message block: NewBlockMessage, }, + NewBlockHashes { + /// Target of the message + peer_id: PeerId, + /// `NewBlockHashes` message to send to the peer. + hashes: Arc, + }, /// Create a new connection to the given node. Connect { remote_addr: SocketAddr, node_id: PeerId }, /// Disconnect an existing connection diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 9acb9a86f..ceb40fa86 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -153,6 +153,10 @@ where let msg = PeerMessage::NewBlock(msg); self.sessions.send_message(&peer_id, msg); } + StateAction::NewBlockHashes { peer_id, hashes } => { + let msg = PeerMessage::NewBlockHashes(hashes); + self.sessions.send_message(&peer_id, msg); + } } None }