feat(engine): hooks (#4582)

This commit is contained in:
Alexey Shekhirin
2023-09-18 18:52:58 +01:00
committed by GitHub
parent 78edae4d4f
commit 11f5f3f8d7
9 changed files with 423 additions and 177 deletions

View File

@ -25,7 +25,10 @@ use eyre::Context;
use fdlimit::raise_fd_limit; use fdlimit::raise_fd_limit;
use futures::{future::Either, pin_mut, stream, stream_select, StreamExt}; use futures::{future::Either, pin_mut, stream, stream_select, StreamExt};
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode}; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN}; use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook},
BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN,
};
use reth_blockchain_tree::{ use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
}; };
@ -446,16 +449,19 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
None None
}; };
let pruner = prune_config.map(|prune_config| { let mut hooks = EngineHooks::new();
if let Some(prune_config) = prune_config {
info!(target: "reth::cli", ?prune_config, "Pruner initialized"); info!(target: "reth::cli", ?prune_config, "Pruner initialized");
reth_prune::Pruner::new( let pruner = reth_prune::Pruner::new(
db.clone(), db.clone(),
self.chain.clone(), self.chain.clone(),
prune_config.block_interval, prune_config.block_interval,
prune_config.parts, prune_config.parts,
self.chain.prune_batch_sizes, self.chain.prune_batch_sizes,
) );
}); hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
}
// Configure the consensus engine // Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
@ -471,7 +477,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
MIN_BLOCKS_FOR_PIPELINE_RUN, MIN_BLOCKS_FOR_PIPELINE_RUN,
consensus_engine_tx, consensus_engine_tx,
consensus_engine_rx, consensus_engine_rx,
pruner, hooks,
)?; )?;
info!(target: "reth::cli", "Consensus engine initialized"); info!(target: "reth::cli", "Consensus engine initialized");

View File

