feat: add blog propagation handlers (#205)

This commit is contained in:
Matthias Seitz
2022-11-15 23:33:03 +01:00
committed by GitHub
parent f8fddcdfa4
commit 6b336c62fb
7 changed files with 151 additions and 11 deletions

View File

@ -10,6 +10,20 @@ pub struct NewBlockHashes(
pub Vec<BlockHashNumber>, pub Vec<BlockHashNumber>,
); );
// === impl NewBlockHashes ===
impl NewBlockHashes {
/// Returns the latest block in the list of blocks.
pub fn latest(&self) -> Option<&BlockHashNumber> {
self.0.iter().fold(None, |latest, block| {
if let Some(latest) = latest {
return if latest.number > block.number { Some(latest) } else { Some(block) }
}
Some(block)
})
}
}
/// A block hash _and_ a block number. /// A block hash _and_ a block number.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] #[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
pub struct BlockHashNumber { pub struct BlockHashNumber {
@ -87,3 +101,20 @@ impl From<Vec<H256>> for NewPooledTransactionHashes {
NewPooledTransactionHashes(v) NewPooledTransactionHashes(v)
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn can_return_latest_block() {
let mut blocks = NewBlockHashes(vec![BlockHashNumber { hash: H256::random(), number: 0 }]);
let latest = blocks.latest().unwrap();
assert_eq!(latest.number, 0);
blocks.0.push(BlockHashNumber { hash: H256::random(), number: 100 });
blocks.0.push(BlockHashNumber { hash: H256::random(), number: 2 });
let latest = blocks.latest().unwrap();
assert_eq!(latest.number, 100);
}
}

View File

@ -43,7 +43,7 @@ impl StateFetcher {
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
best_hash: H256, best_hash: H256,
best_number: Option<u64>, best_number: u64,
) { ) {
self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number }); self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number });
} }
@ -61,6 +61,20 @@ impl StateFetcher {
} }
} }
/// Updates the block information for the peer.
///
/// Returns `true` if this a newer block
pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) -> bool {
if let Some(peer) = self.peers.get_mut(peer_id) {
if number > peer.best_number {
peer.best_hash = hash;
peer.best_number = number;
return true
}
}
false
}
/// Invoked when an active session is about to be disconnected. /// Invoked when an active session is about to be disconnected.
pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) { pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) { if let Some(peer) = self.peers.get_mut(peer_id) {
@ -246,7 +260,7 @@ struct Peer {
/// Best known hash that the peer has /// Best known hash that the peer has
best_hash: H256, best_hash: H256,
/// Tracks the best number of the peer. /// Tracks the best number of the peer.
best_number: Option<u64>, best_number: u64,
} }
/// Tracks the state of an individual peer /// Tracks the state of an individual peer

View File

@ -22,11 +22,33 @@ pub struct BlockImportOutcome {
/// Sender of the `NewBlock` message. /// Sender of the `NewBlock` message.
pub peer: PeerId, pub peer: PeerId,
/// The result after validating the block /// The result after validating the block
pub result: Result<NewBlockMessage, BlockImportError>, pub result: Result<BlockValidation, BlockImportError>,
}
/// Represents the successful validation of a received `NewBlock` message.
#[derive(Debug)]
pub enum BlockValidation {
/// Basic Header validity check, after which the block should be relayed to peers via a
/// `NewBlock` message
ValidHeader {
/// received block
block: NewBlockMessage,
},
/// Successfully imported: state-root matches after execution. The block should be relayed via
/// `NewBlockHashes`
ValidBlock {
/// validated block.
block: NewBlockMessage,
},
} }
/// Represents the error case of a failed block import /// Represents the error case of a failed block import
pub enum BlockImportError {} #[derive(Debug, thiserror::Error)]
pub enum BlockImportError {
/// Consensus error
#[error(transparent)]
Consensus(#[from] reth_interfaces::consensus::Error),
}
/// An implementation of `BlockImport` that does nothing /// An implementation of `BlockImport` that does nothing
#[derive(Debug, Default)] #[derive(Debug, Default)]

View File

@ -19,7 +19,7 @@ use crate::{
config::NetworkConfig, config::NetworkConfig,
discovery::Discovery, discovery::Discovery,
error::NetworkError, error::NetworkError,
import::{BlockImport, BlockImportOutcome}, import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener, listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
network::{NetworkHandle, NetworkHandleMessage}, network::{NetworkHandle, NetworkHandleMessage},
@ -192,10 +192,30 @@ where
} }
} }
/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, outcome: BlockImportOutcome) {
let BlockImportOutcome { peer, result } = outcome;
match result {
Ok(validated_block) => match validated_block {
BlockValidation::ValidHeader { block } => {
self.swarm.state_mut().update_peer_block(&peer, block.hash, block.number());
self.swarm.state_mut().announce_new_block(block);
}
BlockValidation::ValidBlock { block } => {
self.swarm.state_mut().announce_new_block_hash(block);
}
},
Err(_err) => {
// TODO report peer for bad block
}
}
}
/// Handles a received Message from the peer. /// Handles a received Message from the peer.
fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) { fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) {
match msg { match msg {
PeerMessage::NewBlockHashes(hashes) => { PeerMessage::NewBlockHashes(hashes) => {
let hashes = Arc::try_unwrap(hashes).unwrap_or_else(|arc| (*arc).clone());
// update peer's state, to track what blocks this peer has seen // update peer's state, to track what blocks this peer has seen
self.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0) self.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0)
} }
@ -240,9 +260,6 @@ where
.send_message(&peer_id, PeerMessage::PooledTransactions(msg)), .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
} }
} }
/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, _outcome: BlockImportOutcome) {}
} }
impl<C> Future for NetworkManager<C> impl<C> Future for NetworkManager<C>

