mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
chore(downloader): replace database with header provider (#5472)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -5890,6 +5890,7 @@ dependencies = [
|
|||||||
"reth-interfaces",
|
"reth-interfaces",
|
||||||
"reth-metrics",
|
"reth-metrics",
|
||||||
"reth-primitives",
|
"reth-primitives",
|
||||||
|
"reth-provider",
|
||||||
"reth-tasks",
|
"reth-tasks",
|
||||||
"reth-tracing",
|
"reth-tracing",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
|
|||||||
@ -147,7 +147,11 @@ impl ImportCommand {
|
|||||||
.into_task();
|
.into_task();
|
||||||
|
|
||||||
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
|
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
|
||||||
.build(file_client.clone(), consensus.clone(), db.clone())
|
.build(
|
||||||
|
file_client.clone(),
|
||||||
|
consensus.clone(),
|
||||||
|
ProviderFactory::new(db.clone(), self.chain.clone()),
|
||||||
|
)
|
||||||
.into_task();
|
.into_task();
|
||||||
|
|
||||||
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
|
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
|
||||||
|
|||||||
@ -102,7 +102,11 @@ impl Command {
|
|||||||
.into_task_with(task_executor);
|
.into_task_with(task_executor);
|
||||||
|
|
||||||
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
|
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
|
||||||
.build(client, Arc::clone(&consensus), db.clone())
|
.build(
|
||||||
|
client,
|
||||||
|
Arc::clone(&consensus),
|
||||||
|
ProviderFactory::new(db.clone(), self.chain.clone()),
|
||||||
|
)
|
||||||
.into_task_with(task_executor);
|
.into_task_with(task_executor);
|
||||||
|
|
||||||
let stage_conf = &config.stages;
|
let stage_conf = &config.stages;
|
||||||
|
|||||||
@ -617,7 +617,11 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
|||||||
.into_task_with(task_executor);
|
.into_task_with(task_executor);
|
||||||
|
|
||||||
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
|
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
|
||||||
.build(client, Arc::clone(&consensus), db.clone())
|
.build(
|
||||||
|
client,
|
||||||
|
Arc::clone(&consensus),
|
||||||
|
ProviderFactory::new(db.clone(), self.chain.clone()),
|
||||||
|
)
|
||||||
.into_task_with(task_executor);
|
.into_task_with(task_executor);
|
||||||
|
|
||||||
let pipeline = self
|
let pipeline = self
|
||||||
|
|||||||
@ -163,6 +163,9 @@ impl Command {
|
|||||||
|
|
||||||
let default_peers_path = data_dir.known_peers_path();
|
let default_peers_path = data_dir.known_peers_path();
|
||||||
|
|
||||||
|
let provider_factory =
|
||||||
|
Arc::new(ProviderFactory::new(db.clone(), self.chain.clone()));
|
||||||
|
|
||||||
let network = self
|
let network = self
|
||||||
.network
|
.network
|
||||||
.network_config(
|
.network_config(
|
||||||
@ -171,7 +174,7 @@ impl Command {
|
|||||||
p2p_secret_key,
|
p2p_secret_key,
|
||||||
default_peers_path,
|
default_peers_path,
|
||||||
)
|
)
|
||||||
.build(Arc::new(ProviderFactory::new(db.clone(), self.chain.clone())))
|
.build(provider_factory.clone())
|
||||||
.start_network()
|
.start_network()
|
||||||
.await?;
|
.await?;
|
||||||
let fetch_client = Arc::new(network.fetch_client().await?);
|
let fetch_client = Arc::new(network.fetch_client().await?);
|
||||||
@ -187,9 +190,8 @@ impl Command {
|
|||||||
config.stages.bodies.downloader_min_concurrent_requests..=
|
config.stages.bodies.downloader_min_concurrent_requests..=
|
||||||
config.stages.bodies.downloader_max_concurrent_requests,
|
config.stages.bodies.downloader_max_concurrent_requests,
|
||||||
)
|
)
|
||||||
.build(fetch_client, consensus.clone(), db.clone()),
|
.build(fetch_client, consensus.clone(), provider_factory),
|
||||||
);
|
);
|
||||||
|
|
||||||
(Box::new(stage), None)
|
(Box::new(stage), None)
|
||||||
}
|
}
|
||||||
StageEnum::Senders => (Box::new(SenderRecoveryStage::new(batch_size)), None),
|
StageEnum::Senders => (Box::new(SenderRecoveryStage::new(batch_size)), None),
|
||||||
|
|||||||
@ -455,6 +455,8 @@ where
|
|||||||
pub fn build(self) -> (TestBeaconConsensusEngine<Client>, TestEnv<Arc<DatabaseEnv>>) {
|
pub fn build(self) -> (TestBeaconConsensusEngine<Client>, TestEnv<Arc<DatabaseEnv>>) {
|
||||||
reth_tracing::init_test_tracing();
|
reth_tracing::init_test_tracing();
|
||||||
let db = create_test_rw_db();
|
let db = create_test_rw_db();
|
||||||
|
let provider_factory =
|
||||||
|
ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone());
|
||||||
|
|
||||||
let consensus: Arc<dyn Consensus> = match self.base_config.consensus {
|
let consensus: Arc<dyn Consensus> = match self.base_config.consensus {
|
||||||
TestConsensusConfig::Real => {
|
TestConsensusConfig::Real => {
|
||||||
@ -496,7 +498,7 @@ where
|
|||||||
.into_task();
|
.into_task();
|
||||||
|
|
||||||
let body_downloader = BodiesDownloaderBuilder::default()
|
let body_downloader = BodiesDownloaderBuilder::default()
|
||||||
.build(client.clone(), consensus.clone(), db.clone())
|
.build(client.clone(), consensus.clone(), provider_factory.clone())
|
||||||
.into_task();
|
.into_task();
|
||||||
|
|
||||||
Pipeline::builder().add_stages(DefaultStages::new(
|
Pipeline::builder().add_stages(DefaultStages::new(
|
||||||
@ -527,9 +529,8 @@ where
|
|||||||
let tree = ShareableBlockchainTree::new(
|
let tree = ShareableBlockchainTree::new(
|
||||||
BlockchainTree::new(externals, config, None).expect("failed to create tree"),
|
BlockchainTree::new(externals, config, None).expect("failed to create tree"),
|
||||||
);
|
);
|
||||||
let shareable_db = ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone());
|
|
||||||
let latest = self.base_config.chain_spec.genesis_header().seal_slow();
|
let latest = self.base_config.chain_spec.genesis_header().seal_slow();
|
||||||
let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest);
|
let blockchain_provider = BlockchainProvider::with_latest(provider_factory, tree, latest);
|
||||||
|
|
||||||
let pruner = Pruner::new(
|
let pruner = Pruner::new(
|
||||||
db.clone(),
|
db.clone(),
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
use super::headers::client::HeadersRequest;
|
use super::headers::client::HeadersRequest;
|
||||||
use crate::{consensus::ConsensusError, db};
|
use crate::{consensus::ConsensusError, provider::ProviderError};
|
||||||
use reth_network_api::ReputationChangeKind;
|
use reth_network_api::ReputationChangeKind;
|
||||||
use reth_primitives::{
|
use reth_primitives::{
|
||||||
BlockHashOrNumber, BlockNumber, GotExpected, GotExpectedBoxed, Header, WithPeerId, B256,
|
BlockHashOrNumber, BlockNumber, GotExpected, GotExpectedBoxed, Header, WithPeerId, B256,
|
||||||
@ -177,9 +177,9 @@ pub enum DownloadError {
|
|||||||
/// Error while executing the request.
|
/// Error while executing the request.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
RequestError(#[from] RequestError),
|
RequestError(#[from] RequestError),
|
||||||
/// Error while reading data from database.
|
/// Provider error.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
DatabaseError(#[from] db::DatabaseError),
|
Provider(#[from] ProviderError),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@ -12,8 +12,8 @@ description = "Implementations of various block downloaders"
|
|||||||
# reth
|
# reth
|
||||||
reth-interfaces.workspace = true
|
reth-interfaces.workspace = true
|
||||||
reth-primitives.workspace = true
|
reth-primitives.workspace = true
|
||||||
reth-db.workspace = true
|
|
||||||
reth-tasks.workspace = true
|
reth-tasks.workspace = true
|
||||||
|
reth-provider.workspace = true
|
||||||
|
|
||||||
# async
|
# async
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
@ -33,6 +33,7 @@ rayon.workspace = true
|
|||||||
thiserror.workspace = true
|
thiserror.workspace = true
|
||||||
|
|
||||||
# optional deps for the test-utils feature
|
# optional deps for the test-utils feature
|
||||||
|
reth-db = { workspace = true, optional = true }
|
||||||
alloy-rlp = { workspace = true, optional = true }
|
alloy-rlp = { workspace = true, optional = true }
|
||||||
tempfile = { workspace = true, optional = true }
|
tempfile = { workspace = true, optional = true }
|
||||||
itertools = { workspace = true, optional = true }
|
itertools = { workspace = true, optional = true }
|
||||||
@ -50,4 +51,4 @@ itertools.workspace = true
|
|||||||
tempfile.workspace = true
|
tempfile.workspace = true
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
test-utils = ["dep:alloy-rlp", "dep:tempfile", "dep:itertools", "reth-interfaces/test-utils"]
|
test-utils = ["dep:alloy-rlp", "dep:tempfile", "dep:itertools", "reth-db/test-utils", "reth-interfaces/test-utils"]
|
||||||
|
|||||||
@ -2,7 +2,6 @@ use super::queue::BodiesRequestQueue;
|
|||||||
use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics};
|
use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics};
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
|
|
||||||
use reth_interfaces::{
|
use reth_interfaces::{
|
||||||
consensus::Consensus,
|
consensus::Consensus,
|
||||||
p2p::{
|
p2p::{
|
||||||
@ -15,6 +14,7 @@ use reth_interfaces::{
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
use reth_primitives::{BlockNumber, SealedHeader};
|
use reth_primitives::{BlockNumber, SealedHeader};
|
||||||
|
use reth_provider::HeaderProvider;
|
||||||
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
|
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
|
||||||
use std::{
|
use std::{
|
||||||
cmp::Ordering,
|
cmp::Ordering,
|
||||||
@ -27,22 +27,18 @@ use std::{
|
|||||||
};
|
};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
/// The scope for headers downloader metrics.
|
|
||||||
pub const BODIES_DOWNLOADER_SCOPE: &str = "downloaders.bodies";
|
|
||||||
|
|
||||||
/// Downloads bodies in batches.
|
/// Downloads bodies in batches.
|
||||||
///
|
///
|
||||||
/// All blocks in a batch are fetched at the same time.
|
/// All blocks in a batch are fetched at the same time.
|
||||||
#[must_use = "Stream does nothing unless polled"]
|
#[must_use = "Stream does nothing unless polled"]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct BodiesDownloader<B: BodiesClient, DB> {
|
pub struct BodiesDownloader<B: BodiesClient, Provider> {
|
||||||
/// The bodies client
|
/// The bodies client
|
||||||
client: Arc<B>,
|
client: Arc<B>,
|
||||||
/// The consensus client
|
/// The consensus client
|
||||||
consensus: Arc<dyn Consensus>,
|
consensus: Arc<dyn Consensus>,
|
||||||
// TODO: make this a [HeaderProvider]
|
|
||||||
/// The database handle
|
/// The database handle
|
||||||
db: DB,
|
provider: Provider,
|
||||||
/// The maximum number of non-empty blocks per one request
|
/// The maximum number of non-empty blocks per one request
|
||||||
request_limit: u64,
|
request_limit: u64,
|
||||||
/// The maximum number of block bodies returned at once from the stream
|
/// The maximum number of block bodies returned at once from the stream
|
||||||
@ -67,10 +63,10 @@ pub struct BodiesDownloader<B: BodiesClient, DB> {
|
|||||||
metrics: BodyDownloaderMetrics,
|
metrics: BodyDownloaderMetrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B, DB> BodiesDownloader<B, DB>
|
impl<B, Provider> BodiesDownloader<B, Provider>
|
||||||
where
|
where
|
||||||
B: BodiesClient + 'static,
|
B: BodiesClient + 'static,
|
||||||
DB: Database + Unpin + 'static,
|
Provider: HeaderProvider + Unpin + 'static,
|
||||||
{
|
{
|
||||||
/// Returns the next contiguous request.
|
/// Returns the next contiguous request.
|
||||||
fn next_headers_request(&mut self) -> DownloadResult<Option<Vec<SealedHeader>>> {
|
fn next_headers_request(&mut self) -> DownloadResult<Option<Vec<SealedHeader>>> {
|
||||||
@ -103,47 +99,29 @@ where
|
|||||||
return Ok(None)
|
return Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collection of results
|
|
||||||
let mut headers = Vec::new();
|
|
||||||
|
|
||||||
// Non empty headers count
|
|
||||||
let mut non_empty_headers = 0;
|
|
||||||
let mut current_block_num = *range.start();
|
|
||||||
|
|
||||||
// Acquire cursors over canonical and header tables
|
|
||||||
let tx = self.db.tx()?;
|
|
||||||
let mut canonical_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
|
|
||||||
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
|
|
||||||
|
|
||||||
// Collect headers while
|
// Collect headers while
|
||||||
// 1. Current block number is in range
|
// 1. Current block number is in range
|
||||||
// 2. The number of non empty headers is less than maximum
|
// 2. The number of non empty headers is less than maximum
|
||||||
// 3. The total number of headers is less than the stream batch size (this is only
|
// 3. The total number of headers is less than the stream batch size (this is only
|
||||||
// relevant if the range consists entirely of empty headers)
|
// relevant if the range consists entirely of empty headers)
|
||||||
while range.contains(¤t_block_num) &&
|
let mut collected = 0;
|
||||||
non_empty_headers < max_non_empty &&
|
let mut non_empty_headers = 0;
|
||||||
headers.len() < self.stream_batch_size
|
let headers = self.provider.sealed_headers_while(range.clone(), |header| {
|
||||||
{
|
let should_take = range.contains(&header.number) &&
|
||||||
// Find the block hash.
|
non_empty_headers < max_non_empty &&
|
||||||
let (number, hash) = canonical_cursor
|
collected < self.stream_batch_size;
|
||||||
.seek_exact(current_block_num)?
|
|
||||||
.ok_or(DownloadError::MissingHeader { block_number: current_block_num })?;
|
|
||||||
// Find the block header.
|
|
||||||
let (_, header) = header_cursor
|
|
||||||
.seek_exact(number)?
|
|
||||||
.ok_or(DownloadError::MissingHeader { block_number: number })?;
|
|
||||||
|
|
||||||
// If the header is not empty, increment the counter
|
if should_take {
|
||||||
if !header.is_empty() {
|
collected += 1;
|
||||||
non_empty_headers += 1;
|
if !header.is_empty() {
|
||||||
|
non_empty_headers += 1;
|
||||||
|
}
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
}
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
// Add header to the result collection
|
|
||||||
headers.push(header.seal(hash));
|
|
||||||
|
|
||||||
// Increment current block number
|
|
||||||
current_block_num += 1;
|
|
||||||
}
|
|
||||||
Ok(Some(headers).filter(|h| !h.is_empty()))
|
Ok(Some(headers).filter(|h| !h.is_empty()))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,10 +264,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B, DB> BodiesDownloader<B, DB>
|
impl<B, Provider> BodiesDownloader<B, Provider>
|
||||||
where
|
where
|
||||||
B: BodiesClient + 'static,
|
B: BodiesClient + 'static,
|
||||||
DB: Database + Unpin + 'static,
|
Provider: HeaderProvider + Unpin + 'static,
|
||||||
Self: BodyDownloader + 'static,
|
Self: BodyDownloader + 'static,
|
||||||
{
|
{
|
||||||
/// Spawns the downloader task via [tokio::task::spawn]
|
/// Spawns the downloader task via [tokio::task::spawn]
|
||||||
@ -306,10 +284,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B, DB> BodyDownloader for BodiesDownloader<B, DB>
|
impl<B, Provider> BodyDownloader for BodiesDownloader<B, Provider>
|
||||||
where
|
where
|
||||||
B: BodiesClient + 'static,
|
B: BodiesClient + 'static,
|
||||||
DB: Database + Unpin + 'static,
|
Provider: HeaderProvider + Unpin + 'static,
|
||||||
{
|
{
|
||||||
/// Set a new download range (exclusive).
|
/// Set a new download range (exclusive).
|
||||||
///
|
///
|
||||||
@ -354,10 +332,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B, DB> Stream for BodiesDownloader<B, DB>
|
impl<B, Provider> Stream for BodiesDownloader<B, Provider>
|
||||||
where
|
where
|
||||||
B: BodiesClient + 'static,
|
B: BodiesClient + 'static,
|
||||||
DB: Database + Unpin + 'static,
|
Provider: HeaderProvider + Unpin + 'static,
|
||||||
{
|
{
|
||||||
type Item = BodyDownloaderResult;
|
type Item = BodyDownloaderResult;
|
||||||
|
|
||||||
@ -557,15 +535,15 @@ impl BodiesDownloaderBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Consume self and return the concurrent downloader.
|
/// Consume self and return the concurrent downloader.
|
||||||
pub fn build<B, DB>(
|
pub fn build<B, Provider>(
|
||||||
self,
|
self,
|
||||||
client: B,
|
client: B,
|
||||||
consensus: Arc<dyn Consensus>,
|
consensus: Arc<dyn Consensus>,
|
||||||
db: DB,
|
provider: Provider,
|
||||||
) -> BodiesDownloader<B, DB>
|
) -> BodiesDownloader<B, Provider>
|
||||||
where
|
where
|
||||||
B: BodiesClient + 'static,
|
B: BodiesClient + 'static,
|
||||||
DB: Database,
|
Provider: HeaderProvider,
|
||||||
{
|
{
|
||||||
let Self {
|
let Self {
|
||||||
request_limit,
|
request_limit,
|
||||||
@ -578,7 +556,7 @@ impl BodiesDownloaderBuilder {
|
|||||||
BodiesDownloader {
|
BodiesDownloader {
|
||||||
client: Arc::new(client),
|
client: Arc::new(client),
|
||||||
consensus,
|
consensus,
|
||||||
db,
|
provider,
|
||||||
request_limit,
|
request_limit,
|
||||||
stream_batch_size,
|
stream_batch_size,
|
||||||
max_buffered_blocks_size_bytes,
|
max_buffered_blocks_size_bytes,
|
||||||
@ -605,7 +583,8 @@ mod tests {
|
|||||||
use futures_util::stream::StreamExt;
|
use futures_util::stream::StreamExt;
|
||||||
use reth_db::test_utils::create_test_rw_db;
|
use reth_db::test_utils::create_test_rw_db;
|
||||||
use reth_interfaces::test_utils::{generators, generators::random_block_range, TestConsensus};
|
use reth_interfaces::test_utils::{generators, generators::random_block_range, TestConsensus};
|
||||||
use reth_primitives::{BlockBody, B256};
|
use reth_primitives::{BlockBody, B256, MAINNET};
|
||||||
|
use reth_provider::ProviderFactory;
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
// Check that the blocks are emitted in order of block number, not in order of
|
// Check that the blocks are emitted in order of block number, not in order of
|
||||||
@ -624,7 +603,7 @@ mod tests {
|
|||||||
let mut downloader = BodiesDownloaderBuilder::default().build(
|
let mut downloader = BodiesDownloaderBuilder::default().build(
|
||||||
client.clone(),
|
client.clone(),
|
||||||
Arc::new(TestConsensus::default()),
|
Arc::new(TestConsensus::default()),
|
||||||
db,
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
);
|
);
|
||||||
downloader.set_download_range(0..=19).expect("failed to set download range");
|
downloader.set_download_range(0..=19).expect("failed to set download range");
|
||||||
|
|
||||||
@ -659,9 +638,12 @@ mod tests {
|
|||||||
|
|
||||||
let request_limit = 10;
|
let request_limit = 10;
|
||||||
let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
|
let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
|
||||||
let mut downloader = BodiesDownloaderBuilder::default()
|
let mut downloader =
|
||||||
.with_request_limit(request_limit)
|
BodiesDownloaderBuilder::default().with_request_limit(request_limit).build(
|
||||||
.build(client.clone(), Arc::new(TestConsensus::default()), db);
|
client.clone(),
|
||||||
|
Arc::new(TestConsensus::default()),
|
||||||
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
|
);
|
||||||
downloader.set_download_range(0..=199).expect("failed to set download range");
|
downloader.set_download_range(0..=199).expect("failed to set download range");
|
||||||
|
|
||||||
let _ = downloader.collect::<Vec<_>>().await;
|
let _ = downloader.collect::<Vec<_>>().await;
|
||||||
@ -686,7 +668,11 @@ mod tests {
|
|||||||
let mut downloader = BodiesDownloaderBuilder::default()
|
let mut downloader = BodiesDownloaderBuilder::default()
|
||||||
.with_stream_batch_size(stream_batch_size)
|
.with_stream_batch_size(stream_batch_size)
|
||||||
.with_request_limit(request_limit)
|
.with_request_limit(request_limit)
|
||||||
.build(client.clone(), Arc::new(TestConsensus::default()), db);
|
.build(
|
||||||
|
client.clone(),
|
||||||
|
Arc::new(TestConsensus::default()),
|
||||||
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
|
);
|
||||||
|
|
||||||
let mut range_start = 0;
|
let mut range_start = 0;
|
||||||
while range_start < 100 {
|
while range_start < 100 {
|
||||||
@ -715,7 +701,7 @@ mod tests {
|
|||||||
let mut downloader = BodiesDownloaderBuilder::default().with_stream_batch_size(100).build(
|
let mut downloader = BodiesDownloaderBuilder::default().with_stream_batch_size(100).build(
|
||||||
client.clone(),
|
client.clone(),
|
||||||
Arc::new(TestConsensus::default()),
|
Arc::new(TestConsensus::default()),
|
||||||
db,
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Set and download the first range
|
// Set and download the first range
|
||||||
@ -752,7 +738,11 @@ mod tests {
|
|||||||
.with_stream_batch_size(10)
|
.with_stream_batch_size(10)
|
||||||
.with_request_limit(1)
|
.with_request_limit(1)
|
||||||
.with_max_buffered_blocks_size_bytes(1)
|
.with_max_buffered_blocks_size_bytes(1)
|
||||||
.build(client.clone(), Arc::new(TestConsensus::default()), db);
|
.build(
|
||||||
|
client.clone(),
|
||||||
|
Arc::new(TestConsensus::default()),
|
||||||
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
|
);
|
||||||
|
|
||||||
// Set and download the entire range
|
// Set and download the entire range
|
||||||
downloader.set_download_range(0..=199).expect("failed to set download range");
|
downloader.set_download_range(0..=199).expect("failed to set download range");
|
||||||
@ -779,7 +769,11 @@ mod tests {
|
|||||||
let mut downloader = BodiesDownloaderBuilder::default()
|
let mut downloader = BodiesDownloaderBuilder::default()
|
||||||
.with_request_limit(3)
|
.with_request_limit(3)
|
||||||
.with_stream_batch_size(100)
|
.with_stream_batch_size(100)
|
||||||
.build(client.clone(), Arc::new(TestConsensus::default()), db);
|
.build(
|
||||||
|
client.clone(),
|
||||||
|
Arc::new(TestConsensus::default()),
|
||||||
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
|
);
|
||||||
|
|
||||||
// Download the requested range
|
// Download the requested range
|
||||||
downloader.set_download_range(0..=99).expect("failed to set download range");
|
downloader.set_download_range(0..=99).expect("failed to set download range");
|
||||||
|
|||||||
@ -42,16 +42,17 @@ impl TaskDownloader {
|
|||||||
/// # Example
|
/// # Example
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// use reth_db::database::Database;
|
|
||||||
/// use reth_downloaders::bodies::{bodies::BodiesDownloaderBuilder, task::TaskDownloader};
|
/// use reth_downloaders::bodies::{bodies::BodiesDownloaderBuilder, task::TaskDownloader};
|
||||||
/// use reth_interfaces::{consensus::Consensus, p2p::bodies::client::BodiesClient};
|
/// use reth_interfaces::{consensus::Consensus, p2p::bodies::client::BodiesClient};
|
||||||
|
/// use reth_provider::HeaderProvider;
|
||||||
/// use std::sync::Arc;
|
/// use std::sync::Arc;
|
||||||
/// fn t<B: BodiesClient + 'static, DB: Database + 'static>(
|
///
|
||||||
|
/// fn t<B: BodiesClient + 'static, Provider: HeaderProvider + Unpin + 'static>(
|
||||||
/// client: Arc<B>,
|
/// client: Arc<B>,
|
||||||
/// consensus: Arc<dyn Consensus>,
|
/// consensus: Arc<dyn Consensus>,
|
||||||
/// db: Arc<DB>,
|
/// provider: Provider,
|
||||||
/// ) {
|
/// ) {
|
||||||
/// let downloader = BodiesDownloaderBuilder::default().build(client, consensus, db);
|
/// let downloader = BodiesDownloaderBuilder::default().build(client, consensus, provider);
|
||||||
/// let downloader = TaskDownloader::spawn(downloader);
|
/// let downloader = TaskDownloader::spawn(downloader);
|
||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
@ -170,6 +171,8 @@ mod tests {
|
|||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use reth_db::test_utils::create_test_rw_db;
|
use reth_db::test_utils::create_test_rw_db;
|
||||||
use reth_interfaces::{p2p::error::DownloadError, test_utils::TestConsensus};
|
use reth_interfaces::{p2p::error::DownloadError, test_utils::TestConsensus};
|
||||||
|
use reth_primitives::MAINNET;
|
||||||
|
use reth_provider::ProviderFactory;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread")]
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
@ -187,7 +190,7 @@ mod tests {
|
|||||||
let downloader = BodiesDownloaderBuilder::default().build(
|
let downloader = BodiesDownloaderBuilder::default().build(
|
||||||
client.clone(),
|
client.clone(),
|
||||||
Arc::new(TestConsensus::default()),
|
Arc::new(TestConsensus::default()),
|
||||||
db,
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
);
|
);
|
||||||
let mut downloader = TaskDownloader::spawn(downloader);
|
let mut downloader = TaskDownloader::spawn(downloader);
|
||||||
|
|
||||||
@ -209,7 +212,7 @@ mod tests {
|
|||||||
let downloader = BodiesDownloaderBuilder::default().build(
|
let downloader = BodiesDownloaderBuilder::default().build(
|
||||||
Arc::new(TestBodiesClient::default()),
|
Arc::new(TestBodiesClient::default()),
|
||||||
Arc::new(TestConsensus::default()),
|
Arc::new(TestConsensus::default()),
|
||||||
db,
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
);
|
);
|
||||||
let mut downloader = TaskDownloader::spawn(downloader);
|
let mut downloader = TaskDownloader::spawn(downloader);
|
||||||
|
|
||||||
|
|||||||
@ -267,7 +267,8 @@ mod tests {
|
|||||||
},
|
},
|
||||||
test_utils::TestConsensus,
|
test_utils::TestConsensus,
|
||||||
};
|
};
|
||||||
use reth_primitives::SealedHeader;
|
use reth_primitives::{SealedHeader, MAINNET};
|
||||||
|
use reth_provider::ProviderFactory;
|
||||||
use std::{
|
use std::{
|
||||||
io::{Read, Seek, SeekFrom, Write},
|
io::{Read, Seek, SeekFrom, Write},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
@ -291,7 +292,7 @@ mod tests {
|
|||||||
let mut downloader = BodiesDownloaderBuilder::default().build(
|
let mut downloader = BodiesDownloaderBuilder::default().build(
|
||||||
client.clone(),
|
client.clone(),
|
||||||
Arc::new(TestConsensus::default()),
|
Arc::new(TestConsensus::default()),
|
||||||
db,
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
);
|
);
|
||||||
downloader.set_download_range(0..=19).expect("failed to set download range");
|
downloader.set_download_range(0..=19).expect("failed to set download range");
|
||||||
|
|
||||||
@ -373,7 +374,7 @@ mod tests {
|
|||||||
let mut downloader = BodiesDownloaderBuilder::default().build(
|
let mut downloader = BodiesDownloaderBuilder::default().build(
|
||||||
client.clone(),
|
client.clone(),
|
||||||
Arc::new(TestConsensus::default()),
|
Arc::new(TestConsensus::default()),
|
||||||
db,
|
ProviderFactory::new(db, MAINNET.clone()),
|
||||||
);
|
);
|
||||||
downloader.set_download_range(0..=19).expect("failed to set download range");
|
downloader.set_download_range(0..=19).expect("failed to set download range");
|
||||||
|
|
||||||
|
|||||||
@ -36,7 +36,7 @@
|
|||||||
//! # let bodies_downloader = BodiesDownloaderBuilder::default().build(
|
//! # let bodies_downloader = BodiesDownloaderBuilder::default().build(
|
||||||
//! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::ZERO, vec![]).into()) }),
|
//! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::ZERO, vec![]).into()) }),
|
||||||
//! # consensus.clone(),
|
//! # consensus.clone(),
|
||||||
//! # db.clone()
|
//! # ProviderFactory::new(db.clone(), MAINNET.clone())
|
||||||
//! # );
|
//! # );
|
||||||
//! # let (tip_tx, tip_rx) = watch::channel(B256::default());
|
//! # let (tip_tx, tip_rx) = watch::channel(B256::default());
|
||||||
//! # let factory = Factory::new(chain_spec.clone());
|
//! # let factory = Factory::new(chain_spec.clone());
|
||||||
|
|||||||
@ -484,7 +484,7 @@ mod tests {
|
|||||||
tables,
|
tables,
|
||||||
test_utils::TempDatabase,
|
test_utils::TempDatabase,
|
||||||
transaction::{DbTx, DbTxMut},
|
transaction::{DbTx, DbTxMut},
|
||||||
DatabaseEnv,
|
DatabaseEnv, DatabaseError,
|
||||||
};
|
};
|
||||||
use reth_interfaces::{
|
use reth_interfaces::{
|
||||||
p2p::{
|
p2p::{
|
||||||
@ -494,7 +494,7 @@ mod tests {
|
|||||||
response::BlockResponse,
|
response::BlockResponse,
|
||||||
},
|
},
|
||||||
download::DownloadClient,
|
download::DownloadClient,
|
||||||
error::DownloadResult,
|
error::{DownloadError, DownloadResult},
|
||||||
priority::Priority,
|
priority::Priority,
|
||||||
},
|
},
|
||||||
test_utils::{
|
test_utils::{
|
||||||
@ -780,22 +780,27 @@ mod tests {
|
|||||||
&mut self,
|
&mut self,
|
||||||
range: RangeInclusive<BlockNumber>,
|
range: RangeInclusive<BlockNumber>,
|
||||||
) -> DownloadResult<()> {
|
) -> DownloadResult<()> {
|
||||||
self.headers =
|
self.headers = VecDeque::from(
|
||||||
VecDeque::from(self.db.view(|tx| -> DownloadResult<Vec<SealedHeader>> {
|
self.db
|
||||||
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
|
.view(|tx| -> Result<Vec<SealedHeader>, DatabaseError> {
|
||||||
|
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
|
||||||
|
|
||||||
let mut canonical_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
|
let mut canonical_cursor =
|
||||||
let walker = canonical_cursor.walk_range(range)?;
|
tx.cursor_read::<tables::CanonicalHeaders>()?;
|
||||||
|
let walker = canonical_cursor.walk_range(range)?;
|
||||||
|
|
||||||
let mut headers = Vec::default();
|
let mut headers = Vec::default();
|
||||||
for entry in walker {
|
for entry in walker {
|
||||||
let (num, hash) = entry?;
|
let (num, hash) = entry?;
|
||||||
let (_, header) =
|
let (_, header) =
|
||||||
header_cursor.seek_exact(num)?.expect("missing header");
|
header_cursor.seek_exact(num)?.expect("missing header");
|
||||||
headers.push(header.seal(hash));
|
headers.push(header.seal(hash));
|
||||||
}
|
}
|
||||||
Ok(headers)
|
Ok(headers)
|
||||||
})??);
|
})
|
||||||
|
.map_err(|err| DownloadError::Provider(err.into()))?
|
||||||
|
.map_err(|err| DownloadError::Provider(err.into()))?,
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user