diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 5455f0206..31234651e 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -281,20 +281,15 @@ impl NodeCommand { Factory::new(self.chain.clone()), Arc::clone(&self.chain), ); - let tree_config = BlockchainTreeConfig::default(); - // The size of the broadcast is twice the maximum reorg depth, because at maximum reorg - // depth at least N blocks must be sent at once. - let (canon_state_notification_sender, _receiver) = - tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2); - let blockchain_tree = ShareableBlockchainTree::new( - BlockchainTree::new( - tree_externals, - canon_state_notification_sender.clone(), - tree_config, - prune_config.clone().map(|config| config.parts), - )? - .with_sync_metrics_tx(metrics_tx.clone()), - ); + let _tree_config = BlockchainTreeConfig::default(); + let tree = BlockchainTree::new( + tree_externals, + BlockchainTreeConfig::default(), + prune_config.clone().map(|config| config.parts), + )? + .with_sync_metrics_tx(metrics_tx.clone()); + let canon_state_notification_sender = tree.canon_state_notification_sender(); + let blockchain_tree = ShareableBlockchainTree::new(tree); // fetch the head block from the database let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?; diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index 1c8651dfe..b8c2c75c1 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -25,6 +25,7 @@ reth-stages = { path = "../stages" } parking_lot.workspace = true lru = "0.11" tracing.workspace = true +tokio = { workspace = true, features = ["macros", "sync"] } # metrics reth-metrics = { workspace = true, features = ["common"] } @@ -41,7 +42,6 @@ reth-primitives = { workspace = true , features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } parking_lot.workspace = true assert_matches.workspace = true -tokio = { workspace = true, features = ["macros", "sync"] } [features] test-utils = [] diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index a5bcb03c8..401d9c7ba 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -96,25 +96,18 @@ pub struct BlockchainTree { prune_modes: Option, } -/// A container that wraps chains and block indices to allow searching for block hashes across all -/// sidechains. -#[derive(Debug)] -pub struct BlockHashes<'a> { - /// The current tracked chains. - pub chains: &'a mut HashMap, - /// The block indices for all chains. - pub indices: &'a BlockIndices, -} - impl BlockchainTree { /// Create a new blockchain tree. pub fn new( externals: TreeExternals, - canon_state_notification_sender: CanonStateNotificationSender, config: BlockchainTreeConfig, prune_modes: Option, ) -> RethResult { let max_reorg_depth = config.max_reorg_depth(); + // The size of the broadcast is twice the maximum reorg depth, because at maximum reorg + // depth at least N blocks must be sent at once. + let (canon_state_notification_sender, _receiver) = + tokio::sync::broadcast::channel(max_reorg_depth as usize * 2); let last_canonical_hashes = externals .db @@ -1069,11 +1062,16 @@ impl BlockchainTree /// Subscribe to new blocks events. /// - /// Note: Only canonical blocks are send. + /// Note: Only canonical blocks are emitted by the tree. pub fn subscribe_canon_state(&self) -> CanonStateNotifications { self.canon_state_notification_sender.subscribe() } + /// Returns a clone of the sender for the canonical state notifications. + pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender { + self.canon_state_notification_sender.clone() + } + /// Canonicalize the given chain and commit it to the database. fn commit_canonical(&self, chain: Chain) -> RethResult<()> { let provider = DatabaseProvider::new_rw( @@ -1169,6 +1167,16 @@ impl BlockchainTree } } +/// A container that wraps chains and block indices to allow searching for block hashes across all +/// sidechains. +#[derive(Debug)] +pub struct BlockHashes<'a> { + /// The current tracked chains. + pub chains: &'a mut HashMap, + /// The block indices for all chains. + pub indices: &'a BlockIndices, +} + #[cfg(test)] mod tests { use super::*; @@ -1316,10 +1324,9 @@ mod tests { // make tree let config = BlockchainTreeConfig::new(1, 2, 3, 2); - let (sender, mut canon_notif) = tokio::sync::broadcast::channel(10); - let mut tree = - BlockchainTree::new(externals, sender, config, None).expect("failed to create tree"); + let mut tree = BlockchainTree::new(externals, config, None).expect("failed to create tree"); + let mut canon_notif = tree.subscribe_canon_state(); // genesis block 10 is already canonical tree.make_canonical(&B256::ZERO).unwrap(); diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 4765c3ab9..8ced83319 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -506,10 +506,8 @@ where self.base_config.chain_spec.clone(), ); let config = BlockchainTreeConfig::new(1, 2, 3, 2); - let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3); let tree = ShareableBlockchainTree::new( - BlockchainTree::new(externals, canon_state_notification_sender, config, None) - .expect("failed to create tree"), + BlockchainTree::new(externals, config, None).expect("failed to create tree"), ); let shareable_db = ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone()); let latest = self.base_config.chain_spec.genesis_header().seal_slow();