feat(op): import receipts (#7914)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Emilia Hane
2024-05-07 22:26:58 +02:00
committed by GitHub
parent 9bd74fda9e
commit 0ad9c7866b
11 changed files with 930 additions and 94 deletions

View File

@ -8,7 +8,7 @@ use crate::{
LogArgs, LogArgs,
}, },
commands::{ commands::{
config_cmd, db, debug_cmd, dump_genesis, import, init_cmd, init_state, config_cmd, db, debug_cmd, dump_genesis, import, import_receipts, init_cmd, init_state,
node::{self, NoArgs}, node::{self, NoArgs},
p2p, recover, stage, test_vectors, p2p, recover, stage, test_vectors,
}, },
@ -150,6 +150,9 @@ impl<Ext: clap::Args + fmt::Debug> Cli<Ext> {
Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::ImportReceipts(command) => {
runner.run_blocking_until_ctrl_c(command.execute())
}
#[cfg(feature = "optimism")] #[cfg(feature = "optimism")]
Commands::ImportOp(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::ImportOp(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
@ -188,6 +191,9 @@ pub enum Commands<Ext: clap::Args + fmt::Debug = NoArgs> {
/// This syncs RLP encoded blocks from a file. /// This syncs RLP encoded blocks from a file.
#[command(name = "import")] #[command(name = "import")]
Import(import::ImportCommand), Import(import::ImportCommand),
/// This imports RLP encoded receipts from a file.
#[command(name = "import-receipts")]
ImportReceipts(import_receipts::ImportReceiptsCommand),
/// This syncs RLP encoded OP blocks below Bedrock from a file, without executing. /// This syncs RLP encoded OP blocks below Bedrock from a file, without executing.
#[cfg(feature = "optimism")] #[cfg(feature = "optimism")]
#[command(name = "import-op")] #[command(name = "import-op")]

View File

@ -138,7 +138,7 @@ impl ImportCommand {
let mut total_decoded_blocks = 0; let mut total_decoded_blocks = 0;
let mut total_decoded_txns = 0; let mut total_decoded_txns = 0;
while let Some(file_client) = reader.next_chunk().await? { while let Some(file_client) = reader.next_chunk::<FileClient>().await? {
// create a new FileClient from chunk read from file // create a new FileClient from chunk read from file
info!(target: "reth::cli", info!(target: "reth::cli",
"Importing chain file chunk" "Importing chain file chunk"

View File

@ -14,7 +14,9 @@ use reth_beacon_consensus::EthBeaconConsensus;
use reth_config::{config::EtlConfig, Config}; use reth_config::{config::EtlConfig, Config};
use reth_db::{init_db, tables, transaction::DbTx}; use reth_db::{init_db, tables, transaction::DbTx};
use reth_downloaders::file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}; use reth_downloaders::file_client::{
ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE,
};
use reth_node_core::init::init_genesis; use reth_node_core::init::init_genesis;
@ -117,7 +119,7 @@ impl ImportOpCommand {
let mut total_decoded_txns = 0; let mut total_decoded_txns = 0;
let mut total_filtered_out_dup_txns = 0; let mut total_filtered_out_dup_txns = 0;
while let Some(mut file_client) = reader.next_chunk().await? { while let Some(mut file_client) = reader.next_chunk::<FileClient>().await? {
// create a new FileClient from chunk read from file // create a new FileClient from chunk read from file
info!(target: "reth::cli", info!(target: "reth::cli",
"Importing chain file chunk" "Importing chain file chunk"

View File

@ -0,0 +1,165 @@
//! Command that imports receipts from a file.
use crate::{
args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs,
},
dirs::{DataDirPath, MaybePlatformPath},
};
use clap::Parser;
use reth_db::{database::Database, init_db, transaction::DbTx, DatabaseEnv};
use reth_downloaders::{
file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
receipt_file_client::ReceiptFileClient,
};
use reth_node_core::version::SHORT_VERSION;
use reth_primitives::{stage::StageId, ChainSpec, StaticFileSegment};
use reth_provider::{
BundleStateWithReceipts, OriginalValuesKnown, ProviderFactory, StageCheckpointReader,
StaticFileProviderFactory, StaticFileWriter,
};
use tracing::{debug, error, info};
use std::{path::PathBuf, sync::Arc};
/// Initializes the database with the genesis block.
#[derive(Debug, Parser)]
pub struct ImportReceiptsCommand {
/// The path to the data dir for all reth files and subdirectories.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/` or `$HOME/.local/share/reth/`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/`
/// - macOS: `$HOME/Library/Application Support/reth/`
#[arg(long, value_name = "DATA_DIR", verbatim_doc_comment, default_value_t)]
datadir: MaybePlatformPath<DataDirPath>,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
#[arg(
long,
value_name = "CHAIN_OR_PATH",
long_help = chain_help(),
default_value = SUPPORTED_CHAINS[0],
value_parser = genesis_value_parser
)]
chain: Arc<ChainSpec>,
/// Chunk byte length.
#[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
chunk_len: Option<u64>,
#[command(flatten)]
db: DatabaseArgs,
/// The path to a receipts file for import.
#[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)]
path: PathBuf,
}
impl ImportReceiptsCommand {
/// Execute `import` command
pub async fn execute(self) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
debug!(target: "reth::cli",
chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE),
"Chunking receipts import"
);
// add network name to data dir
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db();
info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(db_path, self.db.database_args())?);
info!(target: "reth::cli", "Database opened");
let provider_factory =
ProviderFactory::new(db.clone(), self.chain.clone(), data_dir.static_files())?;
let provider = provider_factory.provider_rw()?;
let static_file_provider = provider_factory.static_file_provider();
for stage in StageId::ALL {
let checkpoint = provider.get_stage_checkpoint(stage)?;
debug!(target: "reth::cli",
?stage,
?checkpoint,
"Read stage checkpoints from db"
);
}
// prepare the tx for `write_to_storage`
let tx = provider.into_tx();
let mut total_decoded_receipts = 0;
// open file
let mut reader = ChunkedFileReader::new(&self.path, self.chunk_len).await?;
while let Some(file_client) = reader.next_chunk::<ReceiptFileClient>().await? {
// create a new file client from chunk read from file
let ReceiptFileClient { receipts, first_block, total_receipts: total_receipts_chunk } =
file_client;
// mark these as decoded
total_decoded_receipts += total_receipts_chunk;
info!(target: "reth::cli",
first_receipts_block=?first_block,
total_receipts_chunk,
"Importing receipt file chunk"
);
// We're reusing receipt writing code internal to
// `BundleStateWithReceipts::write_to_storage`, so we just use a default empty
// `BundleState`.
let bundled_state =
BundleStateWithReceipts::new(Default::default(), receipts, first_block);
let static_file_producer =
static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)?;
// finally, write the receipts
bundled_state.write_to_storage::<<DatabaseEnv as Database>::TXMut>(
&tx,
Some(static_file_producer),
OriginalValuesKnown::Yes,
)?;
}
tx.commit()?;
// as static files works in file ranges, internally it will be committing when creating the
// next file range already, so we only need to call explicitly at the end.
static_file_provider.commit()?;
if total_decoded_receipts == 0 {
error!(target: "reth::cli", "No receipts were imported, ensure the receipt file is valid and not empty");
return Ok(())
}
// compare the highest static file block to the number of receipts we decoded
//
// `HeaderNumbers` and `TransactionHashNumbers` tables serve as additional indexes, but
// nothing like this needs to exist for Receipts. So `tx.entries::<tables::Receipts>` would
// return zero here.
let total_imported_receipts = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Receipts)
.expect("static files must exist after ensuring we decoded more than zero");
if total_imported_receipts != total_decoded_receipts as u64 {
error!(target: "reth::cli",
total_decoded_receipts,
total_imported_receipts,
"Receipts were partially imported"
);
}
info!(target: "reth::cli", total_imported_receipts, "Receipt file imported");
Ok(())
}
}

View File

@ -6,6 +6,7 @@ pub mod debug_cmd;
pub mod dump_genesis; pub mod dump_genesis;
pub mod import; pub mod import;
pub mod import_op; pub mod import_op;
pub mod import_receipts;
pub mod init_cmd; pub mod init_cmd;
pub mod init_state; pub mod init_state;

View File

@ -1,4 +1,5 @@
use super::file_codec::BlockFileCodec; use super::file_codec::BlockFileCodec;
use futures::Future;
use itertools::Either; use itertools::Either;
use reth_interfaces::p2p::{ use reth_interfaces::p2p::{
bodies::client::{BodiesClient, BodiesFut}, bodies::client::{BodiesClient, BodiesFut},
@ -12,7 +13,7 @@ use reth_primitives::{
BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, BytesMut, Header, HeadersDirection, BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, BytesMut, Header, HeadersDirection,
SealedHeader, B256, SealedHeader, B256,
}; };
use std::{collections::HashMap, path::Path}; use std::{collections::HashMap, io, path::Path};
use thiserror::Error; use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt}; use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
@ -57,6 +58,16 @@ pub enum FileClientError {
/// An error occurred when decoding blocks, headers, or rlp headers from the file. /// An error occurred when decoding blocks, headers, or rlp headers from the file.
#[error("{0}")] #[error("{0}")]
Rlp(alloy_rlp::Error, Vec<u8>), Rlp(alloy_rlp::Error, Vec<u8>),
/// Custom error message.
#[error("{0}")]
Custom(&'static str),
}
impl From<&'static str> for FileClientError {
fn from(value: &'static str) -> Self {
Self::Custom(value)
}
} }
impl FileClient { impl FileClient {
@ -78,82 +89,6 @@ impl FileClient {
Ok(Self::from_reader(&reader[..], file_len).await?.0) Ok(Self::from_reader(&reader[..], file_len).await?.0)
} }
/// Initialize the [`FileClient`] from bytes that have been read from file.
pub(crate) async fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> Result<(Self, Vec<u8>), FileClientError>
where
B: AsyncReadExt + Unpin,
{
let mut headers = HashMap::new();
let mut hash_to_number = HashMap::new();
let mut bodies = HashMap::new();
// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);
trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
capacity=stream.read_buffer().capacity(),
"init decode stream"
);
let mut remaining_bytes = vec![];
let mut log_interval = 0;
let mut log_interval_start_block = 0;
while let Some(block_res) = stream.next().await {
let block = match block_res {
Ok(block) => block,
Err(FileClientError::Rlp(err, bytes)) => {
trace!(target: "downloaders::file",
%err,
bytes_len=bytes.len(),
"partial block returned from decoding chunk"
);
remaining_bytes = bytes;
break
}
Err(err) => return Err(err),
};
let block_number = block.header.number;
let block_hash = block.header.hash_slow();
// add to the internal maps
headers.insert(block.header.number, block.header.clone());
hash_to_number.insert(block_hash, block.header.number);
bodies.insert(
block_hash,
BlockBody {
transactions: block.body,
ommers: block.ommers,
withdrawals: block.withdrawals,
},
);
if log_interval == 0 {
trace!(target: "downloaders::file",
block_number,
"read first block"
);
log_interval_start_block = block_number;
} else if log_interval % 100_000 == 0 {
trace!(target: "downloaders::file",
blocks=?log_interval_start_block..=block_number,
"read blocks from file"
);
log_interval_start_block = block_number + 1;
}
log_interval += 1;
}
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
Ok((Self { headers, hash_to_number, bodies }, remaining_bytes))
}
/// Get the tip hash of the chain. /// Get the tip hash of the chain.
pub fn tip(&self) -> Option<B256> { pub fn tip(&self) -> Option<B256> {
self.headers.get(&self.max_block()?).map(|h| h.hash_slow()) self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
@ -241,6 +176,88 @@ impl FileClient {
} }
} }
impl FromReader for FileClient {
type Error = FileClientError;
/// Initialize the [`FileClient`] from bytes that have been read from file.
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
where
B: AsyncReadExt + Unpin,
{
let mut headers = HashMap::new();
let mut hash_to_number = HashMap::new();
let mut bodies = HashMap::new();
// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);
trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
capacity=stream.read_buffer().capacity(),
"init decode stream"
);
let mut remaining_bytes = vec![];
let mut log_interval = 0;
let mut log_interval_start_block = 0;
async move {
while let Some(block_res) = stream.next().await {
let block = match block_res {
Ok(block) => block,
Err(FileClientError::Rlp(err, bytes)) => {
trace!(target: "downloaders::file",
%err,
bytes_len=bytes.len(),
"partial block returned from decoding chunk"
);
remaining_bytes = bytes;
break
}
Err(err) => return Err(err),
};
let block_number = block.header.number;
let block_hash = block.header.hash_slow();
// add to the internal maps
headers.insert(block.header.number, block.header.clone());
hash_to_number.insert(block_hash, block.header.number);
bodies.insert(
block_hash,
BlockBody {
transactions: block.body,
ommers: block.ommers,
withdrawals: block.withdrawals,
},
);
if log_interval == 0 {
trace!(target: "downloaders::file",
block_number,
"read first block"
);
log_interval_start_block = block_number;
} else if log_interval % 100_000 == 0 {
trace!(target: "downloaders::file",
blocks=?log_interval_start_block..=block_number,
"read blocks from file"
);
log_interval_start_block = block_number + 1;
}
log_interval += 1;
}
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
Ok((Self { headers, hash_to_number, bodies }, remaining_bytes))
}
}
}
impl HeadersClient for FileClient { impl HeadersClient for FileClient {
type Output = HeadersFut; type Output = HeadersFut;
@ -341,6 +358,11 @@ pub struct ChunkedFileReader {
} }
impl ChunkedFileReader { impl ChunkedFileReader {
/// Returns the remaining file length.
pub fn file_len(&self) -> u64 {
self.file_byte_len
}
/// Opens the file to import from given path. Returns a new instance. If no chunk byte length /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
/// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file). /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
pub async fn new<P: AsRef<Path>>( pub async fn new<P: AsRef<Path>>(
@ -377,7 +399,10 @@ impl ChunkedFileReader {
} }
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk(&mut self) -> Result<Option<FileClient>, FileClientError> { pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
if self.file_byte_len == 0 && self.chunk.is_empty() { if self.file_byte_len == 0 && self.chunk.is_empty() {
// eof // eof
return Ok(None) return Ok(None)
@ -391,6 +416,7 @@ impl ChunkedFileReader {
// read new bytes from file // read new bytes from file
let mut reader = BytesMut::zeroed(new_read_bytes_target_len as usize); let mut reader = BytesMut::zeroed(new_read_bytes_target_len as usize);
// actual bytes that have been read // actual bytes that have been read
let new_read_bytes_len = self.file.read_exact(&mut reader).await? as u64; let new_read_bytes_len = self.file.read_exact(&mut reader).await? as u64;
@ -416,14 +442,7 @@ impl ChunkedFileReader {
// make new file client from chunk // make new file client from chunk
let (file_client, bytes) = let (file_client, bytes) =
FileClient::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?; T::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?;
debug!(target: "downloaders::file",
headers_len=file_client.headers.len(),
bodies_len=file_client.bodies.len(),
remaining_bytes_len=bytes.len(),
"parsed blocks that were read from file"
);
// save left over bytes // save left over bytes
self.chunk = bytes; self.chunk = bytes;
@ -432,6 +451,20 @@ impl ChunkedFileReader {
} }
} }
/// Constructs a file client from a reader.
pub trait FromReader {
/// Error returned by file client type.
type Error: From<io::Error>;
/// Returns a file client
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -594,7 +627,7 @@ mod tests {
// test // test
while let Some(client) = reader.next_chunk().await.unwrap() { while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
let sync_target = client.tip_header().unwrap(); let sync_target = client.tip_header().unwrap();
let sync_target_hash = sync_target.hash(); let sync_target_hash = sync_target.hash();

View File

@ -0,0 +1,344 @@
//! Codec for reading raw receipts from a file.
use alloy_rlp::{Decodable, RlpDecodable};
use reth_primitives::{
bytes::{Buf, BytesMut},
Address, Bloom, Bytes, Log, Receipt, TxType, B256,
};
use tokio_util::codec::Decoder;
use crate::{file_client::FileClientError, receipt_file_client::ReceiptWithBlockNumber};
/// Codec for reading raw receipts from a file.
///
/// If using with [`FramedRead`](tokio_util::codec::FramedRead), the user should make sure the
/// framed reader has capacity for the entire receipts file. Otherwise, the decoder will return
/// [`InputTooShort`](alloy_rlp::Error::InputTooShort), because RLP receipts can only be
/// decoded if the internal buffer is large enough to contain the entire receipt.
///
/// Without ensuring the framed reader has capacity for the entire file, a receipt is likely to
/// fall across two read buffers, the decoder will not be able to decode the receipt, which will
/// cause it to fail.
///
/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set
/// the capacity of the framed reader to the size of the file.
#[derive(Debug)]
pub struct HackReceiptFileCodec;
impl Decoder for HackReceiptFileCodec {
type Item = Option<ReceiptWithBlockNumber>;
type Error = FileClientError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None)
}
let buf_slice = &mut src.as_ref();
let receipt = HackReceiptContainer::decode(buf_slice)
.map_err(|err| Self::Error::Rlp(err, src.to_vec()))?
.0;
src.advance(src.len() - buf_slice.len());
Ok(Some(
receipt.map(|receipt| receipt.try_into().map_err(FileClientError::from)).transpose()?,
))
}
}
/// See <https://github.com/testinprod-io/op-geth/pull/1>
#[derive(Debug, PartialEq, Eq, RlpDecodable)]
pub struct HackReceipt {
tx_type: u8,
post_state: Bytes,
status: u64,
cumulative_gas_used: u64,
bloom: Bloom,
/// <https://github.com/testinprod-io/op-geth/blob/29062eb0fac595eeeddd3a182a25326405c66e05/core/types/log.go#L67-L72>
logs: Vec<Log>,
tx_hash: B256,
contract_address: Address,
gas_used: u64,
block_hash: B256,
block_number: u64,
transaction_index: u32,
l1_gas_price: u64,
l1_gas_used: u64,
l1_fee: u64,
fee_scalar: String,
}
#[derive(Debug, PartialEq, Eq, RlpDecodable)]
#[rlp(trailing)]
struct HackReceiptContainer(Option<HackReceipt>);
impl TryFrom<HackReceipt> for ReceiptWithBlockNumber {
type Error = &'static str;
fn try_from(exported_receipt: HackReceipt) -> Result<Self, Self::Error> {
let HackReceipt {
tx_type, status, cumulative_gas_used, logs, block_number: number, ..
} = exported_receipt;
#[allow(clippy::needless_update)]
let receipt = Receipt {
tx_type: TxType::try_from(tx_type.to_be_bytes()[0])?,
success: status != 0,
cumulative_gas_used,
logs,
..Default::default()
};
Ok(Self { receipt, number })
}
}
#[cfg(test)]
pub(super) mod test {
use reth_primitives::{alloy_primitives::LogData, hex};
use super::*;
pub(crate) const HACK_RECEIPT_ENCODED_BLOCK_1: &[u8] = &hex!("f9030ff9030c8080018303183db9010000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000400000000000100000000000000200000000002000000000000001000000000000000000004000000000000000000000000000040000400000100400000000000000100000000000000000000000000000020000000000000000000000000000000000000000000000001000000000000000000000100000000000000000000000000000000000000000000000000000000000000088000000080000000000010000000000000000000000000000800008000120000000000000000000000000000000002000f90197f89b948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff863a00109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac60271a00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2da000000000000000000000000000000000000000000000000000000000618d8837f89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ca000000000000000000000000000000000000000000000000000000000d0e3ebf0a00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d80f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007edc6ca0bb6834800080a05e77a04531c7c107af1882d76cbff9486d0a9aa53701c30888509d4f5f2b003a9400000000000000000000000000000000000000008303183da0bee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e8754530180018212c2821c2383312e35");
pub(crate) const HACK_RECEIPT_ENCODED_BLOCK_2: &[u8] = &hex!("f90271f9026e8080018301c60db9010000080000000200000000000000000008000000000000000000000100008000000000000000000000000000000000000000000000000000000000400000000000100000000000000000000000020000000000000000000000000000000000004000000000000000000000000000000000400000000400000000000000100000000000000000000000000000020000000000000000000000000000000000000000100000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000008400000000000000000010000000000000000020000000020000000000000000000000000000000000000000000002000f8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ca000000000000000000000000000000000000000000000000000000000d0ea0e40a00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b24080f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007eda7867e0c7d4800080a0af6ed8a6864d44989adc47c84f6fe0aeb1819817505c42cde6cbbcd5e14dd3179400000000000000000000000000000000000000008301c60da045fd6ce41bb8ebb2bccdaa92dd1619e287704cb07722039901a7eba63dea1d130280018212c2821c2383312e35");
pub(crate) const HACK_RECEIPT_ENCODED_BLOCK_3: &[u8] = &hex!("f90271f9026e8080018301c60db9010000000000000000000000000000000000000000400000000000000000008000000000000000000000000000000000004000000000000000000000400004000000100000000000000000000000000000000000000000000000000000000000004000000000000000000000040000000000400080000400000000000000100000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000000008100000000000000000000000000000000000004000000000000000000000000008000000000000000000010000000000000000000000000000400000000000000001000000000000000000000000002000f8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ca000000000000000000000000000000000000000000000000000000000d101e54ba00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a9980f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007ed8842f062774800080a08fab01dcec1da547e90a77597999e9153ff788fa6451d1cc942064427bd995019400000000000000000000000000000000000000008301c60da0da4509fe0ca03202ddbe4f68692c132d689ee098433691040ece18c3a45d44c50380018212c2821c2383312e35");
fn hack_receipt_1() -> HackReceipt {
let receipt = receipt_block_1();
HackReceipt {
tx_type: receipt.receipt.tx_type as u8,
post_state: Bytes::default(),
status: receipt.receipt.success as u64,
cumulative_gas_used: receipt.receipt.cumulative_gas_used,
bloom: Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000400000000000100000000000000200000000002000000000000001000000000000000000004000000000000000000000000000040000400000100400000000000000100000000000000000000000000000020000000000000000000000000000000000000000000000001000000000000000000000100000000000000000000000000000000000000000000000000000000000000088000000080000000000010000000000000000000000000000800008000120000000000000000000000000000000002000")),
logs: receipt.receipt.logs,
tx_hash: B256::from(hex!("5e77a04531c7c107af1882d76cbff9486d0a9aa53701c30888509d4f5f2b003a")), contract_address: Address::from(hex!("0000000000000000000000000000000000000000")), gas_used: 202813,
block_hash: B256::from(hex!("bee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453")),
block_number: receipt.number,
transaction_index: 0,
l1_gas_price: 1,
l1_gas_used: 4802,
l1_fee: 7203,
fee_scalar: String::from("1.5")
}
}
pub(crate) fn receipt_block_1() -> ReceiptWithBlockNumber {
let log_1 = Log {
address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")),
data: LogData::new(
vec![
B256::from(hex!(
"0109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac60271"
)),
B256::from(hex!(
"0000000000000000000000000000000000000000000000000000000000014218"
)),
B256::from(hex!(
"00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d"
)),
],
Bytes::from(hex!(
"00000000000000000000000000000000000000000000000000000000618d8837"
)),
)
.unwrap(),
};
let log_2 = Log {
address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")),
data: LogData::new(
vec![
B256::from(hex!(
"92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68c"
)),
B256::from(hex!(
"00000000000000000000000000000000000000000000000000000000d0e3ebf0"
)),
B256::from(hex!(
"0000000000000000000000000000000000000000000000000000000000014218"
)),
B256::from(hex!(
"00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d"
)),
],
Bytes::default(),
)
.unwrap(),
};
let log_3 = Log {
address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")),
data: LogData::new(
vec![
B256::from(hex!(
"fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f"
)),
B256::from(hex!(
"00000000000000000000000000000000000000000000007edc6ca0bb68348000"
)),
],
Bytes::default(),
)
.unwrap(),
};
let mut receipt = Receipt {
tx_type: TxType::Legacy,
success: true,
cumulative_gas_used: 202813,
..Default::default()
};
// #[allow(clippy::needless_update)] not recognised, ..Default::default() needed so optimism
// feature must not be brought into scope
receipt.logs = vec![log_1, log_2, log_3];
ReceiptWithBlockNumber { receipt, number: 1 }
}
pub(crate) fn receipt_block_2() -> ReceiptWithBlockNumber {
let log_1 = Log {
address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")),
data: LogData::new(
vec![
B256::from(hex!(
"92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68c"
)),
B256::from(hex!(
"00000000000000000000000000000000000000000000000000000000d0ea0e40"
)),
B256::from(hex!(
"0000000000000000000000000000000000000000000000000000000000014218"
)),
B256::from(hex!(
"000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b240"
)),
],
Bytes::default(),
)
.unwrap(),
};
let log_2 = Log {
address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")),
data: LogData::new(
vec![
B256::from(hex!(
"fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f"
)),
B256::from(hex!(
"00000000000000000000000000000000000000000000007eda7867e0c7d48000"
)),
],
Bytes::default(),
)
.unwrap(),
};
let mut receipt = Receipt {
tx_type: TxType::Legacy,
success: true,
cumulative_gas_used: 116237,
..Default::default()
};
// #[allow(clippy::needless_update)] not recognised, ..Default::default() needed so optimism
// feature must not be brought into scope
receipt.logs = vec![log_1, log_2];
ReceiptWithBlockNumber { receipt, number: 2 }
}
pub(crate) fn receipt_block_3() -> ReceiptWithBlockNumber {
let log_1 = Log {
address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")),
data: LogData::new(
vec![
B256::from(hex!(
"92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68c"
)),
B256::from(hex!(
"00000000000000000000000000000000000000000000000000000000d101e54b"
)),
B256::from(hex!(
"0000000000000000000000000000000000000000000000000000000000014218"
)),
B256::from(hex!(
"000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a99"
)),
],
Bytes::default(),
)
.unwrap(),
};
let log_2 = Log {
address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")),
data: LogData::new(
vec![
B256::from(hex!(
"fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f"
)),
B256::from(hex!(
"00000000000000000000000000000000000000000000007ed8842f0627748000"
)),
],
Bytes::default(),
)
.unwrap(),
};
let mut receipt = Receipt {
tx_type: TxType::Legacy,
success: true,
cumulative_gas_used: 116237,
..Default::default()
};
// #[allow(clippy::needless_update)] not recognised, ..Default::default() needed so optimism
// feature must not be brought into scope
receipt.logs = vec![log_1, log_2];
ReceiptWithBlockNumber { receipt, number: 3 }
}
#[test]
fn decode_hack_receipt() {
let receipt = hack_receipt_1();
let decoded = HackReceiptContainer::decode(&mut &HACK_RECEIPT_ENCODED_BLOCK_1[..])
.unwrap()
.0
.unwrap();
assert_eq!(receipt, decoded);
}
#[test]
#[allow(clippy::needless_update)]
fn receipts_codec() {
// rig
let mut receipt_1_to_3 = HACK_RECEIPT_ENCODED_BLOCK_1.to_vec();
receipt_1_to_3.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_2);
receipt_1_to_3.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_3);
let encoded = &mut BytesMut::from(&receipt_1_to_3[..]);
let mut codec = HackReceiptFileCodec;
// test
let first_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
assert_eq!(receipt_block_1(), first_decoded_receipt);
let second_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
assert_eq!(receipt_block_2(), second_decoded_receipt);
let third_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
assert_eq!(receipt_block_3(), third_decoded_receipt);
}
}

View File

@ -27,10 +27,29 @@ pub mod metrics;
/// efficiently buffering headers and bodies for retrieval. /// efficiently buffering headers and bodies for retrieval.
pub mod file_client; pub mod file_client;
/// Module managing file-based data retrieval and buffering of receipts.
///
/// Contains [ReceiptFileClient](receipt_file_client::ReceiptFileClient) to read receipt data from
/// files, efficiently buffering receipts for retrieval.
///
/// Currently configured to use codec [`HackReceipt`](file_codec_ovm_receipt::HackReceipt) based on
/// export of below Bedrock data using <https://github.com/testinprod-io/op-geth/pull/1>. Codec can
/// be replaced with regular encoding of receipts for export.
///
/// NOTE: receipts can be exported using regular op-geth encoding for `Receipt` type, to fit
/// reth's needs for importing. However, this would require patching the diff in <https://github.com/testinprod-io/op-geth/pull/1> to export the `Receipt` and not `HackReceipt` type (originally
/// made for op-erigon's import needs).
pub mod receipt_file_client;
/// Module with a codec for reading and encoding block bodies in files. /// Module with a codec for reading and encoding block bodies in files.
/// ///
/// Enables decoding and encoding `Block` types within file contexts. /// Enables decoding and encoding `Block` types within file contexts.
pub mod file_codec; pub mod file_codec;
/// Module with a codec for reading and encoding receipts in files.
///
/// Enables decoding and encoding `HackReceipt` type. See <https://github.com/testinprod-io/op-geth/pull/1>.
pub mod file_codec_ovm_receipt;
#[cfg(any(test, feature = "test-utils"))] #[cfg(any(test, feature = "test-utils"))]
pub mod test_utils; pub mod test_utils;

View File

@ -0,0 +1,268 @@
use futures::Future;
use reth_primitives::{Receipt, Receipts};
use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::trace;
use crate::{
file_client::{FileClientError, FromReader},
file_codec_ovm_receipt::HackReceiptFileCodec,
};
/// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential
/// order w.r.t. block number.
#[derive(Debug)]
pub struct ReceiptFileClient {
/// The buffered receipts, read from file, as nested lists. One list per block number.
pub receipts: Receipts,
/// First (lowest) block number read from file.
pub first_block: u64,
/// Total number of receipts. Count of elements in [`Receipts`] flattened.
pub total_receipts: usize,
}
impl FromReader for ReceiptFileClient {
type Error = FileClientError;
/// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If
/// first block has no transactions, it's assumed to be the genesis block.
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
where
B: AsyncReadExt + Unpin,
{
let mut receipts = Receipts::new();
// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream =
FramedRead::with_capacity(reader, HackReceiptFileCodec, num_bytes as usize);
trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
capacity=stream.read_buffer().capacity(),
coded=?HackReceiptFileCodec,
"init decode stream"
);
let mut remaining_bytes = vec![];
let mut log_interval = 0;
let mut log_interval_start_block = 0;
let mut block_number = 0;
let mut total_receipts = 0;
let mut receipts_for_block = vec![];
let mut first_block = None;
async move {
while let Some(receipt_res) = stream.next().await {
let receipt = match receipt_res {
Ok(receipt) => receipt,
Err(FileClientError::Rlp(err, bytes)) => {
trace!(target: "downloaders::file",
%err,
bytes_len=bytes.len(),
"partial receipt returned from decoding chunk"
);
remaining_bytes = bytes;
break
}
Err(err) => return Err(err),
};
total_receipts += 1;
match receipt {
Some(ReceiptWithBlockNumber { receipt, number }) => {
if first_block.is_none() {
first_block = Some(number);
block_number = number;
}
if block_number == number {
receipts_for_block.push(Some(receipt));
} else {
receipts.push(receipts_for_block);
// next block
block_number = number;
receipts_for_block = vec![Some(receipt)];
}
}
None => {
match first_block {
Some(num) => {
// if there was a block number before this, push receipts for that
// block
receipts.push(receipts_for_block);
// block with no txns
block_number = num + receipts.len() as u64;
}
None => {
// this is the first block and it's empty, assume it's the genesis
// block
first_block = Some(0);
block_number = 0;
}
}
receipts_for_block = vec![];
}
}
if log_interval == 0 {
trace!(target: "downloaders::file",
block_number,
total_receipts,
"read first receipt"
);
log_interval_start_block = block_number;
} else if log_interval % 100_000 == 0 {
trace!(target: "downloaders::file",
blocks=?log_interval_start_block..=block_number,
total_receipts,
"read receipts from file"
);
log_interval_start_block = block_number + 1;
}
log_interval += 1;
}
trace!(target: "downloaders::file",
blocks=?log_interval_start_block..=block_number,
total_receipts,
"read receipts from file"
);
// we need to push the last receipts
receipts.push(receipts_for_block);
trace!(target: "downloaders::file",
blocks = receipts.len(),
total_receipts,
"Initialized receipt file client"
);
Ok((
Self { receipts, first_block: first_block.unwrap_or_default(), total_receipts },
remaining_bytes,
))
}
}
}
/// [`Receipt`] with block number.
#[derive(Debug, PartialEq, Eq)]
pub struct ReceiptWithBlockNumber {
/// Receipt.
pub receipt: Receipt,
/// Block number.
pub number: u64,
}
#[cfg(test)]
mod test {
use reth_primitives::hex;
use reth_tracing::init_test_tracing;
use crate::file_codec_ovm_receipt::test::{
receipt_block_1 as op_mainnet_receipt_block_1,
receipt_block_2 as op_mainnet_receipt_block_2,
receipt_block_3 as op_mainnet_receipt_block_3,
HACK_RECEIPT_ENCODED_BLOCK_1 as HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET,
HACK_RECEIPT_ENCODED_BLOCK_2 as HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET,
HACK_RECEIPT_ENCODED_BLOCK_3 as HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET,
};
use super::*;
/// No receipts for genesis block
const HACK_RECEIPT_BLOCK_NO_TRANSACTIONS: &[u8] = &hex!("c0");
#[tokio::test]
async fn receipt_file_client_ovm_codec() {
init_test_tracing();
// genesis block has no hack receipts
let mut encoded_receipts = HACK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
// one receipt each for block 1 and 2
encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET);
encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET);
// no receipt for block 4
encoded_receipts.extend_from_slice(HACK_RECEIPT_BLOCK_NO_TRANSACTIONS);
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];
let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) =
ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap();
assert_eq!(4, total_receipts);
assert_eq!(0, first_block);
assert!(receipts[0].is_empty());
assert_eq!(op_mainnet_receipt_block_1().receipt, receipts[1][0].clone().unwrap());
assert_eq!(op_mainnet_receipt_block_2().receipt, receipts[2][0].clone().unwrap());
assert!(receipts[3].is_empty());
}
#[tokio::test]
async fn no_receipts_middle_block() {
init_test_tracing();
// genesis block has no hack receipts
let mut encoded_receipts = HACK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
// one receipt each for block 1
encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET);
// no receipt for block 2
encoded_receipts.extend_from_slice(HACK_RECEIPT_BLOCK_NO_TRANSACTIONS);
// one receipt for block 3
encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET);
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];
let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) =
ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap();
assert_eq!(4, total_receipts);
assert_eq!(0, first_block);
assert!(receipts[0].is_empty());
assert_eq!(op_mainnet_receipt_block_1().receipt, receipts[1][0].clone().unwrap());
assert!(receipts[2].is_empty());
assert_eq!(op_mainnet_receipt_block_3().receipt, receipts[3][0].clone().unwrap());
}
#[tokio::test]
async fn two_receipts_same_block() {
init_test_tracing();
// genesis block has no hack receipts
let mut encoded_receipts = HACK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
// one receipt each for block 1
encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET);
// two receipts for block 2
encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET);
encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET);
// one receipt for block 3
encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET);
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];
let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) =
ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap();
assert_eq!(5, total_receipts);
assert_eq!(0, first_block);
assert!(receipts[0].is_empty());
assert_eq!(op_mainnet_receipt_block_1().receipt, receipts[1][0].clone().unwrap());
assert_eq!(op_mainnet_receipt_block_2().receipt, receipts[2][0].clone().unwrap());
assert_eq!(op_mainnet_receipt_block_2().receipt, receipts[2][1].clone().unwrap());
assert_eq!(op_mainnet_receipt_block_3().receipt, receipts[3][0].clone().unwrap());
}
}

