feat(pruner): respect ExEx finished height (#7673)

This commit is contained in:
Alexey Shekhirin
2024-04-18 19:15:28 +02:00
committed by GitHub
parent 6863cdb42b
commit e401c4848a
8 changed files with 117 additions and 23 deletions

1
Cargo.lock generated
View File

@ -7170,6 +7170,7 @@ dependencies = [
"reth-tokio-util", "reth-tokio-util",
"reth-tracing", "reth-tracing",
"thiserror", "thiserror",
"tokio",
"tokio-stream", "tokio-stream",
"tracing", "tracing",
] ]

View File

@ -22,7 +22,7 @@ use reth_interfaces::{
}; };
use reth_node_ethereum::{EthEngineTypes, EthEvmConfig}; use reth_node_ethereum::{EthEngineTypes, EthEvmConfig};
use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{BlockNumber, ChainSpec, PruneModes, B256}; use reth_primitives::{BlockNumber, ChainSpec, FinishedExExHeight, PruneModes, B256};
use reth_provider::{ use reth_provider::{
providers::BlockchainProvider, providers::BlockchainProvider,
test_utils::{create_test_provider_factory_with_chain_spec, TestExecutorFactory}, test_utils::{create_test_provider_factory_with_chain_spec, TestExecutorFactory},
@ -435,6 +435,7 @@ where
self.base_config.chain_spec.prune_delete_limit, self.base_config.chain_spec.prune_delete_limit,
config.max_reorg_depth() as usize, config.max_reorg_depth() as usize,
None, None,
watch::channel(FinishedExExHeight::NoExExs).1,
); );
let mut hooks = EngineHooks::new(); let mut hooks = EngineHooks::new();

View File

@ -396,8 +396,8 @@ impl ExExManagerHandle {
} }
/// The finished height of all ExEx's. /// The finished height of all ExEx's.
pub fn finished_height(&mut self) -> FinishedExExHeight { pub fn finished_height(&self) -> watch::Receiver<FinishedExExHeight> {
*self.finished_height.borrow_and_update() self.finished_height.clone()
} }
/// Wait until the manager is ready for new notifications. /// Wait until the manager is ready for new notifications.

View File

@ -641,20 +641,21 @@ where
future::join_all(exexs).await; future::join_all(exexs).await;
// spawn exex manager // spawn exex manager
if !exex_handles.is_empty() { let exex_manager_handle = if !exex_handles.is_empty() {
debug!(target: "reth::cli", "spawning exex manager"); debug!(target: "reth::cli", "spawning exex manager");
// todo(onbjerg): rm magic number // todo(onbjerg): rm magic number
let exex_manager = ExExManager::new(exex_handles, 1024); let exex_manager = ExExManager::new(exex_handles, 1024);
let mut exex_manager_handle = exex_manager.handle(); let exex_manager_handle = exex_manager.handle();
executor.spawn_critical("exex manager", async move { executor.spawn_critical("exex manager", async move {
exex_manager.await.expect("exex manager crashed"); exex_manager.await.expect("exex manager crashed");
}); });
// send notifications from the blockchain tree to exex manager // send notifications from the blockchain tree to exex manager
let mut canon_state_notifications = blockchain_tree.subscribe_to_canonical_state(); let mut canon_state_notifications = blockchain_tree.subscribe_to_canonical_state();
let mut handle = exex_manager_handle.clone();
executor.spawn_critical("exex manager blockchain tree notifications", async move { executor.spawn_critical("exex manager blockchain tree notifications", async move {
while let Ok(notification) = canon_state_notifications.recv().await { while let Ok(notification) = canon_state_notifications.recv().await {
exex_manager_handle handle
.send_async(notification) .send_async(notification)
.await .await
.expect("blockchain tree notification could not be sent to exex manager"); .expect("blockchain tree notification could not be sent to exex manager");
@ -662,7 +663,11 @@ where
}); });
info!(target: "reth::cli", "ExEx Manager started"); info!(target: "reth::cli", "ExEx Manager started");
}
Some(exex_manager_handle)
} else {
None
};
// create pipeline // create pipeline
let network_client = network.fetch_client().await?; let network_client = network.fetch_client().await?;
@ -773,11 +778,16 @@ where
let initial_target = config.initial_pipeline_target(genesis_hash); let initial_target = config.initial_pipeline_target(genesis_hash);
let prune_config = prune_config.unwrap_or_default(); let prune_config = prune_config.unwrap_or_default();
let mut pruner = PrunerBuilder::new(prune_config.clone()) let mut pruner_builder = PrunerBuilder::new(prune_config.clone())
.max_reorg_depth(tree_config.max_reorg_depth() as usize) .max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(config.chain.prune_delete_limit) .prune_delete_limit(config.chain.prune_delete_limit)
.timeout(PrunerBuilder::DEFAULT_TIMEOUT) .timeout(PrunerBuilder::DEFAULT_TIMEOUT);
.build(provider_factory.clone()); if let Some(exex_manager_handle) = &exex_manager_handle {
pruner_builder =
pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
}
let mut pruner = pruner_builder.build(provider_factory.clone());
let pruner_events = pruner.events(); let pruner_events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(executor.clone()))); hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));

