feat: BackFillJobStream (#9578)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Luca Provini
2024-07-18 11:32:37 +02:00
committed by GitHub
parent ce4d4bd43d
commit 43ffb83575
5 changed files with 231 additions and 64 deletions

1
Cargo.lock generated
View File

@ -7269,6 +7269,7 @@ name = "reth-exex"
version = "1.0.2"
dependencies = [
"eyre",
"futures",
"metrics",
"reth-blockchain-tree",
"reth-chainspec",

View File

@ -33,6 +33,7 @@ reth-stages-api.workspace = true
## async
tokio.workspace = true
tokio-util.workspace = true
futures.workspace = true
## misc
eyre.workspace = true

View File

@ -0,0 +1,74 @@
use reth_evm::execute::{
BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_primitives::{BlockNumber, BlockWithSenders, Receipt};
use reth_provider::{
BlockReader, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_tracing::tracing::trace;
use std::ops::RangeInclusive;
use crate::BackfillJob;
/// Single block Backfill job started for a specific range.
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug, Clone)]
pub struct SingleBlockBackfillJob<E, P> {
executor: E,
provider: P,
pub(crate) range: RangeInclusive<BlockNumber>,
}
impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;
fn next(&mut self) -> Option<Self::Item> {
self.range.next().map(|block_number| self.execute_block(block_number))
}
}
impl<E, P> SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
pub(crate) fn execute_block(
&self,
block_number: u64,
) -> Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError> {
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
// Fetch the block with senders for execution.
let block_with_senders = self
.provider
.block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
// Configure the executor to use the previous block's state.
let executor = self.executor.executor(StateProviderDatabase::new(
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
));
trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block");
let block_execution_output = executor.execute((&block_with_senders, td).into())?;
Ok((block_with_senders, block_execution_output))
}
}
impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
fn from(value: BackfillJob<E, P>) -> Self {
Self { executor: value.executor, provider: value.provider, range: value.range }
}
}

View File

@ -1,8 +1,7 @@
use reth_evm::execute::{
BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use job::SingleBlockBackfillJob;
use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider};
use reth_node_api::FullNodeComponents;
use reth_primitives::{Block, BlockNumber, BlockWithSenders, Receipt};
use reth_primitives::{Block, BlockNumber};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant,
@ -15,6 +14,10 @@ use std::{
ops::RangeInclusive,
time::{Duration, Instant},
};
use stream::BackFillJobStream;
mod job;
mod stream;
/// Factory for creating new backfill jobs.
#[derive(Debug, Clone)]
@ -198,67 +201,14 @@ impl<E, P> BackfillJob<E, P> {
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
self.into()
}
}
impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
fn from(value: BackfillJob<E, P>) -> Self {
Self { executor: value.executor, provider: value.provider, range: value.range }
}
}
/// Single block Backfill job started for a specific range.
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug)]
pub struct SingleBlockBackfillJob<E, P> {
executor: E,
provider: P,
range: RangeInclusive<BlockNumber>,
}
impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;
fn next(&mut self) -> Option<Self::Item> {
self.range.next().map(|block_number| self.execute_block(block_number))
}
}
impl<E, P> SingleBlockBackfillJob<E, P>
where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
fn execute_block(
&self,
block_number: u64,
) -> Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError> {
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
// Fetch the block with senders for execution.
let block_with_senders = self
.provider
.block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
// Configure the executor to use the previous block's state.
let executor = self.executor.executor(StateProviderDatabase::new(
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
));
trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block");
let block_execution_output = executor.execute((&block_with_senders, td).into())?;
Ok((block_with_senders, block_execution_output))
/// Converts the backfill job into a backfill job stream.
pub fn into_stream(self) -> BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static,
{
BackFillJobStream::new(self.into_single_blocks())
}
}
@ -266,6 +216,7 @@ where
mod tests {
use crate::BackfillJobFactory;
use eyre::OptionExt;
use futures::StreamExt;
use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_db_common::init::init_genesis;
@ -519,4 +470,45 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_async_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());
let chain_spec = chain_spec(address);
let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;
// Create first 2 blocks
let blocks_and_execution_outcomes =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
// Backfill the first block
let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
let mut backfill_stream = factory.backfill(1..=1).into_stream();
// execute first block
let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
execution_output.state.reverts.sort();
let sealed_block_with_senders = blocks_and_execution_outcomes[0].0.clone();
let expected_block = sealed_block_with_senders.unseal();
let expected_output = &blocks_and_execution_outcomes[0].1;
assert_eq!(block, expected_block);
assert_eq!(&execution_output, expected_output);
// expect no more blocks
assert!(backfill_stream.next().await.is_none());
Ok(())
}
}

View File

@ -0,0 +1,99 @@
use super::job::SingleBlockBackfillJob;
use futures::{
stream::{FuturesOrdered, Stream},
StreamExt,
};
use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
use reth_primitives::{BlockNumber, BlockWithSenders, Receipt};
use reth_provider::{BlockReader, HeaderProvider, StateProviderFactory};
use std::{
ops::RangeInclusive,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::task::JoinHandle;
type BackfillTasks = FuturesOrdered<
JoinHandle<Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>>,
>;
/// The default parallelism for active tasks in [`BackFillJobStream`].
const DEFAULT_PARALLELISM: usize = 4;
/// Stream for processing backfill jobs asynchronously.
///
/// This struct manages the execution of [`SingleBlockBackfillJob`] tasks, allowing blocks to be
/// processed asynchronously but in order within a specified range.
#[derive(Debug)]
pub struct BackFillJobStream<E, P> {
job: SingleBlockBackfillJob<E, P>,
tasks: BackfillTasks,
range: RangeInclusive<BlockNumber>,
parallelism: usize,
}
impl<E, P> BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static,
{
/// Creates a new [`BackFillJobStream`] with the default parallelism.
///
/// # Parameters
/// - `job`: The [`SingleBlockBackfillJob`] to be executed asynchronously.
///
/// # Returns
/// A new instance of [`BackFillJobStream`] with the default parallelism.
pub fn new(job: SingleBlockBackfillJob<E, P>) -> Self {
let range = job.range.clone();
Self { job, tasks: FuturesOrdered::new(), range, parallelism: DEFAULT_PARALLELISM }
}
/// Configures the parallelism of the [`BackFillJobStream`] to handle active tasks.
///
/// # Parameters
/// - `parallelism`: The parallelism to handle active tasks.
///
/// # Returns
/// The modified instance of [`BackFillJobStream`] with the specified parallelism.
pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
self.parallelism = parallelism;
self
}
fn spawn_task(
&self,
block_number: BlockNumber,
) -> JoinHandle<Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>>
{
let job = self.job.clone();
tokio::task::spawn_blocking(move || job.execute_block(block_number))
}
}
impl<E, P> Stream for BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static + Unpin,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// Spawn new tasks only if we are below the parallelism configured.
while this.tasks.len() < this.parallelism {
if let Some(block_number) = this.range.next() {
let task = this.spawn_task(block_number);
this.tasks.push_back(task);
} else {
break;
}
}
match ready!(this.tasks.poll_next_unpin(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(|e| BlockExecutionError::Other(e.into()))?)),
None => Poll::Ready(None),
}
}
}