feat: send CanonStateNotifications from execution stage (#7578)

This commit is contained in:
Oliver Nordbjerg
2024-04-19 16:39:52 +02:00
committed by GitHub
parent 6646a329ad
commit 49c02c3b8e
21 changed files with 246 additions and 194 deletions

View File

@ -15,7 +15,7 @@ workspace = true
reth-beacon-consensus-core.workspace = true
reth-primitives.workspace = true
reth-interfaces.workspace = true
reth-stages.workspace = true
reth-stages-api.workspace = true
reth-db.workspace = true
reth-provider.workspace = true
reth-rpc-types.workspace = true
@ -65,5 +65,5 @@ optimism = [
"reth-interfaces/optimism",
"reth-provider/optimism",
"reth-blockchain-tree/optimism",
"reth-beacon-consensus-core/optimism"
"reth-beacon-consensus-core/optimism",
]

View File

@ -1,7 +1,7 @@
use crate::engine::hooks::EngineHookError;
use reth_interfaces::RethError;
use reth_rpc_types::engine::ForkchoiceUpdateError;
use reth_stages::PipelineError;
use reth_stages_api::PipelineError;
/// Beacon engine result.
pub type BeaconEngineResult<Ok> = Result<Ok, BeaconConsensusEngineError>;

View File

@ -33,7 +33,7 @@ use reth_provider::{
use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
};
use reth_stages::{ControlFlow, Pipeline};
use reth_stages_api::{ControlFlow, Pipeline};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
use std::{

View File

@ -12,7 +12,7 @@ use reth_interfaces::p2p::{
headers::client::HeadersClient,
};
use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, B256};
use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
use std::{

View File

@ -355,6 +355,26 @@ pub struct ExExManagerHandle {
}
impl ExExManagerHandle {
/// Creates an empty manager handle.
///
/// Use this if there is no manager present.
///
/// The handle will always be ready, and have a capacity of 0.
pub fn empty() -> Self {
let (exex_tx, _) = mpsc::unbounded_channel();
let (_, is_ready_rx) = watch::channel(true);
let (_, finished_height_rx) = watch::channel(FinishedExExHeight::NoExExs);
Self {
exex_tx,
num_exexs: 0,
is_ready_receiver: is_ready_rx.clone(),
is_ready: WatchStream::new(is_ready_rx),
current_capacity: Arc::new(AtomicUsize::new(0)),
finished_height: finished_height_rx,
}
}
/// Synchronously send a notification over the channel to all execution extensions.
///
/// Senders should call [`Self::has_capacity`] first.

View File

@ -27,7 +27,7 @@ use reth_db::{
test_utils::{create_test_rw_db, TempDatabase},
DatabaseEnv,
};
use reth_exex::{ExExContext, ExExHandle, ExExManager};
use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle};
use reth_interfaces::p2p::either::EitherDownloader;
use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle};
use reth_node_api::{
@ -711,6 +711,8 @@ where
}
// Configure the pipeline
let pipeline_exex_handle =
exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
let (mut pipeline, client) = if config.dev.dev {
info!(target: "reth::cli", "Starting Reth in dev mode");
@ -743,6 +745,7 @@ where
max_block,
static_file_producer,
evm_config,
pipeline_exex_handle,
)
.await?;
@ -765,6 +768,7 @@ where
max_block,
static_file_producer,
evm_config,
pipeline_exex_handle,
)
.await?;

View File

