Make ReceiptFileClient generic (#8955)

This commit is contained in:
Omid Chenane
2024-06-20 14:30:53 +03:30
committed by GitHub
parent abbc0e6338
commit 93b82469b0
3 changed files with 105 additions and 30 deletions

View File

@ -7,6 +7,7 @@ use reth_db::tables;
use reth_db_api::{database::Database, transaction::DbTx}; use reth_db_api::{database::Database, transaction::DbTx};
use reth_downloaders::{ use reth_downloaders::{
file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, file_client::{ChunkedFileReader, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
file_codec_ovm_receipt::HackReceiptFileCodec,
receipt_file_client::ReceiptFileClient, receipt_file_client::ReceiptFileClient,
}; };
use reth_execution_types::ExecutionOutcome; use reth_execution_types::ExecutionOutcome;
@ -115,10 +116,16 @@ where
// open file // open file
let mut reader = ChunkedFileReader::new(path, chunk_len).await?; let mut reader = ChunkedFileReader::new(path, chunk_len).await?;
while let Some(file_client) = reader.next_chunk::<ReceiptFileClient>().await? { while let Some(file_client) =
reader.next_chunk::<ReceiptFileClient<HackReceiptFileCodec>>().await?
{
// create a new file client from chunk read from file // create a new file client from chunk read from file
let ReceiptFileClient { mut receipts, first_block, total_receipts: total_receipts_chunk } = let ReceiptFileClient {
file_client; mut receipts,
first_block,
total_receipts: total_receipts_chunk,
..
} = file_client;
// mark these as decoded // mark these as decoded
total_decoded_receipts += total_receipts_chunk; total_decoded_receipts += total_receipts_chunk;

View File

@ -22,7 +22,7 @@ use crate::{file_client::FileClientError, receipt_file_client::ReceiptWithBlockN
/// ///
/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set /// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set
/// the capacity of the framed reader to the size of the file. /// the capacity of the framed reader to the size of the file.
#[derive(Debug)] #[derive(Debug, Default)]
pub struct HackReceiptFileCodec; pub struct HackReceiptFileCodec;
impl Decoder for HackReceiptFileCodec { impl Decoder for HackReceiptFileCodec {

View File

@ -1,34 +1,83 @@
use std::marker::PhantomData;
use futures::Future; use futures::Future;
use reth_primitives::{Receipt, Receipts}; use reth_primitives::{Receipt, Receipts};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead; use tokio_util::codec::{Decoder, FramedRead};
use tracing::trace; use tracing::trace;
use crate::{ use crate::file_client::{FileClientError, FromReader};
file_client::{FileClientError, FromReader},
file_codec_ovm_receipt::HackReceiptFileCodec,
};
/// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential /// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential
/// order w.r.t. block number. /// order w.r.t. block number.
#[derive(Debug)] #[derive(Debug)]
pub struct ReceiptFileClient { pub struct ReceiptFileClient<D> {
/// The buffered receipts, read from file, as nested lists. One list per block number. /// The buffered receipts, read from file, as nested lists. One list per block number.
pub receipts: Receipts, pub receipts: Receipts,
/// First (lowest) block number read from file. /// First (lowest) block number read from file.
pub first_block: u64, pub first_block: u64,
/// Total number of receipts. Count of elements in [`Receipts`] flattened. /// Total number of receipts. Count of elements in [`Receipts`] flattened.
pub total_receipts: usize, pub total_receipts: usize,
/// marker
_marker: PhantomData<D>,
} }
impl FromReader for ReceiptFileClient { /// Constructs a file client from a reader and decoder.
type Error = FileClientError; pub trait FromReceiptReader<D> {
/// Error returned by file client type.
type Error: From<std::io::Error>;
/// Returns a decoder instance
fn decoder() -> D;
/// Returns a file client
fn from_receipt_reader<B>(
reader: B,
decoder: D,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
}
impl<D> FromReader for ReceiptFileClient<D>
where
D: Decoder<Item = Option<ReceiptWithBlockNumber>, Error = FileClientError>
+ std::fmt::Debug
+ Default,
{
type Error = D::Error;
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
where
B: AsyncReadExt + Unpin,
{
Self::from_receipt_reader(reader, Self::decoder(), num_bytes)
}
}
impl<D> FromReceiptReader<D> for ReceiptFileClient<D>
where
D: Decoder<Item = Option<ReceiptWithBlockNumber>, Error = FileClientError>
+ std::fmt::Debug
+ Default,
{
type Error = D::Error;
fn decoder() -> D {
Default::default()
}
/// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If /// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If
/// first block has no transactions, it's assumed to be the genesis block. /// first block has no transactions, it's assumed to be the genesis block.
fn from_reader<B>( fn from_receipt_reader<B>(
reader: B, reader: B,
decoder: D,
num_bytes: u64, num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>> ) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
where where
@ -37,13 +86,12 @@ impl FromReader for ReceiptFileClient {
let mut receipts = Receipts::default(); let mut receipts = Receipts::default();
// use with_capacity to make sure the internal buffer contains the entire chunk // use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = let mut stream = FramedRead::with_capacity(reader, decoder, num_bytes as usize);
FramedRead::with_capacity(reader, HackReceiptFileCodec, num_bytes as usize);
trace!(target: "downloaders::file", trace!(target: "downloaders::file",
target_num_bytes=num_bytes, target_num_bytes=num_bytes,
capacity=stream.read_buffer().capacity(), capacity=stream.read_buffer().capacity(),
codec=?HackReceiptFileCodec, codec=?Self::decoder(),
"init decode stream" "init decode stream"
); );
@ -149,7 +197,12 @@ impl FromReader for ReceiptFileClient {
); );
Ok(( Ok((
Self { receipts, first_block: first_block.unwrap_or_default(), total_receipts }, Self {
receipts,
first_block: first_block.unwrap_or_default(),
total_receipts,
_marker: Default::default(),
},
remaining_bytes, remaining_bytes,
)) ))
} }
@ -170,13 +223,16 @@ mod test {
use reth_primitives::hex; use reth_primitives::hex;
use reth_tracing::init_test_tracing; use reth_tracing::init_test_tracing;
use crate::file_codec_ovm_receipt::test::{ use crate::file_codec_ovm_receipt::{
receipt_block_1 as op_mainnet_receipt_block_1, test::{
receipt_block_2 as op_mainnet_receipt_block_2, receipt_block_1 as op_mainnet_receipt_block_1,
receipt_block_3 as op_mainnet_receipt_block_3, receipt_block_2 as op_mainnet_receipt_block_2,
HACK_RECEIPT_ENCODED_BLOCK_1 as HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET, receipt_block_3 as op_mainnet_receipt_block_3,
HACK_RECEIPT_ENCODED_BLOCK_2 as HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET, HACK_RECEIPT_ENCODED_BLOCK_1 as HACK_RECEIPT_ENCODED_BLOCK_1_OP_MAINNET,
HACK_RECEIPT_ENCODED_BLOCK_3 as HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET, HACK_RECEIPT_ENCODED_BLOCK_2 as HACK_RECEIPT_ENCODED_BLOCK_2_OP_MAINNET,
HACK_RECEIPT_ENCODED_BLOCK_3 as HACK_RECEIPT_ENCODED_BLOCK_3_OP_MAINNET,
},
HackReceiptFileCodec,
}; };
use super::*; use super::*;
@ -199,8 +255,12 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64; let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..]; let reader = &mut &encoded_receipts[..];
let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = let (
ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<HackReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
// 2 non-empty receipt objects // 2 non-empty receipt objects
assert_eq!(2, total_receipts); assert_eq!(2, total_receipts);
@ -227,8 +287,12 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64; let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..]; let reader = &mut &encoded_receipts[..];
let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = let (
ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<HackReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
// 2 non-empty receipt objects // 2 non-empty receipt objects
assert_eq!(2, total_receipts); assert_eq!(2, total_receipts);
@ -256,8 +320,12 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64; let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..]; let reader = &mut &encoded_receipts[..];
let (ReceiptFileClient { receipts, first_block, total_receipts }, _remaining_bytes) = let (
ReceiptFileClient::from_reader(reader, encoded_byte_len).await.unwrap(); ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<HackReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
// 4 non-empty receipt objects // 4 non-empty receipt objects
assert_eq!(4, total_receipts); assert_eq!(4, total_receipts);