refactor: move broadcast channel init into tree (#4894)

This commit is contained in:
Matthias Seitz
2023-10-03 19:26:41 +02:00
committed by GitHub
parent b1a7a871cd
commit d3cc4cc643
4 changed files with 33 additions and 33 deletions

View File

@ -281,20 +281,15 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Factory::new(self.chain.clone()),
Arc::clone(&self.chain),
);
let tree_config = BlockchainTreeConfig::default();
// The size of the broadcast is twice the maximum reorg depth, because at maximum reorg
// depth at least N blocks must be sent at once.
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
let blockchain_tree = ShareableBlockchainTree::new(
BlockchainTree::new(
tree_externals,
canon_state_notification_sender.clone(),
tree_config,
prune_config.clone().map(|config| config.parts),
)?
.with_sync_metrics_tx(metrics_tx.clone()),
);
let _tree_config = BlockchainTreeConfig::default();
let tree = BlockchainTree::new(
tree_externals,
BlockchainTreeConfig::default(),
prune_config.clone().map(|config| config.parts),
)?
.with_sync_metrics_tx(metrics_tx.clone());
let canon_state_notification_sender = tree.canon_state_notification_sender();
let blockchain_tree = ShareableBlockchainTree::new(tree);
// fetch the head block from the database
let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?;

View File

@ -25,6 +25,7 @@ reth-stages = { path = "../stages" }
parking_lot.workspace = true
lru = "0.11"
tracing.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
# metrics
reth-metrics = { workspace = true, features = ["common"] }
@ -41,7 +42,6 @@ reth-primitives = { workspace = true , features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
parking_lot.workspace = true
assert_matches.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
[features]
test-utils = []

View File

@ -96,25 +96,18 @@ pub struct BlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
prune_modes: Option<PruneModes>,
}
/// A container that wraps chains and block indices to allow searching for block hashes across all
/// sidechains.
#[derive(Debug)]
pub struct BlockHashes<'a> {
/// The current tracked chains.
pub chains: &'a mut HashMap<BlockChainId, AppendableChain>,
/// The block indices for all chains.
pub indices: &'a BlockIndices,
}
impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF> {
/// Create a new blockchain tree.
pub fn new(
externals: TreeExternals<DB, C, EF>,
canon_state_notification_sender: CanonStateNotificationSender,
config: BlockchainTreeConfig,
prune_modes: Option<PruneModes>,
) -> RethResult<Self> {
let max_reorg_depth = config.max_reorg_depth();
// The size of the broadcast is twice the maximum reorg depth, because at maximum reorg
// depth at least N blocks must be sent at once.
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(max_reorg_depth as usize * 2);
let last_canonical_hashes = externals
.db
@ -1069,11 +1062,16 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
/// Subscribe to new blocks events.
///
/// Note: Only canonical blocks are send.
/// Note: Only canonical blocks are emitted by the tree.
pub fn subscribe_canon_state(&self) -> CanonStateNotifications {
self.canon_state_notification_sender.subscribe()
}
/// Returns a clone of the sender for the canonical state notifications.
pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender {
self.canon_state_notification_sender.clone()
}
/// Canonicalize the given chain and commit it to the database.
fn commit_canonical(&self, chain: Chain) -> RethResult<()> {
let provider = DatabaseProvider::new_rw(
@ -1169,6 +1167,16 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
}
/// A container that wraps chains and block indices to allow searching for block hashes across all
/// sidechains.
#[derive(Debug)]
pub struct BlockHashes<'a> {
/// The current tracked chains.
pub chains: &'a mut HashMap<BlockChainId, AppendableChain>,
/// The block indices for all chains.
pub indices: &'a BlockIndices,
}
#[cfg(test)]
mod tests {
use super::*;
@ -1316,10 +1324,9 @@ mod tests {
// make tree
let config = BlockchainTreeConfig::new(1, 2, 3, 2);
let (sender, mut canon_notif) = tokio::sync::broadcast::channel(10);
let mut tree =
BlockchainTree::new(externals, sender, config, None).expect("failed to create tree");
let mut tree = BlockchainTree::new(externals, config, None).expect("failed to create tree");
let mut canon_notif = tree.subscribe_canon_state();
// genesis block 10 is already canonical
tree.make_canonical(&B256::ZERO).unwrap();

View File

@ -506,10 +506,8 @@ where
self.base_config.chain_spec.clone(),
);
let config = BlockchainTreeConfig::new(1, 2, 3, 2);
let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3);
let tree = ShareableBlockchainTree::new(
BlockchainTree::new(externals, canon_state_notification_sender, config, None)
.expect("failed to create tree"),
BlockchainTree::new(externals, config, None).expect("failed to create tree"),
);
let shareable_db = ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone());
let latest = self.base_config.chain_spec.genesis_header().seal_slow();