From 93b82469b01ee5b6116106e0cf213d1ae2ac6732 Mon Sep 17 00:00:00 2001 From: Omid Chenane <155813094+ochenane@users.noreply.github.com> Date: Thu, 20 Jun 2024 14:30:53 +0330 Subject: [PATCH] Make ReceiptFileClient generic (#8955) --- bin/reth/src/commands/import_receipts_op.rs | 13 +- .../downloaders/src/file_codec_ovm_receipt.rs | 2 +- .../downloaders/src/receipt_file_client.rs | 120 ++++++++++++++---- 3 files changed, 105 insertions(+), 30 deletions(-) diff --git a/bin/reth/src/commands/import_receipts_op.rs b/bin/reth/src/commands/import_receipts_op.rs index 62cff7017..d77332f86 100644 --- a/bin/reth/src/commands/import_receipts_op.rs +++ b/bin/reth/src/commands/import_receipts_op.rs @@ -7,6 +7,7 @@ use reth_db::tables; use reth_db_api::{database::Database, transaction::DbTx}; use reth_downloaders::{ file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, + file_codec_ovm_receipt::HackReceiptFileCodec, receipt_file_client::ReceiptFileClient, }; use reth_execution_types::ExecutionOutcome; @@ -115,10 +116,16 @@ where // open file let mut reader = ChunkedFileReader::new(path, chunk_len).await?; - while let Some(file_client) = reader.next_chunk::().await? { + while let Some(file_client) = + reader.next_chunk::>().await? + { // create a new file client from chunk read from file - let ReceiptFileClient { mut receipts, first_block, total_receipts: total_receipts_chunk } = - file_client; + let ReceiptFileClient { + mut receipts, + first_block, + total_receipts: total_receipts_chunk, + .. + } = file_client; // mark these as decoded total_decoded_receipts += total_receipts_chunk; diff --git a/crates/net/downloaders/src/file_codec_ovm_receipt.rs b/crates/net/downloaders/src/file_codec_ovm_receipt.rs index 5b3c81a92..74911c7e3 100644 --- a/crates/net/downloaders/src/file_codec_ovm_receipt.rs +++ b/crates/net/downloaders/src/file_codec_ovm_receipt.rs @@ -22,7 +22,7 @@ use crate::{file_client::FileClientError, receipt_file_client::ReceiptWithBlockN /// /// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set /// the capacity of the framed reader to the size of the file. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct HackReceiptFileCodec; impl Decoder for HackReceiptFileCodec { diff --git a/crates/net/downloaders/src/receipt_file_client.rs b/crates/net/downloaders/src/receipt_file_client.rs index 7b0554389..9dd23489d 100644 --- a/crates/net/downloaders/src/receipt_file_client.rs +++ b/crates/net/downloaders/src/receipt_file_client.rs @@ -1,34 +1,83 @@ +use std::marker::PhantomData; + use futures::Future; use reth_primitives::{Receipt, Receipts}; use tokio::io::AsyncReadExt; use tokio_stream::StreamExt; -use tokio_util::codec::FramedRead; +use tokio_util::codec::{Decoder, FramedRead}; use tracing::trace; -use crate::{ - file_client::{FileClientError, FromReader}, - file_codec_ovm_receipt::HackReceiptFileCodec, -}; +use crate::file_client::{FileClientError, FromReader}; /// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential /// order w.r.t. block number. #[derive(Debug)] -pub struct ReceiptFileClient { +pub struct ReceiptFileClient { /// The buffered receipts, read from file, as nested lists. One list per block number. pub receipts: Receipts, /// First (lowest) block number read from file. pub first_block: u64, /// Total number of receipts. Count of elements in [`Receipts`] flattened. pub total_receipts: usize, + /// marker + _marker: PhantomData, } -impl FromReader for ReceiptFileClient { - type Error = FileClientError; +/// Constructs a file client from a reader and decoder. +pub trait FromReceiptReader { + /// Error returned by file client type. + type Error: From; + + /// Returns a decoder instance + fn decoder() -> D; + + /// Returns a file client + fn from_receipt_reader( + reader: B, + decoder: D, + num_bytes: u64, + ) -> impl Future), Self::Error>> + where + Self: Sized, + B: AsyncReadExt + Unpin; +} + +impl FromReader for ReceiptFileClient +where + D: Decoder, Error = FileClientError> + + std::fmt::Debug + + Default, +{ + type Error = D::Error; + + fn from_reader( + reader: B, + num_bytes: u64, + ) -> impl Future), Self::Error>> + where + B: AsyncReadExt + Unpin, + { + Self::from_receipt_reader(reader, Self::decoder(), num_bytes) + } +} + +impl FromReceiptReader for ReceiptFileClient +where + D: Decoder, Error = FileClientError> + + std::fmt::Debug + + Default, +{ + type Error = D::Error; + + fn decoder() -> D { + Default::default() + } /// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If /// first block has no transactions, it's assumed to be the genesis block. - fn from_reader( + fn from_receipt_reader( reader: B, + decoder: D, num_bytes: u64, ) -> impl Future), Self::Error>> where @@ -37,13 +86,12 @@ impl FromReader for ReceiptFileClient { let mut receipts = Receipts::default(); // use with_capacity to make sure the internal buffer contains the entire chunk - let mut stream = - FramedRead::with_capacity(reader, HackReceiptFileCodec, num_bytes as usize); + let mut stream = FramedRead::with_capacity(reader, decoder, num_bytes as usize); trace!(target: "downloaders::file", target_num_bytes=num_bytes, capacity=stream.read_buffer().capacity(), - codec=?HackReceiptFileCodec, + codec=?Self::decoder(), "init decode stream" ); @@ -149,7 +197,12 @@ impl FromReader for ReceiptFileClient { ); Ok(( - Self { receipts, first_block: first_block.unwrap_or_default(), total_receipts }, + Self { + receipts, + first_block: first_block.unwrap_or_default(), + total_receipts, + _marker: Default::default(), + }, remaining_bytes, )) } @@ -170,13 +223,16 @@ mod test { use reth_primitives::hex; use reth_tracing::init_test_tracing; - use crate::file_codec_ovm_receipt::test::{ - receipt_block_1 as op_mainnet_receipt_block_1, - receipt_block_2 as op_mainnet_receipt_block_2, - receipt_block_3 as op_mainnet_receipt_block_3, - HACK_RECEIPT_ENCODED_BLOCK_1 as HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET, - HACK_RECEIPT_ENCODED_BLOCK_2 as HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET, - HACK_RECEIPT_ENCODED_BLOCK_3 as HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET, + use crate::file_codec_ovm_receipt::{ + test::{ + receipt_block_1 as op_mainnet_receipt_block_1, + receipt_block_2 as op_mainnet_receipt_block_2, + receipt_block_3 as op_mainnet_receipt_block_3, + HACK_RECEIPT_ENCODED_BLOCK_1 as HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET, + HACK_RECEIPT_ENCODED_BLOCK_2 as HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET, + HACK_RECEIPT_ENCODED_BLOCK_3 as HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET, + }, + HackReceiptFileCodec, }; use super::*; @@ -199,8 +255,12 @@ mod test { let encoded_byte_len = encoded_receipts.len() as u64; let reader = &mut &encoded_receipts[..]; - let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = - ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); + let ( + ReceiptFileClient { receipts, first_block, total_receipts, _marker }, + _remaining_bytes, + ) = ReceiptFileClient::::from_reader(reader, encoded_byte_len) + .await + .unwrap(); // 2 non-empty receipt objects assert_eq!(2, total_receipts); @@ -227,8 +287,12 @@ mod test { let encoded_byte_len = encoded_receipts.len() as u64; let reader = &mut &encoded_receipts[..]; - let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = - ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); + let ( + ReceiptFileClient { receipts, first_block, total_receipts, _marker }, + _remaining_bytes, + ) = ReceiptFileClient::::from_reader(reader, encoded_byte_len) + .await + .unwrap(); // 2 non-empty receipt objects assert_eq!(2, total_receipts); @@ -256,8 +320,12 @@ mod test { let encoded_byte_len = encoded_receipts.len() as u64; let reader = &mut &encoded_receipts[..]; - let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = - ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); + let ( + ReceiptFileClient { receipts, first_block, total_receipts, _marker }, + _remaining_bytes, + ) = ReceiptFileClient::::from_reader(reader, encoded_byte_len) + .await + .unwrap(); // 4 non-empty receipt objects assert_eq!(4, total_receipts);