feat(bin, engine, prune): spawn pruning task from the engine (#3566)

This commit is contained in:
Alexey Shekhirin
2023-07-12 13:03:08 +01:00
committed by GitHub
parent a7eae8cfc3
commit dbafe23cce
20 changed files with 494 additions and 50 deletions

11
Cargo.lock generated
View File

@ -4956,6 +4956,7 @@ dependencies = [
"reth-payload-builder",
"reth-primitives",
"reth-provider",
"reth-prune",
"reth-revm",
"reth-revm-inspectors",
"reth-rlp",
@ -5029,6 +5030,7 @@ dependencies = [
"reth-payload-builder",
"reth-primitives",
"reth-provider",
"reth-prune",
"reth-rpc-types",
"reth-stages",
"reth-tasks",
@ -5554,6 +5556,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "reth-prune"
version = "0.1.0-alpha.3"
dependencies = [
"reth-primitives",
"thiserror",
"tracing",
]
[[package]]
name = "reth-revm"
version = "0.1.0-alpha.3"

View File

@ -21,6 +21,7 @@ members = [
"crates/net/downloaders",
"crates/payload/basic",
"crates/primitives",
"crates/prune",
"crates/revm",
"crates/revm/revm-primitives",
"crates/revm/revm-inspectors",

View File

@ -36,6 +36,7 @@ reth-payload-builder = { workspace = true }
reth-basic-payload-builder = { path = "../../crates/payload/basic" }
reth-discv4 = { path = "../../crates/net/discv4" }
reth-metrics = { workspace = true }
reth-prune = { path = "../../crates/prune" }
jemallocator = { version = "0.5.0", optional = true }
jemalloc-ctl = { version = "0.5.0", optional = true }

View File

@ -360,6 +360,11 @@ impl Command {
None
};
let pruner = config.prune.map(|prune_config| {
info!(target: "reth::cli", "Pruner initialized");
reth_prune::Pruner::new(prune_config.block_interval, tree_config.max_reorg_depth())
});
// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
client,
@ -374,6 +379,7 @@ impl Command {
MIN_BLOCKS_FOR_PIPELINE_RUN,
consensus_engine_tx,
consensus_engine_rx,
pruner,
)?;
info!(target: "reth::cli", "Consensus engine initialized");

View File

@ -750,12 +750,21 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
///
/// This finalizes `last_finalized_block` prior to reading the canonical hashes (using
/// [`BlockchainTree::finalize_block`]).
pub fn restore_canonical_hashes(
pub fn restore_canonical_hashes_and_finalize(
&mut self,
last_finalized_block: BlockNumber,
) -> Result<(), Error> {
self.finalize_block(last_finalized_block);
self.restore_canonical_hashes()
}
/// Reads the last `N` canonical hashes from the database and updates the block indices of the
/// tree.
///
/// `N` is the `max_reorg_depth` plus the number of block hashes needed to satisfy the
/// `BLOCKHASH` opcode in the EVM.
pub fn restore_canonical_hashes(&mut self) -> Result<(), Error> {
let num_of_canonical_hashes =
self.config.max_reorg_depth() + self.config.num_of_additional_canonical_block_hashes();
@ -1578,7 +1587,7 @@ mod tests {
.assert(&tree);
// update canonical block to b2, this would make b2a be removed
assert_eq!(tree.restore_canonical_hashes(12), Ok(()));
assert_eq!(tree.restore_canonical_hashes_and_finalize(12), Ok(()));
assert_eq!(tree.is_block_known(block2.num_hash()).unwrap(), Some(BlockStatus::Valid));

View File

@ -1,7 +1,7 @@
//! Blockchain tree configuration
/// The configuration for the blockchain tree.
#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub struct BlockchainTreeConfig {
/// Number of blocks after the last finalized block that we are storing.
///

View File

@ -66,10 +66,21 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeEngine
tree.update_chains_metrics();
}
fn restore_canonical_hashes(&self, last_finalized_block: BlockNumber) -> Result<(), Error> {
fn restore_canonical_hashes_and_finalize(
&self,
last_finalized_block: BlockNumber,
) -> Result<(), Error> {
trace!(target: "blockchain_tree", ?last_finalized_block, "Restoring canonical hashes for last finalized block");
let mut tree = self.tree.write();
let res = tree.restore_canonical_hashes(last_finalized_block);
let res = tree.restore_canonical_hashes_and_finalize(last_finalized_block);
tree.update_chains_metrics();
res
}
fn restore_canonical_hashes(&self) -> Result<(), Error> {
trace!(target: "blockchain_tree", "Restoring canonical hashes");
let mut tree = self.tree.write();
let res = tree.restore_canonical_hashes();
tree.update_chains_metrics();
res
}

View File

@ -19,6 +19,7 @@ reth-rpc-types = { workspace = true }
reth-tasks = { workspace = true }
reth-payload-builder = { workspace = true }
reth-metrics = { workspace = true }
reth-prune = { path = "../../prune" }
# async
tokio = { workspace = true, features = ["sync"] }

View File

@ -1,3 +1,4 @@
use reth_prune::PrunerError;
use reth_rpc_types::engine::ForkchoiceUpdateError;
use reth_stages::PipelineError;
@ -16,6 +17,12 @@ pub enum BeaconConsensusEngineError {
/// Pipeline error.
#[error(transparent)]
Pipeline(#[from] Box<PipelineError>),
/// Pruner channel closed.
#[error("Pruner channel closed")]
PrunerChannelClosed,
/// Pruner error.
#[error(transparent)]
Pruner(#[from] PrunerError),
/// Common error. Wrapper around [reth_interfaces::Error].
#[error(transparent)]
Common(#[from] reth_interfaces::Error),

View File

@ -13,6 +13,8 @@ pub(crate) struct EngineMetrics {
pub(crate) forkchoice_updated_messages: Counter,
/// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter,
/// The number of times the pruner was run.
pub(crate) pruner_runs: Counter,
}
/// Metrics for the `EngineSyncController`.

View File

@ -3,6 +3,7 @@ use crate::{
forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker},
message::OnForkChoiceUpdated,
metrics::EngineMetrics,
prune::{EnginePruneController, EnginePruneEvent},
},
sync::{EngineSyncController, EngineSyncEvent},
};
@ -28,6 +29,7 @@ use reth_provider::{
BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ProviderError,
StageCheckpointReader,
};
use reth_prune::Pruner;
use reth_rpc_types::engine::{
ExecutionPayload, PayloadAttributes, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
};
@ -60,13 +62,15 @@ use invalid_headers::InvalidHeaderCache;
mod event;
pub use event::BeaconConsensusEngineEvent;
mod forkchoice;
mod metrics;
pub(crate) mod sync;
mod handle;
pub use handle::BeaconConsensusEngineHandle;
mod forkchoice;
mod metrics;
pub(crate) mod prune;
pub(crate) mod sync;
/// The maximum number of invalid headers that can be tracked by the engine.
const MAX_INVALID_HEADERS: u32 = 512u32;
@ -187,6 +191,8 @@ where
/// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will
/// be used to download and execute the missing blocks.
pipeline_run_threshold: u64,
/// Controls pruning triggered by engine updates.
prune: Option<EnginePruneController>,
}
impl<DB, BT, Client> BeaconConsensusEngine<DB, BT, Client>
@ -213,7 +219,8 @@ where
payload_builder: PayloadBuilderHandle,
target: Option<H256>,
pipeline_run_threshold: u64,
) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> {
pruner: Option<Pruner>,
) -> Result<(Self, BeaconConsensusEngineHandle), Error> {
let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel(
client,
@ -228,6 +235,7 @@ where
pipeline_run_threshold,
to_engine,
rx,
pruner,
)
}
@ -257,15 +265,17 @@ where
pipeline_run_threshold: u64,
to_engine: UnboundedSender<BeaconEngineMessage>,
rx: UnboundedReceiver<BeaconEngineMessage>,
) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> {
pruner: Option<Pruner>,
) -> Result<(Self, BeaconConsensusEngineHandle), Error> {
let handle = BeaconConsensusEngineHandle { to_engine };
let sync = EngineSyncController::new(
pipeline,
client,
task_spawner,
task_spawner.clone(),
run_pipeline_continuously,
max_block,
);
let prune = pruner.map(|pruner| EnginePruneController::new(pruner, task_spawner));
let mut this = Self {
sync,
blockchain,
@ -278,6 +288,7 @@ where
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
metrics: EngineMetrics::default(),
pipeline_run_threshold,
prune,
};
let maybe_pipeline_target = match target {
@ -304,7 +315,7 @@ where
/// # Returns
///
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
fn check_pipeline_consistency(&self) -> Result<Option<H256>, reth_interfaces::Error> {
fn check_pipeline_consistency(&self) -> Result<Option<H256>, Error> {
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
let first_stage_checkpoint = self
@ -532,7 +543,7 @@ where
&mut self,
state: ForkchoiceState,
attrs: Option<PayloadAttributes>,
tx: oneshot::Sender<Result<OnForkChoiceUpdated, reth_interfaces::Error>>,
tx: oneshot::Sender<Result<OnForkChoiceUpdated, Error>>,
) -> bool {
self.metrics.forkchoice_updated_messages.increment(1);
self.blockchain.on_forkchoice_update_received(&state);
@ -583,7 +594,7 @@ where
&mut self,
state: ForkchoiceState,
attrs: Option<PayloadAttributes>,
) -> Result<OnForkChoiceUpdated, reth_interfaces::Error> {
) -> Result<OnForkChoiceUpdated, Error> {
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
if state.head_block_hash.is_zero() {
return Ok(OnForkChoiceUpdated::invalid_state())
@ -602,6 +613,17 @@ where
return Ok(OnForkChoiceUpdated::syncing())
}
if self.is_prune_active() {
// We can only process new forkchoice updates if the pruner is idle, since it requires
// exclusive access to the database
warn!(
target: "consensus::engine",
"Pruning is in progress, skipping forkchoice update. \
This may affect the performance of your node as a validator."
);
return Ok(OnForkChoiceUpdated::syncing())
}
let status = match self.blockchain.make_canonical(&state.head_block_hash) {
Ok(outcome) => {
if !outcome.is_already_canonical() {
@ -654,7 +676,7 @@ where
&self,
head: SealedHeader,
update: &ForkchoiceState,
) -> Result<(), reth_interfaces::Error> {
) -> Result<(), Error> {
let mut head_block = Head {
number: head.number,
hash: head.hash,
@ -899,11 +921,14 @@ where
return Ok(status)
}
let res = if self.sync.is_pipeline_idle() {
// we can only insert new payloads if the pipeline is _not_ running, because it holds
// exclusive access to the database
let res = if self.sync.is_pipeline_idle() && self.is_prune_idle() {
// we can only insert new payloads if the pipeline and the pruner are _not_ running,
// because they hold exclusive access to the database
self.try_insert_new_payload(block)
} else {
if self.is_prune_active() {
warn!(target: "consensus::engine", "Pruning is in progress, buffering new payload.");
}
self.try_buffer_payload(block)
};
@ -964,12 +989,12 @@ where
Ok(block)
}
/// When the pipeline is actively syncing the tree is unable to commit any additional blocks
/// since the pipeline holds exclusive access to the database.
/// When the pipeline or the pruner is active, the tree is unable to commit any additional
/// blocks since the pipeline holds exclusive access to the database.
///
/// In this scenario we buffer the payload in the tree if the payload is valid, once the
/// pipeline finished syncing the tree is then able to also use the buffered payloads to commit
/// to a (newer) canonical chain.
/// pipeline or pruner is finished, the tree is then able to also use the buffered payloads to
/// commit to a (newer) canonical chain.
///
/// This will return `SYNCING` if the block was buffered successfully, and an error if an error
/// occurred while buffering the block.
@ -984,7 +1009,7 @@ where
/// Attempts to insert a new payload into the tree.
///
/// Caution: This expects that the pipeline is idle.
/// Caution: This expects that the pipeline and the pruner are idle.
#[instrument(level = "trace", skip_all, target = "consensus::engine", ret)]
fn try_insert_new_payload(
&mut self,
@ -1063,14 +1088,11 @@ where
///
/// If the given block is missing from the database, this will return `false`. Otherwise, `true`
/// is returned: the database contains the hash and the tree was updated.
fn update_tree_on_finished_pipeline(
&mut self,
block_hash: H256,
) -> Result<bool, reth_interfaces::Error> {
fn update_tree_on_finished_pipeline(&mut self, block_hash: H256) -> Result<bool, Error> {
let synced_to_finalized = match self.blockchain.block_number(block_hash)? {
Some(number) => {
// Attempt to restore the tree.
self.blockchain.restore_canonical_hashes(number)?;
self.blockchain.restore_canonical_hashes_and_finalize(number)?;
true
}
None => false,
@ -1078,6 +1100,14 @@ where
Ok(synced_to_finalized)
}
/// Attempt to restore the tree.
///
/// This is invoked after a pruner run to update the tree with the most recent canonical
/// hashes.
fn update_tree_on_finished_pruner(&mut self) -> Result<(), Error> {
self.blockchain.restore_canonical_hashes()
}
/// Invoked if we successfully downloaded a new block from the network.
///
/// This will attempt to insert the block into the tree.
@ -1226,9 +1256,7 @@ where
// it's part of the canonical chain: if it's the safe or the finalized block
if matches!(
err,
reth_interfaces::Error::Execution(
BlockExecutionError::BlockHashNotFoundInChain { .. }
)
Error::Execution(BlockExecutionError::BlockHashNotFoundInChain { .. })
) {
// if the inserted block is the currently targeted `finalized` or `safe`
// block, we will attempt to make them canonical,
@ -1250,9 +1278,9 @@ where
/// This returns a result to indicate whether the engine future should resolve (fatal error).
fn on_sync_event(
&mut self,
ev: EngineSyncEvent,
event: EngineSyncEvent,
) -> Option<Result<(), BeaconConsensusEngineError>> {
match ev {
match event {
EngineSyncEvent::FetchedFullBlock(block) => {
self.on_downloaded_block(block);
}
@ -1416,6 +1444,55 @@ where
None
}
/// Event handler for events emitted by the [EnginePruneController].
///
/// This returns a result to indicate whether the engine future should resolve (fatal error).
fn on_prune_event(
&mut self,
event: EnginePruneEvent,
) -> Option<Result<(), BeaconConsensusEngineError>> {
match event {
EnginePruneEvent::NotReady => {}
EnginePruneEvent::Started(tip_block_number) => {
trace!(target: "consensus::engine", %tip_block_number, "Pruner started");
self.metrics.pruner_runs.increment(1);
}
EnginePruneEvent::TaskDropped => {
error!(target: "consensus::engine", "Failed to receive spawned pruner");
return Some(Err(BeaconConsensusEngineError::PrunerChannelClosed))
}
EnginePruneEvent::Finished { result } => {
trace!(target: "consensus::engine", ?result, "Pruner finished");
match result {
Ok(_) => {
// Update the state and hashes of the blockchain tree if possible.
match self.update_tree_on_finished_pruner() {
Ok(()) => {}
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state");
return Some(Err(error.into()))
}
};
}
// Any pruner error at this point is fatal.
Err(error) => return Some(Err(error.into())),
};
}
};
None
}
/// Returns `true` if the prune controller's pruner is idle.
fn is_prune_idle(&self) -> bool {
self.prune.as_ref().map(|prune| prune.is_pruner_idle()).unwrap_or(true)
}
/// Returns `true` if the prune controller's pruner is active.
fn is_prune_active(&self) -> bool {
!self.is_prune_idle()
}
}
/// On initialization, the consensus engine will poll the message receiver and return
@ -1446,6 +1523,7 @@ where
// SyncController, hence they are polled first, and they're also time sensitive.
loop {
let mut engine_messages_pending = false;
let mut sync_pending = false;
// handle next engine message
match this.engine_message_rx.poll_next_unpin(cx) {
@ -1484,11 +1562,29 @@ where
}
}
Poll::Pending => {
if engine_messages_pending {
// both the sync and the engine message receiver are pending
return Poll::Pending
// no more sync events to process
sync_pending = true;
}
}
// check prune events if pipeline is idle AND (pruning is running and we need to
// prioritize checking its events OR no engine and sync messages are pending and we may
// start pruning)
if this.sync.is_pipeline_idle() &&
(this.is_prune_active() || engine_messages_pending & sync_pending)
{
if let Some(ref mut prune) = this.prune {
match prune.poll(cx, this.blockchain.canonical_tip().number) {
Poll::Ready(prune_event) => {
if let Some(res) = this.on_prune_event(prune_event) {
return Poll::Ready(res)
}
}
Poll::Pending => return Poll::Pending,
}
} else {
return Poll::Pending
}
}
}
}
@ -1680,6 +1776,9 @@ mod tests {
let shareable_db = ProviderFactory::new(db.clone(), self.chain_spec.clone());
let latest = self.chain_spec.genesis_header().seal_slow();
let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest);
let pruner = Pruner::new(5, 0);
let (mut engine, handle) = BeaconConsensusEngine::new(
NoopFullBlockClient::default(),
pipeline,
@ -1691,6 +1790,7 @@ mod tests {
payload_builder,
None,
self.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN),
Some(pruner),
)
.expect("failed to create consensus engine");
@ -1767,21 +1867,41 @@ mod tests {
std::thread::sleep(Duration::from_millis(100));
assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
// consensus engine is still idle
// consensus engine is still idle because no FCUs were received
let _ = env.send_new_payload(SealedBlock::default().into()).await;
assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
// consensus engine receives a forkchoice state and triggers the pipeline
// consensus engine is still idle because pruning is running
let _ = env
.send_forkchoice_updated(ForkchoiceState {
head_block_hash: H256::random(),
..Default::default()
})
.await;
assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
// consensus engine receives a forkchoice state and triggers the pipeline when pruning is
// finished
loop {
match rx.try_recv() {
Ok(result) => {
assert_matches!(
rx.await,
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
result,
Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
);
break
}
Err(TryRecvError::Empty) => {
let _ = env
.send_forkchoice_updated(ForkchoiceState {
head_block_hash: H256::random(),
..Default::default()
})
.await;
}
Err(err) => panic!("receive error: {err}"),
}
}
}
// Test that the consensus engine runs the pipeline again if the tree cannot be restored.

View File

@ -0,0 +1,146 @@
//! Prune management for the engine implementation.
use futures::FutureExt;
use reth_primitives::BlockNumber;
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
/// Manages pruning under the control of the engine.
///
/// This type controls the [Pruner].
pub(crate) struct EnginePruneController {
/// The current state of the pruner.
pruner_state: PrunerState,
/// The type that can spawn the pruner task.
pruner_task_spawner: Box<dyn TaskSpawner>,
}
impl EnginePruneController {
/// Create a new instance
pub(crate) fn new(pruner: Pruner, pruner_task_spawner: Box<dyn TaskSpawner>) -> Self {
Self { pruner_state: PrunerState::Idle(Some(pruner)), pruner_task_spawner }
}
/// Returns `true` if the pruner is idle.
pub(crate) fn is_pruner_idle(&self) -> bool {
self.pruner_state.is_idle()
}
/// Advances the pruner state.
///
/// This checks for the result in the channel, or returns pending if the pruner is idle.
fn poll_pruner(&mut self, cx: &mut Context<'_>) -> Poll<EnginePruneEvent> {
let res = match self.pruner_state {
PrunerState::Idle(_) => return Poll::Pending,
PrunerState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let ev = match res {
Ok((pruner, result)) => {
self.pruner_state = PrunerState::Idle(Some(pruner));
EnginePruneEvent::Finished { result }
}
Err(_) => {
// failed to receive the pruner
EnginePruneEvent::TaskDropped
}
};
Poll::Ready(ev)
}
/// This will try to spawn the pruner if it is idle:
/// 1. Check if pruning is needed through [Pruner::is_pruning_needed].
/// 2a. If pruning is needed, pass tip block number to the [Pruner::run] and spawn it in a
/// separate task. Set pruner state to [PrunerState::Running].
/// 2b. If pruning is not needed, set pruner state back to [PrunerState::Idle].
///
/// If pruner is already running, do nothing.
fn try_spawn_pruner(&mut self, tip_block_number: BlockNumber) -> Option<EnginePruneEvent> {
match &mut self.pruner_state {
PrunerState::Idle(pruner) => {
let mut pruner = pruner.take()?;
// Check tip for pruning
if pruner.is_pruning_needed(tip_block_number) {
let (tx, rx) = oneshot::channel();
self.pruner_task_spawner.spawn_critical_blocking(
"pruner task",
Box::pin(async move {
let result = pruner.run(tip_block_number);
let _ = tx.send((pruner, result));
}),
);
self.pruner_state = PrunerState::Running(rx);
Some(EnginePruneEvent::Started(tip_block_number))
} else {
self.pruner_state = PrunerState::Idle(Some(pruner));
Some(EnginePruneEvent::NotReady)
}
}
PrunerState::Running(_) => None,
}
}
/// Advances the prune process with the tip block number.
pub(crate) fn poll(
&mut self,
cx: &mut Context<'_>,
tip_block_number: BlockNumber,
) -> Poll<EnginePruneEvent> {
// Try to spawn a pruner
match self.try_spawn_pruner(tip_block_number) {
Some(EnginePruneEvent::NotReady) => return Poll::Pending,
Some(event) => return Poll::Ready(event),
None => (),
}
// Poll pruner and check its status
self.poll_pruner(cx)
}
}
/// The event type emitted by the [EnginePruneController].
#[derive(Debug)]
pub(crate) enum EnginePruneEvent {
/// Pruner is not ready
NotReady,
/// Pruner started with tip block number
Started(BlockNumber),
/// Pruner finished
///
/// If this is returned, the pruner is idle.
Finished {
/// Final result of the pruner run.
result: Result<(), PrunerError>,
},
/// Pruner task was dropped after it was started, unable to receive it because channel
/// closed. This would indicate a panicked pruner task
TaskDropped,
}
/// The possible pruner states within the sync controller.
///
/// [PrunerState::Idle] means that the pruner is currently idle.
/// [PrunerState::Running] means that the pruner is currently running.
///
/// NOTE: The differentiation between these two states is important, because when the pruner is
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
enum PrunerState {
/// Pruner is idle.
Idle(Option<Pruner>),
/// Pruner is running and waiting for a response
Running(oneshot::Receiver<PrunerWithResult>),
}
impl PrunerState {
/// Returns `true` if the state matches idle.
fn is_idle(&self) -> bool {
matches!(self, PrunerState::Idle(_))
}
}

View File

@ -166,9 +166,9 @@ where
return false
}
trace!(
target: "consensus::engine",
target: "consensus::engine::sync",
?hash,
"start downloading full block."
"Start downloading full block"
);
let request = self.full_block_client.get_full_block(hash);
self.inflight_full_block_requests.push(request);
@ -191,10 +191,10 @@ where
self.max_block.map(|target| progress >= target).unwrap_or_default();
if has_reached_max_block {
trace!(
target: "consensus::engine",
target: "consensus::engine::sync",
?progress,
max_block = ?self.max_block,
"Consensus engine reached max block."
"Consensus engine reached max block"
);
}
has_reached_max_block

View File

@ -62,7 +62,17 @@ pub trait BlockchainTreeEngine: BlockchainTreeViewer + Send + Sync {
///
/// This finalizes `last_finalized_block` prior to reading the canonical hashes (using
/// [`BlockchainTreeEngine::finalize_block`]).
fn restore_canonical_hashes(&self, last_finalized_block: BlockNumber) -> Result<(), Error>;
fn restore_canonical_hashes_and_finalize(
&self,
last_finalized_block: BlockNumber,
) -> Result<(), Error>;
/// Reads the last `N` canonical hashes from the database and updates the block indices of the
/// tree.
///
/// `N` is the `max_reorg_depth` plus the number of block hashes needed to satisfy the
/// `BLOCKHASH` opcode in the EVM.
fn restore_canonical_hashes(&self) -> Result<(), Error>;
/// Make a block and its parent chain part of the canonical chain by committing it to the
/// database.

View File

@ -220,7 +220,7 @@ impl StageCheckpoint {
/// Get the underlying [`EntitiesCheckpoint`], if any, to determine the number of entities
/// processed, and the number of total entities to process.
pub fn entities(&self) -> Option<EntitiesCheckpoint> {
let Some(stage_checkpoint) = self.stage_checkpoint else { return None };
let stage_checkpoint = self.stage_checkpoint?;
match stage_checkpoint {
StageUnitCheckpoint::Account(AccountHashingCheckpoint {

20
crates/prune/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "reth-prune"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = """
Pruning implementation
"""
[dependencies]
# reth
reth-primitives = { workspace = true }
# misc
tracing = { workspace = true }
thiserror = { workspace = true }

View File

@ -0,0 +1,4 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PrunerError {}

5
crates/prune/src/lib.rs Normal file
View File

@ -0,0 +1,5 @@
mod error;
mod pruner;
pub use error::PrunerError;
pub use pruner::{Pruner, PrunerResult, PrunerWithResult};

View File

@ -0,0 +1,83 @@
//! Support for pruning.
use crate::PrunerError;
use reth_primitives::BlockNumber;
use tracing::debug;
/// Result of [Pruner::run] execution
pub type PrunerResult = Result<(), PrunerError>;
/// The pipeline type itself with the result of [Pruner::run]
pub type PrunerWithResult = (Pruner, PrunerResult);
/// Pruning routine. Main pruning logic happens in [Pruner::run].
pub struct Pruner {
/// Minimum pruning interval measured in blocks. All prune parts are checked and, if needed,
/// pruned, when the chain advances by the specified number of blocks.
min_block_interval: u64,
/// Maximum prune depth. Used to determine the pruning target for parts that are needed during
/// the reorg, e.g. changesets.
#[allow(dead_code)]
max_prune_depth: u64,
/// Last pruned block number. Used in conjunction with `min_block_interval` to determine
/// when the pruning needs to be initiated.
last_pruned_block_number: Option<BlockNumber>,
}
impl Pruner {
/// Creates a new [Pruner].
pub fn new(min_block_interval: u64, max_prune_depth: u64) -> Self {
Self { min_block_interval, max_prune_depth, last_pruned_block_number: None }
}
/// Run the pruner
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
// Pruning logic
self.last_pruned_block_number = Some(tip_block_number);
Ok(())
}
/// Returns `true` if the pruning is needed at the provided tip block number.
/// This determined by the check against minimum pruning interval and last pruned block number.
pub fn is_pruning_needed(&self, tip_block_number: BlockNumber) -> bool {
if self.last_pruned_block_number.map_or(true, |last_pruned_block_number| {
// Saturating subtraction is needed for the case when the chain was reverted, meaning
// current block number might be less than the previously pruned block number. If
// that's the case, no pruning is needed as outdated data is also reverted.
tip_block_number.saturating_sub(last_pruned_block_number) >= self.min_block_interval
}) {
debug!(
target: "pruner",
last_pruned_block_number = ?self.last_pruned_block_number,
%tip_block_number,
"Minimum pruning interval reached"
);
true
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use crate::Pruner;
#[test]
fn pruner_is_pruning_needed() {
let pruner = Pruner::new(5, 0);
// No last pruned block number was set before
let first_block_number = 1;
assert!(pruner.is_pruning_needed(first_block_number));
// Delta is not less than min block interval
let second_block_number = first_block_number + pruner.min_block_interval;
assert!(pruner.is_pruning_needed(second_block_number));
// Delta is less than min block interval
let third_block_number = second_block_number;
assert!(pruner.is_pruning_needed(third_block_number));
}
}

View File

@ -594,8 +594,15 @@ where
self.tree.finalize_block(finalized_block)
}
fn restore_canonical_hashes(&self, last_finalized_block: BlockNumber) -> Result<()> {
self.tree.restore_canonical_hashes(last_finalized_block)
fn restore_canonical_hashes_and_finalize(
&self,
last_finalized_block: BlockNumber,
) -> Result<()> {
self.tree.restore_canonical_hashes_and_finalize(last_finalized_block)
}
fn restore_canonical_hashes(&self) -> Result<()> {
self.tree.restore_canonical_hashes()
}
fn make_canonical(&self, block_hash: &BlockHash) -> Result<CanonicalOutcome> {