From 0ad9c7866b6eef75234a353c1dabcce5fb7cc8a9 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 7 May 2024 22:26:58 +0200 Subject: [PATCH] feat(op): import receipts (#7914) Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> --- bin/reth/src/cli/mod.rs | 8 +- bin/reth/src/commands/import.rs | 2 +- bin/reth/src/commands/import_op.rs | 6 +- bin/reth/src/commands/import_receipts.rs | 165 +++++++++ bin/reth/src/commands/mod.rs | 1 + crates/net/downloaders/src/file_client.rs | 207 ++++++----- .../downloaders/src/file_codec_ovm_receipt.rs | 344 ++++++++++++++++++ crates/net/downloaders/src/lib.rs | 19 + .../downloaders/src/receipt_file_client.rs | 268 ++++++++++++++ crates/primitives/Cargo.toml | 3 - crates/storage/provider/src/lib.rs | 1 + 11 files changed, 930 insertions(+), 94 deletions(-) create mode 100644 bin/reth/src/commands/import_receipts.rs create mode 100644 crates/net/downloaders/src/file_codec_ovm_receipt.rs create mode 100644 crates/net/downloaders/src/receipt_file_client.rs diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index 40e1f24be..deece5b62 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -8,7 +8,7 @@ use crate::{ LogArgs, }, commands::{ - config_cmd, db, debug_cmd, dump_genesis, import, init_cmd, init_state, + config_cmd, db, debug_cmd, dump_genesis, import, import_receipts, init_cmd, init_state, node::{self, NoArgs}, p2p, recover, stage, test_vectors, }, @@ -150,6 +150,9 @@ impl Cli { Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()), + Commands::ImportReceipts(command) => { + runner.run_blocking_until_ctrl_c(command.execute()) + } #[cfg(feature = "optimism")] Commands::ImportOp(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()), @@ -188,6 +191,9 @@ pub enum Commands { /// This syncs RLP encoded blocks from a file. #[command(name = "import")] Import(import::ImportCommand), + /// This imports RLP encoded receipts from a file. + #[command(name = "import-receipts")] + ImportReceipts(import_receipts::ImportReceiptsCommand), /// This syncs RLP encoded OP blocks below Bedrock from a file, without executing. #[cfg(feature = "optimism")] #[command(name = "import-op")] diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index f73cf3c17..354787f32 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -138,7 +138,7 @@ impl ImportCommand { let mut total_decoded_blocks = 0; let mut total_decoded_txns = 0; - while let Some(file_client) = reader.next_chunk().await? { + while let Some(file_client) = reader.next_chunk::().await? { // create a new FileClient from chunk read from file info!(target: "reth::cli", "Importing chain file chunk" diff --git a/bin/reth/src/commands/import_op.rs b/bin/reth/src/commands/import_op.rs index 3147f9b10..8ca1baf5b 100644 --- a/bin/reth/src/commands/import_op.rs +++ b/bin/reth/src/commands/import_op.rs @@ -14,7 +14,9 @@ use reth_beacon_consensus::EthBeaconConsensus; use reth_config::{config::EtlConfig, Config}; use reth_db::{init_db, tables, transaction::DbTx}; -use reth_downloaders::file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}; +use reth_downloaders::file_client::{ + ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE, +}; use reth_node_core::init::init_genesis; @@ -117,7 +119,7 @@ impl ImportOpCommand { let mut total_decoded_txns = 0; let mut total_filtered_out_dup_txns = 0; - while let Some(mut file_client) = reader.next_chunk().await? { + while let Some(mut file_client) = reader.next_chunk::().await? { // create a new FileClient from chunk read from file info!(target: "reth::cli", "Importing chain file chunk" diff --git a/bin/reth/src/commands/import_receipts.rs b/bin/reth/src/commands/import_receipts.rs new file mode 100644 index 000000000..8e06c3c03 --- /dev/null +++ b/bin/reth/src/commands/import_receipts.rs @@ -0,0 +1,165 @@ +//! Command that imports receipts from a file. + +use crate::{ + args::{ + utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS}, + DatabaseArgs, + }, + dirs::{DataDirPath, MaybePlatformPath}, +}; +use clap::Parser; +use reth_db::{database::Database, init_db, transaction::DbTx, DatabaseEnv}; +use reth_downloaders::{ + file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, + receipt_file_client::ReceiptFileClient, +}; +use reth_node_core::version::SHORT_VERSION; +use reth_primitives::{stage::StageId, ChainSpec, StaticFileSegment}; +use reth_provider::{ + BundleStateWithReceipts, OriginalValuesKnown, ProviderFactory, StageCheckpointReader, + StaticFileProviderFactory, StaticFileWriter, +}; +use tracing::{debug, error, info}; + +use std::{path::PathBuf, sync::Arc}; + +/// Initializes the database with the genesis block. +#[derive(Debug, Parser)] +pub struct ImportReceiptsCommand { + /// The path to the data dir for all reth files and subdirectories. + /// + /// Defaults to the OS-specific data directory: + /// + /// - Linux: `$XDG_DATA_HOME/reth/` or `$HOME/.local/share/reth/` + /// - Windows: `{FOLDERID_RoamingAppData}/reth/` + /// - macOS: `$HOME/Library/Application Support/reth/` + #[arg(long, value_name = "DATA_DIR", verbatim_doc_comment, default_value_t)] + datadir: MaybePlatformPath, + + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + #[arg( + long, + value_name = "CHAIN_OR_PATH", + long_help = chain_help(), + default_value = SUPPORTED_CHAINS[0], + value_parser = genesis_value_parser + )] + chain: Arc, + + /// Chunk byte length. + #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)] + chunk_len: Option, + + #[command(flatten)] + db: DatabaseArgs, + + /// The path to a receipts file for import. + #[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)] + path: PathBuf, +} + +impl ImportReceiptsCommand { + /// Execute `import` command + pub async fn execute(self) -> eyre::Result<()> { + info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); + + debug!(target: "reth::cli", + chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE), + "Chunking receipts import" + ); + + // add network name to data dir + let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); + + let db_path = data_dir.db(); + info!(target: "reth::cli", path = ?db_path, "Opening database"); + + let db = Arc::new(init_db(db_path, self.db.database_args())?); + info!(target: "reth::cli", "Database opened"); + let provider_factory = + ProviderFactory::new(db.clone(), self.chain.clone(), data_dir.static_files())?; + + let provider = provider_factory.provider_rw()?; + let static_file_provider = provider_factory.static_file_provider(); + + for stage in StageId::ALL { + let checkpoint = provider.get_stage_checkpoint(stage)?; + debug!(target: "reth::cli", + ?stage, + ?checkpoint, + "Read stage checkpoints from db" + ); + } + + // prepare the tx for `write_to_storage` + let tx = provider.into_tx(); + let mut total_decoded_receipts = 0; + + // open file + let mut reader = ChunkedFileReader::new(&self.path, self.chunk_len).await?; + + while let Some(file_client) = reader.next_chunk::().await? { + // create a new file client from chunk read from file + let ReceiptFileClient { receipts, first_block, total_receipts: total_receipts_chunk } = + file_client; + + // mark these as decoded + total_decoded_receipts += total_receipts_chunk; + + info!(target: "reth::cli", + first_receipts_block=?first_block, + total_receipts_chunk, + "Importing receipt file chunk" + ); + + // We're reusing receipt writing code internal to + // `BundleStateWithReceipts::write_to_storage`, so we just use a default empty + // `BundleState`. + let bundled_state = + BundleStateWithReceipts::new(Default::default(), receipts, first_block); + + let static_file_producer = + static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)?; + + // finally, write the receipts + bundled_state.write_to_storage::<::TXMut>( + &tx, + Some(static_file_producer), + OriginalValuesKnown::Yes, + )?; + } + + tx.commit()?; + // as static files works in file ranges, internally it will be committing when creating the + // next file range already, so we only need to call explicitly at the end. + static_file_provider.commit()?; + + if total_decoded_receipts == 0 { + error!(target: "reth::cli", "No receipts were imported, ensure the receipt file is valid and not empty"); + return Ok(()) + } + + // compare the highest static file block to the number of receipts we decoded + // + // `HeaderNumbers` and `TransactionHashNumbers` tables serve as additional indexes, but + // nothing like this needs to exist for Receipts. So `tx.entries::` would + // return zero here. + let total_imported_receipts = static_file_provider + .get_highest_static_file_block(StaticFileSegment::Receipts) + .expect("static files must exist after ensuring we decoded more than zero"); + + if total_imported_receipts != total_decoded_receipts as u64 { + error!(target: "reth::cli", + total_decoded_receipts, + total_imported_receipts, + "Receipts were partially imported" + ); + } + + info!(target: "reth::cli", total_imported_receipts, "Receipt file imported"); + + Ok(()) + } +} diff --git a/bin/reth/src/commands/mod.rs b/bin/reth/src/commands/mod.rs index a005d5e8b..9e6ff8f84 100644 --- a/bin/reth/src/commands/mod.rs +++ b/bin/reth/src/commands/mod.rs @@ -6,6 +6,7 @@ pub mod debug_cmd; pub mod dump_genesis; pub mod import; pub mod import_op; +pub mod import_receipts; pub mod init_cmd; pub mod init_state; diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index ef72a891b..85fac4642 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -1,4 +1,5 @@ use super::file_codec::BlockFileCodec; +use futures::Future; use itertools::Either; use reth_interfaces::p2p::{ bodies::client::{BodiesClient, BodiesFut}, @@ -12,7 +13,7 @@ use reth_primitives::{ BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, BytesMut, Header, HeadersDirection, SealedHeader, B256, }; -use std::{collections::HashMap, path::Path}; +use std::{collections::HashMap, io, path::Path}; use thiserror::Error; use tokio::{fs::File, io::AsyncReadExt}; use tokio_stream::StreamExt; @@ -57,6 +58,16 @@ pub enum FileClientError { /// An error occurred when decoding blocks, headers, or rlp headers from the file. #[error("{0}")] Rlp(alloy_rlp::Error, Vec), + + /// Custom error message. + #[error("{0}")] + Custom(&'static str), +} + +impl From<&'static str> for FileClientError { + fn from(value: &'static str) -> Self { + Self::Custom(value) + } } impl FileClient { @@ -78,82 +89,6 @@ impl FileClient { Ok(Self::from_reader(&reader[..], file_len).await?.0) } - /// Initialize the [`FileClient`] from bytes that have been read from file. - pub(crate) async fn from_reader( - reader: B, - num_bytes: u64, - ) -> Result<(Self, Vec), FileClientError> - where - B: AsyncReadExt + Unpin, - { - let mut headers = HashMap::new(); - let mut hash_to_number = HashMap::new(); - let mut bodies = HashMap::new(); - - // use with_capacity to make sure the internal buffer contains the entire chunk - let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize); - - trace!(target: "downloaders::file", - target_num_bytes=num_bytes, - capacity=stream.read_buffer().capacity(), - "init decode stream" - ); - - let mut remaining_bytes = vec![]; - - let mut log_interval = 0; - let mut log_interval_start_block = 0; - - while let Some(block_res) = stream.next().await { - let block = match block_res { - Ok(block) => block, - Err(FileClientError::Rlp(err, bytes)) => { - trace!(target: "downloaders::file", - %err, - bytes_len=bytes.len(), - "partial block returned from decoding chunk" - ); - remaining_bytes = bytes; - break - } - Err(err) => return Err(err), - }; - let block_number = block.header.number; - let block_hash = block.header.hash_slow(); - - // add to the internal maps - headers.insert(block.header.number, block.header.clone()); - hash_to_number.insert(block_hash, block.header.number); - bodies.insert( - block_hash, - BlockBody { - transactions: block.body, - ommers: block.ommers, - withdrawals: block.withdrawals, - }, - ); - - if log_interval == 0 { - trace!(target: "downloaders::file", - block_number, - "read first block" - ); - log_interval_start_block = block_number; - } else if log_interval % 100_000 == 0 { - trace!(target: "downloaders::file", - blocks=?log_interval_start_block..=block_number, - "read blocks from file" - ); - log_interval_start_block = block_number + 1; - } - log_interval += 1; - } - - trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client"); - - Ok((Self { headers, hash_to_number, bodies }, remaining_bytes)) - } - /// Get the tip hash of the chain. pub fn tip(&self) -> Option { self.headers.get(&self.max_block()?).map(|h| h.hash_slow()) @@ -241,6 +176,88 @@ impl FileClient { } } +impl FromReader for FileClient { + type Error = FileClientError; + + /// Initialize the [`FileClient`] from bytes that have been read from file. + fn from_reader( + reader: B, + num_bytes: u64, + ) -> impl Future), Self::Error>> + where + B: AsyncReadExt + Unpin, + { + let mut headers = HashMap::new(); + let mut hash_to_number = HashMap::new(); + let mut bodies = HashMap::new(); + + // use with_capacity to make sure the internal buffer contains the entire chunk + let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize); + + trace!(target: "downloaders::file", + target_num_bytes=num_bytes, + capacity=stream.read_buffer().capacity(), + "init decode stream" + ); + + let mut remaining_bytes = vec![]; + + let mut log_interval = 0; + let mut log_interval_start_block = 0; + + async move { + while let Some(block_res) = stream.next().await { + let block = match block_res { + Ok(block) => block, + Err(FileClientError::Rlp(err, bytes)) => { + trace!(target: "downloaders::file", + %err, + bytes_len=bytes.len(), + "partial block returned from decoding chunk" + ); + remaining_bytes = bytes; + break + } + Err(err) => return Err(err), + }; + let block_number = block.header.number; + let block_hash = block.header.hash_slow(); + + // add to the internal maps + headers.insert(block.header.number, block.header.clone()); + hash_to_number.insert(block_hash, block.header.number); + bodies.insert( + block_hash, + BlockBody { + transactions: block.body, + ommers: block.ommers, + withdrawals: block.withdrawals, + }, + ); + + if log_interval == 0 { + trace!(target: "downloaders::file", + block_number, + "read first block" + ); + log_interval_start_block = block_number; + } else if log_interval % 100_000 == 0 { + trace!(target: "downloaders::file", + blocks=?log_interval_start_block..=block_number, + "read blocks from file" + ); + log_interval_start_block = block_number + 1; + } + log_interval += 1; + } + + trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client"); + + Ok((Self { headers, hash_to_number, bodies }, remaining_bytes)) + } + } +} + impl HeadersClient for FileClient { type Output = HeadersFut; @@ -341,6 +358,11 @@ pub struct ChunkedFileReader { } impl ChunkedFileReader { + /// Returns the remaining file length. + pub fn file_len(&self) -> u64 { + self.file_byte_len + } + /// Opens the file to import from given path. Returns a new instance. If no chunk byte length /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file). pub async fn new>( @@ -377,7 +399,10 @@ impl ChunkedFileReader { } /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. - pub async fn next_chunk(&mut self) -> Result, FileClientError> { + pub async fn next_chunk(&mut self) -> Result, T::Error> + where + T: FromReader, + { if self.file_byte_len == 0 && self.chunk.is_empty() { // eof return Ok(None) @@ -391,6 +416,7 @@ impl ChunkedFileReader { // read new bytes from file let mut reader = BytesMut::zeroed(new_read_bytes_target_len as usize); + // actual bytes that have been read let new_read_bytes_len = self.file.read_exact(&mut reader).await? as u64; @@ -416,14 +442,7 @@ impl ChunkedFileReader { // make new file client from chunk let (file_client, bytes) = - FileClient::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?; - - debug!(target: "downloaders::file", - headers_len=file_client.headers.len(), - bodies_len=file_client.bodies.len(), - remaining_bytes_len=bytes.len(), - "parsed blocks that were read from file" - ); + T::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?; // save left over bytes self.chunk = bytes; @@ -432,6 +451,20 @@ impl ChunkedFileReader { } } +/// Constructs a file client from a reader. +pub trait FromReader { + /// Error returned by file client type. + type Error: From; + /// Returns a file client + fn from_reader( + reader: B, + num_bytes: u64, + ) -> impl Future), Self::Error>> + where + Self: Sized, + B: AsyncReadExt + Unpin; +} + #[cfg(test)] mod tests { use super::*; @@ -594,7 +627,7 @@ mod tests { // test - while let Some(client) = reader.next_chunk().await.unwrap() { + while let Some(client) = reader.next_chunk::().await.unwrap() { let sync_target = client.tip_header().unwrap(); let sync_target_hash = sync_target.hash(); diff --git a/crates/net/downloaders/src/file_codec_ovm_receipt.rs b/crates/net/downloaders/src/file_codec_ovm_receipt.rs new file mode 100644 index 000000000..5b3c81a92 --- /dev/null +++ b/crates/net/downloaders/src/file_codec_ovm_receipt.rs @@ -0,0 +1,344 @@ +//! Codec for reading raw receipts from a file. + +use alloy_rlp::{Decodable, RlpDecodable}; +use reth_primitives::{ + bytes::{Buf, BytesMut}, + Address, Bloom, Bytes, Log, Receipt, TxType, B256, +}; +use tokio_util::codec::Decoder; + +use crate::{file_client::FileClientError, receipt_file_client::ReceiptWithBlockNumber}; + +/// Codec for reading raw receipts from a file. +/// +/// If using with [`FramedRead`](tokio_util::codec::FramedRead), the user should make sure the +/// framed reader has capacity for the entire receipts file. Otherwise, the decoder will return +/// [`InputTooShort`](alloy_rlp::Error::InputTooShort), because RLP receipts can only be +/// decoded if the internal buffer is large enough to contain the entire receipt. +/// +/// Without ensuring the framed reader has capacity for the entire file, a receipt is likely to +/// fall across two read buffers, the decoder will not be able to decode the receipt, which will +/// cause it to fail. +/// +/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set +/// the capacity of the framed reader to the size of the file. +#[derive(Debug)] +pub struct HackReceiptFileCodec; + +impl Decoder for HackReceiptFileCodec { + type Item = Option; + type Error = FileClientError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.is_empty() { + return Ok(None) + } + + let buf_slice = &mut src.as_ref(); + let receipt = HackReceiptContainer::decode(buf_slice) + .map_err(|err| Self::Error::Rlp(err, src.to_vec()))? + .0; + src.advance(src.len() - buf_slice.len()); + + Ok(Some( + receipt.map(|receipt| receipt.try_into().map_err(FileClientError::from)).transpose()?, + )) + } +} + +/// See +#[derive(Debug, PartialEq, Eq, RlpDecodable)] +pub struct HackReceipt { + tx_type: u8, + post_state: Bytes, + status: u64, + cumulative_gas_used: u64, + bloom: Bloom, + /// + logs: Vec, + tx_hash: B256, + contract_address: Address, + gas_used: u64, + block_hash: B256, + block_number: u64, + transaction_index: u32, + l1_gas_price: u64, + l1_gas_used: u64, + l1_fee: u64, + fee_scalar: String, +} + +#[derive(Debug, PartialEq, Eq, RlpDecodable)] +#[rlp(trailing)] +struct HackReceiptContainer(Option); + +impl TryFrom for ReceiptWithBlockNumber { + type Error = &'static str; + fn try_from(exported_receipt: HackReceipt) -> Result { + let HackReceipt { + tx_type, status, cumulative_gas_used, logs, block_number: number, .. + } = exported_receipt; + + #[allow(clippy::needless_update)] + let receipt = Receipt { + tx_type: TxType::try_from(tx_type.to_be_bytes()[0])?, + success: status != 0, + cumulative_gas_used, + logs, + ..Default::default() + }; + + Ok(Self { receipt, number }) + } +} + +#[cfg(test)] +pub(super) mod test { + use reth_primitives::{alloy_primitives::LogData, hex}; + + use super::*; + + pub(crate) const HACK_RECEIPT_ENCODED_BLOCK_1: &[u8] = &hex!("f9030ff9030c8080018303183db9010000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000400000000000100000000000000200000000002000000000000001000000000000000000004000000000000000000000000000040000400000100400000000000000100000000000000000000000000000020000000000000000000000000000000000000000000000001000000000000000000000100000000000000000000000000000000000000000000000000000000000000088000000080000000000010000000000000000000000000000800008000120000000000000000000000000000000002000f90197f89b948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff863a00109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac60271a00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2da000000000000000000000000000000000000000000000000000000000618d8837f89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ca000000000000000000000000000000000000000000000000000000000d0e3ebf0a00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d80f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007edc6ca0bb6834800080a05e77a04531c7c107af1882d76cbff9486d0a9aa53701c30888509d4f5f2b003a9400000000000000000000000000000000000000008303183da0bee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e8754530180018212c2821c2383312e35"); + + pub(crate) const HACK_RECEIPT_ENCODED_BLOCK_2: &[u8] = &hex!("f90271f9026e8080018301c60db9010000080000000200000000000000000008000000000000000000000100008000000000000000000000000000000000000000000000000000000000400000000000100000000000000000000000020000000000000000000000000000000000004000000000000000000000000000000000400000000400000000000000100000000000000000000000000000020000000000000000000000000000000000000000100000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000008400000000000000000010000000000000000020000000020000000000000000000000000000000000000000000002000f8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ca000000000000000000000000000000000000000000000000000000000d0ea0e40a00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b24080f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007eda7867e0c7d4800080a0af6ed8a6864d44989adc47c84f6fe0aeb1819817505c42cde6cbbcd5e14dd3179400000000000000000000000000000000000000008301c60da045fd6ce41bb8ebb2bccdaa92dd1619e287704cb07722039901a7eba63dea1d130280018212c2821c2383312e35"); + + pub(crate) const HACK_RECEIPT_ENCODED_BLOCK_3: &[u8] = &hex!("f90271f9026e8080018301c60db9010000000000000000000000000000000000000000400000000000000000008000000000000000000000000000000000004000000000000000000000400004000000100000000000000000000000000000000000000000000000000000000000004000000000000000000000040000000000400080000400000000000000100000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000000008100000000000000000000000000000000000004000000000000000000000000008000000000000000000010000000000000000000000000000400000000000000001000000000000000000000000002000f8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ca000000000000000000000000000000000000000000000000000000000d101e54ba00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a9980f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aff842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007ed8842f062774800080a08fab01dcec1da547e90a77597999e9153ff788fa6451d1cc942064427bd995019400000000000000000000000000000000000000008301c60da0da4509fe0ca03202ddbe4f68692c132d689ee098433691040ece18c3a45d44c50380018212c2821c2383312e35"); + + fn hack_receipt_1() -> HackReceipt { + let receipt = receipt_block_1(); + + HackReceipt { + tx_type: receipt.receipt.tx_type as u8, + post_state: Bytes::default(), + status: receipt.receipt.success as u64, + cumulative_gas_used: receipt.receipt.cumulative_gas_used, + bloom: Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000400000000000100000000000000200000000002000000000000001000000000000000000004000000000000000000000000000040000400000100400000000000000100000000000000000000000000000020000000000000000000000000000000000000000000000001000000000000000000000100000000000000000000000000000000000000000000000000000000000000088000000080000000000010000000000000000000000000000800008000120000000000000000000000000000000002000")), + logs: receipt.receipt.logs, + tx_hash: B256::from(hex!("5e77a04531c7c107af1882d76cbff9486d0a9aa53701c30888509d4f5f2b003a")), contract_address: Address::from(hex!("0000000000000000000000000000000000000000")), gas_used: 202813, + block_hash: B256::from(hex!("bee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453")), + block_number: receipt.number, + transaction_index: 0, + l1_gas_price: 1, + l1_gas_used: 4802, + l1_fee: 7203, + fee_scalar: String::from("1.5") + } + } + + pub(crate) fn receipt_block_1() -> ReceiptWithBlockNumber { + let log_1 = Log { + address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")), + data: LogData::new( + vec![ + B256::from(hex!( + "0109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac60271" + )), + B256::from(hex!( + "0000000000000000000000000000000000000000000000000000000000014218" + )), + B256::from(hex!( + "00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d" + )), + ], + Bytes::from(hex!( + "00000000000000000000000000000000000000000000000000000000618d8837" + )), + ) + .unwrap(), + }; + + let log_2 = Log { + address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")), + data: LogData::new( + vec![ + B256::from(hex!( + "92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68c" + )), + B256::from(hex!( + "00000000000000000000000000000000000000000000000000000000d0e3ebf0" + )), + B256::from(hex!( + "0000000000000000000000000000000000000000000000000000000000014218" + )), + B256::from(hex!( + "00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d" + )), + ], + Bytes::default(), + ) + .unwrap(), + }; + + let log_3 = Log { + address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")), + data: LogData::new( + vec![ + B256::from(hex!( + "fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f" + )), + B256::from(hex!( + "00000000000000000000000000000000000000000000007edc6ca0bb68348000" + )), + ], + Bytes::default(), + ) + .unwrap(), + }; + + let mut receipt = Receipt { + tx_type: TxType::Legacy, + success: true, + cumulative_gas_used: 202813, + ..Default::default() + }; + // #[allow(clippy::needless_update)] not recognised, ..Default::default() needed so optimism + // feature must not be brought into scope + receipt.logs = vec![log_1, log_2, log_3]; + + ReceiptWithBlockNumber { receipt, number: 1 } + } + + pub(crate) fn receipt_block_2() -> ReceiptWithBlockNumber { + let log_1 = Log { + address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")), + data: LogData::new( + vec![ + B256::from(hex!( + "92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68c" + )), + B256::from(hex!( + "00000000000000000000000000000000000000000000000000000000d0ea0e40" + )), + B256::from(hex!( + "0000000000000000000000000000000000000000000000000000000000014218" + )), + B256::from(hex!( + "000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b240" + )), + ], + Bytes::default(), + ) + .unwrap(), + }; + + let log_2 = Log { + address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")), + data: LogData::new( + vec![ + B256::from(hex!( + "fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f" + )), + B256::from(hex!( + "00000000000000000000000000000000000000000000007eda7867e0c7d48000" + )), + ], + Bytes::default(), + ) + .unwrap(), + }; + + let mut receipt = Receipt { + tx_type: TxType::Legacy, + success: true, + cumulative_gas_used: 116237, + ..Default::default() + }; + // #[allow(clippy::needless_update)] not recognised, ..Default::default() needed so optimism + // feature must not be brought into scope + receipt.logs = vec![log_1, log_2]; + + ReceiptWithBlockNumber { receipt, number: 2 } + } + + pub(crate) fn receipt_block_3() -> ReceiptWithBlockNumber { + let log_1 = Log { + address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")), + data: LogData::new( + vec![ + B256::from(hex!( + "92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68c" + )), + B256::from(hex!( + "00000000000000000000000000000000000000000000000000000000d101e54b" + )), + B256::from(hex!( + "0000000000000000000000000000000000000000000000000000000000014218" + )), + B256::from(hex!( + "000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a99" + )), + ], + Bytes::default(), + ) + .unwrap(), + }; + + let log_2 = Log { + address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850af")), + data: LogData::new( + vec![ + B256::from(hex!( + "fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f" + )), + B256::from(hex!( + "00000000000000000000000000000000000000000000007ed8842f0627748000" + )), + ], + Bytes::default(), + ) + .unwrap(), + }; + + let mut receipt = Receipt { + tx_type: TxType::Legacy, + success: true, + cumulative_gas_used: 116237, + ..Default::default() + }; + // #[allow(clippy::needless_update)] not recognised, ..Default::default() needed so optimism + // feature must not be brought into scope + receipt.logs = vec![log_1, log_2]; + + ReceiptWithBlockNumber { receipt, number: 3 } + } + + #[test] + fn decode_hack_receipt() { + let receipt = hack_receipt_1(); + + let decoded = HackReceiptContainer::decode(&mut &HACK_RECEIPT_ENCODED_BLOCK_1[..]) + .unwrap() + .0 + .unwrap(); + + assert_eq!(receipt, decoded); + } + + #[test] + #[allow(clippy::needless_update)] + fn receipts_codec() { + // rig + + let mut receipt_1_to_3 = HACK_RECEIPT_ENCODED_BLOCK_1.to_vec(); + receipt_1_to_3.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_2); + receipt_1_to_3.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_3); + + let encoded = &mut BytesMut::from(&receipt_1_to_3[..]); + + let mut codec = HackReceiptFileCodec; + + // test + + let first_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap(); + + assert_eq!(receipt_block_1(), first_decoded_receipt); + + let second_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap(); + + assert_eq!(receipt_block_2(), second_decoded_receipt); + + let third_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap(); + + assert_eq!(receipt_block_3(), third_decoded_receipt); + } +} diff --git a/crates/net/downloaders/src/lib.rs b/crates/net/downloaders/src/lib.rs index 37c4a95e3..81e669d88 100644 --- a/crates/net/downloaders/src/lib.rs +++ b/crates/net/downloaders/src/lib.rs @@ -27,10 +27,29 @@ pub mod metrics; /// efficiently buffering headers and bodies for retrieval. pub mod file_client; +/// Module managing file-based data retrieval and buffering of receipts. +/// +/// Contains [ReceiptFileClient](receipt_file_client::ReceiptFileClient) to read receipt data from +/// files, efficiently buffering receipts for retrieval. +/// +/// Currently configured to use codec [`HackReceipt`](file_codec_ovm_receipt::HackReceipt) based on +/// export of below Bedrock data using . Codec can +/// be replaced with regular encoding of receipts for export. +/// +/// NOTE: receipts can be exported using regular op-geth encoding for `Receipt` type, to fit +/// reth's needs for importing. However, this would require patching the diff in to export the `Receipt` and not `HackReceipt` type (originally +/// made for op-erigon's import needs). +pub mod receipt_file_client; + /// Module with a codec for reading and encoding block bodies in files. /// /// Enables decoding and encoding `Block` types within file contexts. pub mod file_codec; +/// Module with a codec for reading and encoding receipts in files. +/// +/// Enables decoding and encoding `HackReceipt` type. See . +pub mod file_codec_ovm_receipt; + #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/crates/net/downloaders/src/receipt_file_client.rs b/crates/net/downloaders/src/receipt_file_client.rs new file mode 100644 index 000000000..b6291d0a3 --- /dev/null +++ b/crates/net/downloaders/src/receipt_file_client.rs @@ -0,0 +1,268 @@ +use futures::Future; +use reth_primitives::{Receipt, Receipts}; +use tokio::io::AsyncReadExt; +use tokio_stream::StreamExt; +use tokio_util::codec::FramedRead; +use tracing::trace; + +use crate::{ + file_client::{FileClientError, FromReader}, + file_codec_ovm_receipt::HackReceiptFileCodec, +}; + +/// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential +/// order w.r.t. block number. +#[derive(Debug)] +pub struct ReceiptFileClient { + /// The buffered receipts, read from file, as nested lists. One list per block number. + pub receipts: Receipts, + /// First (lowest) block number read from file. + pub first_block: u64, + /// Total number of receipts. Count of elements in [`Receipts`] flattened. + pub total_receipts: usize, +} + +impl FromReader for ReceiptFileClient { + type Error = FileClientError; + + /// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If + /// first block has no transactions, it's assumed to be the genesis block. + fn from_reader( + reader: B, + num_bytes: u64, + ) -> impl Future), Self::Error>> + where + B: AsyncReadExt + Unpin, + { + let mut receipts = Receipts::new(); + + // use with_capacity to make sure the internal buffer contains the entire chunk + let mut stream = + FramedRead::with_capacity(reader, HackReceiptFileCodec, num_bytes as usize); + + trace!(target: "downloaders::file", + target_num_bytes=num_bytes, + capacity=stream.read_buffer().capacity(), + coded=?HackReceiptFileCodec, + "init decode stream" + ); + + let mut remaining_bytes = vec![]; + + let mut log_interval = 0; + let mut log_interval_start_block = 0; + + let mut block_number = 0; + let mut total_receipts = 0; + let mut receipts_for_block = vec![]; + let mut first_block = None; + + async move { + while let Some(receipt_res) = stream.next().await { + let receipt = match receipt_res { + Ok(receipt) => receipt, + Err(FileClientError::Rlp(err, bytes)) => { + trace!(target: "downloaders::file", + %err, + bytes_len=bytes.len(), + "partial receipt returned from decoding chunk" + ); + + remaining_bytes = bytes; + + break + } + Err(err) => return Err(err), + }; + + total_receipts += 1; + + match receipt { + Some(ReceiptWithBlockNumber { receipt, number }) => { + if first_block.is_none() { + first_block = Some(number); + block_number = number; + } + + if block_number == number { + receipts_for_block.push(Some(receipt)); + } else { + receipts.push(receipts_for_block); + + // next block + block_number = number; + receipts_for_block = vec![Some(receipt)]; + } + } + None => { + match first_block { + Some(num) => { + // if there was a block number before this, push receipts for that + // block + receipts.push(receipts_for_block); + // block with no txns + block_number = num + receipts.len() as u64; + } + None => { + // this is the first block and it's empty, assume it's the genesis + // block + first_block = Some(0); + block_number = 0; + } + } + + receipts_for_block = vec![]; + } + } + + if log_interval == 0 { + trace!(target: "downloaders::file", + block_number, + total_receipts, + "read first receipt" + ); + log_interval_start_block = block_number; + } else if log_interval % 100_000 == 0 { + trace!(target: "downloaders::file", + blocks=?log_interval_start_block..=block_number, + total_receipts, + "read receipts from file" + ); + log_interval_start_block = block_number + 1; + } + log_interval += 1; + } + + trace!(target: "downloaders::file", + blocks=?log_interval_start_block..=block_number, + total_receipts, + "read receipts from file" + ); + + // we need to push the last receipts + receipts.push(receipts_for_block); + + trace!(target: "downloaders::file", + blocks = receipts.len(), + total_receipts, + "Initialized receipt file client" + ); + + Ok(( + Self { receipts, first_block: first_block.unwrap_or_default(), total_receipts }, + remaining_bytes, + )) + } + } +} + +/// [`Receipt`] with block number. +#[derive(Debug, PartialEq, Eq)] +pub struct ReceiptWithBlockNumber { + /// Receipt. + pub receipt: Receipt, + /// Block number. + pub number: u64, +} + +#[cfg(test)] +mod test { + use reth_primitives::hex; + use reth_tracing::init_test_tracing; + + use crate::file_codec_ovm_receipt::test::{ + receipt_block_1 as op_mainnet_receipt_block_1, + receipt_block_2 as op_mainnet_receipt_block_2, + receipt_block_3 as op_mainnet_receipt_block_3, + HACK_RECEIPT_ENCODED_BLOCK_1 as HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET, + HACK_RECEIPT_ENCODED_BLOCK_2 as HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET, + HACK_RECEIPT_ENCODED_BLOCK_3 as HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET, + }; + + use super::*; + + /// No receipts for genesis block + const HACK_RECEIPT_BLOCK_NO_TRANSACTIONS: &[u8] = &hex!("c0"); + + #[tokio::test] + async fn receipt_file_client_ovm_codec() { + init_test_tracing(); + + // genesis block has no hack receipts + let mut encoded_receipts = HACK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec(); + // one receipt each for block 1 and 2 + encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET); + encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET); + // no receipt for block 4 + encoded_receipts.extend_from_slice(HACK_RECEIPT_BLOCK_NO_TRANSACTIONS); + + let encoded_byte_len = encoded_receipts.len() as u64; + let reader = &mut &encoded_receipts[..]; + + let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = + ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); + + assert_eq!(4, total_receipts); + assert_eq!(0, first_block); + assert!(receipts[0].is_empty()); + assert_eq!(op_mainnet_receipt_block_1().receipt, receipts[1][0].clone().unwrap()); + assert_eq!(op_mainnet_receipt_block_2().receipt, receipts[2][0].clone().unwrap()); + assert!(receipts[3].is_empty()); + } + + #[tokio::test] + async fn no_receipts_middle_block() { + init_test_tracing(); + + // genesis block has no hack receipts + let mut encoded_receipts = HACK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec(); + // one receipt each for block 1 + encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET); + // no receipt for block 2 + encoded_receipts.extend_from_slice(HACK_RECEIPT_BLOCK_NO_TRANSACTIONS); + // one receipt for block 3 + encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET); + + let encoded_byte_len = encoded_receipts.len() as u64; + let reader = &mut &encoded_receipts[..]; + + let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = + ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); + + assert_eq!(4, total_receipts); + assert_eq!(0, first_block); + assert!(receipts[0].is_empty()); + assert_eq!(op_mainnet_receipt_block_1().receipt, receipts[1][0].clone().unwrap()); + assert!(receipts[2].is_empty()); + assert_eq!(op_mainnet_receipt_block_3().receipt, receipts[3][0].clone().unwrap()); + } + + #[tokio::test] + async fn two_receipts_same_block() { + init_test_tracing(); + + // genesis block has no hack receipts + let mut encoded_receipts = HACK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec(); + // one receipt each for block 1 + encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET); + // two receipts for block 2 + encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET); + encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET); + // one receipt for block 3 + encoded_receipts.extend_from_slice(HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET); + + let encoded_byte_len = encoded_receipts.len() as u64; + let reader = &mut &encoded_receipts[..]; + + let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = + ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); + + assert_eq!(5, total_receipts); + assert_eq!(0, first_block); + assert!(receipts[0].is_empty()); + assert_eq!(op_mainnet_receipt_block_1().receipt, receipts[1][0].clone().unwrap()); + assert_eq!(op_mainnet_receipt_block_2().receipt, receipts[2][0].clone().unwrap()); + assert_eq!(op_mainnet_receipt_block_2().receipt, receipts[2][1].clone().unwrap()); + assert_eq!(op_mainnet_receipt_block_3().receipt, receipts[3][0].clone().unwrap()); + } +} diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index f4be57f9c..8d31358d9 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -66,8 +66,6 @@ revm-primitives = { workspace = true, features = ["arbitrary"] } nybbles = { workspace = true, features = ["arbitrary"] } alloy-trie = { workspace = true, features = ["arbitrary"] } alloy-eips = { workspace = true, features = ["arbitrary"] } - -arbitrary = { workspace = true, features = ["derive"] } assert_matches.workspace = true proptest.workspace = true proptest-derive.workspace = true @@ -109,7 +107,6 @@ zstd-codec = ["dep:zstd"] clap = ["dep:clap"] optimism = [ "reth-codecs/optimism", - "revm-primitives/optimism", "reth-ethereum-forks/optimism", "revm/optimism", ] diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 838edd620..2b146245e 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -21,6 +21,7 @@ pub mod providers; pub use providers::{ DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider, HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory, + StaticFileWriter, }; #[cfg(any(test, feature = "test-utils"))]