fix(cli): fix ImportReceiptsOp (#11216)

This commit is contained in:
joshieDo
2024-09-26 16:55:28 +02:00
committed by GitHub
parent 6a7d8938c7
commit d46f76264d
3 changed files with 87 additions and 71 deletions

View File

@ -49,7 +49,7 @@ Imports a `.rlp` file of blocks.
Import of >100 million OVM blocks, from genesis to Bedrock, completes in 45 minutes. Import of >100 million OVM blocks, from genesis to Bedrock, completes in 45 minutes.
```bash ```bash
$ op-reth import-op <exported-blocks> $ op-reth import-op --chain optimism <exported-blocks>
``` ```
#### 2. Import Receipts #### 2. Import Receipts
@ -63,7 +63,7 @@ Imports a `.rlp` file of receipts, that has been exported with command specified
Import of >100 million OVM receipts, from genesis to Bedrock, completes in 30 minutes. Import of >100 million OVM receipts, from genesis to Bedrock, completes in 30 minutes.
```bash ```bash
$ op-reth import-receipts-op <exported-receipts> $ op-reth import-receipts-op --chain optimism <exported-receipts>
``` ```
#### 3. Import State #### 3. Import State

View File

@ -5,7 +5,7 @@ use reth_primitives::{Receipt, Receipts};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, FramedRead}; use tokio_util::codec::{Decoder, FramedRead};
use tracing::trace; use tracing::{trace, warn};
use crate::{DecodedFileChunk, FileClientError}; use crate::{DecodedFileChunk, FileClientError};
@ -106,6 +106,11 @@ where
match receipt { match receipt {
Some(ReceiptWithBlockNumber { receipt, number }) => { Some(ReceiptWithBlockNumber { receipt, number }) => {
if block_number > number {
warn!(target: "downloaders::file", previous_block_number = block_number, "skipping receipt from a lower block: {number}");
continue
}
total_receipts += 1; total_receipts += 1;
if first_block.is_none() { if first_block.is_none() {

View File

@ -19,11 +19,12 @@ use reth_optimism_primitives::bedrock::is_dup_tx;
use reth_primitives::Receipts; use reth_primitives::Receipts;
use reth_provider::{ use reth_provider::{
writer::UnifiedStorageWriter, DatabaseProviderFactory, OriginalValuesKnown, ProviderFactory, writer::UnifiedStorageWriter, DatabaseProviderFactory, OriginalValuesKnown, ProviderFactory,
StageCheckpointReader, StateWriter, StaticFileProviderFactory, StaticFileWriter, StatsReader, StageCheckpointReader, StageCheckpointWriter, StateWriter, StaticFileProviderFactory,
StaticFileWriter, StatsReader,
}; };
use reth_stages::StageId; use reth_stages::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment; use reth_static_file_types::StaticFileSegment;
use tracing::{debug, error, info, trace}; use tracing::{debug, info, trace, warn};
use crate::receipt_file_codec::HackReceiptFileCodec; use crate::receipt_file_codec::HackReceiptFileCodec;
@ -91,15 +92,6 @@ where
P: AsRef<Path>, P: AsRef<Path>,
F: FnMut(u64, &mut Receipts) -> usize, F: FnMut(u64, &mut Receipts) -> usize,
{ {
let total_imported_txns = provider_factory
.static_file_provider()
.count_entries::<tables::Transactions>()
.expect("transaction static files must exist before importing receipts");
let highest_block_transactions = provider_factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::Transactions)
.expect("transaction static files must exist before importing receipts");
for stage in StageId::ALL { for stage in StageId::ALL {
let checkpoint = provider_factory.database_provider_ro()?.get_stage_checkpoint(stage)?; let checkpoint = provider_factory.database_provider_ro()?.get_stage_checkpoint(stage)?;
trace!(target: "reth::cli", trace!(target: "reth::cli",
@ -113,53 +105,9 @@ where
let reader = ChunkedFileReader::new(&path, chunk_len).await?; let reader = ChunkedFileReader::new(&path, chunk_len).await?;
// import receipts // import receipts
let ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns } = let _ = import_receipts_from_reader(&provider_factory, reader, filter).await?;
import_receipts_from_reader(&provider_factory, reader, filter).await?;
if total_decoded_receipts == 0 {
error!(target: "reth::cli", "No receipts were imported, ensure the receipt file is valid and not empty");
return Ok(())
}
let total_imported_receipts = provider_factory
.static_file_provider()
.count_entries::<tables::Receipts>()
.expect("static files must exist after ensuring we decoded more than zero");
if total_imported_receipts + total_filtered_out_dup_txns != total_decoded_receipts {
error!(target: "reth::cli",
total_decoded_receipts,
total_imported_receipts,
total_filtered_out_dup_txns,
"Receipts were partially imported"
);
}
if total_imported_receipts != total_imported_txns {
error!(target: "reth::cli",
total_imported_receipts,
total_imported_txns,
"Receipts inconsistent with transactions"
);
}
let highest_block_receipts = provider_factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::Receipts)
.expect("static files must exist after ensuring we decoded more than zero");
if highest_block_receipts != highest_block_transactions {
error!(target: "reth::cli",
highest_block_receipts,
highest_block_transactions,
"Height of receipts inconsistent with transactions"
);
}
info!(target: "reth::cli", info!(target: "reth::cli",
total_imported_receipts,
total_decoded_receipts,
total_filtered_out_dup_txns,
"Receipt file imported" "Receipt file imported"
); );
@ -181,15 +129,45 @@ where
N: NodeTypesWithDB<ChainSpec = ChainSpec>, N: NodeTypesWithDB<ChainSpec = ChainSpec>,
F: FnMut(u64, &mut Receipts) -> usize, F: FnMut(u64, &mut Receipts) -> usize,
{ {
let mut total_decoded_receipts = 0; let static_file_provider = provider_factory.static_file_provider();
let mut total_filtered_out_dup_txns = 0;
// Ensure that receipts hasn't been initialized apart from `init_genesis`.
if let Some(num_receipts) =
static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts)
{
if num_receipts > 0 {
eyre::bail!("Expected no receipts in storage, but found {num_receipts}.");
}
}
match static_file_provider.get_highest_static_file_block(StaticFileSegment::Receipts) {
Some(receipts_block) => {
if receipts_block > 0 {
eyre::bail!("Expected highest receipt block to be 0, but found {receipts_block}.");
}
}
None => {
eyre::bail!("Receipts was not initialized. Please import blocks and transactions before calling this command.");
}
}
let provider = provider_factory.provider_rw()?; let provider = provider_factory.provider_rw()?;
let static_file_provider = provider_factory.static_file_provider(); let mut total_decoded_receipts = 0;
let mut total_receipts = 0;
let mut total_filtered_out_dup_txns = 0;
let mut highest_block_receipts = 0;
let highest_block_transactions = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Transactions)
.expect("transaction static files must exist before importing receipts");
while let Some(file_client) = while let Some(file_client) =
reader.next_receipts_chunk::<ReceiptFileClient<_>, HackReceiptFileCodec>().await? reader.next_receipts_chunk::<ReceiptFileClient<_>, HackReceiptFileCodec>().await?
{ {
if highest_block_receipts == highest_block_transactions {
warn!(target: "reth::cli", highest_block_receipts, highest_block_transactions, "Ignoring all other blocks in the file since we have reached the desired height");
break
}
// create a new file client from chunk read from file // create a new file client from chunk read from file
let ReceiptFileClient { let ReceiptFileClient {
mut receipts, mut receipts,
@ -221,6 +199,21 @@ where
// this ensures the execution outcome and static file producer start at block 1 // this ensures the execution outcome and static file producer start at block 1
first_block = 1; first_block = 1;
} }
highest_block_receipts = first_block + receipts.len() as u64 - 1;
// RLP file may have too many blocks. We ignore the excess, but warn the user.
if highest_block_receipts > highest_block_transactions {
let excess = highest_block_receipts - highest_block_transactions;
highest_block_receipts -= excess;
// Remove the last `excess` blocks
receipts.receipt_vec.truncate(receipts.len() - excess as usize);
warn!(target: "reth::cli", highest_block_receipts, "Too many decoded blocks, ignoring the last {excess}.");
}
// Update total_receipts after all filtering
total_receipts += receipts.iter().map(|v| v.len()).sum::<usize>();
// We're reusing receipt writing code internal to // We're reusing receipt writing code internal to
// `UnifiedStorageWriter::append_receipts_from_blocks`, so we just use a default empty // `UnifiedStorageWriter::append_receipts_from_blocks`, so we just use a default empty
@ -228,16 +221,32 @@ where
let execution_outcome = let execution_outcome =
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default()); ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
let static_file_producer =
static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)?;
// finally, write the receipts // finally, write the receipts
let mut storage_writer = UnifiedStorageWriter::from(&provider, static_file_producer); let mut storage_writer = UnifiedStorageWriter::from(
&provider,
static_file_provider.latest_writer(StaticFileSegment::Receipts)?,
);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?; storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
} }
// as static files works in file ranges, internally it will be committing when creating the // Only commit if we have imported as many receipts as the number of transactions.
// next file range already, so we only need to call explicitly at the end. let total_imported_txns = static_file_provider
.count_entries::<tables::Transactions>()
.expect("transaction static files must exist before importing receipts");
if total_receipts != total_imported_txns {
eyre::bail!("Number of receipts ({total_receipts}) inconsistent with transactions {total_imported_txns}")
}
// Only commit if the receipt block height matches the one from transactions.
if highest_block_receipts != highest_block_transactions {
eyre::bail!("Receipt block height ({highest_block_receipts}) inconsistent with transactions' {highest_block_transactions}")
}
// Required or any access-write provider factory will attempt to unwind to 0.
provider
.save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?;
UnifiedStorageWriter::commit(provider, static_file_provider)?; UnifiedStorageWriter::commit(provider, static_file_provider)?;
Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns }) Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
@ -246,8 +255,10 @@ where
/// Result of importing receipts in chunks. /// Result of importing receipts in chunks.
#[derive(Debug)] #[derive(Debug)]
pub struct ImportReceiptsResult { pub struct ImportReceiptsResult {
total_decoded_receipts: usize, /// Total decoded receipts.
total_filtered_out_dup_txns: usize, pub total_decoded_receipts: usize,
/// Total filtered out receipts.
pub total_filtered_out_dup_txns: usize,
} }
#[cfg(test)] #[cfg(test)]