@ -1,4 +1,4 @@
use reth_prune::PrunerError; use crate::engine::hooks::EngineHookError;
use reth_rpc_types::engine::ForkchoiceUpdateError; use reth_rpc_types::engine::ForkchoiceUpdateError;
use reth_stages::PipelineError; use reth_stages::PipelineError;
@ -20,9 +20,9 @@ pub enum BeaconConsensusEngineError {
/// Pruner channel closed. /// Pruner channel closed.
#[error("Pruner channel closed")] #[error("Pruner channel closed")]
PrunerChannelClosed, PrunerChannelClosed,
/// Pruner error. /// Hook error.
#[error(transparent)] #[error(transparent)]
Pruner(#[from] PrunerError), Hook(#[from] EngineHookError),
/// Common error. Wrapper around [reth_interfaces::Error]. /// Common error. Wrapper around [reth_interfaces::Error].
#[error(transparent)] #[error(transparent)]
Common(#[from] reth_interfaces::Error), Common(#[from] reth_interfaces::Error),

View File

@ -0,0 +1,132 @@
use crate::hooks::{EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHooks};
use std::{
collections::VecDeque,
task::{Context, Poll},
};
use tracing::debug;
/// Manages hooks under the control of the engine.
///
/// This type polls the initialized hooks one by one, respecting the DB access level
/// (i.e. [crate::hooks::EngineHookDBAccessLevel::ReadWrite] that enforces running at most one such
/// hook).
pub(crate) struct EngineHooksController {
/// Collection of hooks.
///
/// Hooks might be removed from the collection, and returned upon completion.
/// In the current implementation, it only happens when moved to `running_hook_with_db_write`.
hooks: VecDeque<Box<dyn EngineHook>>,
/// Currently running hook with DB write access, if any.
running_hook_with_db_write: Option<Box<dyn EngineHook>>,
}
impl EngineHooksController {
/// Creates a new [`EngineHooksController`].
pub(crate) fn new(hooks: EngineHooks) -> Self {
Self { hooks: hooks.inner.into(), running_hook_with_db_write: None }
}
/// Polls currently running hook with DB write access, if any.
///
/// Returns [`Poll::Ready`] if currently running hook with DB write access returned
/// an [event][`crate::hooks::EngineHookEvent`] that resulted in [action][`EngineHookAction`] or
/// error.
///
/// Returns [`Poll::Pending`] in all other cases:
/// 1. No hook with DB write access is running.
/// 2. Currently running hook with DB write access returned [`Poll::Pending`] on polling.
/// 3. Currently running hook with DB write access returned [`Poll::Ready`] on polling, but no
/// action to act upon.
pub(crate) fn poll_running_hook_with_db_write(
&mut self,
cx: &mut Context<'_>,
args: EngineContext,
) -> Poll<Result<EngineHookAction, EngineHookError>> {
let Some(mut hook) = self.running_hook_with_db_write.take() else { return Poll::Pending };
match hook.poll(cx, args) {
Poll::Ready((event, action)) => {
debug!(
target: "consensus::engine::hooks",
hook = hook.name(),
?action,
?event,
"Polled running hook with db write access"
);
if !event.is_finished() {
self.running_hook_with_db_write = Some(hook);
} else {
self.hooks.push_back(hook);
}
if let Some(action) = action {
return Poll::Ready(Ok(action))
}
}
Poll::Pending => {
self.running_hook_with_db_write = Some(hook);
}
}
Poll::Pending
}
/// Polls next engine from the collection.
///
/// Returns [`Poll::Ready`] if next hook returned an [event][`crate::hooks::EngineHookEvent`]
/// that resulted in [action][`EngineHookAction`].
///
/// Returns [`Poll::Pending`] in all other cases:
/// 1. Next hook is [`Option::None`], i.e. taken, meaning it's currently running and has a DB
/// write access.
/// 2. Next hook needs a DB write access, but either there's another hook with DB write access
/// running, or `db_write_active` passed into arguments is `true`.
/// 3. Next hook returned [`Poll::Pending`] on polling.
/// 4. Next hook returned [`Poll::Ready`] on polling, but no action to act upon.
pub(crate) fn poll_next_hook(
&mut self,
cx: &mut Context<'_>,
args: EngineContext,
db_write_active: bool,
) -> Poll<Result<EngineHookAction, EngineHookError>> {
let Some(mut hook) = self.hooks.pop_front() else { return Poll::Pending };
// Hook with DB write access level is not allowed to run due to already running hook with DB
// write access level or active DB write according to passed argument
if hook.db_access_level().is_read_write() &&
(self.running_hook_with_db_write.is_some() || db_write_active)
{
return Poll::Pending
}
if let Poll::Ready((event, action)) = hook.poll(cx, args) {
debug!(
target: "consensus::engine::hooks",
hook = hook.name(),
?action,
?event,
"Polled next hook"
);
if event.is_started() && hook.db_access_level().is_read_write() {
self.running_hook_with_db_write = Some(hook);
} else {
self.hooks.push_back(hook);
}
if let Some(action) = action {
return Poll::Ready(Ok(action))
}
} else {
self.hooks.push_back(hook);
}
Poll::Pending
}
/// Returns `true` if there's a hook with DB write access running.
pub(crate) fn is_hook_with_db_write_running(&self) -> bool {
self.running_hook_with_db_write.is_some()
}
}

View File

@ -0,0 +1,128 @@
use reth_interfaces::sync::SyncState;
use reth_primitives::BlockNumber;
use std::{
fmt::Debug,
task::{Context, Poll},
};
mod controller;
pub(crate) use controller::EngineHooksController;
mod prune;
pub use prune::PruneHook;
/// Collection of [engine hooks][`EngineHook`].
#[derive(Default)]
pub struct EngineHooks {
inner: Vec<Box<dyn EngineHook>>,
}
impl EngineHooks {
/// Creates a new empty collection of [engine hooks][`EngineHook`].
pub fn new() -> Self {
Self { inner: Vec::new() }
}
/// Adds a new [engine hook][`EngineHook`] to the collection.
pub fn add<H: EngineHook>(&mut self, hook: H) {
self.inner.push(Box::new(hook))
}
}
/// Hook that will be run during the main loop of
/// [consensus engine][`crate::engine::BeaconConsensusEngine`].
pub trait EngineHook: Send + Sync + 'static {
/// Returns a human-readable name for the hook.
fn name(&self) -> &'static str;
/// Advances the hook execution, emitting an [event][`EngineHookEvent`] and an optional
/// [action][`EngineHookAction`].
fn poll(
&mut self,
cx: &mut Context<'_>,
ctx: EngineContext,
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)>;
/// Returns [db access level][`EngineHookDBAccessLevel`] the hook needs.
fn db_access_level(&self) -> EngineHookDBAccessLevel;
}
/// Engine context passed to the [hook polling function][`EngineHook::poll`].
#[derive(Copy, Clone, Debug)]
pub struct EngineContext {
/// Tip block number.
pub tip_block_number: BlockNumber,
}
/// An event emitted when [hook][`EngineHook`] is polled.
#[derive(Debug)]
pub enum EngineHookEvent {
/// Hook is not ready.
///
/// If this is returned, the hook is idle.
NotReady,
/// Hook started.
///
/// If this is returned, the hook is running.
Started,
/// Hook finished.
///
/// If this is returned, the hook is idle.
Finished(Result<(), EngineHookError>),
}
impl EngineHookEvent {
/// Returns `true` if the event is [`EngineHookEvent::Started`].
pub fn is_started(&self) -> bool {
matches!(self, Self::Started)
}
/// Returns `true` if the event is [`EngineHookEvent::Finished`].
pub fn is_finished(&self) -> bool {
matches!(self, Self::Finished(_))
}
}
/// An action that the caller of [hook][`EngineHook`] should act upon.
#[derive(Debug, Copy, Clone)]
pub enum EngineHookAction {
/// Notify about a [SyncState] update.
UpdateSyncState(SyncState),
/// Read the last relevant canonical hashes from the database and update the block indices of
/// the blockchain tree.
RestoreCanonicalHashes,
}
/// An error returned by [hook][`EngineHook`].
#[derive(Debug, thiserror::Error)]
pub enum EngineHookError {
/// Hook channel closed.
#[error("Hook channel closed")]
ChannelClosed,
/// Common error. Wrapper around [reth_interfaces::Error].
#[error(transparent)]
Common(#[from] reth_interfaces::Error),
/// An internal error occurred.
#[error("Internal hook error occurred.")]
Internal(#[from] Box<dyn std::error::Error + Send + Sync>),
}
/// Level of database access the hook needs for execution.
pub enum EngineHookDBAccessLevel {
/// Read-only database access.
ReadOnly,
/// Read-write database access.
ReadWrite,
}
impl EngineHookDBAccessLevel {
/// Returns `true` if the hook needs read-only access to the database.
pub fn is_read_only(&self) -> bool {
matches!(self, Self::ReadOnly)
}
/// Returns `true` if the hook needs read-write access to the database.
pub fn is_read_write(&self) -> bool {
matches!(self, Self::ReadWrite)
}
}

View File

@ -1,9 +1,17 @@
//! Prune management for the engine implementation. //! Prune hook for the engine implementation.
use crate::{
engine::hooks::{
EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHookEvent,
},
hooks::EngineHookDBAccessLevel,
};
use futures::FutureExt; use futures::FutureExt;
use metrics::Counter;
use reth_db::database::Database; use reth_db::database::Database;
use reth_interfaces::sync::SyncState;
use reth_primitives::BlockNumber; use reth_primitives::BlockNumber;
use reth_prune::{Pruner, PrunerResult, PrunerWithResult}; use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_tasks::TaskSpawner; use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll}; use std::task::{ready, Context, Poll};
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -11,45 +19,67 @@ use tokio::sync::oneshot;
/// Manages pruning under the control of the engine. /// Manages pruning under the control of the engine.
/// ///
/// This type controls the [Pruner]. /// This type controls the [Pruner].
pub(crate) struct EnginePruneController<DB> { pub struct PruneHook<DB> {
/// The current state of the pruner. /// The current state of the pruner.
pruner_state: PrunerState<DB>, pruner_state: PrunerState<DB>,
/// The type that can spawn the pruner task. /// The type that can spawn the pruner task.
pruner_task_spawner: Box<dyn TaskSpawner>, pruner_task_spawner: Box<dyn TaskSpawner>,
metrics: Metrics,
} }
impl<DB: Database + 'static> EnginePruneController<DB> { impl<DB: Database + 'static> PruneHook<DB> {
/// Create a new instance /// Create a new instance
pub(crate) fn new(pruner: Pruner<DB>, pruner_task_spawner: Box<dyn TaskSpawner>) -> Self { pub fn new(pruner: Pruner<DB>, pruner_task_spawner: Box<dyn TaskSpawner>) -> Self {
Self { pruner_state: PrunerState::Idle(Some(pruner)), pruner_task_spawner } Self {
} pruner_state: PrunerState::Idle(Some(pruner)),
pruner_task_spawner,
/// Returns `true` if the pruner is idle. metrics: Metrics::default(),
pub(crate) fn is_pruner_idle(&self) -> bool { }
self.pruner_state.is_idle()
} }
/// Advances the pruner state. /// Advances the pruner state.
/// ///
/// This checks for the result in the channel, or returns pending if the pruner is idle. /// This checks for the result in the channel, or returns pending if the pruner is idle.
fn poll_pruner(&mut self, cx: &mut Context<'_>) -> Poll<EnginePruneEvent> { fn poll_pruner(
let res = match self.pruner_state { &mut self,
cx: &mut Context<'_>,
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)> {
let result = match self.pruner_state {
PrunerState::Idle(_) => return Poll::Pending, PrunerState::Idle(_) => return Poll::Pending,
PrunerState::Running(ref mut fut) => { PrunerState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx)) ready!(fut.poll_unpin(cx))
} }
}; };
let ev = match res {
let event = match result {
Ok((pruner, result)) => { Ok((pruner, result)) => {
self.pruner_state = PrunerState::Idle(Some(pruner)); self.pruner_state = PrunerState::Idle(Some(pruner));
EnginePruneEvent::Finished { result }
match result {
Ok(_) => EngineHookEvent::Finished(Ok(())),
Err(err) => EngineHookEvent::Finished(Err(match err {
PrunerError::PrunePart(_) | PrunerError::InconsistentData(_) => {
EngineHookError::Internal(Box::new(err))
}
PrunerError::Interface(err) => err.into(),
PrunerError::Database(err) => reth_interfaces::Error::Database(err).into(),
PrunerError::Provider(err) => reth_interfaces::Error::Provider(err).into(),
})),
}
} }
Err(_) => { Err(_) => {
// failed to receive the pruner // failed to receive the pruner
EnginePruneEvent::TaskDropped EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
} }
}; };
Poll::Ready(ev)
let action = if matches!(event, EngineHookEvent::Finished(Ok(_))) {
Some(EngineHookAction::RestoreCanonicalHashes)
} else {
None
};
Poll::Ready((event, action))
} }
/// This will try to spawn the pruner if it is idle: /// This will try to spawn the pruner if it is idle:
@ -59,7 +89,10 @@ impl<DB: Database + 'static> EnginePruneController<DB> {
/// 2b. If pruning is not needed, set pruner state back to [PrunerState::Idle]. /// 2b. If pruning is not needed, set pruner state back to [PrunerState::Idle].
/// ///
/// If pruner is already running, do nothing. /// If pruner is already running, do nothing.
fn try_spawn_pruner(&mut self, tip_block_number: BlockNumber) -> Option<EnginePruneEvent> { fn try_spawn_pruner(
&mut self,
tip_block_number: BlockNumber,
) -> Option<(EngineHookEvent, Option<EngineHookAction>)> {
match &mut self.pruner_state { match &mut self.pruner_state {
PrunerState::Idle(pruner) => { PrunerState::Idle(pruner) => {
let mut pruner = pruner.take()?; let mut pruner = pruner.take()?;
@ -74,53 +107,51 @@ impl<DB: Database + 'static> EnginePruneController<DB> {
let _ = tx.send((pruner, result)); let _ = tx.send((pruner, result));
}), }),
); );
self.metrics.runs.increment(1);
self.pruner_state = PrunerState::Running(rx); self.pruner_state = PrunerState::Running(rx);
Some(EnginePruneEvent::Started(tip_block_number)) Some((
EngineHookEvent::Started,
// Engine can't process any FCU/payload messages from CL while we're
// pruning, as pruner needs an exclusive write access to the database. To
// prevent CL from sending us unneeded updates, we need to respond `true`
// on `eth_syncing` request.
Some(EngineHookAction::UpdateSyncState(SyncState::Syncing)),
))
} else { } else {
self.pruner_state = PrunerState::Idle(Some(pruner)); self.pruner_state = PrunerState::Idle(Some(pruner));
Some(EnginePruneEvent::NotReady) Some((EngineHookEvent::NotReady, None))
} }
} }
PrunerState::Running(_) => None, PrunerState::Running(_) => None,
} }
} }
}
/// Advances the prune process with the tip block number. impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
pub(crate) fn poll( fn name(&self) -> &'static str {
"Prune"
}
fn poll(
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
tip_block_number: BlockNumber, ctx: EngineContext,
) -> Poll<EnginePruneEvent> { ) -> Poll<(EngineHookEvent, Option<EngineHookAction>)> {
// Try to spawn a pruner // Try to spawn a pruner
match self.try_spawn_pruner(tip_block_number) { match self.try_spawn_pruner(ctx.tip_block_number) {
Some(EnginePruneEvent::NotReady) => return Poll::Pending, Some((EngineHookEvent::NotReady, _)) => return Poll::Pending,
Some(event) => return Poll::Ready(event), Some((event, action)) => return Poll::Ready((event, action)),
None => (), None => (),
} }
// Poll pruner and check its status // Poll pruner and check its status
self.poll_pruner(cx) self.poll_pruner(cx)
} }
}
/// The event type emitted by the [EnginePruneController]. fn db_access_level(&self) -> EngineHookDBAccessLevel {
#[derive(Debug)] EngineHookDBAccessLevel::ReadWrite
pub(crate) enum EnginePruneEvent { }
/// Pruner is not ready
NotReady,
/// Pruner started with tip block number
Started(BlockNumber),
/// Pruner finished
///
/// If this is returned, the pruner is idle.
Finished {
/// Final result of the pruner run.
result: PrunerResult,
},
/// Pruner task was dropped after it was started, unable to receive it because channel
/// closed. This would indicate a panicked pruner task
TaskDropped,
} }
/// The possible pruner states within the sync controller. /// The possible pruner states within the sync controller.
@ -139,9 +170,9 @@ enum PrunerState<DB> {
Running(oneshot::Receiver<PrunerWithResult<DB>>), Running(oneshot::Receiver<PrunerWithResult<DB>>),
} }
impl<DB> PrunerState<DB> { #[derive(reth_metrics::Metrics)]
/// Returns `true` if the state matches idle. #[metrics(scope = "consensus.engine.prune")]
fn is_idle(&self) -> bool { struct Metrics {
matches!(self, PrunerState::Idle(_)) /// The number of times the pruner was run.
} runs: Counter,
} }

