From e401c4848a8c28754b1c5cb6025cfe756076329b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 18 Apr 2024 19:15:28 +0200 Subject: [PATCH] feat(pruner): respect ExEx finished height (#7673) --- Cargo.lock | 1 + .../consensus/beacon/src/engine/test_utils.rs | 3 +- crates/exex/src/manager.rs | 4 +- crates/node-builder/src/builder.rs | 24 +++++-- crates/primitives/src/exex/mod.rs | 9 ++- crates/prune/Cargo.toml | 1 + crates/prune/src/builder.rs | 28 ++++++-- crates/prune/src/pruner.rs | 70 +++++++++++++++++-- 8 files changed, 117 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 610064f81..5c48a9d74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7170,6 +7170,7 @@ dependencies = [ "reth-tokio-util", "reth-tracing", "thiserror", + "tokio", "tokio-stream", "tracing", ] diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 7aeb8d746..42f85282c 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -22,7 +22,7 @@ use reth_interfaces::{ }; use reth_node_ethereum::{EthEngineTypes, EthEvmConfig}; use reth_payload_builder::test_utils::spawn_test_payload_service; -use reth_primitives::{BlockNumber, ChainSpec, PruneModes, B256}; +use reth_primitives::{BlockNumber, ChainSpec, FinishedExExHeight, PruneModes, B256}; use reth_provider::{ providers::BlockchainProvider, test_utils::{create_test_provider_factory_with_chain_spec, TestExecutorFactory}, @@ -435,6 +435,7 @@ where self.base_config.chain_spec.prune_delete_limit, config.max_reorg_depth() as usize, None, + watch::channel(FinishedExExHeight::NoExExs).1, ); let mut hooks = EngineHooks::new(); diff --git a/crates/exex/src/manager.rs b/crates/exex/src/manager.rs index 32c4d8e26..332650607 100644 --- a/crates/exex/src/manager.rs +++ b/crates/exex/src/manager.rs @@ -396,8 +396,8 @@ impl ExExManagerHandle { } /// The finished height of all ExEx's. - pub fn finished_height(&mut self) -> FinishedExExHeight { - *self.finished_height.borrow_and_update() + pub fn finished_height(&self) -> watch::Receiver { + self.finished_height.clone() } /// Wait until the manager is ready for new notifications. diff --git a/crates/node-builder/src/builder.rs b/crates/node-builder/src/builder.rs index bff747345..76df0fc8f 100644 --- a/crates/node-builder/src/builder.rs +++ b/crates/node-builder/src/builder.rs @@ -641,20 +641,21 @@ where future::join_all(exexs).await; // spawn exex manager - if !exex_handles.is_empty() { + let exex_manager_handle = if !exex_handles.is_empty() { debug!(target: "reth::cli", "spawning exex manager"); // todo(onbjerg): rm magic number let exex_manager = ExExManager::new(exex_handles, 1024); - let mut exex_manager_handle = exex_manager.handle(); + let exex_manager_handle = exex_manager.handle(); executor.spawn_critical("exex manager", async move { exex_manager.await.expect("exex manager crashed"); }); // send notifications from the blockchain tree to exex manager let mut canon_state_notifications = blockchain_tree.subscribe_to_canonical_state(); + let mut handle = exex_manager_handle.clone(); executor.spawn_critical("exex manager blockchain tree notifications", async move { while let Ok(notification) = canon_state_notifications.recv().await { - exex_manager_handle + handle .send_async(notification) .await .expect("blockchain tree notification could not be sent to exex manager"); @@ -662,7 +663,11 @@ where }); info!(target: "reth::cli", "ExEx Manager started"); - } + + Some(exex_manager_handle) + } else { + None + }; // create pipeline let network_client = network.fetch_client().await?; @@ -773,11 +778,16 @@ where let initial_target = config.initial_pipeline_target(genesis_hash); let prune_config = prune_config.unwrap_or_default(); - let mut pruner = PrunerBuilder::new(prune_config.clone()) + let mut pruner_builder = PrunerBuilder::new(prune_config.clone()) .max_reorg_depth(tree_config.max_reorg_depth() as usize) .prune_delete_limit(config.chain.prune_delete_limit) - .timeout(PrunerBuilder::DEFAULT_TIMEOUT) - .build(provider_factory.clone()); + .timeout(PrunerBuilder::DEFAULT_TIMEOUT); + if let Some(exex_manager_handle) = &exex_manager_handle { + pruner_builder = + pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); + } + + let mut pruner = pruner_builder.build(provider_factory.clone()); let pruner_events = pruner.events(); hooks.add(PruneHook::new(pruner, Box::new(executor.clone()))); diff --git a/crates/primitives/src/exex/mod.rs b/crates/primitives/src/exex/mod.rs index 9fc2ace66..82730f297 100644 --- a/crates/primitives/src/exex/mod.rs +++ b/crates/primitives/src/exex/mod.rs @@ -5,7 +5,7 @@ use crate::BlockNumber; pub enum FinishedExExHeight { /// No ExEx's are installed, so there is no finished height. NoExExs, - /// Not all ExExs emitted a `FinishedHeight` event yet. + /// Not all ExExs have emitted a `FinishedHeight` event yet. NotReady, /// The finished height of all ExEx's. /// @@ -16,3 +16,10 @@ pub enum FinishedExExHeight { /// The number is inclusive, i.e. all blocks `<= finished_height` are safe to prune. Height(BlockNumber), } + +impl FinishedExExHeight { + /// Returns `true` if not all ExExs have emitted a `FinishedHeight` event yet. + pub const fn is_not_ready(&self) -> bool { + matches!(self, Self::NotReady) + } +} diff --git a/crates/prune/Cargo.toml b/crates/prune/Cargo.toml index 3a8971a66..cc24e68b8 100644 --- a/crates/prune/Cargo.toml +++ b/crates/prune/Cargo.toml @@ -29,6 +29,7 @@ tracing.workspace = true thiserror.workspace = true itertools.workspace = true rayon.workspace = true +tokio.workspace = true tokio-stream.workspace = true [dev-dependencies] diff --git a/crates/prune/src/builder.rs b/crates/prune/src/builder.rs index 377a98664..8a14ccf4a 100644 --- a/crates/prune/src/builder.rs +++ b/crates/prune/src/builder.rs @@ -3,24 +3,27 @@ use std::time::Duration; use crate::{segments::SegmentSet, Pruner}; use reth_config::PruneConfig; use reth_db::database::Database; -use reth_primitives::{PruneModes, MAINNET}; +use reth_primitives::{FinishedExExHeight, PruneModes, MAINNET}; use reth_provider::ProviderFactory; +use tokio::sync::watch; /// Contains the information required to build a pruner -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct PrunerBuilder { /// Minimum pruning interval measured in blocks. - pub block_interval: usize, + block_interval: usize, /// Pruning configuration for every part of the data that can be pruned. - pub segments: PruneModes, + segments: PruneModes, /// The number of blocks that can be re-orged. - pub max_reorg_depth: usize, + max_reorg_depth: usize, /// The delete limit for pruner, per block. In the actual pruner run it will be multiplied by /// the amount of blocks between pruner runs to account for the difference in amount of new /// data coming in. - pub prune_delete_limit: usize, + prune_delete_limit: usize, /// Time a pruner job can run before timing out. - pub timeout: Option, + timeout: Option, + /// The finished height of all ExEx's. + finished_exex_height: watch::Receiver, } impl PrunerBuilder { @@ -67,6 +70,15 @@ impl PrunerBuilder { self } + /// Sets the receiver for the finished height of all ExEx's. + pub fn finished_exex_height( + mut self, + finished_exex_height: watch::Receiver, + ) -> Self { + self.finished_exex_height = finished_exex_height; + self + } + /// Builds a [Pruner] from the current configuration. pub fn build(self, provider_factory: ProviderFactory) -> Pruner { let segments = SegmentSet::::from_prune_modes(self.segments); @@ -78,6 +90,7 @@ impl PrunerBuilder { self.prune_delete_limit, self.max_reorg_depth, self.timeout, + self.finished_exex_height, ) } } @@ -90,6 +103,7 @@ impl Default for PrunerBuilder { max_reorg_depth: 64, prune_delete_limit: MAINNET.prune_delete_limit, timeout: Some(Self::DEFAULT_TIMEOUT), + finished_exex_height: watch::channel(FinishedExExHeight::NoExExs).1, } } } diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 6bd3749c5..f3bf963e0 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -7,8 +7,8 @@ use crate::{ }; use reth_db::database::Database; use reth_primitives::{ - BlockNumber, PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, - StaticFileSegment, + BlockNumber, FinishedExExHeight, PruneLimiter, PruneMode, PruneProgress, PrunePurpose, + PruneSegment, StaticFileSegment, }; use reth_provider::{DatabaseProviderRW, ProviderFactory, PruneCheckpointReader}; use reth_tokio_util::EventListeners; @@ -16,6 +16,7 @@ use std::{ collections::BTreeMap, time::{Duration, Instant}, }; +use tokio::sync::watch; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::debug; @@ -46,6 +47,8 @@ pub struct Pruner { prune_max_blocks_per_run: usize, /// Maximum time for a one pruner run. timeout: Option, + /// The finished height of all ExEx's. + finished_exex_height: watch::Receiver, #[doc(hidden)] metrics: Metrics, listeners: EventListeners, @@ -60,6 +63,7 @@ impl Pruner { delete_limit: usize, prune_max_blocks_per_run: usize, timeout: Option, + finished_exex_height: watch::Receiver, ) -> Self { Self { provider_factory, @@ -69,6 +73,7 @@ impl Pruner { delete_limit_per_block: delete_limit, prune_max_blocks_per_run, timeout, + finished_exex_height, metrics: Metrics::default(), listeners: Default::default(), } @@ -81,6 +86,11 @@ impl Pruner { /// Run the pruner pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { + let Some(tip_block_number) = + self.adjust_tip_block_number_to_finished_exex_height(tip_block_number) + else { + return Ok(PruneProgress::Finished) + }; if tip_block_number == 0 { self.previous_tip_block_number = Some(tip_block_number); @@ -269,6 +279,12 @@ impl Pruner { /// Returns `true` if the pruning is needed at the provided tip block number. /// This determined by the check against minimum pruning interval and last pruned block number. pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool { + let Some(tip_block_number) = + self.adjust_tip_block_number_to_finished_exex_height(tip_block_number) + else { + return false + }; + // Saturating subtraction is needed for the case when the chain was reverted, meaning // current block number might be less than the previous tip block number. // If that's the case, no pruning is needed as outdated data is also reverted. @@ -286,6 +302,30 @@ impl Pruner { false } } + + /// Adjusts the tip block number to the finished ExEx height. This is needed to not prune more + /// data than ExExs have processed. Depending on the height: + /// - [FinishedExExHeight::NoExExs] returns the tip block number as is as no adjustment for + /// ExExs is needed. + /// - [FinishedExExHeight::NotReady] returns `None` as not all ExExs have emitted a + /// `FinishedHeight` event yet. + /// - [FinishedExExHeight::Height] returns the finished ExEx height. + fn adjust_tip_block_number_to_finished_exex_height( + &self, + tip_block_number: BlockNumber, + ) -> Option { + match *self.finished_exex_height.borrow() { + FinishedExExHeight::NoExExs => Some(tip_block_number), + FinishedExExHeight::NotReady => { + debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune"); + None + } + FinishedExExHeight::Height(finished_exex_height) => { + debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height"); + Some(finished_exex_height) + } + } + } } #[cfg(test)] @@ -293,7 +333,7 @@ mod tests { use crate::Pruner; use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; - use reth_primitives::MAINNET; + use reth_primitives::{FinishedExExHeight, MAINNET}; use reth_provider::ProviderFactory; #[test] @@ -302,7 +342,12 @@ mod tests { let (_static_dir, static_dir_path) = create_test_static_files_dir(); let provider_factory = ProviderFactory::new(db, MAINNET.clone(), static_dir_path) .expect("create provide factory with static_files"); - let mut pruner = Pruner::new(provider_factory, vec![], 5, 0, 5, None); + + let (finished_exex_height_tx, finished_exex_height_rx) = + tokio::sync::watch::channel(FinishedExExHeight::NoExExs); + + let mut pruner = + Pruner::new(provider_factory, vec![], 5, 0, 5, None, finished_exex_height_rx); // No last pruned block number was set before let first_block_number = 1; @@ -315,7 +360,22 @@ mod tests { pruner.previous_tip_block_number = Some(second_block_number); // Tip block number delta is < than min block interval - let third_block_number = second_block_number; + assert!(!pruner.is_pruning_needed(second_block_number)); + + // Tip block number delta is >= than min block interval + let third_block_number = second_block_number + pruner.min_block_interval as u64; + assert!(pruner.is_pruning_needed(third_block_number)); + + // Not all ExExs have emitted a `FinishedHeight` event yet + finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap(); assert!(!pruner.is_pruning_needed(third_block_number)); + + // Adjust tip block number to the finished ExEx height that doesn't reach the threshold + finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap(); + assert!(!pruner.is_pruning_needed(third_block_number)); + + // Adjust tip block number to the finished ExEx height that reaches the threshold + finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap(); + assert!(pruner.is_pruning_needed(third_block_number)); } }