mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
feat(download): bodies task downloader and header selection fix (#1049)
This commit is contained in:
@ -1,16 +1,17 @@
|
|||||||
use super::response::BlockResponse;
|
use super::response::BlockResponse;
|
||||||
use crate::db;
|
use crate::p2p::error::DownloadResult;
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use reth_primitives::BlockNumber;
|
use reth_primitives::BlockNumber;
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
|
|
||||||
|
/// Body downloader return type.
|
||||||
|
pub type BodyDownloaderResult = DownloadResult<Vec<BlockResponse>>;
|
||||||
|
|
||||||
/// A downloader capable of fetching and yielding block bodies from block headers.
|
/// A downloader capable of fetching and yielding block bodies from block headers.
|
||||||
///
|
///
|
||||||
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
|
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
|
||||||
/// while a [BodiesClient] represents a client capable of fulfilling these requests.
|
/// while a [BodiesClient] represents a client capable of fulfilling these requests.
|
||||||
pub trait BodyDownloader:
|
pub trait BodyDownloader: Send + Sync + Stream<Item = BodyDownloaderResult> + Unpin {
|
||||||
Send + Sync + Stream<Item = Result<Vec<BlockResponse>, db::Error>> + Unpin
|
|
||||||
{
|
|
||||||
/// Method for setting the download range.
|
/// Method for setting the download range.
|
||||||
fn set_download_range(&mut self, range: Range<BlockNumber>) -> Result<(), db::Error>;
|
fn set_download_range(&mut self, range: Range<BlockNumber>) -> DownloadResult<()>;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
use super::headers::client::HeadersRequest;
|
use super::headers::client::HeadersRequest;
|
||||||
use crate::consensus;
|
use crate::{consensus, db};
|
||||||
use reth_primitives::{rpc::BlockNumber, BlockHashOrNumber, Header, WithPeerId, H256};
|
use reth_primitives::{BlockHashOrNumber, BlockNumber, Header, WithPeerId, H256};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
@ -154,6 +154,12 @@ pub enum DownloadError {
|
|||||||
/// How many bodies we expected.
|
/// How many bodies we expected.
|
||||||
expected: usize,
|
expected: usize,
|
||||||
},
|
},
|
||||||
|
/// Headers missing from the database.
|
||||||
|
#[error("Header missing from the database: {block_number}")]
|
||||||
|
MissingHeader {
|
||||||
|
/// Missing header block number.
|
||||||
|
block_number: BlockNumber,
|
||||||
|
},
|
||||||
/* ==================== COMMON ERRORS ==================== */
|
/* ==================== COMMON ERRORS ==================== */
|
||||||
/// Timed out while waiting for request id response.
|
/// Timed out while waiting for request id response.
|
||||||
#[error("Timed out while waiting for response.")]
|
#[error("Timed out while waiting for response.")]
|
||||||
@ -164,4 +170,7 @@ pub enum DownloadError {
|
|||||||
/// Error while executing the request.
|
/// Error while executing the request.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
RequestError(#[from] RequestError),
|
RequestError(#[from] RequestError),
|
||||||
|
/// Error while reading data from database.
|
||||||
|
#[error(transparent)]
|
||||||
|
DatabaseError(#[from] db::Error),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -83,8 +83,8 @@ pub fn validate_header_download(
|
|||||||
pub fn ensure_parent(header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
|
pub fn ensure_parent(header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
|
||||||
if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) {
|
if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) {
|
||||||
return Err(DownloadError::MismatchedHeaders {
|
return Err(DownloadError::MismatchedHeaders {
|
||||||
header_number: header.number.into(),
|
header_number: header.number,
|
||||||
parent_number: parent.number.into(),
|
parent_number: parent.number,
|
||||||
header_hash: header.hash(),
|
header_hash: header.hash(),
|
||||||
parent_hash: parent.hash(),
|
parent_hash: parent.hash(),
|
||||||
})
|
})
|
||||||
|
|||||||
@ -6,8 +6,14 @@ use futures_util::StreamExt;
|
|||||||
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
|
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
|
||||||
use reth_interfaces::{
|
use reth_interfaces::{
|
||||||
consensus::Consensus,
|
consensus::Consensus,
|
||||||
db,
|
p2p::{
|
||||||
p2p::bodies::{client::BodiesClient, downloader::BodyDownloader, response::BlockResponse},
|
bodies::{
|
||||||
|
client::BodiesClient,
|
||||||
|
downloader::{BodyDownloader, BodyDownloaderResult},
|
||||||
|
response::BlockResponse,
|
||||||
|
},
|
||||||
|
error::{DownloadError, DownloadResult},
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use reth_primitives::{BlockNumber, SealedHeader};
|
use reth_primitives::{BlockNumber, SealedHeader};
|
||||||
use std::{
|
use std::{
|
||||||
@ -72,7 +78,7 @@ where
|
|||||||
DB: Database,
|
DB: Database,
|
||||||
{
|
{
|
||||||
/// Returns the next contiguous request.
|
/// Returns the next contiguous request.
|
||||||
fn next_headers_request(&mut self) -> Result<Option<Vec<SealedHeader>>, db::Error> {
|
fn next_headers_request(&mut self) -> DownloadResult<Option<Vec<SealedHeader>>> {
|
||||||
let start_at = match self.in_progress_queue.last_requested_block_number {
|
let start_at = match self.in_progress_queue.last_requested_block_number {
|
||||||
Some(num) => num + 1,
|
Some(num) => num + 1,
|
||||||
None => self.download_range.start,
|
None => self.download_range.start,
|
||||||
@ -83,12 +89,7 @@ where
|
|||||||
return Ok(None)
|
return Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
let request = self.query_headers(start_at, limit)?;
|
self.query_headers(start_at..self.download_range.end, limit)
|
||||||
if request.is_empty() {
|
|
||||||
return Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Some(request))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Retrieve a batch of headers from the database starting from provided block number.
|
/// Retrieve a batch of headers from the database starting from provided block number.
|
||||||
@ -98,37 +99,61 @@ where
|
|||||||
/// 1. The number of non-empty headers in the batch equals requested.
|
/// 1. The number of non-empty headers in the batch equals requested.
|
||||||
/// 2. The total number of headers in the batch (both empty and non-empty)
|
/// 2. The total number of headers in the batch (both empty and non-empty)
|
||||||
/// is greater than or equal to the stream batch size.
|
/// is greater than or equal to the stream batch size.
|
||||||
/// 3. There are no more headers in the database.
|
/// 3. Downloader reached the end of the range
|
||||||
///
|
///
|
||||||
/// NOTE: The batches returned have a variable length.
|
/// NOTE: The batches returned have a variable length.
|
||||||
fn query_headers(
|
fn query_headers(
|
||||||
&self,
|
&self,
|
||||||
start: BlockNumber,
|
range: Range<BlockNumber>,
|
||||||
max_non_empty: u64,
|
max_non_empty: u64,
|
||||||
) -> Result<Vec<SealedHeader>, db::Error> {
|
) -> DownloadResult<Option<Vec<SealedHeader>>> {
|
||||||
let tx = self.db.tx()?;
|
if range.start >= self.download_range.end {
|
||||||
|
return Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
// Acquire cursors over canonical and header tables
|
// Acquire cursors over canonical and header tables
|
||||||
|
let tx = self.db.tx()?;
|
||||||
let mut canonical_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
|
let mut canonical_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
|
||||||
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
|
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
|
||||||
|
|
||||||
|
// Non empty headers count
|
||||||
let mut non_empty_headers = 0;
|
let mut non_empty_headers = 0;
|
||||||
|
|
||||||
|
// Collection of results
|
||||||
let mut headers = Vec::<SealedHeader>::default();
|
let mut headers = Vec::<SealedHeader>::default();
|
||||||
let mut canonical_entry = canonical_cursor.seek_exact(start)?;
|
|
||||||
while let Some((number, hash)) = canonical_entry {
|
let mut current_block_num = range.start;
|
||||||
let (_, header) =
|
|
||||||
header_cursor.seek_exact((number, hash).into())?.expect("database corrupted");
|
// Collect headers while
|
||||||
|
// 1. Current block number is in range
|
||||||
|
// 2. The number of non empty headers is less than maximum
|
||||||
|
// 3. The total number of headers is less than the stream batch size
|
||||||
|
while current_block_num < range.end &&
|
||||||
|
non_empty_headers < max_non_empty &&
|
||||||
|
headers.len() < self.stream_batch_size
|
||||||
|
{
|
||||||
|
// Find the block hash
|
||||||
|
let (number, hash) = canonical_cursor
|
||||||
|
.seek_exact(current_block_num)?
|
||||||
|
.ok_or(DownloadError::MissingHeader { block_number: current_block_num })?;
|
||||||
|
// Find the block number
|
||||||
|
let (_, header) = header_cursor
|
||||||
|
.seek_exact((number, hash).into())?
|
||||||
|
.ok_or(DownloadError::MissingHeader { block_number: number })?;
|
||||||
|
|
||||||
|
// If the header is not empty, increment the counter
|
||||||
if !header.is_empty() {
|
if !header.is_empty() {
|
||||||
non_empty_headers += 1;
|
non_empty_headers += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add header to the result collection
|
||||||
headers.push(SealedHeader::new(header, hash));
|
headers.push(SealedHeader::new(header, hash));
|
||||||
if non_empty_headers >= max_non_empty || headers.len() >= self.stream_batch_size {
|
|
||||||
break
|
// Increment current block number
|
||||||
}
|
current_block_num += 1;
|
||||||
canonical_entry = canonical_cursor.next()?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(headers)
|
Ok(Some(headers).filter(|h| !h.is_empty()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the next expected block number for queueing.
|
/// Get the next expected block number for queueing.
|
||||||
@ -230,13 +255,13 @@ where
|
|||||||
/// back into the buffer.
|
/// back into the buffer.
|
||||||
/// If there are any bodies between the range start and last queued body that have not been
|
/// If there are any bodies between the range start and last queued body that have not been
|
||||||
/// downloaded or are not in progress, they will be re-requested.
|
/// downloaded or are not in progress, they will be re-requested.
|
||||||
fn set_download_range(&mut self, range: Range<BlockNumber>) -> Result<(), db::Error> {
|
fn set_download_range(&mut self, range: Range<BlockNumber>) -> DownloadResult<()> {
|
||||||
if range.is_empty() {
|
if range.is_empty() {
|
||||||
tracing::warn!(target: "downloaders::bodies", "New header range is empty");
|
tracing::warn!(target: "downloaders::bodies", "New download range is empty");
|
||||||
return Ok(())
|
return Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::trace!(target: "downloaders::bodies", ?range, "Setting new header range");
|
tracing::trace!(target: "downloaders::bodies", ?range, "Setting new download range");
|
||||||
|
|
||||||
// Drain queued bodies.
|
// Drain queued bodies.
|
||||||
let queued_bodies = std::mem::take(&mut self.queued_bodies)
|
let queued_bodies = std::mem::take(&mut self.queued_bodies)
|
||||||
@ -271,11 +296,20 @@ where
|
|||||||
} else if request_range.end + 1 == num {
|
} else if request_range.end + 1 == num {
|
||||||
request_range.end = num;
|
request_range.end = num;
|
||||||
} else {
|
} else {
|
||||||
|
let headers = self
|
||||||
|
.query_headers(
|
||||||
|
request_range.start..request_range.end + 1, // exclusive
|
||||||
|
request_range.clone().count() as u64,
|
||||||
|
)?
|
||||||
|
.ok_or(DownloadError::MissingHeader {
|
||||||
|
block_number: request_range.start,
|
||||||
|
})?;
|
||||||
|
|
||||||
// Dispatch contiguous request.
|
// Dispatch contiguous request.
|
||||||
self.in_progress_queue.push_new_request(
|
self.in_progress_queue.push_new_request(
|
||||||
Arc::clone(&self.client),
|
Arc::clone(&self.client),
|
||||||
Arc::clone(&self.consensus),
|
Arc::clone(&self.consensus),
|
||||||
self.query_headers(request_range.start, request_range.count() as u64)?,
|
headers,
|
||||||
);
|
);
|
||||||
// Clear the current request range
|
// Clear the current request range
|
||||||
request_range = Range::default();
|
request_range = Range::default();
|
||||||
@ -291,7 +325,7 @@ where
|
|||||||
|
|
||||||
self.download_range = range;
|
self.download_range = range;
|
||||||
self.latest_queued_block_number = None;
|
self.latest_queued_block_number = None;
|
||||||
tracing::trace!(target: "downloaders::bodies", range = ?self.download_range, "New header range set");
|
tracing::trace!(target: "downloaders::bodies", range = ?self.download_range, "New download range set");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -301,7 +335,7 @@ where
|
|||||||
B: BodiesClient + 'static,
|
B: BodiesClient + 'static,
|
||||||
DB: Database,
|
DB: Database,
|
||||||
{
|
{
|
||||||
type Item = Result<Vec<BlockResponse>, db::Error>;
|
type Item = BodyDownloaderResult;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let this = self.get_mut();
|
let this = self.get_mut();
|
||||||
@ -314,7 +348,9 @@ where
|
|||||||
loop {
|
loop {
|
||||||
// Poll requests
|
// Poll requests
|
||||||
while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) {
|
while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) {
|
||||||
|
println!("RESPONSE LEN >> {}", response.len());
|
||||||
let response = OrderedBodiesResponse(response);
|
let response = OrderedBodiesResponse(response);
|
||||||
|
println!("RESPONSE RANGE >> {:?}", response.block_range());
|
||||||
this.buffered_responses.push(response);
|
this.buffered_responses.push(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -521,31 +557,15 @@ impl ConcurrentDownloaderBuilder {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
bodies::test_utils::zip_blocks,
|
bodies::test_utils::{insert_headers, zip_blocks},
|
||||||
test_utils::{generate_bodies, TestBodiesClient},
|
test_utils::{generate_bodies, TestBodiesClient},
|
||||||
};
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use futures_util::stream::StreamExt;
|
use futures_util::stream::StreamExt;
|
||||||
use reth_db::{
|
use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap};
|
||||||
mdbx::{test_utils::create_test_db, Env, EnvKind, WriteMap},
|
|
||||||
transaction::DbTxMut,
|
|
||||||
};
|
|
||||||
use reth_interfaces::test_utils::TestConsensus;
|
use reth_interfaces::test_utils::TestConsensus;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn insert_headers(db: &Env<WriteMap>, headers: &[SealedHeader]) {
|
|
||||||
db.update(|tx| -> Result<(), db::Error> {
|
|
||||||
for header in headers {
|
|
||||||
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
|
|
||||||
tx.put::<tables::Headers>(header.num_hash().into(), header.clone().unseal())?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.expect("failed to commit")
|
|
||||||
.expect("failed to insert headers");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check that the blocks are emitted in order of block number, not in order of
|
// Check that the blocks are emitted in order of block number, not in order of
|
||||||
// first-downloaded
|
// first-downloaded
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@ -564,7 +584,7 @@ mod tests {
|
|||||||
Arc::new(TestConsensus::default()),
|
Arc::new(TestConsensus::default()),
|
||||||
db,
|
db,
|
||||||
);
|
);
|
||||||
downloader.set_download_range(0..20).expect("failed to set header range");
|
downloader.set_download_range(0..20).expect("failed to set download range");
|
||||||
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
downloader.next().await,
|
downloader.next().await,
|
||||||
@ -595,7 +615,7 @@ mod tests {
|
|||||||
|
|
||||||
let mut range_start = 0;
|
let mut range_start = 0;
|
||||||
while range_start < 100 {
|
while range_start < 100 {
|
||||||
downloader.set_download_range(range_start..100).expect("failed to set header range");
|
downloader.set_download_range(range_start..100).expect("failed to set download range");
|
||||||
assert_eq!(downloader.latest_queued_block_number, None);
|
assert_eq!(downloader.latest_queued_block_number, None);
|
||||||
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
@ -629,7 +649,7 @@ mod tests {
|
|||||||
.build(client.clone(), Arc::new(TestConsensus::default()), db);
|
.build(client.clone(), Arc::new(TestConsensus::default()), db);
|
||||||
|
|
||||||
// Set and download the first range
|
// Set and download the first range
|
||||||
downloader.set_download_range(0..100).expect("failed to set header range");
|
downloader.set_download_range(0..100).expect("failed to set download range");
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
downloader.next().await,
|
downloader.next().await,
|
||||||
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
|
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
|
||||||
@ -639,7 +659,7 @@ mod tests {
|
|||||||
assert!(downloader.next().await.is_none());
|
assert!(downloader.next().await.is_none());
|
||||||
|
|
||||||
// Set and download the second range
|
// Set and download the second range
|
||||||
downloader.set_download_range(100..200).expect("failed to set header range");
|
downloader.set_download_range(100..200).expect("failed to set download range");
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
downloader.next().await,
|
downloader.next().await,
|
||||||
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(100), &mut bodies))
|
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(100), &mut bodies))
|
||||||
|
|||||||
@ -1,6 +1,9 @@
|
|||||||
/// A naive concurrent downloader.
|
/// A naive concurrent downloader.
|
||||||
pub mod concurrent;
|
pub mod concurrent;
|
||||||
|
|
||||||
|
/// TODO:
|
||||||
|
pub mod task;
|
||||||
|
|
||||||
mod queue;
|
mod queue;
|
||||||
mod request;
|
mod request;
|
||||||
|
|
||||||
|
|||||||
206
crates/net/downloaders/src/bodies/task.rs
Normal file
206
crates/net/downloaders/src/bodies/task.rs
Normal file
@ -0,0 +1,206 @@
|
|||||||
|
use futures::Stream;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use pin_project::pin_project;
|
||||||
|
use reth_interfaces::p2p::{
|
||||||
|
bodies::downloader::{BodyDownloader, BodyDownloaderResult},
|
||||||
|
error::DownloadResult,
|
||||||
|
};
|
||||||
|
use reth_primitives::BlockNumber;
|
||||||
|
use std::{
|
||||||
|
future::Future,
|
||||||
|
ops::Range,
|
||||||
|
pin::Pin,
|
||||||
|
task::{ready, Context, Poll},
|
||||||
|
};
|
||||||
|
use tokio::{
|
||||||
|
sync::{mpsc, mpsc::UnboundedSender},
|
||||||
|
task::JoinSet,
|
||||||
|
};
|
||||||
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||||
|
|
||||||
|
/// A [BodyDownloader] that drives a spawned [BodyDownloader] on a spawned task.
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[pin_project]
|
||||||
|
pub struct TaskDownloader {
|
||||||
|
#[pin]
|
||||||
|
from_downloader: UnboundedReceiverStream<BodyDownloaderResult>,
|
||||||
|
to_downloader: UnboundedSender<Range<BlockNumber>>,
|
||||||
|
/// The spawned downloader tasks.
|
||||||
|
///
|
||||||
|
/// Note: If this type is dropped, the downloader task gets dropped as well.
|
||||||
|
_task: JoinSet<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// === impl TaskDownloader ===
|
||||||
|
|
||||||
|
impl TaskDownloader {
|
||||||
|
/// Spawns the given `downloader` and returns a [TaskDownloader] that's connected to that task.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// This method panics if called outside of a Tokio runtime
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use std::sync::Arc;
|
||||||
|
/// use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
|
||||||
|
/// use reth_downloaders::bodies::task::TaskDownloader;
|
||||||
|
/// use reth_interfaces::consensus::Consensus;
|
||||||
|
/// use reth_interfaces::p2p::bodies::client::BodiesClient;
|
||||||
|
/// use reth_db::database::Database;
|
||||||
|
/// fn t<B: BodiesClient + 'static, DB: Database + 'static>(client: Arc<B>, consensus:Arc<dyn Consensus>, db: Arc<DB>) {
|
||||||
|
/// let downloader = ConcurrentDownloaderBuilder::default().build(
|
||||||
|
/// client,
|
||||||
|
/// consensus,
|
||||||
|
/// db
|
||||||
|
/// );
|
||||||
|
/// let downloader = TaskDownloader::spawn(downloader);
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn spawn<T>(downloader: T) -> Self
|
||||||
|
where
|
||||||
|
T: BodyDownloader + 'static,
|
||||||
|
{
|
||||||
|
let (bodies_tx, bodies_rx) = mpsc::unbounded_channel();
|
||||||
|
let (to_downloader, updates_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let downloader = SpawnedDownloader {
|
||||||
|
bodies_tx,
|
||||||
|
updates: UnboundedReceiverStream::new(updates_rx),
|
||||||
|
downloader,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut task = JoinSet::<()>::new();
|
||||||
|
task.spawn(downloader);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
from_downloader: UnboundedReceiverStream::new(bodies_rx),
|
||||||
|
to_downloader,
|
||||||
|
_task: task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BodyDownloader for TaskDownloader {
|
||||||
|
fn set_download_range(&mut self, range: Range<BlockNumber>) -> DownloadResult<()> {
|
||||||
|
let _ = self.to_downloader.send(range);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for TaskDownloader {
|
||||||
|
type Item = BodyDownloaderResult;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
self.project().from_downloader.poll_next(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A [BodyDownloader] that runs on its own task
|
||||||
|
struct SpawnedDownloader<T> {
|
||||||
|
updates: UnboundedReceiverStream<Range<BlockNumber>>,
|
||||||
|
bodies_tx: UnboundedSender<BodyDownloaderResult>,
|
||||||
|
downloader: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: BodyDownloader> Future for SpawnedDownloader<T> {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let this = self.get_mut();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
while let Poll::Ready(Some(range)) = this.updates.poll_next_unpin(cx) {
|
||||||
|
if let Err(err) = this.downloader.set_download_range(range) {
|
||||||
|
tracing::error!(target: "downloaders::bodies", ?err, "Failed to set download range");
|
||||||
|
let _ = this.bodies_tx.send(Err(err));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match ready!(this.downloader.poll_next_unpin(cx)) {
|
||||||
|
Some(bodies) => {
|
||||||
|
let _ = this.bodies_tx.send(bodies);
|
||||||
|
}
|
||||||
|
None => return Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::{
|
||||||
|
bodies::{
|
||||||
|
concurrent::ConcurrentDownloaderBuilder,
|
||||||
|
test_utils::{insert_headers, zip_blocks},
|
||||||
|
},
|
||||||
|
test_utils::{generate_bodies, TestBodiesClient},
|
||||||
|
};
|
||||||
|
use assert_matches::assert_matches;
|
||||||
|
use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap};
|
||||||
|
use reth_interfaces::{p2p::error::DownloadError, test_utils::TestConsensus};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn download_one_by_one_on_task() {
|
||||||
|
reth_tracing::init_test_tracing();
|
||||||
|
|
||||||
|
let db = create_test_db::<WriteMap>(EnvKind::RW);
|
||||||
|
let (headers, mut bodies) = generate_bodies(0..20);
|
||||||
|
|
||||||
|
insert_headers(&db, &headers);
|
||||||
|
|
||||||
|
let client = Arc::new(
|
||||||
|
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
|
||||||
|
);
|
||||||
|
let downloader = ConcurrentDownloaderBuilder::default().build(
|
||||||
|
client.clone(),
|
||||||
|
Arc::new(TestConsensus::default()),
|
||||||
|
db,
|
||||||
|
);
|
||||||
|
let mut downloader = TaskDownloader::spawn(downloader);
|
||||||
|
|
||||||
|
downloader.set_download_range(0..20).expect("failed to set download range");
|
||||||
|
|
||||||
|
assert_matches!(
|
||||||
|
downloader.next().await,
|
||||||
|
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
|
||||||
|
);
|
||||||
|
assert_eq!(client.times_requested(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn set_download_range_error_returned() {
|
||||||
|
reth_tracing::init_test_tracing();
|
||||||
|
|
||||||
|
let db = create_test_db::<WriteMap>(EnvKind::RW);
|
||||||
|
let (headers, mut bodies) = generate_bodies(0..20);
|
||||||
|
|
||||||
|
// Insert a subset of headers to the database
|
||||||
|
insert_headers(&db, &headers[10..]);
|
||||||
|
|
||||||
|
let client = Arc::new(
|
||||||
|
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
|
||||||
|
);
|
||||||
|
let downloader = ConcurrentDownloaderBuilder::default().build(
|
||||||
|
client.clone(),
|
||||||
|
Arc::new(TestConsensus::default()),
|
||||||
|
db,
|
||||||
|
);
|
||||||
|
let mut downloader = TaskDownloader::spawn(downloader);
|
||||||
|
|
||||||
|
downloader.set_download_range(10..20).expect("failed to set download range");
|
||||||
|
assert_matches!(
|
||||||
|
downloader.next().await,
|
||||||
|
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(10), &mut bodies))
|
||||||
|
);
|
||||||
|
|
||||||
|
downloader.set_download_range(0..20).expect("failed to set download range");
|
||||||
|
assert_matches!(
|
||||||
|
downloader.next().await,
|
||||||
|
Some(Err(DownloadError::MissingHeader { block_number: 0 }))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,5 +1,11 @@
|
|||||||
|
use reth_db::{
|
||||||
|
database::Database,
|
||||||
|
mdbx::{Env, WriteMap},
|
||||||
|
tables,
|
||||||
|
transaction::DbTxMut,
|
||||||
|
};
|
||||||
use reth_eth_wire::BlockBody;
|
use reth_eth_wire::BlockBody;
|
||||||
use reth_interfaces::p2p::bodies::response::BlockResponse;
|
use reth_interfaces::{db, p2p::bodies::response::BlockResponse};
|
||||||
use reth_primitives::{SealedBlock, SealedHeader, H256};
|
use reth_primitives::{SealedBlock, SealedHeader, H256};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
@ -23,3 +29,16 @@ pub(crate) fn zip_blocks<'a>(
|
|||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn insert_headers(db: &Env<WriteMap>, headers: &[SealedHeader]) {
|
||||||
|
db.update(|tx| -> Result<(), db::Error> {
|
||||||
|
for header in headers {
|
||||||
|
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
|
||||||
|
tx.put::<tables::Headers>(header.num_hash().into(), header.clone().unseal())?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.expect("failed to commit")
|
||||||
|
.expect("failed to insert headers");
|
||||||
|
}
|
||||||
|
|||||||
@ -401,13 +401,14 @@ mod tests {
|
|||||||
use reth_eth_wire::BlockBody;
|
use reth_eth_wire::BlockBody;
|
||||||
use reth_interfaces::{
|
use reth_interfaces::{
|
||||||
consensus::Consensus,
|
consensus::Consensus,
|
||||||
db,
|
|
||||||
p2p::{
|
p2p::{
|
||||||
bodies::{
|
bodies::{
|
||||||
client::BodiesClient, downloader::BodyDownloader, response::BlockResponse,
|
client::BodiesClient,
|
||||||
|
downloader::{BodyDownloader, BodyDownloaderResult},
|
||||||
|
response::BlockResponse,
|
||||||
},
|
},
|
||||||
download::DownloadClient,
|
download::DownloadClient,
|
||||||
error::PeerRequestResult,
|
error::{DownloadResult, PeerRequestResult},
|
||||||
priority::Priority,
|
priority::Priority,
|
||||||
},
|
},
|
||||||
test_utils::{
|
test_utils::{
|
||||||
@ -703,30 +704,32 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BodyDownloader for TestBodyDownloader {
|
impl BodyDownloader for TestBodyDownloader {
|
||||||
fn set_download_range(&mut self, range: Range<BlockNumber>) -> Result<(), db::Error> {
|
fn set_download_range(&mut self, range: Range<BlockNumber>) -> DownloadResult<()> {
|
||||||
self.headers = VecDeque::from(self.db.view(|tx| {
|
self.headers =
|
||||||
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
|
VecDeque::from(self.db.view(|tx| -> DownloadResult<Vec<SealedHeader>> {
|
||||||
|
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
|
||||||
|
|
||||||
let mut canonical_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
|
let mut canonical_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
|
||||||
let walker = canonical_cursor.walk(range.start)?.take_while(|entry| {
|
let walker = canonical_cursor.walk(range.start)?.take_while(|entry| {
|
||||||
entry.as_ref().map(|(num, _)| *num < range.end).unwrap_or_default()
|
entry.as_ref().map(|(num, _)| *num < range.end).unwrap_or_default()
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut headers = Vec::default();
|
let mut headers = Vec::default();
|
||||||
for entry in walker {
|
for entry in walker {
|
||||||
let (num, hash) = entry?;
|
let (num, hash) = entry?;
|
||||||
let (_, header) =
|
let (_, header) = header_cursor
|
||||||
header_cursor.seek_exact((num, hash).into())?.expect("missing header");
|
.seek_exact((num, hash).into())?
|
||||||
headers.push(SealedHeader::new(header, hash));
|
.expect("missing header");
|
||||||
}
|
headers.push(SealedHeader::new(header, hash));
|
||||||
Ok(headers)
|
}
|
||||||
})??);
|
Ok(headers)
|
||||||
|
})??);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for TestBodyDownloader {
|
impl Stream for TestBodyDownloader {
|
||||||
type Item = Result<Vec<BlockResponse>, db::Error>;
|
type Item = BodyDownloaderResult;
|
||||||
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let this = self.get_mut();
|
let this = self.get_mut();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user