feat: implement headers for FileClient (#1113)

This commit is contained in:
Dan Cline
2023-02-02 17:43:43 -05:00
committed by GitHub
parent e09b3e098a
commit 9d4bd5e004
9 changed files with 308 additions and 36 deletions

View File

@ -28,6 +28,9 @@ metrics = "0.20.1"
thiserror = { version = "1", optional = true }
reth-rlp = { path = "../../rlp", optional = true }
tokio-util = { version = "0.7", features = ["codec"], optional = true }
bytes = { version = "1", optional = true }
tempfile = { version = "3.3", optional = true }
[dev-dependencies]
reth-db = { path = "../../storage/db", features = ["test-utils"] }
@ -36,10 +39,12 @@ reth-tracing = { path = "../../tracing" }
assert_matches = "1.5.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7", features = ["codec"] }
reth-rlp = { path = "../../rlp" }
bytes = "1"
thiserror = "1"
tempfile = "3.3"
[features]
test-utils = ["dep:reth-rlp", "dep:thiserror"]
test-utils = ["dep:reth-rlp", "dep:thiserror", "dep:tokio-util", "dep:tempfile"]

View File

@ -8,5 +8,5 @@ pub mod task;
mod queue;
mod request;
#[cfg(test)]
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

View File

@ -1,10 +1,12 @@
#![allow(unused)]
//! Test helper impls for generating bodies
use reth_db::{
database::Database,
mdbx::{Env, WriteMap},
tables,
transaction::DbTxMut,
};
use reth_eth_wire::BlockBody;
use reth_eth_wire::{BlockBody, RawBlockBody};
use reth_interfaces::{db, p2p::bodies::response::BlockResponse};
use reth_primitives::{SealedBlock, SealedHeader, H256};
use std::collections::HashMap;
@ -30,6 +32,19 @@ pub(crate) fn zip_blocks<'a>(
.collect()
}
pub(crate) fn create_raw_bodies<'a>(
headers: impl Iterator<Item = &'a SealedHeader>,
bodies: &mut HashMap<H256, BlockBody>,
) -> Vec<RawBlockBody> {
headers
.into_iter()
.map(|header| {
let body = bodies.remove(&header.hash()).expect("body exists");
body.create_block(header)
})
.collect()
}
#[inline]
pub(crate) fn insert_headers(db: &Env<WriteMap>, headers: &[SealedHeader]) {
db.update(|tx| -> Result<(), db::Error> {

View File

@ -4,5 +4,5 @@ pub mod reverse_headers;
/// A downloader implementation that spawns a downloader to a task
pub mod task;
#[cfg(test)]
mod test_utils;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

View File

@ -1,3 +1,5 @@
#![allow(unused)]
//! Test helper impls for generating bodies
use reth_primitives::SealedHeader;
/// Returns a new [SealedHeader] that's the child header of the given `parent`.

View File

@ -1,25 +1,35 @@
use super::file_codec::BlockFileCodec;
use reth_eth_wire::{BlockBody, RawBlockBody};
use reth_interfaces::{
p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::RequestError,
headers::client::{HeadersClient, HeadersFut, HeadersRequest},
priority::Priority,
},
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
};
use reth_primitives::{
Block, BlockHash, BlockHashOrNumber, BlockNumber, Header, HeadersDirection, PeerId, H256,
};
use reth_rlp::{Decodable, Header as RlpHeader};
use std::{
collections::HashMap,
iter::zip,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::RequestError,
priority::Priority,
},
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
};
use reth_primitives::{BlockHash, BlockNumber, Header, PeerId, H256};
use thiserror::Error;
use tokio::{fs::File, io::BufReader};
use tokio::{
fs::File,
io::{AsyncReadExt, BufReader},
};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::warn;
/// Front-end API for fetching chain data from a file.
@ -31,14 +41,16 @@ use tracing::warn;
///
/// Blocks are assumed to have populated transactions, so reading headers will also buffer
/// transactions in memory for use in the bodies stage.
///
/// This reads the entire file into memory, so it is not suitable for large files.
#[derive(Debug)]
pub struct FileClient {
/// The open reader for the file.
reader: BufReader<File>,
/// The buffered headers retrieved when fetching new bodies.
headers: HashMap<BlockNumber, Header>,
/// A mapping between block hash and number.
hash_to_number: HashMap<BlockHash, BlockNumber>,
/// The buffered bodies retrieved when fetching new headers.
bodies: HashMap<BlockHash, BlockBody>,
@ -62,19 +74,44 @@ impl FileClient {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
FileClient::from_file(file)
FileClient::from_file(file).await
}
/// Initialize the [`FileClient`](FileClient) with a file directly.
pub(crate) fn from_file(file: File) -> Result<Self, FileClientError> {
let reader = BufReader::new(file);
pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
// get file len from metadata before reading
let metadata = file.metadata().await?;
let file_len = metadata.len();
Ok(Self {
reader,
headers: HashMap::new(),
bodies: HashMap::new(),
is_syncing: Arc::new(Default::default()),
})
// read the entire file into memory
let mut reader = vec![];
file.read_to_end(&mut reader).await.unwrap();
let mut headers = HashMap::new();
let mut hash_to_number = HashMap::new();
let mut bodies = HashMap::new();
// use with_capacity to make sure the internal buffer contains the entire file
let mut stream = FramedRead::with_capacity(&reader[..], BlockFileCodec, file_len as usize);
let mut block_num = 0;
while let Some(block_res) = stream.next().await {
let block = block_res?;
let block_hash = block.header.hash_slow();
// add to the internal maps
headers.insert(block_num, block.header.clone());
hash_to_number.insert(block_hash, block_num);
bodies.insert(
block_hash,
BlockBody { transactions: block.transactions, ommers: block.ommers },
);
// update block num
block_num += 1;
}
Ok(Self { headers, hash_to_number, bodies, is_syncing: Arc::new(Default::default()) })
}
/// Use the provided bodies as the file client's block body buffer.
@ -86,10 +123,48 @@ impl FileClient {
/// Use the provided headers as the file client's block body buffer.
pub(crate) fn with_headers(mut self, headers: HashMap<BlockNumber, Header>) -> Self {
self.headers = headers;
for (number, header) in &self.headers {
self.hash_to_number.insert(header.hash_slow(), *number);
}
self
}
}
impl HeadersClient for FileClient {
type Output = HeadersFut;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
_priority: Priority,
) -> Self::Output {
// this just searches the buffer, and fails if it can't find the header
let mut headers = Vec::new();
let start_num = match request.start {
BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
Some(num) => *num,
None => return Box::pin(async move { Err(RequestError::BadResponse) }),
},
BlockHashOrNumber::Number(num) => num,
};
let range = match request.direction {
HeadersDirection::Rising => start_num..=start_num + 1 - request.limit,
HeadersDirection::Falling => start_num + 1 - request.limit..=start_num,
};
for block_number in range {
match self.headers.get(&block_number).cloned() {
Some(header) => headers.push(header),
None => return Box::pin(async move { Err(RequestError::BadResponse) }),
}
}
Box::pin(async move { Ok((PeerId::default(), headers).into()) })
}
}
impl BodiesClient for FileClient {
type Output = BodiesFut;
@ -145,15 +220,30 @@ mod tests {
use crate::{
bodies::{
bodies::BodiesDownloaderBuilder,
test_utils::{insert_headers, zip_blocks},
test_utils::{create_raw_bodies, insert_headers, zip_blocks},
},
test_utils::generate_bodies,
headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
test_utils::{generate_bodies, generate_bodies_file},
};
use assert_matches::assert_matches;
use futures::SinkExt;
use futures_util::stream::StreamExt;
use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap};
use reth_interfaces::{p2p::bodies::downloader::BodyDownloader, test_utils::TestConsensus};
use std::sync::Arc;
use reth_interfaces::{
p2p::{
bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget},
},
test_utils::TestConsensus,
};
use reth_primitives::SealedHeader;
use reth_rlp::Encodable;
use std::{
io::{Read, Seek, SeekFrom, Write},
sync::Arc,
};
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
use tokio_util::codec::FramedWrite;
#[tokio::test]
async fn streams_bodies_from_buffer() {
@ -167,7 +257,89 @@ mod tests {
let file = tempfile::tempfile().unwrap();
let client =
Arc::new(FileClient::from_file(file.into()).unwrap().with_bodies(bodies.clone()));
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
);
downloader.set_download_range(0..20).expect("failed to set download range");
assert_matches!(
downloader.next().await,
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
);
}
#[tokio::test]
async fn download_headers_at_fork_head() {
reth_tracing::init_test_tracing();
let p3 = SealedHeader::default();
let p2 = child_header(&p3);
let p1 = child_header(&p2);
let p0 = child_header(&p1);
let file = tempfile::tempfile().unwrap();
let client = Arc::new(FileClient::from_file(file.into()).await.unwrap().with_headers(
HashMap::from([
(0u64, p0.clone().unseal()),
(1, p1.clone().unseal()),
(2, p2.clone().unseal()),
(3, p3.clone().unseal()),
]),
));
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(3)
.request_limit(3)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(p3.clone());
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p0, p1, p2,]);
assert!(downloader.next().await.is_none());
assert!(downloader.next().await.is_none());
}
#[tokio::test]
async fn test_download_headers_from_file() {
// Generate some random blocks
let db = create_test_db::<WriteMap>(EnvKind::RW);
let (file, headers, mut bodies) = generate_bodies_file(0..20).await;
// now try to read them back
let client = Arc::new(FileClient::from_file(file).await.unwrap());
// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
header_downloader.update_local_head(headers.first().unwrap().clone());
header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
// get headers first
let mut downloaded_headers = header_downloader.next().await.unwrap();
// reverse to make sure it's in the right order before comparing
downloaded_headers.reverse();
// the first header is not included in the response
assert_eq!(downloaded_headers, headers[1..]);
}
#[tokio::test]
async fn test_download_bodies_from_file() {
// Generate some random blocks
let db = create_test_db::<WriteMap>(EnvKind::RW);
let (file, headers, mut bodies) = generate_bodies_file(0..20).await;
// now try to read them back
let client = Arc::new(FileClient::from_file(file).await.unwrap());
// insert headers in db for the bodies downloader
insert_headers(&db, &headers);
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),