View File

@ -66,8 +66,6 @@ revm-primitives = { workspace = true, features = ["arbitrary"] }
nybbles = { workspace = true, features = ["arbitrary"] } nybbles = { workspace = true, features = ["arbitrary"] }
alloy-trie = { workspace = true, features = ["arbitrary"] } alloy-trie = { workspace = true, features = ["arbitrary"] }
alloy-eips = { workspace = true, features = ["arbitrary"] } alloy-eips = { workspace = true, features = ["arbitrary"] }
arbitrary = { workspace = true, features = ["derive"] }
assert_matches.workspace = true assert_matches.workspace = true
proptest.workspace = true proptest.workspace = true
proptest-derive.workspace = true proptest-derive.workspace = true
@ -109,7 +107,6 @@ zstd-codec = ["dep:zstd"]
clap = ["dep:clap"] clap = ["dep:clap"]
optimism = [ optimism = [
"reth-codecs/optimism", "reth-codecs/optimism",
"revm-primitives/optimism",
"reth-ethereum-forks/optimism", "reth-ethereum-forks/optimism",
"revm/optimism", "revm/optimism",
] ]

View File

@ -21,6 +21,7 @@ pub mod providers;
pub use providers::{ pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory, HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
StaticFileWriter,
}; };
#[cfg(any(test, feature = "test-utils"))] #[cfg(any(test, feature = "test-utils"))]