View File

@ -26,11 +26,20 @@ pub struct NewBlockMessage {
pub block: Arc<NewBlock>, pub block: Arc<NewBlock>,
} }
// === impl NewBlockMessage ===
impl NewBlockMessage {
/// Returns the block number of the block
pub fn number(&self) -> u64 {
self.block.block.header.number
}
}
/// Represents all messages that can be sent to a peer session /// Represents all messages that can be sent to a peer session
#[derive(Debug)] #[derive(Debug)]
pub enum PeerMessage { pub enum PeerMessage {
/// Announce new block hashes /// Announce new block hashes
NewBlockHashes(NewBlockHashes), NewBlockHashes(Arc<NewBlockHashes>),
/// Broadcast new block. /// Broadcast new block.
NewBlock(NewBlockMessage), NewBlock(NewBlockMessage),
/// Broadcast transactions. /// Broadcast transactions.

View File

@ -10,7 +10,7 @@ use crate::{
}, },
peers::{PeerAction, PeersManager}, peers::{PeerAction, PeersManager},
}; };
use reth_eth_wire::{capability::Capabilities, BlockHashNumber, Status}; use reth_eth_wire::{capability::Capabilities, BlockHashNumber, NewBlockHashes, Status};
use reth_interfaces::provider::BlockProvider; use reth_interfaces::provider::BlockProvider;
use reth_primitives::{PeerId, H256}; use reth_primitives::{PeerId, H256};
use std::{ use std::{
@ -93,7 +93,8 @@ where
debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible"); debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible");
// find the corresponding block number // find the corresponding block number
let block_number = self.client.block_number(status.blockhash).ok().flatten(); let block_number =
self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
self.state_fetcher.new_connected_peer(peer, status.blockhash, block_number); self.state_fetcher.new_connected_peer(peer, status.blockhash, block_number);
self.connected_peers.insert( self.connected_peers.insert(
@ -129,6 +130,7 @@ where
// number of peers) // number of peers)
let num_propagate = (self.connected_peers.len() as f64).sqrt() as u64 + 1; let num_propagate = (self.connected_peers.len() as f64).sqrt() as u64 + 1;
let number = msg.block.block.header.number;
let mut count = 0; let mut count = 0;
for (peer_id, peer) in self.connected_peers.iter_mut() { for (peer_id, peer) in self.connected_peers.iter_mut() {
if peer.blocks.contains(&msg.hash) { if peer.blocks.contains(&msg.hash) {
@ -141,6 +143,11 @@ where
self.queued_messages self.queued_messages
.push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() }); .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
// update peer block info
if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
peer.best_hash = msg.hash;
}
// mark the block as seen by the peer // mark the block as seen by the peer
peer.blocks.insert(msg.hash); peer.blocks.insert(msg.hash);
@ -153,6 +160,36 @@ where
} }
} }
/// Completes the block propagation process started in [`NetworkState::announce_new_block()`]
/// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage) {
let number = msg.block.block.header.number;
let hashes = Arc::new(NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]));
for (peer_id, peer) in self.connected_peers.iter_mut() {
if peer.blocks.contains(&msg.hash) {
// skip peers which already reported the block
continue
}
if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
peer.best_hash = msg.hash;
}
self.queued_messages.push_back(StateAction::NewBlockHashes {
peer_id: *peer_id,
hashes: Arc::clone(&hashes),
});
}
}
/// Updates the block information for the peer.
pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) {
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
peer.best_hash = hash;
}
self.state_fetcher.update_peer_block(peer_id, hash, number);
}
/// Invoked after a `NewBlock` message was received by the peer. /// Invoked after a `NewBlock` message was received by the peer.
/// ///
/// This will keep track of blocks we know a peer has /// This will keep track of blocks we know a peer has
@ -342,6 +379,12 @@ pub enum StateAction {
/// The `NewBlock` message /// The `NewBlock` message
block: NewBlockMessage, block: NewBlockMessage,
}, },
NewBlockHashes {
/// Target of the message
peer_id: PeerId,
/// `NewBlockHashes` message to send to the peer.
hashes: Arc<NewBlockHashes>,
},
/// Create a new connection to the given node. /// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, node_id: PeerId }, Connect { remote_addr: SocketAddr, node_id: PeerId },
/// Disconnect an existing connection /// Disconnect an existing connection

View File

@ -153,6 +153,10 @@ where
let msg = PeerMessage::NewBlock(msg); let msg = PeerMessage::NewBlock(msg);
self.sessions.send_message(&peer_id, msg); self.sessions.send_message(&peer_id, msg);
} }
StateAction::NewBlockHashes { peer_id, hashes } => {
let msg = PeerMessage::NewBlockHashes(hashes);
self.sessions.send_message(&peer_id, msg);
}
} }
None None
} }