feat: make ReceiptFileClient generic over receipt (#13523)

This commit is contained in:
Arsenii Kulikov
2024-12-23 21:33:12 +04:00
committed by GitHub
parent ec21e895c1
commit 855029b3ca
4 changed files with 81 additions and 36 deletions

View File

@ -464,9 +464,9 @@ impl ChunkedFileReader {
} }
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_receipts_chunk<T, D>(&mut self) -> Result<Option<T>, T::Error> pub async fn next_receipts_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where where
T: FromReceiptReader<D>, T: FromReceiptReader,
{ {
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) }; let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };

View File

@ -1,4 +1,4 @@
use std::{fmt, io, marker::PhantomData}; use std::{fmt, io};
use futures::Future; use futures::Future;
use reth_primitives::{Receipt, Receipts}; use reth_primitives::{Receipt, Receipts};
@ -9,28 +9,36 @@ use tracing::{trace, warn};
use crate::{DecodedFileChunk, FileClientError}; use crate::{DecodedFileChunk, FileClientError};
/// Helper trait implemented for [`Decoder`] that decodes the receipt type.
pub trait ReceiptDecoder: Decoder<Item = Option<ReceiptWithBlockNumber<Self::Receipt>>> {
/// The receipt type being decoded.
type Receipt;
}
impl<T, R> ReceiptDecoder for T
where
T: Decoder<Item = Option<ReceiptWithBlockNumber<R>>>,
{
type Receipt = R;
}
/// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential /// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential
/// order w.r.t. block number. /// order w.r.t. block number.
#[derive(Debug)] #[derive(Debug)]
pub struct ReceiptFileClient<D> { pub struct ReceiptFileClient<D: ReceiptDecoder> {
/// The buffered receipts, read from file, as nested lists. One list per block number. /// The buffered receipts, read from file, as nested lists. One list per block number.
pub receipts: Receipts, pub receipts: Receipts<D::Receipt>,
/// First (lowest) block number read from file. /// First (lowest) block number read from file.
pub first_block: u64, pub first_block: u64,
/// Total number of receipts. Count of elements in [`Receipts`] flattened. /// Total number of receipts. Count of elements in [`Receipts`] flattened.
pub total_receipts: usize, pub total_receipts: usize,
/// marker
_marker: PhantomData<D>,
} }
/// Constructs a file client from a reader and decoder. /// Constructs a file client from a reader and decoder.
pub trait FromReceiptReader<D> { pub trait FromReceiptReader {
/// Error returned by file client type. /// Error returned by file client type.
type Error: From<io::Error>; type Error: From<io::Error>;
/// Returns a decoder instance
fn decoder() -> D;
/// Returns a file client /// Returns a file client
fn from_receipt_reader<B>( fn from_receipt_reader<B>(
reader: B, reader: B,
@ -42,18 +50,12 @@ pub trait FromReceiptReader<D> {
B: AsyncReadExt + Unpin; B: AsyncReadExt + Unpin;
} }
impl<D> FromReceiptReader<D> for ReceiptFileClient<D> impl<D> FromReceiptReader for ReceiptFileClient<D>
where where
D: Decoder<Item = Option<ReceiptWithBlockNumber>, Error = FileClientError> D: ReceiptDecoder<Error = FileClientError> + fmt::Debug + Default,
+ fmt::Debug
+ Default,
{ {
type Error = D::Error; type Error = D::Error;
fn decoder() -> D {
D::default()
}
/// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If /// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If
/// first block has no transactions, it's assumed to be the genesis block. /// first block has no transactions, it's assumed to be the genesis block.
fn from_receipt_reader<B>( fn from_receipt_reader<B>(
@ -67,12 +69,12 @@ where
let mut receipts = Receipts::default(); let mut receipts = Receipts::default();
// use with_capacity to make sure the internal buffer contains the entire chunk // use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, Self::decoder(), num_bytes as usize); let mut stream = FramedRead::with_capacity(reader, D::default(), num_bytes as usize);
trace!(target: "downloaders::file", trace!(target: "downloaders::file",
target_num_bytes=num_bytes, target_num_bytes=num_bytes,
capacity=stream.read_buffer().capacity(), capacity=stream.read_buffer().capacity(),
codec=?Self::decoder(), codec=?D::default(),
"init decode stream" "init decode stream"
); );
@ -193,7 +195,6 @@ where
receipts, receipts,
first_block: first_block.unwrap_or_default(), first_block: first_block.unwrap_or_default(),
total_receipts, total_receipts,
_marker: Default::default(),
}, },
remaining_bytes, remaining_bytes,
highest_block: Some(block_number), highest_block: Some(block_number),
@ -204,9 +205,9 @@ where
/// [`Receipt`] with block number. /// [`Receipt`] with block number.
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub struct ReceiptWithBlockNumber { pub struct ReceiptWithBlockNumber<R = Receipt> {
/// Receipt. /// Receipt.
pub receipt: Receipt, pub receipt: R,
/// Block number. /// Block number.
pub number: u64, pub number: u64,
} }

View File

@ -161,7 +161,7 @@ where
.expect("transaction static files must exist before importing receipts"); .expect("transaction static files must exist before importing receipts");
while let Some(file_client) = while let Some(file_client) =
reader.next_receipts_chunk::<ReceiptFileClient<_>, HackReceiptFileCodec>().await? reader.next_receipts_chunk::<ReceiptFileClient<HackReceiptFileCodec>>().await?
{ {
if highest_block_receipts == highest_block_transactions { if highest_block_receipts == highest_block_transactions {
warn!(target: "reth::cli", highest_block_receipts, highest_block_transactions, "Ignoring all other blocks in the file since we have reached the desired height"); warn!(target: "reth::cli", highest_block_receipts, highest_block_transactions, "Ignoring all other blocks in the file since we have reached the desired height");

View File

@ -5,6 +5,8 @@ use alloy_primitives::{
Address, Bloom, Bytes, B256, Address, Bloom, Bytes, B256,
}; };
use alloy_rlp::{Decodable, RlpDecodable}; use alloy_rlp::{Decodable, RlpDecodable};
use op_alloy_consensus::OpDepositReceipt;
use reth_optimism_primitives::OpReceipt;
use reth_primitives::{Log, Receipt, TxType}; use reth_primitives::{Log, Receipt, TxType};
use tokio_util::codec::Decoder; use tokio_util::codec::Decoder;
@ -24,10 +26,13 @@ use reth_downloaders::{file_client::FileClientError, receipt_file_client::Receip
/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set /// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set
/// the capacity of the framed reader to the size of the file. /// the capacity of the framed reader to the size of the file.
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct HackReceiptFileCodec; pub struct HackReceiptFileCodec<R = Receipt>(core::marker::PhantomData<R>);
impl Decoder for HackReceiptFileCodec { impl<R> Decoder for HackReceiptFileCodec<R>
type Item = Option<ReceiptWithBlockNumber>; where
R: TryFrom<HackReceipt, Error: Into<FileClientError>>,
{
type Item = Option<ReceiptWithBlockNumber<R>>;
type Error = FileClientError; type Error = FileClientError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@ -42,7 +47,15 @@ impl Decoder for HackReceiptFileCodec {
src.advance(src.len() - buf_slice.len()); src.advance(src.len() - buf_slice.len());
Ok(Some( Ok(Some(
receipt.map(|receipt| receipt.try_into().map_err(FileClientError::from)).transpose()?, receipt
.map(|receipt| {
let number = receipt.block_number;
receipt
.try_into()
.map_err(Into::into)
.map(|receipt| ReceiptWithBlockNumber { receipt, number })
})
.transpose()?,
)) ))
} }
} }
@ -73,23 +86,54 @@ pub struct HackReceipt {
#[rlp(trailing)] #[rlp(trailing)]
struct HackReceiptContainer(Option<HackReceipt>); struct HackReceiptContainer(Option<HackReceipt>);
impl TryFrom<HackReceipt> for ReceiptWithBlockNumber { impl TryFrom<HackReceipt> for Receipt {
type Error = &'static str; type Error = &'static str;
fn try_from(exported_receipt: HackReceipt) -> Result<Self, Self::Error> { fn try_from(exported_receipt: HackReceipt) -> Result<Self, Self::Error> {
let HackReceipt { let HackReceipt { tx_type, status, cumulative_gas_used, logs, .. } = exported_receipt;
tx_type, status, cumulative_gas_used, logs, block_number: number, ..
} = exported_receipt;
#[allow(clippy::needless_update)] #[allow(clippy::needless_update)]
let receipt = Receipt { Ok(Self {
tx_type: TxType::try_from(tx_type.to_be_bytes()[0])?, tx_type: TxType::try_from(tx_type.to_be_bytes()[0])?,
success: status != 0, success: status != 0,
cumulative_gas_used, cumulative_gas_used,
logs, logs,
..Default::default() ..Default::default()
})
}
}
impl TryFrom<HackReceipt> for OpReceipt {
type Error = &'static str;
fn try_from(exported_receipt: HackReceipt) -> Result<Self, Self::Error> {
let Receipt {
tx_type,
success,
cumulative_gas_used,
logs,
deposit_nonce,
deposit_receipt_version,
} = exported_receipt.try_into()?;
let receipt = alloy_consensus::Receipt {
status: success.into(),
cumulative_gas_used: cumulative_gas_used as u128,
logs,
}; };
Ok(Self { receipt, number }) match tx_type {
TxType::Legacy => Ok(Self::Legacy(receipt)),
TxType::Eip2930 => Ok(Self::Eip2930(receipt)),
TxType::Eip1559 => Ok(Self::Eip1559(receipt)),
TxType::Eip7702 => Ok(Self::Eip7702(receipt)),
TxType::Eip4844 => Err("EIP-4844 receipts are not supported for OP"),
TxType::Deposit => Ok(Self::Deposit(OpDepositReceipt {
inner: receipt,
deposit_nonce,
deposit_receipt_version,
})),
}
} }
} }
@ -326,7 +370,7 @@ pub(crate) mod test {
let encoded = &mut BytesMut::from(&receipt_1_to_3[..]); let encoded = &mut BytesMut::from(&receipt_1_to_3[..]);
let mut codec = HackReceiptFileCodec; let mut codec = HackReceiptFileCodec::default();
// test // test