View File

@ -5,7 +5,7 @@ use crate::BlockNumber;
pub enum FinishedExExHeight { pub enum FinishedExExHeight {
/// No ExEx's are installed, so there is no finished height. /// No ExEx's are installed, so there is no finished height.
NoExExs, NoExExs,
/// Not all ExExs emitted a `FinishedHeight` event yet. /// Not all ExExs have emitted a `FinishedHeight` event yet.
NotReady, NotReady,
/// The finished height of all ExEx's. /// The finished height of all ExEx's.
/// ///
@ -16,3 +16,10 @@ pub enum FinishedExExHeight {
/// The number is inclusive, i.e. all blocks `<= finished_height` are safe to prune. /// The number is inclusive, i.e. all blocks `<= finished_height` are safe to prune.
Height(BlockNumber), Height(BlockNumber),
} }
impl FinishedExExHeight {
/// Returns `true` if not all ExExs have emitted a `FinishedHeight` event yet.
pub const fn is_not_ready(&self) -> bool {
matches!(self, Self::NotReady)
}
}

View File

@ -29,6 +29,7 @@ tracing.workspace = true
thiserror.workspace = true thiserror.workspace = true
itertools.workspace = true itertools.workspace = true
rayon.workspace = true rayon.workspace = true
tokio.workspace = true
tokio-stream.workspace = true tokio-stream.workspace = true
[dev-dependencies] [dev-dependencies]

View File

@ -3,24 +3,27 @@ use std::time::Duration;
use crate::{segments::SegmentSet, Pruner}; use crate::{segments::SegmentSet, Pruner};
use reth_config::PruneConfig; use reth_config::PruneConfig;
use reth_db::database::Database; use reth_db::database::Database;
use reth_primitives::{PruneModes, MAINNET}; use reth_primitives::{FinishedExExHeight, PruneModes, MAINNET};
use reth_provider::ProviderFactory; use reth_provider::ProviderFactory;
use tokio::sync::watch;
/// Contains the information required to build a pruner /// Contains the information required to build a pruner
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone)]
pub struct PrunerBuilder { pub struct PrunerBuilder {
/// Minimum pruning interval measured in blocks. /// Minimum pruning interval measured in blocks.
pub block_interval: usize, block_interval: usize,
/// Pruning configuration for every part of the data that can be pruned. /// Pruning configuration for every part of the data that can be pruned.
pub segments: PruneModes, segments: PruneModes,
/// The number of blocks that can be re-orged. /// The number of blocks that can be re-orged.
pub max_reorg_depth: usize, max_reorg_depth: usize,
/// The delete limit for pruner, per block. In the actual pruner run it will be multiplied by /// The delete limit for pruner, per block. In the actual pruner run it will be multiplied by
/// the amount of blocks between pruner runs to account for the difference in amount of new /// the amount of blocks between pruner runs to account for the difference in amount of new
/// data coming in. /// data coming in.
pub prune_delete_limit: usize, prune_delete_limit: usize,
/// Time a pruner job can run before timing out. /// Time a pruner job can run before timing out.
pub timeout: Option<Duration>, timeout: Option<Duration>,
/// The finished height of all ExEx's.
finished_exex_height: watch::Receiver<FinishedExExHeight>,
} }
impl PrunerBuilder { impl PrunerBuilder {
@ -67,6 +70,15 @@ impl PrunerBuilder {
self self
} }
/// Sets the receiver for the finished height of all ExEx's.
pub fn finished_exex_height(
mut self,
finished_exex_height: watch::Receiver<FinishedExExHeight>,
) -> Self {
self.finished_exex_height = finished_exex_height;
self
}
/// Builds a [Pruner] from the current configuration. /// Builds a [Pruner] from the current configuration.
pub fn build<DB: Database>(self, provider_factory: ProviderFactory<DB>) -> Pruner<DB> { pub fn build<DB: Database>(self, provider_factory: ProviderFactory<DB>) -> Pruner<DB> {
let segments = SegmentSet::<DB>::from_prune_modes(self.segments); let segments = SegmentSet::<DB>::from_prune_modes(self.segments);
@ -78,6 +90,7 @@ impl PrunerBuilder {
self.prune_delete_limit, self.prune_delete_limit,
self.max_reorg_depth, self.max_reorg_depth,
self.timeout, self.timeout,
self.finished_exex_height,
) )
} }
} }
@ -90,6 +103,7 @@ impl Default for PrunerBuilder {
max_reorg_depth: 64, max_reorg_depth: 64,
prune_delete_limit: MAINNET.prune_delete_limit, prune_delete_limit: MAINNET.prune_delete_limit,
timeout: Some(Self::DEFAULT_TIMEOUT), timeout: Some(Self::DEFAULT_TIMEOUT),
finished_exex_height: watch::channel(FinishedExExHeight::NoExExs).1,
} }
} }
} }

