feat(exex): backfill stream in batches (#9738)

This commit is contained in:
Alexey Shekhirin
2024-08-01 00:39:12 +01:00
committed by GitHub
parent c58ffc47e3
commit 62246199b9
6 changed files with 249 additions and 83 deletions

View File

@ -28,7 +28,7 @@ impl<'a, Block> From<(&'a Block, U256)> for BlockExecutionInput<'a, Block> {
/// Contains the state changes, transaction receipts, and total gas used in the block.
///
/// TODO(mattsse): combine with `ExecutionOutcome`
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockExecutionOutput<T> {
/// The changed state of the block after execution.
pub state: BundleState,

View File

@ -44,7 +44,7 @@ impl<E, P> BackfillJobFactory<E, P> {
/// Sets the stream parallelism.
///
/// Configures the [`BackFillJobStream`](super::stream::BackFillJobStream) created via
/// Configures the [`StreamBackfillJob`](super::stream::StreamBackfillJob) created via
/// [`BackfillJob::into_stream`].
pub const fn with_stream_parallelism(mut self, stream_parallelism: usize) -> Self {
self.stream_parallelism = stream_parallelism;

View File

@ -1,4 +1,4 @@
use crate::BackFillJobStream;
use crate::StreamBackfillJob;
use std::{
ops::RangeInclusive,
time::{Duration, Instant},
@ -52,6 +52,16 @@ where
E: BlockExecutorProvider,
P: BlockReader + HeaderProvider + StateProviderFactory,
{
/// Converts the backfill job into a single block backfill job.
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
self.into()
}
/// Converts the backfill job into a stream.
pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain> {
self.into()
}
fn execute_range(&mut self) -> Result<Chain, BlockExecutionError> {
let mut executor = self.executor.batch_executor(StateProviderDatabase::new(
self.provider.history_by_block_number(self.range.start().saturating_sub(1))?,
@ -137,32 +147,16 @@ where
}
}
impl<E, P> BackfillJob<E, P> {
/// Converts the backfill job into a single block backfill job.
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
self.into()
}
/// Converts the backfill job into a backfill job stream.
pub fn into_stream(self) -> BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static,
{
let parallelism = self.stream_parallelism;
BackFillJobStream::new(self.into_single_blocks()).with_parallelism(parallelism)
}
}
/// Single block Backfill job started for a specific range.
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug, Clone)]
pub struct SingleBlockBackfillJob<E, P> {
executor: E,
provider: P,
pub(crate) executor: E,
pub(crate) provider: P,
pub(crate) range: RangeInclusive<BlockNumber>,
pub(crate) stream_parallelism: usize,
}
impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
@ -182,6 +176,13 @@ where
E: BlockExecutorProvider,
P: HeaderProvider + BlockReader + StateProviderFactory,
{
/// Converts the single block backfill job into a stream.
pub fn into_stream(
self,
) -> StreamBackfillJob<E, P, (BlockWithSenders, BlockExecutionOutput<Receipt>)> {
self.into()
}
pub(crate) fn execute_block(
&self,
block_number: u64,
@ -211,8 +212,13 @@ where
}
impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
fn from(value: BackfillJob<E, P>) -> Self {
Self { executor: value.executor, provider: value.provider, range: value.range }
fn from(job: BackfillJob<E, P>) -> Self {
Self {
executor: job.executor,
provider: job.provider,
range: job.range,
stream_parallelism: job.stream_parallelism,
}
}
}

View File

@ -6,4 +6,4 @@ mod test_utils;
pub use factory::BackfillJobFactory;
pub use job::{BackfillJob, SingleBlockBackfillJob};
pub use stream::BackFillJobStream;
pub use stream::StreamBackfillJob;

View File

@ -1,4 +1,4 @@
use crate::SingleBlockBackfillJob;
use crate::{BackfillJob, SingleBlockBackfillJob};
use std::{
ops::RangeInclusive,
pin::Pin,
@ -11,90 +11,159 @@ use futures::{
};
use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
use reth_primitives::{BlockNumber, BlockWithSenders, Receipt};
use reth_provider::{BlockReader, HeaderProvider, StateProviderFactory};
use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
use reth_prune_types::PruneModes;
use reth_stages_api::ExecutionStageThresholds;
use tokio::task::JoinHandle;
type BackfillTasks = FuturesOrdered<
JoinHandle<Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>>,
>;
/// The default parallelism for active tasks in [`BackFillJobStream`].
/// The default parallelism for active tasks in [`StreamBackfillJob`].
pub(crate) const DEFAULT_PARALLELISM: usize = 4;
/// The default batch size for active tasks in [`StreamBackfillJob`].
const DEFAULT_BATCH_SIZE: usize = 100;
type BackfillTasks<T> = FuturesOrdered<JoinHandle<Result<T, BlockExecutionError>>>;
type SingleBlockStreamItem = (BlockWithSenders, BlockExecutionOutput<Receipt>);
type BatchBlockStreamItem = Chain;
/// Stream for processing backfill jobs asynchronously.
///
/// This struct manages the execution of [`SingleBlockBackfillJob`] tasks, allowing blocks to be
/// processed asynchronously but in order within a specified range.
#[derive(Debug)]
pub struct BackFillJobStream<E, P> {
job: SingleBlockBackfillJob<E, P>,
tasks: BackfillTasks,
pub struct StreamBackfillJob<E, P, T> {
executor: E,
provider: P,
prune_modes: PruneModes,
range: RangeInclusive<BlockNumber>,
tasks: BackfillTasks<T>,
parallelism: usize,
batch_size: usize,
}
impl<E, P> BackFillJobStream<E, P>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static,
{
/// Creates a new [`BackFillJobStream`] with the default parallelism.
///
/// # Parameters
/// - `job`: The [`SingleBlockBackfillJob`] to be executed asynchronously.
///
/// # Returns
/// A new instance of [`BackFillJobStream`] with the default parallelism.
pub fn new(job: SingleBlockBackfillJob<E, P>) -> Self {
let range = job.range.clone();
Self { job, tasks: FuturesOrdered::new(), range, parallelism: DEFAULT_PARALLELISM }
}
/// Configures the parallelism of the [`BackFillJobStream`] to handle active tasks.
///
/// # Parameters
/// - `parallelism`: The parallelism to handle active tasks.
///
/// # Returns
/// The modified instance of [`BackFillJobStream`] with the specified parallelism.
impl<E, P, T> StreamBackfillJob<E, P, T> {
/// Configures the parallelism of the [`StreamBackfillJob`] to handle active tasks.
pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
self.parallelism = parallelism;
self
}
fn spawn_task(
&self,
block_number: BlockNumber,
) -> JoinHandle<Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>>
{
let job = self.job.clone();
tokio::task::spawn_blocking(move || job.execute_block(block_number))
/// Configures the batch size for the [`StreamBackfillJob`].
pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
fn poll_next_task(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<T, BlockExecutionError>>> {
match ready!(self.tasks.poll_next_unpin(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(BlockExecutionError::other)?)),
None => Poll::Ready(None),
}
}
}
impl<E, P> Stream for BackFillJobStream<E, P>
impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + 'static + Unpin,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + Unpin + 'static,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;
type Item = Result<SingleBlockStreamItem, BlockExecutionError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// Spawn new tasks only if we are below the parallelism configured.
while this.tasks.len() < this.parallelism {
// If we have a block number, then we can spawn a new task for that block
if let Some(block_number) = this.range.next() {
let task = this.spawn_task(block_number);
let mut job = SingleBlockBackfillJob {
executor: this.executor.clone(),
provider: this.provider.clone(),
range: block_number..=block_number,
stream_parallelism: this.parallelism,
};
let task =
tokio::task::spawn_blocking(move || job.next().expect("non-empty range"));
this.tasks.push_back(task);
} else {
break;
}
}
match ready!(this.tasks.poll_next_unpin(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(BlockExecutionError::other)?)),
None => Poll::Ready(None),
this.poll_next_task(cx)
}
}
impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem>
where
E: BlockExecutorProvider + Clone + Send + 'static,
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + Unpin + 'static,
{
type Item = Result<BatchBlockStreamItem, BlockExecutionError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// Spawn new tasks only if we are below the parallelism configured.
while this.tasks.len() < this.parallelism {
// Take the next `batch_size` blocks from the range and calculate the range bounds
let mut range = this.range.by_ref().take(this.batch_size);
let start = range.next();
let range_bounds = start.zip(range.last().or(start));
// Advance the range by `batch_size` blocks
this.range.nth(this.batch_size);
// If we have range bounds, then we can spawn a new task for that range
if let Some((first, last)) = range_bounds {
let range = first..=last;
let mut job = BackfillJob {
executor: this.executor.clone(),
provider: this.provider.clone(),
prune_modes: this.prune_modes.clone(),
thresholds: ExecutionStageThresholds::default(),
range,
stream_parallelism: this.parallelism,
};
let task =
tokio::task::spawn_blocking(move || job.next().expect("non-empty range"));
this.tasks.push_back(task);
} else {
break;
}
}
this.poll_next_task(cx)
}
}
impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
Self {
executor: job.executor,
provider: job.provider,
prune_modes: PruneModes::default(),
range: job.range,
tasks: FuturesOrdered::new(),
parallelism: job.stream_parallelism,
batch_size: 1,
}
}
}
impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem> {
fn from(job: BackfillJob<E, P>) -> Self {
Self {
executor: job.executor,
provider: job.provider,
prune_modes: job.prune_modes,
range: job.range,
tasks: FuturesOrdered::new(),
parallelism: job.stream_parallelism,
batch_size: job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize),
}
}
}
@ -104,7 +173,9 @@ mod tests {
use std::sync::Arc;
use crate::{
backfill::test_utils::{blocks_and_execution_outputs, chain_spec},
backfill::test_utils::{
blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
},
BackfillJobFactory,
};
use futures::StreamExt;
@ -115,11 +186,12 @@ mod tests {
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
};
use reth_stages_api::ExecutionStageThresholds;
use reth_testing_utils::generators;
use secp256k1::Keypair;
#[tokio::test]
async fn test_async_backfill() -> eyre::Result<()> {
async fn test_single_blocks() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Create a key pair for the sender
@ -142,7 +214,7 @@ mod tests {
// Backfill the first block
let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
let mut backfill_stream = factory.backfill(1..=1).into_stream();
let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
// execute first block
let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
@ -158,4 +230,44 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_batch() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());
let chain_spec = chain_spec(address);
let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;
// Create first 2 blocks
let (blocks, execution_outcome) =
blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
// Backfill the same range
let factory =
BackfillJobFactory::new(executor.clone(), blockchain_db.clone()).with_thresholds(
ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() },
);
let mut backfill_stream = factory.backfill(1..=2).into_stream();
let mut chain = backfill_stream.next().await.unwrap().unwrap();
chain.execution_outcome_mut().state_mut().reverts.sort();
assert!(chain.blocks_iter().eq(&blocks));
assert_eq!(chain.execution_outcome(), &execution_outcome);
// expect no more blocks
assert!(backfill_stream.next().await.is_none());
Ok(())
}
}

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use eyre::OptionExt;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_evm::execute::{
BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor,
BatchExecutor, BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::{
@ -82,14 +82,10 @@ where
Ok(block_execution_output)
}
pub(crate) fn blocks_and_execution_outputs<DB>(
provider_factory: ProviderFactory<DB>,
fn blocks(
chain_spec: Arc<ChainSpec>,
key_pair: Keypair,
) -> eyre::Result<Vec<(SealedBlockWithSenders, BlockExecutionOutput<Receipt>)>>
where
DB: reth_db_api::database::Database,
{
) -> eyre::Result<(BlockWithSenders, BlockWithSenders)> {
// First block has a transaction that transfers some ETH to zero address
let block1 = Block {
header: Header {
@ -150,6 +146,19 @@ where
.with_recovered_senders()
.ok_or_eyre("failed to recover senders")?;
Ok((block1, block2))
}
pub(crate) fn blocks_and_execution_outputs<DB>(
provider_factory: ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
key_pair: Keypair,
) -> eyre::Result<Vec<(SealedBlockWithSenders, BlockExecutionOutput<Receipt>)>>
where
DB: reth_db_api::database::Database,
{
let (block1, block2) = blocks(chain_spec.clone(), key_pair)?;
let block_output1 =
execute_block_and_commit_to_database(&provider_factory, chain_spec.clone(), &block1)?;
let block_output2 =
@ -160,3 +169,42 @@ where
Ok(vec![(block1, block_output1), (block2, block_output2)])
}
pub(crate) fn blocks_and_execution_outcome<DB>(
provider_factory: ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
key_pair: Keypair,
) -> eyre::Result<(Vec<SealedBlockWithSenders>, ExecutionOutcome)>
where
DB: reth_db_api::database::Database,
{
let (block1, block2) = blocks(chain_spec.clone(), key_pair)?;
let provider = provider_factory.provider()?;
let executor =
EthExecutorProvider::ethereum(chain_spec).batch_executor(StateProviderDatabase::new(
LatestStateProviderRef::new(provider.tx_ref(), provider.static_file_provider().clone()),
));
let mut execution_outcome = executor.execute_and_verify_batch(vec![
(&block1, U256::ZERO).into(),
(&block2, U256::ZERO).into(),
])?;
execution_outcome.state_mut().reverts.sort();
let block1 = block1.seal_slow();
let block2 = block2.seal_slow();
// Commit the block's execution outcome to the database
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_state(
vec![block1.clone(), block2.clone()],
execution_outcome.clone(),
Default::default(),
Default::default(),
)?;
provider_rw.commit()?;
Ok((vec![block1, block2], execution_outcome))
}