View File

@ -0,0 +1,45 @@
//! Codec for reading raw block bodies from a file.
use super::FileClientError;
use bytes::{Buf, BytesMut};
use reth_eth_wire::RawBlockBody;
use reth_rlp::{Decodable, Encodable};
use tokio_util::codec::{Decoder, Encoder};
/// Codec for reading raw block bodies from a file.
///
/// If using with [`FramedRead`](tokio_util::codec::FramedRead), the user should make sure the
/// framed reader has capacity for the entire block file. Otherwise, the decoder will return
/// [`InputTooShort`](reth_rlp::DecodeError::InputTooShort), because RLP headers can only be
/// decoded if the internal buffer is large enough to contain the entire block body.
///
/// Without ensuring the framed reader has capacity for the entire file, a block body is likely to
/// fall across two read buffers, the decoder will not be able to decode the header, which will
/// cause it to fail.
///
/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set
/// the capacity of the framed reader to the size of the file.
pub(crate) struct BlockFileCodec;
impl Decoder for BlockFileCodec {
type Item = RawBlockBody;
type Error = FileClientError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None)
}
let mut buf_slice = &mut src.as_ref();
let body = RawBlockBody::decode(buf_slice)?;
src.advance(src.len() - buf_slice.len());
Ok(Some(body))
}
}
impl Encoder<RawBlockBody> for BlockFileCodec {
type Error = FileClientError;
fn encode(&mut self, item: RawBlockBody, dst: &mut BytesMut) -> Result<(), Self::Error> {
item.encode(dst);
Ok(())
}
}

