feat: add Block AT to BlockReader (#12837)

This commit is contained in:
Arsenii Kulikov
2024-11-25 18:28:56 +04:00
committed by GitHub
parent e2c42ae242
commit c44e11b8ad
69 changed files with 664 additions and 267 deletions

View File

@ -35,6 +35,7 @@ reth-tasks.workspace = true
reth-tracing.workspace = true
# alloy
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true

View File

@ -4,12 +4,14 @@ use std::{
time::{Duration, Instant},
};
use alloy_consensus::BlockHeader;
use alloy_primitives::BlockNumber;
use reth_evm::execute::{
BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_primitives::{Block, BlockExt, BlockWithSenders, Receipt};
use reth_primitives_traits::format_gas_throughput;
use reth_node_api::{Block as _, BlockBody as _};
use reth_primitives::{BlockExt, BlockWithSenders, Receipt};
use reth_primitives_traits::{format_gas_throughput, SignedTransaction};
use reth_provider::{
BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant,
};
@ -37,7 +39,9 @@ pub struct BackfillJob<E, P> {
impl<E, P> Iterator for BackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
P: HeaderProvider
+ BlockReader<Transaction: SignedTransaction, Block = reth_primitives::Block>
+ StateProviderFactory,
{
type Item = BackfillJobResult<Chain>;
@ -53,7 +57,9 @@ where
impl<E, P> BackfillJob<E, P>
where
E: BlockExecutorProvider,
P: BlockReader + HeaderProvider + StateProviderFactory,
P: BlockReader<Transaction: SignedTransaction, Block = reth_primitives::Block>
+ HeaderProvider
+ StateProviderFactory,
{
/// Converts the backfill job into a single block backfill job.
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
@ -100,10 +106,10 @@ where
fetch_block_duration += fetch_block_start.elapsed();
cumulative_gas += block.gas_used;
cumulative_gas += block.gas_used();
// Configure the executor to use the current state.
trace!(target: "exex::backfill", number = block_number, txs = block.body.transactions.len(), "Executing block");
trace!(target: "exex::backfill", number = block_number, txs = block.body.transactions().len(), "Executing block");
// Execute the block
let execute_start = Instant::now();
@ -111,8 +117,7 @@ where
// Unseal the block for execution
let (block, senders) = block.into_components();
let (unsealed_header, hash) = block.header.split();
let block =
Block { header: unsealed_header, body: block.body }.with_senders_unchecked(senders);
let block = P::Block::new(unsealed_header, block.body).with_senders_unchecked(senders);
executor.execute_and_verify_one((&block, td).into())?;
execution_duration += execute_start.elapsed();
@ -134,7 +139,7 @@ where
}
}
let last_block_number = blocks.last().expect("blocks should not be empty").number;
let last_block_number = blocks.last().expect("blocks should not be empty").number();
debug!(
target: "exex::backfill",
range = ?*self.range.start()..=last_block_number,
@ -165,7 +170,7 @@ pub struct SingleBlockBackfillJob<E, P> {
impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
P: HeaderProvider + BlockReader<Block = reth_primitives::Block> + StateProviderFactory,
{
type Item = BackfillJobResult<(BlockWithSenders, BlockExecutionOutput<Receipt>)>;
@ -177,7 +182,7 @@ where
impl<E, P> SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
P: HeaderProvider + BlockReader<Block = reth_primitives::Block> + StateProviderFactory,
{
/// Converts the single block backfill job into a stream.
pub fn into_stream(
@ -189,7 +194,7 @@ where
pub(crate) fn execute_block(
&self,
block_number: u64,
) -> BackfillJobResult<(BlockWithSenders, BlockExecutionOutput<Receipt>)> {
) -> BackfillJobResult<(BlockWithSenders<P::Block>, BlockExecutionOutput<Receipt>)> {
let td = self
.provider
.header_td_by_number(block_number)?
@ -206,7 +211,7 @@ where
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
));
trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.transactions.len(), "Executing block");
trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body().transactions().len(), "Executing block");
let block_execution_output = executor.execute((&block_with_senders, td).into())?;

View File

@ -103,7 +103,13 @@ where
impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + Unpin + 'static,
P: HeaderProvider
+ BlockReader<Block = reth_primitives::Block>
+ StateProviderFactory
+ Clone
+ Send
+ Unpin
+ 'static,
{
type Item = BackfillJobResult<SingleBlockStreamItem>;
@ -136,7 +142,13 @@ where
impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + Unpin + 'static,
P: HeaderProvider
+ BlockReader<Block = reth_primitives::Block>
+ StateProviderFactory
+ Clone
+ Send
+ Unpin
+ 'static,
{
type Item = BackfillJobResult<BatchBlockStreamItem>;

View File

@ -3,6 +3,7 @@ use reth_exex_types::ExExHead;
use reth_node_api::{FullNodeComponents, NodeTypes};
use reth_node_core::node_config::NodeConfig;
use reth_primitives::Head;
use reth_provider::BlockReader;
use reth_tasks::TaskExecutor;
use std::fmt::Debug;
use tokio::sync::mpsc::UnboundedSender;
@ -56,7 +57,7 @@ where
impl<Node> ExExContext<Node>
where
Node: FullNodeComponents,
Node::Provider: Debug,
Node::Provider: Debug + BlockReader<Block = reth_primitives::Block>,
Node::Executor: Debug,
{
/// Returns dynamic version of the context
@ -106,13 +107,19 @@ where
/// Sets notifications stream to [`crate::ExExNotificationsWithoutHead`], a stream of
/// notifications without a head.
pub fn set_notifications_without_head(&mut self) {
pub fn set_notifications_without_head(&mut self)
where
Node::Provider: BlockReader<Block = reth_primitives::Block>,
{
self.notifications.set_without_head();
}
/// Sets notifications stream to [`crate::ExExNotificationsWithHead`], a stream of notifications
/// with the provided head.
pub fn set_notifications_with_head(&mut self, head: ExExHead) {
pub fn set_notifications_with_head(&mut self, head: ExExHead)
where
Node::Provider: BlockReader<Block = reth_primitives::Block>,
{
self.notifications.set_with_head(head);
}
}
@ -121,6 +128,7 @@ where
mod tests {
use reth_exex_types::ExExHead;
use reth_node_api::FullNodeComponents;
use reth_provider::BlockReader;
use crate::ExExContext;
@ -132,7 +140,10 @@ mod tests {
ctx: ExExContext<Node>,
}
impl<Node: FullNodeComponents> ExEx<Node> {
impl<Node: FullNodeComponents> ExEx<Node>
where
Node::Provider: BlockReader<Block = reth_primitives::Block>,
{
async fn _test_bounds(mut self) -> eyre::Result<()> {
self.ctx.pool();
self.ctx.block_executor();

View File

@ -6,6 +6,7 @@ use std::fmt::Debug;
use reth_chainspec::{EthChainSpec, Head};
use reth_node_api::FullNodeComponents;
use reth_node_core::node_config::NodeConfig;
use reth_provider::BlockReader;
use tokio::sync::mpsc;
use crate::{ExExContext, ExExEvent, ExExNotificationsStream};
@ -51,7 +52,7 @@ impl Debug for ExExContextDyn {
impl<Node> From<ExExContext<Node>> for ExExContextDyn
where
Node: FullNodeComponents,
Node::Provider: Debug,
Node::Provider: Debug + BlockReader<Block = reth_primitives::Block>,
Node::Executor: Debug,
{
fn from(ctx: ExExContext<Node>) -> Self {

View File

@ -90,7 +90,12 @@ impl<P, E> ExExNotifications<P, E> {
impl<P, E> ExExNotificationsStream for ExExNotifications<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
P: BlockReader<Block = reth_primitives::Block>
+ HeaderProvider
+ StateProviderFactory
+ Clone
+ Unpin
+ 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
fn set_without_head(&mut self) {
@ -139,7 +144,12 @@ where
impl<P, E> Stream for ExExNotifications<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
P: BlockReader<Block = reth_primitives::Block>
+ HeaderProvider
+ StateProviderFactory
+ Clone
+ Unpin
+ 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
type Item = eyre::Result<ExExNotification>;
@ -262,7 +272,12 @@ impl<P, E> ExExNotificationsWithHead<P, E> {
impl<P, E> ExExNotificationsWithHead<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
P: BlockReader<Block = reth_primitives::Block>
+ HeaderProvider
+ StateProviderFactory
+ Clone
+ Unpin
+ 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
/// Checks if the ExEx head is on the canonical chain.
@ -339,7 +354,12 @@ where
impl<P, E> Stream for ExExNotificationsWithHead<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
P: BlockReader<Block = reth_primitives::Block>
+ HeaderProvider
+ StateProviderFactory
+ Clone
+ Unpin
+ 'static,
E: BlockExecutorProvider + Clone + Unpin + 'static,
{
type Item = eyre::Result<ExExNotification>;

View File

@ -265,7 +265,7 @@ pub async fn test_exex_context_with_chain_spec(
let (static_dir, _) = create_test_static_files_dir();
let db = create_test_rw_db();
let provider_factory = ProviderFactory::new(
let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
db,
chain_spec.clone(),
StaticFileProvider::read_write(static_dir.into_path()).expect("static file provider"),
@ -289,7 +289,7 @@ pub async fn test_exex_context_with_chain_spec(
let (_, payload_builder) = NoopPayloadBuilderService::<EthEngineTypes>::new();
let components = NodeAdapter::<FullNodeTypesAdapter<NodeTypesWithDBAdapter<TestNode, _>, _>, _> {
let components = NodeAdapter::<FullNodeTypesAdapter<_, _>, _> {
components: Components {
transaction_pool,
evm_config,