@ -7,6 +7,7 @@ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_exex::ExExManagerHandle;
use reth_interfaces::{
consensus::Consensus,
p2p::{
@ -49,6 +50,7 @@ pub async fn build_networked_pipeline<DB, Client, EvmConfig>(
max_block: Option<BlockNumber>,
static_file_producer: StaticFileProducer<DB>,
evm_config: EvmConfig,
exex_manager_handle: ExExManagerHandle,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Unpin + Clone + 'static,
@ -76,6 +78,7 @@ where
prune_config,
static_file_producer,
evm_config,
exex_manager_handle,
)
.await?;
@ -96,6 +99,7 @@ pub async fn build_pipeline<DB, H, B, EvmConfig>(
prune_config: Option<PruneConfig>,
static_file_producer: StaticFileProducer<DB>,
evm_config: EvmConfig,
exex_manager_handle: ExExManagerHandle,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Clone + 'static,
@ -166,6 +170,7 @@ where
.max(stage_config.account_hashing.clean_threshold)
.max(stage_config.storage_hashing.clean_threshold),
prune_modes.clone(),
exex_manager_handle,
)
.with_metrics_tx(metrics_tx),
)

View File

@ -13,6 +13,7 @@ workspace = true
[dependencies]
# reth
reth-exex.workspace = true
reth-primitives.workspace = true
reth-interfaces.workspace = true
reth-db.workspace = true
@ -21,7 +22,7 @@ reth-provider.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-etl.workspace = true
reth-config.workspace = true
reth-stages-api = {workspace = true , features = ["test-utils"]}
reth-stages-api = { workspace = true, features = ["test-utils"] }
# async
tokio = { workspace = true, features = ["sync"] }
@ -35,7 +36,7 @@ thiserror.workspace = true
itertools.workspace = true
rayon.workspace = true
num-traits = "0.2.15"
tempfile = { workspace = true, optional = true}
tempfile = { workspace = true, optional = true }
[dev-dependencies]
# reth
@ -64,10 +65,20 @@ criterion = { workspace = true, features = ["async_futures"] }
serde_json.workspace = true
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
pprof = { workspace = true, features = ["flamegraph", "frame-pointer", "criterion"] }
pprof = { workspace = true, features = [
"flamegraph",
"frame-pointer",
"criterion",
] }
[features]
test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils", "reth-provider/test-utils", "reth-stages-api/test-utils", "dep:tempfile"]
test-utils = [
"reth-interfaces/test-utils",
"reth-db/test-utils",
"reth-provider/test-utils",
"reth-stages-api/test-utils",
"dep:tempfile",
]
[[bench]]
name = "criterion"

View File

@ -1,23 +1,20 @@
use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD;
use num_traits::Zero;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
models::BlockNumberAddress,
static_file::HeaderMask,
tables,
transaction::{DbTx, DbTxMut},
cursor::DbCursorRO, database::Database, static_file::HeaderMask, tables, transaction::DbTx,
};
use reth_exex::ExExManagerHandle;
use reth_primitives::{
stage::{
CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, StageCheckpoint, StageId,
},
BlockNumber, Header, PruneModes, StaticFileSegment, U256,
BlockNumber, Header, PruneModes, StaticFileSegment,
};
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
BlockReader, DatabaseProviderRW, ExecutorFactory, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, StatsReader, TransactionVariant,
BlockReader, CanonStateNotification, Chain, DatabaseProviderRW, ExecutorFactory,
HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, StatsReader,
TransactionVariant,
};
use reth_stages_api::{
BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError,
@ -26,6 +23,8 @@ use reth_stages_api::{
use std::{
cmp::Ordering,
ops::RangeInclusive,
sync::Arc,
task::{ready, Context, Poll},
time::{Duration, Instant},
};
use tracing::*;
@ -74,6 +73,8 @@ pub struct ExecutionStage<EF: ExecutorFactory> {
external_clean_threshold: u64,
/// Pruning configuration.
prune_modes: PruneModes,
/// Handle to communicate with ExEx manager.
exex_manager_handle: ExExManagerHandle,
}
impl<EF: ExecutorFactory> ExecutionStage<EF> {
@ -83,6 +84,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
thresholds: ExecutionStageThresholds,
external_clean_threshold: u64,
prune_modes: PruneModes,
exex_manager_handle: ExExManagerHandle,
) -> Self {
Self {
metrics_tx: None,
@ -90,6 +92,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
executor_factory,
thresholds,
prune_modes,
exex_manager_handle,
}
}
@ -102,6 +105,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
ExecutionStageThresholds::default(),
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
PruneModes::none(),
ExExManagerHandle::empty(),
)
}
@ -156,6 +160,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let mut cumulative_gas = 0;
let batch_start = Instant::now();
let mut blocks = Vec::new();
for block_number in start_block..=max_block {
// Fetch the block
let fetch_block_start = Instant::now();
@ -191,9 +196,13 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
}
stage_progress = block_number;
stage_checkpoint.progress.processed += block.gas_used;
// If we have ExEx's we need to save the block in memory for later
if self.exex_manager_handle.has_exexs() {
blocks.push(block);
}
// Check if we should commit now
let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64;
if self.thresholds.is_end_of_batch(
@ -209,6 +218,25 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let state = executor.take_output_state();
let write_preparation_duration = time.elapsed();
// Check if we should send a [`CanonStateNotification`] to execution extensions.
//
// Note: Since we only write to `blocks` if there are any ExEx's we don't need to perform
// the `has_exexs` check here as well
if !blocks.is_empty() {
let chain = Arc::new(Chain::new(
blocks.into_iter().map(|block| {
let hash = block.header.hash_slow();
block.seal(hash)
}),
state.clone(),
None,
));
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self.exex_manager_handle.send(CanonStateNotification::Commit { new: chain });
}
let time = Instant::now();
// write output
state.write_to_storage(
@ -360,6 +388,16 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
StageId::Execution
}
fn poll_execute_ready(
&mut self,
cx: &mut Context<'_>,
_: ExecInput,
) -> Poll<Result<(), StageError>> {
ready!(self.exex_manager_handle.poll_ready(cx));
Poll::Ready(Ok(()))
}
/// Execute the stage
fn execute(
&mut self,
@ -375,74 +413,33 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
provider: &DatabaseProviderRW<DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
// Acquire changeset cursors
let mut account_changeset = tx.cursor_dup_write::<tables::AccountChangeSets>()?;
let mut storage_changeset = tx.cursor_dup_write::<tables::StorageChangeSets>()?;
let (range, unwind_to, _) =
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
return Ok(UnwindOutput {
checkpoint: input.checkpoint.with_block_number(input.unwind_to),
})
}
// get all batches for account change
// Check if walk and walk_dup would do the same thing
let account_changeset_batch =
account_changeset.walk_range(range.clone())?.collect::<Result<Vec<_>, _>>()?;
// Unwind account and storage changesets, as well as receipts.
//
// This also updates `PlainStorageState` and `PlainAccountState`.
let bundle_state_with_receipts = provider.unwind_or_peek_state::<true>(range.clone())?;
// revert all changes to PlainState
for (_, changeset) in account_changeset_batch.into_iter().rev() {
if let Some(account_info) = changeset.info {
tx.put::<tables::PlainAccountState>(changeset.address, account_info)?;
} else {
tx.delete::<tables::PlainAccountState>(changeset.address, None)?;
}
// Construct a `CanonStateNotification` if we have ExEx's installed.
if self.exex_manager_handle.has_exexs() {
// Get the blocks for the unwound range. This is needed for `CanonStateNotification`.
let blocks = provider.get_take_block_range::<false>(range.clone())?;
let chain = Chain::new(blocks, bundle_state_with_receipts, None);
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self.exex_manager_handle.send(CanonStateNotification::Reorg {
old: Arc::new(chain),
new: Arc::new(Chain::default()),
});
}
// get all batches for storage change
let storage_changeset_batch = storage_changeset
.walk_range(BlockNumberAddress::range(range.clone()))?
.collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainStorage
let mut plain_storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
for (key, storage) in storage_changeset_batch.into_iter().rev() {
let address = key.address();
if let Some(v) = plain_storage_cursor.seek_by_key_subkey(address, storage.key)? {
if v.key == storage.key {
plain_storage_cursor.delete_current()?;
}
}
if storage.value != U256::ZERO {
plain_storage_cursor.upsert(address, storage)?;
}
}
// Discard unwinded changesets
provider.unwind_table_by_num::<tables::AccountChangeSets>(unwind_to)?;
let mut rev_storage_changeset_walker = storage_changeset.walk_back(None)?;
while let Some((key, _)) = rev_storage_changeset_walker.next().transpose()? {
if key.block_number() < *range.start() {
break
}
// delete all changesets
rev_storage_changeset_walker.delete_current()?;
}
// Look up the start index for the transaction range
let first_tx_num = provider
.block_body_indices(*range.start())?
.ok_or(ProviderError::BlockBodyIndicesNotFound(*range.start()))?
.first_tx_num();
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();
// Unwind all receipts for transactions in the block range
if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() {
// We only use static files for Receipts, if there is no receipt pruning of any kind.
@ -451,34 +448,24 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
// if the expected highest receipt in the files is higher than the database.
// Which is essentially what happens here when we unwind this stage.
let _static_file_producer = prepare_static_file_producer(provider, *range.start())?;
// Update the checkpoint.
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
for block_number in range {
stage_checkpoint.progress.processed -= provider
.block_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?
.gas_used;
}
}
} else {
// We use database for Receipts, if there is any kind of receipt pruning/filtering,
// since it is not supported by static files.
let mut cursor = tx.cursor_write::<tables::Receipts>()?;
let mut reverse_walker = cursor.walk_back(None)?;
while let Some(Ok((tx_number, receipt))) = reverse_walker.next() {
if tx_number < first_tx_num {
break
}
reverse_walker.delete_current()?;
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
stage_checkpoint.progress.processed -= receipt.cumulative_gas_used;
}
}
// If there is any kind of receipt pruning/filtering we use the database, since static
// files do not support filters.
//
// If we hit this case, the receipts have already been unwound by the call to
// `unwind_or_peek_state`.
}
// Update the checkpoint.
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
for block_number in range {
stage_checkpoint.progress.processed -= provider
.block_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?
.gas_used;
}
}
let checkpoint = if let Some(stage_checkpoint) = stage_checkpoint {
StageCheckpoint::new(unwind_to).with_execution_stage_checkpoint(stage_checkpoint)
} else {
@ -621,17 +608,17 @@ mod tests {
use crate::test_utils::TestStageDB;
use alloy_rlp::Decodable;
use assert_matches::assert_matches;
use reth_db::models::AccountBeforeTx;
use reth_db::{models::AccountBeforeTx, transaction::DbTxMut};
use reth_evm_ethereum::EthEvmConfig;
use reth_interfaces::executor::BlockValidationError;
use reth_primitives::{
address, hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Address,
Bytecode, ChainSpecBuilder, PruneMode, ReceiptsLogPruneConfig, SealedBlock, StorageEntry,
B256,
B256, U256,
};
use reth_provider::{test_utils::create_test_provider_factory, AccountReader, ReceiptProvider};
use reth_revm::EvmProcessorFactory;
use std::{collections::BTreeMap, sync::Arc};
use std::collections::BTreeMap;
fn stage() -> ExecutionStage<EvmProcessorFactory<EthEvmConfig>> {
let executor_factory = EvmProcessorFactory::new(
@ -648,6 +635,7 @@ mod tests {
},
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
PruneModes::none(),
ExExManagerHandle::empty(),
)
}

View File

@ -51,6 +51,7 @@ mod tests {
AccountsHistory, DatabaseEnv,
};
use reth_evm_ethereum::EthEvmConfig;
use reth_exex::ExExManagerHandle;
use reth_interfaces::test_utils::generators::{self, random_block};
use reth_primitives::{
address, hex_literal::hex, keccak256, Account, Bytecode, ChainSpecBuilder, PruneMode,
@ -151,6 +152,7 @@ mod tests {
},
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
prune_modes.clone(),
ExExManagerHandle::empty(),
);
execution_stage.execute(&provider, input).unwrap();

View File

@ -218,11 +218,13 @@ impl BundleStateWithReceipts {
self.first_block
}
/// Revert to given block number.
/// Revert the state to the given block number.
///
/// If number is in future, or in the past return false
/// Returns false if the block number is not in the bundle state.
///
/// NOTE: Provided block number will stay inside the bundle state.
/// # Note
///
/// The provided block number will stay inside the bundle state.
pub fn revert_to(&mut self, block_number: BlockNumber) -> bool {
let Some(index) = self.block_number_to_index(block_number) else { return false };

View File

@ -363,7 +363,6 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
}
// TODO(joshie) TEMPORARY should be moved to trait providers
/// Unwind or peek at last N blocks of state recreating the [`BundleStateWithReceipts`].
///
/// If UNWIND it set to true tip and latest state will be unwind
@ -388,7 +387,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
/// 1. Take the old value from the changeset
/// 2. Take the new value from the local state
/// 3. Set the local state to the value in the changeset
fn unwind_or_peek_state<const UNWIND: bool>(
pub fn unwind_or_peek_state<const UNWIND: bool>(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BundleStateWithReceipts> {
@ -706,8 +705,8 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
Ok(block_tx)
}
/// Return range of blocks and its execution result
fn get_take_block_range<const TAKE: bool>(
/// Get or unwind the given range of blocks.
pub fn get_take_block_range<const TAKE: bool>(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {

View File

@ -4,14 +4,12 @@ use crate::{bundle_state::BundleStateWithReceipts, StateProvider};
use reth_interfaces::executor::BlockExecutionError;
use reth_primitives::{BlockNumber, BlockWithSenders, PruneModes, Receipt, U256};
/// Executor factory that would create the EVM with particular state provider.
///
/// It can be used to mock executor.
/// A factory capable of creating an executor with the given state provider.
pub trait ExecutorFactory: Send + Sync + 'static {
/// Executor with [`StateProvider`]
fn with_state<'a, SP: StateProvider + 'a>(
&'a self,
_sp: SP,
sp: SP,
) -> Box<dyn PrunableBlockExecutor<Error = BlockExecutionError> + 'a>;
}
@ -19,7 +17,7 @@ pub trait ExecutorFactory: Send + Sync + 'static {
///
/// This type is capable of executing (multiple) blocks by applying the state changes made by each
/// block. The final state of the executor can extracted using
/// [take_output_state](BlockExecutor::take_output_state).
/// [`Self::take_output_state`].
pub trait BlockExecutor {
/// The error type returned by the executor.
type Error;