View File

@ -1,9 +1,16 @@
#![allow(unused)]
//! Test helper impls
use crate::bodies::test_utils::create_raw_bodies;
use futures::SinkExt;
use reth_eth_wire::BlockBody;
use reth_interfaces::test_utils::generators::random_block_range;
use reth_primitives::{SealedHeader, H256};
use std::collections::HashMap;
use std::{collections::HashMap, io::SeekFrom};
use tokio::{
fs::File,
io::{AsyncSeekExt, AsyncWriteExt, BufWriter},
};
use tokio_util::codec::FramedWrite;
/// Metrics scope used for testing.
pub(crate) const TEST_SCOPE: &str = "downloaders.test";
@ -31,8 +38,32 @@ pub(crate) fn generate_bodies(
(headers, bodies)
}
/// Generate a set of bodies, write them to a temporary file, and return the file along with the
/// bodies and corresponding block hashes
pub(crate) async fn generate_bodies_file(
rng: std::ops::Range<u64>,
) -> (tokio::fs::File, Vec<SealedHeader>, HashMap<H256, BlockBody>) {
let (headers, mut bodies) = generate_bodies(0..20);
let raw_block_bodies = create_raw_bodies(headers.clone().iter(), &mut bodies.clone());
let mut file: File = tempfile::tempfile().unwrap().into();
let mut writer = FramedWrite::new(file, BlockFileCodec);
// rlp encode one after the other
for block in raw_block_bodies {
writer.send(block).await.unwrap();
}
// get the file back
let mut file: File = writer.into_inner();
file.seek(SeekFrom::Start(0)).await.unwrap();
(file, headers, bodies)
}
mod file_client;
mod file_codec;
mod test_client;
pub use file_client::{FileClient, FileClientError};
pub(crate) use file_codec::BlockFileCodec;
pub use test_client::TestBodiesClient;