From a3800c52c09627c75b90d65b5e237908d7a9d632 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Wed, 1 Feb 2023 16:28:58 +0200 Subject: [PATCH] feat(consensus): beacon consensus builder (#1119) --- bin/reth/src/node/mod.rs | 33 ++++++++--------- bin/reth/src/stage/mod.rs | 3 +- .../consensus/src/beacon/beacon_consensus.rs | 36 +++++++++---------- crates/consensus/src/beacon/builder.rs | 22 ++++++++++++ crates/consensus/src/beacon/mod.rs | 2 ++ crates/interfaces/src/consensus.rs | 8 +---- crates/interfaces/src/test_utils/headers.rs | 19 +++++----- crates/stages/src/stages/headers.rs | 4 +-- 8 files changed, 68 insertions(+), 59 deletions(-) create mode 100644 crates/consensus/src/beacon/builder.rs diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 6b39373ce..b333d7e02 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -153,26 +153,21 @@ impl Command { } fn init_consensus(&self) -> eyre::Result> { - // TODO: This should be in a builder/factory in the consensus crate - let consensus: Arc = { - let beacon_consensus = BeaconConsensus::new(self.chain.clone()); + let (consensus, notifier) = BeaconConsensus::builder().build(self.chain.clone()); - if let Some(tip) = self.tip { - debug!(target: "reth::cli", %tip, "Tip manually set"); - beacon_consensus.notify_fork_choice_state(ForkchoiceState { - head_block_hash: tip, - safe_block_hash: tip, - finalized_block_hash: tip, - })?; - } else { - let warn_msg = "No tip specified. \ - reth cannot communicate with consensus clients, \ - so a tip must manually be provided for the online stages with --debug.tip ."; - warn!(target: "reth::cli", warn_msg); - } - - Arc::new(beacon_consensus) - }; + if let Some(tip) = self.tip { + debug!(target: "reth::cli", %tip, "Tip manually set"); + notifier.send(ForkchoiceState { + head_block_hash: tip, + safe_block_hash: tip, + finalized_block_hash: tip, + })?; + } else { + let warn_msg = "No tip specified. \ + reth cannot communicate with consensus clients, \ + so a tip must manually be provided for the online stages with --debug.tip ."; + warn!(target: "reth::cli", warn_msg); + } Ok(consensus) } diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index 001e66ca6..0ebd4cd08 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -126,8 +126,7 @@ impl Command { match self.stage { StageEnum::Bodies => { - let consensus: Arc = - Arc::new(BeaconConsensus::new(self.chain.clone())); + let (consensus, _) = BeaconConsensus::builder().build(self.chain.clone()); let mut config = config; config.peers.connect_trusted_nodes_only = self.network.trusted_only; diff --git a/crates/consensus/src/beacon/beacon_consensus.rs b/crates/consensus/src/beacon/beacon_consensus.rs index cfea20d45..cd6c51808 100644 --- a/crates/consensus/src/beacon/beacon_consensus.rs +++ b/crates/consensus/src/beacon/beacon_consensus.rs @@ -1,8 +1,10 @@ //! Consensus for ethereum network use crate::validation; use reth_interfaces::consensus::{Consensus, Error, ForkchoiceState}; -use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, SealedHeader, H256}; -use tokio::sync::{watch, watch::error::SendError}; +use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, SealedHeader}; +use tokio::sync::watch; + +use super::BeaconConsensusBuilder; /// Ethereum beacon consensus /// @@ -11,35 +13,29 @@ use tokio::sync::{watch, watch::error::SendError}; #[derive(Debug)] pub struct BeaconConsensus { /// Watcher over the forkchoice state - channel: (watch::Sender, watch::Receiver), + forkchoice_state_rx: watch::Receiver, /// Configuration chain_spec: ChainSpec, } impl BeaconConsensus { /// Create a new instance of [BeaconConsensus] - pub fn new(chain_spec: ChainSpec) -> Self { - Self { - channel: watch::channel(ForkchoiceState { - head_block_hash: H256::zero(), - finalized_block_hash: H256::zero(), - safe_block_hash: H256::zero(), - }), - chain_spec, - } + pub fn new( + chain_spec: ChainSpec, + forkchoice_state_rx: watch::Receiver, + ) -> Self { + Self { chain_spec, forkchoice_state_rx } + } + + /// Create new [BeaconConsensusBuilder]. + pub fn builder() -> BeaconConsensusBuilder { + BeaconConsensusBuilder::default() } } impl Consensus for BeaconConsensus { fn fork_choice_state(&self) -> watch::Receiver { - self.channel.1.clone() - } - - fn notify_fork_choice_state( - &self, - state: ForkchoiceState, - ) -> Result<(), SendError> { - self.channel.0.send(state) + self.forkchoice_state_rx.clone() } fn validate_header(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), Error> { diff --git a/crates/consensus/src/beacon/builder.rs b/crates/consensus/src/beacon/builder.rs new file mode 100644 index 000000000..d3f2bd97b --- /dev/null +++ b/crates/consensus/src/beacon/builder.rs @@ -0,0 +1,22 @@ +use super::BeaconConsensus; +use reth_interfaces::consensus::ForkchoiceState; +use reth_primitives::ChainSpec; +use std::sync::Arc; +use tokio::sync::watch; + +/// TODO: +#[derive(Debug, Default)] +pub struct BeaconConsensusBuilder; + +impl BeaconConsensusBuilder { + /// Create new instance of [BeaconConsensus] and forkchoice notifier. Internally, creates a + /// [watch::channel] for updating the forkchoice state. + pub fn build( + self, + chain_spec: ChainSpec, + ) -> (Arc, watch::Sender) { + let (forkchoice_state_tx, forkchoice_state_rx) = watch::channel(ForkchoiceState::default()); + let inner = Arc::new(BeaconConsensus::new(chain_spec, forkchoice_state_rx)); + (inner, forkchoice_state_tx) + } +} diff --git a/crates/consensus/src/beacon/mod.rs b/crates/consensus/src/beacon/mod.rs index dbec2ec96..ec2d4ab5c 100644 --- a/crates/consensus/src/beacon/mod.rs +++ b/crates/consensus/src/beacon/mod.rs @@ -1,5 +1,7 @@ //! Beacon consensus implementation. mod beacon_consensus; +mod builder; pub use beacon_consensus::BeaconConsensus; +pub use builder::BeaconConsensusBuilder; diff --git a/crates/interfaces/src/consensus.rs b/crates/interfaces/src/consensus.rs index ced617341..b7b9b3743 100644 --- a/crates/interfaces/src/consensus.rs +++ b/crates/interfaces/src/consensus.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use reth_primitives::{BlockHash, BlockNumber, SealedBlock, SealedHeader, H256}; use std::fmt::Debug; -use tokio::sync::watch::{error::SendError, Receiver}; +use tokio::sync::watch::Receiver; /// Re-export fork choice state pub use reth_rpc_types::engine::ForkchoiceState; @@ -13,12 +13,6 @@ pub trait Consensus: Debug + Send + Sync { /// Get a receiver for the fork choice state fn fork_choice_state(&self) -> Receiver; - /// Notifies all listeners of the latest [ForkchoiceState]. - fn notify_fork_choice_state( - &self, - state: ForkchoiceState, - ) -> Result<(), SendError>; - /// Validate if header is correct and follows consensus specification. /// /// **This should not be called for the genesis block**. diff --git a/crates/interfaces/src/test_utils/headers.rs b/crates/interfaces/src/test_utils/headers.rs index 413900a96..219fce000 100644 --- a/crates/interfaces/src/test_utils/headers.rs +++ b/crates/interfaces/src/test_utils/headers.rs @@ -279,15 +279,23 @@ impl Default for TestConsensus { } impl TestConsensus { - /// Get the failed validation flag + /// Get the failed validation flag. pub fn fail_validation(&self) -> bool { self.fail_validation.load(Ordering::SeqCst) } - /// Update the validation flag + /// Update the validation flag. pub fn set_fail_validation(&self, val: bool) { self.fail_validation.store(val, Ordering::SeqCst) } + + /// Update the forkchoice state. + pub fn notify_fork_choice_state( + &self, + state: ForkchoiceState, + ) -> Result<(), SendError> { + self.channel.0.send(state) + } } /// Nil status updater for testing @@ -304,13 +312,6 @@ impl Consensus for TestConsensus { self.channel.1.clone() } - fn notify_fork_choice_state( - &self, - state: ForkchoiceState, - ) -> Result<(), SendError> { - self.channel.0.send(state) - } - fn validate_header( &self, _header: &SealedHeader, diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 6b6cb983d..168d2e3fb 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -287,7 +287,7 @@ mod tests { ReverseHeadersDownloader, ReverseHeadersDownloaderBuilder, }; use reth_interfaces::{ - consensus::{Consensus, ForkchoiceState}, + consensus::ForkchoiceState, p2p::headers::downloader::HeaderDownloader, test_utils::{ generators::{random_header, random_header_range}, @@ -488,7 +488,7 @@ mod tests { let stage = runner.stage(); let consensus_tip = H256::random(); - stage + runner .consensus .notify_fork_choice_state(ForkchoiceState { head_block_hash: consensus_tip,