View File

@ -7,8 +7,8 @@ use crate::{
}; };
use reth_db::database::Database; use reth_db::database::Database;
use reth_primitives::{ use reth_primitives::{
BlockNumber, PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, BlockNumber, FinishedExExHeight, PruneLimiter, PruneMode, PruneProgress, PrunePurpose,
StaticFileSegment, PruneSegment, StaticFileSegment,
}; };
use reth_provider::{DatabaseProviderRW, ProviderFactory, PruneCheckpointReader}; use reth_provider::{DatabaseProviderRW, ProviderFactory, PruneCheckpointReader};
use reth_tokio_util::EventListeners; use reth_tokio_util::EventListeners;
@ -16,6 +16,7 @@ use std::{
collections::BTreeMap, collections::BTreeMap,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::sync::watch;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug; use tracing::debug;
@ -46,6 +47,8 @@ pub struct Pruner<DB> {
prune_max_blocks_per_run: usize, prune_max_blocks_per_run: usize,
/// Maximum time for a one pruner run. /// Maximum time for a one pruner run.
timeout: Option<Duration>, timeout: Option<Duration>,
/// The finished height of all ExEx's.
finished_exex_height: watch::Receiver<FinishedExExHeight>,
#[doc(hidden)] #[doc(hidden)]
metrics: Metrics, metrics: Metrics,
listeners: EventListeners<PrunerEvent>, listeners: EventListeners<PrunerEvent>,
@ -60,6 +63,7 @@ impl<DB: Database> Pruner<DB> {
delete_limit: usize, delete_limit: usize,
prune_max_blocks_per_run: usize, prune_max_blocks_per_run: usize,
timeout: Option<Duration>, timeout: Option<Duration>,
finished_exex_height: watch::Receiver<FinishedExExHeight>,
) -> Self { ) -> Self {
Self { Self {
provider_factory, provider_factory,
@ -69,6 +73,7 @@ impl<DB: Database> Pruner<DB> {
delete_limit_per_block: delete_limit, delete_limit_per_block: delete_limit,
prune_max_blocks_per_run, prune_max_blocks_per_run,
timeout, timeout,
finished_exex_height,
metrics: Metrics::default(), metrics: Metrics::default(),
listeners: Default::default(), listeners: Default::default(),
} }
@ -81,6 +86,11 @@ impl<DB: Database> Pruner<DB> {
/// Run the pruner /// Run the pruner
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
let Some(tip_block_number) =
self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
else {
return Ok(PruneProgress::Finished)
};
if tip_block_number == 0 { if tip_block_number == 0 {
self.previous_tip_block_number = Some(tip_block_number); self.previous_tip_block_number = Some(tip_block_number);
@ -269,6 +279,12 @@ impl<DB: Database> Pruner<DB> {
/// Returns `true` if the pruning is needed at the provided tip block number. /// Returns `true` if the pruning is needed at the provided tip block number.
/// This determined by the check against minimum pruning interval and last pruned block number. /// This determined by the check against minimum pruning interval and last pruned block number.
pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool { pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
let Some(tip_block_number) =
self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
else {
return false
};
// Saturating subtraction is needed for the case when the chain was reverted, meaning // Saturating subtraction is needed for the case when the chain was reverted, meaning
// current block number might be less than the previous tip block number. // current block number might be less than the previous tip block number.
// If that's the case, no pruning is needed as outdated data is also reverted. // If that's the case, no pruning is needed as outdated data is also reverted.
@ -286,6 +302,30 @@ impl<DB: Database> Pruner<DB> {
false false
} }
} }
/// Adjusts the tip block number to the finished ExEx height. This is needed to not prune more
/// data than ExExs have processed. Depending on the height:
/// - [FinishedExExHeight::NoExExs] returns the tip block number as is as no adjustment for
/// ExExs is needed.
/// - [FinishedExExHeight::NotReady] returns `None` as not all ExExs have emitted a
/// `FinishedHeight` event yet.
/// - [FinishedExExHeight::Height] returns the finished ExEx height.
fn adjust_tip_block_number_to_finished_exex_height(
&self,
tip_block_number: BlockNumber,
) -> Option<BlockNumber> {
match *self.finished_exex_height.borrow() {
FinishedExExHeight::NoExExs => Some(tip_block_number),
FinishedExExHeight::NotReady => {
debug!(target: "pruner", %tip_block_number, "Not all ExExs have emitted a `FinishedHeight` event yet, can't prune");
None
}
FinishedExExHeight::Height(finished_exex_height) => {
debug!(target: "pruner", %tip_block_number, %finished_exex_height, "Adjusting tip block number to the finished ExEx height");
Some(finished_exex_height)
}
}
}
} }
#[cfg(test)] #[cfg(test)]
@ -293,7 +333,7 @@ mod tests {
use crate::Pruner; use crate::Pruner;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
use reth_primitives::MAINNET; use reth_primitives::{FinishedExExHeight, MAINNET};
use reth_provider::ProviderFactory; use reth_provider::ProviderFactory;
#[test] #[test]
@ -302,7 +342,12 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir(); let (_static_dir, static_dir_path) = create_test_static_files_dir();
let provider_factory = ProviderFactory::new(db, MAINNET.clone(), static_dir_path) let provider_factory = ProviderFactory::new(db, MAINNET.clone(), static_dir_path)
.expect("create provide factory with static_files"); .expect("create provide factory with static_files");
let mut pruner = Pruner::new(provider_factory, vec![], 5, 0, 5, None);
let (finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let mut pruner =
Pruner::new(provider_factory, vec![], 5, 0, 5, None, finished_exex_height_rx);
// No last pruned block number was set before // No last pruned block number was set before
let first_block_number = 1; let first_block_number = 1;
@ -315,7 +360,22 @@ mod tests {
pruner.previous_tip_block_number = Some(second_block_number); pruner.previous_tip_block_number = Some(second_block_number);
// Tip block number delta is < than min block interval // Tip block number delta is < than min block interval
let third_block_number = second_block_number; assert!(!pruner.is_pruning_needed(second_block_number));
// Tip block number delta is >= than min block interval
let third_block_number = second_block_number + pruner.min_block_interval as u64;
assert!(pruner.is_pruning_needed(third_block_number));
// Not all ExExs have emitted a `FinishedHeight` event yet
finished_exex_height_tx.send(FinishedExExHeight::NotReady).unwrap();
assert!(!pruner.is_pruning_needed(third_block_number)); assert!(!pruner.is_pruning_needed(third_block_number));
// Adjust tip block number to the finished ExEx height that doesn't reach the threshold
finished_exex_height_tx.send(FinishedExExHeight::Height(second_block_number)).unwrap();
assert!(!pruner.is_pruning_needed(third_block_number));
// Adjust tip block number to the finished ExEx height that reaches the threshold
finished_exex_height_tx.send(FinishedExExHeight::Height(third_block_number)).unwrap();
assert!(pruner.is_pruning_needed(third_block_number));
} }
} }