View File

@ -13,8 +13,6 @@ pub(crate) struct EngineMetrics {
pub(crate) forkchoice_updated_messages: Counter, pub(crate) forkchoice_updated_messages: Counter,
/// The total count of new payload messages received. /// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter, pub(crate) new_payload_messages: Counter,
/// The number of times the pruner was run.
pub(crate) pruner_runs: Counter,
/// Latency for making canonical already canonical block /// Latency for making canonical already canonical block
pub(crate) make_canonical_already_canonical_latency: Histogram, pub(crate) make_canonical_already_canonical_latency: Histogram,
/// Latency for making canonical committed block /// Latency for making canonical committed block

View File

@ -3,8 +3,8 @@ use crate::{
forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker}, forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker},
message::OnForkChoiceUpdated, message::OnForkChoiceUpdated,
metrics::EngineMetrics, metrics::EngineMetrics,
prune::{EnginePruneController, EnginePruneEvent},
}, },
hooks::{EngineContext, EngineHookAction, EngineHooksController},
sync::{EngineSyncController, EngineSyncEvent}, sync::{EngineSyncController, EngineSyncEvent},
}; };
use futures::{Future, StreamExt}; use futures::{Future, StreamExt};
@ -29,7 +29,6 @@ use reth_provider::{
BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError, BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
StageCheckpointReader, StageCheckpointReader,
}; };
use reth_prune::Pruner;
use reth_rpc_types::engine::{ use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, PayloadAttributes, PayloadError, PayloadStatus, CancunPayloadFields, ExecutionPayload, PayloadAttributes, PayloadError, PayloadStatus,
PayloadStatusEnum, PayloadValidationError, PayloadStatusEnum, PayloadValidationError,
@ -69,11 +68,17 @@ mod handle;
pub use handle::BeaconConsensusEngineHandle; pub use handle::BeaconConsensusEngineHandle;
mod forkchoice; mod forkchoice;
use crate::hooks::EngineHooks;
pub use forkchoice::ForkchoiceStatus; pub use forkchoice::ForkchoiceStatus;
mod metrics; mod metrics;
pub(crate) mod prune;
pub(crate) mod sync; pub(crate) mod sync;
/// Hooks for running during the main loop of
/// [consensus engine][`crate::engine::BeaconConsensusEngine`].
pub mod hooks;
#[cfg(any(test, feature = "test-utils"))] #[cfg(any(test, feature = "test-utils"))]
pub mod test_utils; pub mod test_utils;
@ -197,8 +202,7 @@ where
/// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will /// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will
/// be used to download and execute the missing blocks. /// be used to download and execute the missing blocks.
pipeline_run_threshold: u64, pipeline_run_threshold: u64,
/// Controls pruning triggered by engine updates. hooks: EngineHooksController,
prune: Option<EnginePruneController<DB>>,
} }
impl<DB, BT, Client> BeaconConsensusEngine<DB, BT, Client> impl<DB, BT, Client> BeaconConsensusEngine<DB, BT, Client>
@ -226,7 +230,7 @@ where
payload_builder: PayloadBuilderHandle, payload_builder: PayloadBuilderHandle,
target: Option<H256>, target: Option<H256>,
pipeline_run_threshold: u64, pipeline_run_threshold: u64,
pruner: Option<Pruner<DB>>, hooks: EngineHooks,
) -> Result<(Self, BeaconConsensusEngineHandle), Error> { ) -> Result<(Self, BeaconConsensusEngineHandle), Error> {
let (to_engine, rx) = mpsc::unbounded_channel(); let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel( Self::with_channel(
@ -242,7 +246,7 @@ where
pipeline_run_threshold, pipeline_run_threshold,
to_engine, to_engine,
rx, rx,
pruner, hooks,
) )
} }
@ -272,7 +276,7 @@ where
pipeline_run_threshold: u64, pipeline_run_threshold: u64,
to_engine: UnboundedSender<BeaconEngineMessage>, to_engine: UnboundedSender<BeaconEngineMessage>,
rx: UnboundedReceiver<BeaconEngineMessage>, rx: UnboundedReceiver<BeaconEngineMessage>,
pruner: Option<Pruner<DB>>, hooks: EngineHooks,
) -> Result<(Self, BeaconConsensusEngineHandle), Error> { ) -> Result<(Self, BeaconConsensusEngineHandle), Error> {
let handle = BeaconConsensusEngineHandle { to_engine }; let handle = BeaconConsensusEngineHandle { to_engine };
let sync = EngineSyncController::new( let sync = EngineSyncController::new(
@ -283,7 +287,6 @@ where
max_block, max_block,
blockchain.chain_spec(), blockchain.chain_spec(),
); );
let prune = pruner.map(|pruner| EnginePruneController::new(pruner, task_spawner));
let mut this = Self { let mut this = Self {
sync, sync,
blockchain, blockchain,
@ -296,7 +299,7 @@ where
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS), invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
metrics: EngineMetrics::default(), metrics: EngineMetrics::default(),
pipeline_run_threshold, pipeline_run_threshold,
prune, hooks: EngineHooksController::new(hooks),
}; };
let maybe_pipeline_target = match target { let maybe_pipeline_target = match target {
@ -638,12 +641,12 @@ where
return Ok(OnForkChoiceUpdated::syncing()) return Ok(OnForkChoiceUpdated::syncing())
} }
if self.is_prune_active() { if self.hooks.is_hook_with_db_write_running() {
// We can only process new forkchoice updates if the pruner is idle, since it requires // We can only process new forkchoice updates if no hook with db write is running,
// exclusive access to the database // since it requires exclusive access to the database
warn!( warn!(
target: "consensus::engine", target: "consensus::engine",
"Pruning is in progress, skipping forkchoice update. \ "Hook is in progress, skipping forkchoice update. \
This may affect the performance of your node as a validator." This may affect the performance of your node as a validator."
); );
return Ok(OnForkChoiceUpdated::syncing()) return Ok(OnForkChoiceUpdated::syncing())
@ -1083,13 +1086,13 @@ where
return Ok(status) return Ok(status)
} }
let res = if self.sync.is_pipeline_idle() && self.is_prune_idle() { let res = if self.sync.is_pipeline_idle() && !self.hooks.is_hook_with_db_write_running() {
// we can only insert new payloads if the pipeline and the pruner are _not_ running, // we can only insert new payloads if the pipeline and any hook with db write
// because they hold exclusive access to the database // are _not_ running, because they hold exclusive access to the database
self.try_insert_new_payload(block) self.try_insert_new_payload(block)
} else { } else {
if self.is_prune_active() { if self.hooks.is_hook_with_db_write_running() {
debug!(target: "consensus::engine", "Pruning is in progress, buffering new payload."); debug!(target: "consensus::engine", "Hook is in progress, buffering new payload.");
} }
self.try_buffer_payload(block) self.try_buffer_payload(block)
}; };
@ -1226,12 +1229,12 @@ where
Ok(()) Ok(())
} }
/// When the pipeline or the pruner is active, the tree is unable to commit any additional /// When the pipeline or a hook with DB write access is active, the tree is unable to commit
/// blocks since the pipeline holds exclusive access to the database. /// any additional blocks since the pipeline holds exclusive access to the database.
/// ///
/// In this scenario we buffer the payload in the tree if the payload is valid, once the /// In this scenario we buffer the payload in the tree if the payload is valid, once the
/// pipeline or pruner is finished, the tree is then able to also use the buffered payloads to /// pipeline or a hook with DB write access is finished, the tree is then able to also use the
/// commit to a (newer) canonical chain. /// buffered payloads to commit to a (newer) canonical chain.
/// ///
/// This will return `SYNCING` if the block was buffered successfully, and an error if an error /// This will return `SYNCING` if the block was buffered successfully, and an error if an error
/// occurred while buffering the block. /// occurred while buffering the block.
@ -1246,7 +1249,7 @@ where
/// Attempts to insert a new payload into the tree. /// Attempts to insert a new payload into the tree.
/// ///
/// Caution: This expects that the pipeline and the pruner are idle. /// Caution: This expects that the pipeline and a hook with DB write access are idle.
#[instrument(level = "trace", skip_all, target = "consensus::engine", ret)] #[instrument(level = "trace", skip_all, target = "consensus::engine", ret)]
fn try_insert_new_payload( fn try_insert_new_payload(
&mut self, &mut self,
@ -1339,14 +1342,6 @@ where
Ok(synced_to_finalized) Ok(synced_to_finalized)
} }
/// Attempt to restore the tree.
///
/// This is invoked after a pruner run to update the tree with the most recent canonical
/// hashes.
fn update_tree_on_finished_pruner(&mut self) -> Result<(), Error> {
self.blockchain.restore_canonical_hashes()
}
/// Invoked if we successfully downloaded a new block from the network. /// Invoked if we successfully downloaded a new block from the network.
/// ///
/// This will attempt to insert the block into the tree. /// This will attempt to insert the block into the tree.
@ -1686,72 +1681,20 @@ where
None None
} }
/// Event handler for events emitted by the [EnginePruneController]. fn on_hook_action(&self, action: EngineHookAction) -> Result<(), BeaconConsensusEngineError> {
/// match action {
/// This returns a result to indicate whether the engine future should resolve (fatal error). EngineHookAction::UpdateSyncState(state) => {
fn on_prune_event( self.sync_state_updater.update_sync_state(state)
&mut self,
event: EnginePruneEvent,
) -> Option<Result<(), BeaconConsensusEngineError>> {
match event {
EnginePruneEvent::NotReady => {}
EnginePruneEvent::Started(tip_block_number) => {
trace!(target: "consensus::engine", %tip_block_number, "Pruner started");
self.metrics.pruner_runs.increment(1);
// Engine can't process any FCU/payload messages from CL while we're pruning, as
// pruner needs an exclusive write access to the database. To prevent CL from
// sending us unneeded updates, we need to respond `true` on `eth_syncing` request.
self.sync_state_updater.update_sync_state(SyncState::Syncing);
} }
EnginePruneEvent::TaskDropped => { EngineHookAction::RestoreCanonicalHashes => {
error!(target: "consensus::engine", "Failed to receive spawned pruner"); if let Err(error) = self.blockchain.restore_canonical_hashes() {
return Some(Err(BeaconConsensusEngineError::PrunerChannelClosed)) error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state");
return Err(error.into())
}
} }
EnginePruneEvent::Finished { result } => {
trace!(target: "consensus::engine", ?result, "Pruner finished");
match result {
Ok(_) => {
// Update the state and hashes of the blockchain tree if possible.
match self.update_tree_on_finished_pruner() {
Ok(()) => {}
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state");
return Some(Err(error.into()))
}
};
}
// Any pruner error at this point is fatal.
Err(error) => return Some(Err(error.into())),
};
}
};
None
}
/// Returns `true` if the prune controller's pruner is idle.
fn is_prune_idle(&self) -> bool {
self.prune.as_ref().map(|prune| prune.is_pruner_idle()).unwrap_or(true)
}
/// Returns `true` if the prune controller's pruner is active.
fn is_prune_active(&self) -> bool {
!self.is_prune_idle()
}
/// Polls the prune controller, if it exists, and processes the event [`EnginePruneEvent`]
/// emitted by it.
///
/// Returns [`Option::Some`] if prune controller emitted an event which resulted in the error
/// (see [`Self::on_prune_event`] for error handling)
fn poll_prune(
&mut self,
cx: &mut Context<'_>,
) -> Option<Result<(), BeaconConsensusEngineError>> {
match self.prune.as_mut()?.poll(cx, self.blockchain.canonical_tip().number) {
Poll::Ready(prune_event) => self.on_prune_event(prune_event),
Poll::Pending => None,
} }
Ok(())
} }
} }
@ -1783,11 +1726,14 @@ where
// Process all incoming messages from the CL, these can affect the state of the // Process all incoming messages from the CL, these can affect the state of the
// SyncController, hence they are polled first, and they're also time sensitive. // SyncController, hence they are polled first, and they're also time sensitive.
loop { loop {
// Poll prune controller first if it's active, as we will not be able to process any // Poll a running hook with db write access first, as we will not be able to process
// engine messages until it's finished. // any engine messages until it's finished.
if this.is_prune_active() { if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write(
if let Some(res) = this.poll_prune(cx) { cx,
return Poll::Ready(res) EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
) {
if let Err(err) = this.on_hook_action(result?) {
return Poll::Ready(Err(err))
} }
} }
@ -1847,16 +1793,18 @@ where
// we're pending if both engine messages and sync events are pending (fully drained) // we're pending if both engine messages and sync events are pending (fully drained)
let is_pending = engine_messages_pending && sync_pending; let is_pending = engine_messages_pending && sync_pending;
// Poll prune controller if all conditions are met: // Poll next hook if all conditions are met:
// 1. Pipeline is idle // 1. No engine and sync messages are pending
// 2. No engine and sync messages are pending // 2. Latest FCU status is not INVALID
// 3. Latest FCU status is not INVALID if is_pending && !this.forkchoice_state_tracker.is_latest_invalid() {
if this.sync.is_pipeline_idle() && if let Poll::Ready(result) = this.hooks.poll_next_hook(
is_pending && cx,
!this.forkchoice_state_tracker.is_latest_invalid() EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
{ this.sync.is_pipeline_active(),
if let Some(res) = this.poll_prune(cx) { ) {
return Poll::Ready(res) if let Err(err) = this.on_hook_action(result?) {
return Poll::Ready(Err(err))
}
} }
} }

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
BeaconConsensus, BeaconConsensusEngine, BeaconConsensusEngineError, engine::hooks::PruneHook, hooks::EngineHooks, BeaconConsensus, BeaconConsensusEngine,
BeaconConsensusEngineHandle, BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, BeaconConsensusEngineError, BeaconConsensusEngineHandle, BeaconForkChoiceUpdateError,
MIN_BLOCKS_FOR_PIPELINE_RUN, BeaconOnNewPayloadError, MIN_BLOCKS_FOR_PIPELINE_RUN,
}; };
use reth_blockchain_tree::{ use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
@ -518,6 +518,9 @@ where
PruneBatchSizes::default(), PruneBatchSizes::default(),
); );
let mut hooks = EngineHooks::new();
hooks.add(PruneHook::new(pruner, Box::<TokioTaskExecutor>::default()));
let (mut engine, handle) = BeaconConsensusEngine::new( let (mut engine, handle) = BeaconConsensusEngine::new(
client, client,
pipeline, pipeline,
@ -529,7 +532,7 @@ where
payload_builder, payload_builder,
None, None,
self.base_config.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN), self.base_config.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN),
Some(pruner), hooks,
) )
.expect("failed to create consensus engine"); .expect("failed to create consensus engine");

View File

@ -21,7 +21,7 @@ pub trait SyncStateProvider: Send + Sync {
/// which point the node is considered fully synced. /// which point the node is considered fully synced.
#[auto_impl::auto_impl(&, Arc, Box)] #[auto_impl::auto_impl(&, Arc, Box)]
pub trait NetworkSyncUpdater: std::fmt::Debug + Send + Sync + 'static { pub trait NetworkSyncUpdater: std::fmt::Debug + Send + Sync + 'static {
/// Notifies about an [SyncState] update. /// Notifies about a [SyncState] update.
fn update_sync_state(&self, state: SyncState); fn update_sync_state(&self, state: SyncState);
/// Updates the status of the p2p node /// Updates the status of the p2p node
@ -29,7 +29,7 @@ pub trait NetworkSyncUpdater: std::fmt::Debug + Send + Sync + 'static {
} }
/// The state the network is currently in when it comes to synchronization. /// The state the network is currently in when it comes to synchronization.
#[derive(Clone, Eq, PartialEq, Debug)] #[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum SyncState { pub enum SyncState {
/// Node sync is complete. /// Node sync is complete.
/// ///