fix(op): receipts import, fix chunked read of file with optional block data (#10577)

This commit is contained in:
Emilia Hane
2024-08-28 10:03:23 +02:00
committed by GitHub
parent b1d6e27e3f
commit 0b6480c859
4 changed files with 127 additions and 74 deletions

View File

@ -1,4 +1,5 @@
use super::file_codec::BlockFileCodec;
use std::{collections::HashMap, io, path::Path};
use futures::Future;
use itertools::Either;
use reth_network_p2p::{
@ -12,13 +13,16 @@ use reth_network_peers::PeerId;
use reth_primitives::{
BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, Header, SealedHeader, B256,
};
use std::{collections::HashMap, io, path::Path};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::{debug, trace, warn};
use crate::receipt_file_client::FromReceiptReader;
use super::file_codec::BlockFileCodec;
/// Default byte length of chunk to read from chain file.
///
/// Default is 1 GB.
@ -85,7 +89,7 @@ impl FileClient {
let mut reader = vec![];
file.read_to_end(&mut reader).await?;
Ok(Self::from_reader(&reader[..], file_len).await?.0)
Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
}
/// Get the tip hash of the chain.
@ -184,7 +188,7 @@ impl FromReader for FileClient {
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
B: AsyncReadExt + Unpin,
{
@ -247,7 +251,11 @@ impl FromReader for FileClient {
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
Ok((Self { headers, hash_to_number, bodies }, remaining_bytes))
Ok(DecodedFileChunk {
file_client: Self { headers, hash_to_number, bodies },
remaining_bytes,
highest_block: None,
})
}
}
}
@ -349,6 +357,9 @@ pub struct ChunkedFileReader {
chunk: Vec<u8>,
/// Max bytes per chunk.
chunk_byte_len: u64,
/// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
/// with block number
highest_block: Option<u64>,
}
impl ChunkedFileReader {
@ -375,7 +386,7 @@ impl ChunkedFileReader {
let metadata = file.metadata().await?;
let file_byte_len = metadata.len();
Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len })
Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
}
/// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
@ -392,11 +403,9 @@ impl ChunkedFileReader {
}
}
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
/// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
/// chunk to read.
async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
if self.file_byte_len == 0 && self.chunk.is_empty() {
dbg!(self.chunk.is_empty());
// eof
@ -431,12 +440,42 @@ impl ChunkedFileReader {
"new bytes were read from file"
);
Ok(Some(next_chunk_byte_len as u64))
}
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
// make new file client from chunk
let (file_client, bytes) =
T::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?;
let DecodedFileChunk { file_client, remaining_bytes, .. } =
T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;
// save left over bytes
self.chunk = bytes;
self.chunk = remaining_bytes;
Ok(Some(file_client))
}
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_receipts_chunk<T, D>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReceiptReader<D>,
{
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
// make new file client from chunk
let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
.await?;
// save left over bytes
self.chunk = remaining_bytes;
// update highest block
self.highest_block = highest_block;
Ok(Some(file_client))
}
@ -446,16 +485,29 @@ impl ChunkedFileReader {
pub trait FromReader {
/// Error returned by file client type.
type Error: From<io::Error>;
/// Returns a file client
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
}
/// Output from decoding a file chunk with [`FromReader::from_reader`].
#[derive(Debug)]
pub struct DecodedFileChunk<T> {
/// File client, i.e. the decoded part of chunk.
pub file_client: T,
/// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
pub remaining_bytes: Vec<u8>,
/// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
/// block number, like receipts.
pub highest_block: Option<u64>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -40,3 +40,5 @@ pub mod file_codec;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
pub use file_client::{DecodedFileChunk, FileClientError};

View File

@ -1,4 +1,4 @@
use std::marker::PhantomData;
use std::{fmt, io, marker::PhantomData};
use futures::Future;
use reth_primitives::{Receipt, Receipts};
@ -7,7 +7,7 @@ use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, FramedRead};
use tracing::trace;
use crate::file_client::{FileClientError, FromReader};
use crate::{DecodedFileChunk, FileClientError};
/// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential
/// order w.r.t. block number.
@ -26,7 +26,7 @@ pub struct ReceiptFileClient<D> {
/// Constructs a file client from a reader and decoder.
pub trait FromReceiptReader<D> {
/// Error returned by file client type.
type Error: From<std::io::Error>;
type Error: From<io::Error>;
/// Returns a decoder instance
fn decoder() -> D;
@ -34,59 +34,40 @@ pub trait FromReceiptReader<D> {
/// Returns a file client
fn from_receipt_reader<B>(
reader: B,
decoder: D,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
prev_chunk_highest_block: Option<u64>,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
}
impl<D> FromReader for ReceiptFileClient<D>
where
D: Decoder<Item = Option<ReceiptWithBlockNumber>, Error = FileClientError>
+ std::fmt::Debug
+ Default,
{
type Error = D::Error;
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
where
B: AsyncReadExt + Unpin,
{
Self::from_receipt_reader(reader, Self::decoder(), num_bytes)
}
}
impl<D> FromReceiptReader<D> for ReceiptFileClient<D>
where
D: Decoder<Item = Option<ReceiptWithBlockNumber>, Error = FileClientError>
+ std::fmt::Debug
+ fmt::Debug
+ Default,
{
type Error = D::Error;
fn decoder() -> D {
Default::default()
D::default()
}
/// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If
/// first block has no transactions, it's assumed to be the genesis block.
fn from_receipt_reader<B>(
reader: B,
decoder: D,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
prev_chunk_highest_block: Option<u64>,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
B: AsyncReadExt + Unpin,
{
let mut receipts = Receipts::default();
// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, decoder, num_bytes as usize);
let mut stream = FramedRead::with_capacity(reader, Self::decoder(), num_bytes as usize);
trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
@ -152,10 +133,16 @@ where
block_number = num + receipts.len() as u64;
}
None => {
// this is the first block and it's empty, assume it's the genesis
// block
first_block = Some(0);
block_number = 0;
// this is the first block and it's empty
if let Some(highest_block) = prev_chunk_highest_block {
// this is a chunked read and this is not the first chunk
block_number = highest_block + 1;
} else {
// this is not a chunked read or this is the first chunk. assume
// it's the genesis block
block_number = 0;
}
first_block = Some(block_number);
}
}
@ -196,15 +183,16 @@ where
"Initialized receipt file client"
);
Ok((
Self {
Ok(DecodedFileChunk {
file_client: Self {
receipts,
first_block: first_block.unwrap_or_default(),
total_receipts,
_marker: Default::default(),
},
remaining_bytes,
))
highest_block: Some(block_number),
})
}
}
}
@ -220,10 +208,6 @@ pub struct ReceiptWithBlockNumber {
#[cfg(test)]
mod test {
use crate::{
file_client::{FileClientError, FromReader},
receipt_file_client::{ReceiptFileClient, ReceiptWithBlockNumber},
};
use alloy_rlp::{Decodable, RlpDecodable};
use reth_primitives::{
hex, Address, Buf, Bytes, BytesMut, Log, LogData, Receipt, TxType, B256,
@ -231,6 +215,9 @@ mod test {
use reth_tracing::init_test_tracing;
use tokio_util::codec::Decoder;
use super::{FromReceiptReader, ReceiptFileClient, ReceiptWithBlockNumber};
use crate::{DecodedFileChunk, FileClientError};
#[derive(Debug, PartialEq, Eq, RlpDecodable)]
struct MockReceipt {
tx_type: u8,
@ -578,12 +565,16 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];
let (
ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<MockReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
let DecodedFileChunk {
file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
..
} = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
reader,
encoded_byte_len,
None,
)
.await
.unwrap();
// 2 non-empty receipt objects
assert_eq!(2, total_receipts);
@ -610,12 +601,16 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];
let (
ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<MockReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
let DecodedFileChunk {
file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
..
} = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
reader,
encoded_byte_len,
None,
)
.await
.unwrap();
// 2 non-empty receipt objects
assert_eq!(2, total_receipts);
@ -643,12 +638,16 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];
let (
ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<MockReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
let DecodedFileChunk {
file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
..
} = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
reader,
encoded_byte_len,
None,
)
.await
.unwrap();
// 4 non-empty receipt objects
assert_eq!(4, total_receipts);