feat(cli): spawn task downloaders (#1055)

This commit is contained in:
Roman Krasiuk
2023-01-26 19:20:13 +02:00
committed by GitHub
parent a9c75d2fc7
commit b5dab614df
2 changed files with 31 additions and 27 deletions

View File

@ -155,41 +155,47 @@ impl Command {
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
let fetch_client = Arc::new(network.fetch_client().await?);
// Spawn headers downloader
let headers_downloader = headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
// NOTE: the head and target will be set from inside the stage before the
// downloader is called
.build(
consensus.clone(),
fetch_client.clone(),
Default::default(),
Default::default(),
),
);
// Spawn bodies downloader
let bodies_downloader = bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(config.stages.bodies.downloader_max_buffered_responses)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
);
let mut pipeline = reth_stages::Pipeline::default()
.with_sync_state_updater(network.clone())
.with_channel(sender)
.push(HeaderStage {
downloader: headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
// NOTE: the head and target will be set from inside the stage before the
// downloader is called
.build(
consensus.clone(),
fetch_client.clone(),
Default::default(),
Default::default(),
),
downloader: headers_downloader,
consensus: consensus.clone(),
sync_status_updates: network.clone(),
})
.push(TotalDifficultyStage {
commit_threshold: config.stages.total_difficulty.commit_threshold,
})
.push(BodyStage {
downloader: bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(
config.stages.bodies.downloader_max_buffered_responses,
)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
consensus: consensus.clone(),
})
.push(BodyStage { downloader: bodies_downloader, consensus: consensus.clone() })
.push(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.sender_recovery.commit_threshold,

View File

@ -348,9 +348,7 @@ where
loop {
// Poll requests
while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) {
println!("RESPONSE LEN >> {}", response.len());
let response = OrderedBodiesResponse(response);
println!("RESPONSE RANGE >> {:?}", response.block_range());
this.buffered_responses.push(response);
}