fix(net): handle closed channel in Spawned downloader (#1346)

This commit is contained in:
Matthias Seitz
2023-02-14 18:21:14 +01:00
committed by GitHub
parent f63c8d7e36
commit 6e89af9e8e
8 changed files with 117 additions and 65 deletions

1
Cargo.lock generated
View File

@ -4220,6 +4220,7 @@ dependencies = [
"reth-metrics-derive",
"reth-primitives",
"reth-rlp",
"reth-tasks",
"reth-tracing",
"tempfile",
"thiserror",

View File

@ -134,11 +134,11 @@ impl ImportCommand {
{
let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers)
.build(file_client.clone(), consensus.clone())
.as_task();
.into_task();
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
.build(file_client.clone(), consensus.clone(), db)
.as_task();
.into_task();
let mut pipeline = Pipeline::builder()
.with_sync_state_updater(file_client)

View File

@ -161,7 +161,13 @@ impl Command {
info!(target: "reth::cli", "Started RPC server");
let (mut pipeline, events) = self
.build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone())
.build_networked_pipeline(
&mut config,
network.clone(),
&consensus,
db.clone(),
&ctx.task_executor,
)
.await?;
ctx.task_executor.spawn(handle_events(events));
@ -186,6 +192,7 @@ impl Command {
network: NetworkHandle,
consensus: &Arc<dyn Consensus>,
db: Arc<Env<WriteMap>>,
task_executor: &TaskExecutor,
) -> eyre::Result<(Pipeline<Env<WriteMap>, impl SyncStateUpdater>, impl Stream<Item = NodeEvent>)>
{
// building network downloaders using the fetch client
@ -193,11 +200,11 @@ impl Command {
let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers)
.build(fetch_client.clone(), consensus.clone())
.as_task();
.into_task_with(task_executor);
let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies)
.build(fetch_client.clone(), consensus.clone(), db.clone())
.as_task();
.into_task_with(task_executor);
let mut pipeline = self
.build_pipeline(config, header_downloader, body_downloader, network.clone(), consensus)

View File

@ -13,6 +13,7 @@ reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-eth-wire = { path = "../eth-wire" }
reth-db = { path = "../../storage/db" }
reth-tasks = { path = "../../tasks" }
reth-metrics-derive = { path = "../../metrics/metrics-derive" }
# async

View File

@ -16,6 +16,7 @@ use reth_interfaces::{
},
};
use reth_primitives::{BlockNumber, SealedHeader};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
cmp::Ordering,
collections::BinaryHeap,
@ -246,10 +247,18 @@ where
DB: Database,
Self: BodyDownloader + 'static,
{
/// Spawns the downloader task via [tokio::task::spawn]
pub fn into_task(self) -> TaskDownloader {
self.into_task_with(&TokioTaskExecutor::default())
}
/// Convert the downloader into a [`TaskDownloader`](super::task::TaskDownloader) by spawning
/// it.
pub fn as_task(self) -> TaskDownloader {
TaskDownloader::spawn(self)
/// it via the given spawner.
pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader
where
S: TaskSpawner,
{
TaskDownloader::spawn_with(self, spawner)
}
}

View File

@ -1,21 +1,19 @@
use futures::Stream;
use futures_util::StreamExt;
use futures_util::{FutureExt, StreamExt};
use pin_project::pin_project;
use reth_interfaces::p2p::{
bodies::downloader::{BodyDownloader, BodyDownloaderResult},
error::DownloadResult,
};
use reth_primitives::BlockNumber;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
future::Future,
ops::Range,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::{
sync::{mpsc, mpsc::UnboundedSender},
task::JoinSet,
};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A [BodyDownloader] that drives a spawned [BodyDownloader] on a spawned task.
@ -25,16 +23,13 @@ pub struct TaskDownloader {
#[pin]
from_downloader: UnboundedReceiverStream<BodyDownloaderResult>,
to_downloader: UnboundedSender<Range<BlockNumber>>,
/// The spawned downloader tasks.
///
/// Note: If this type is dropped, the downloader task gets dropped as well.
_task: JoinSet<()>,
}
// === impl TaskDownloader ===
impl TaskDownloader {
/// Spawns the given `downloader` and returns a [TaskDownloader] that's connected to that task.
/// Spawns the given `downloader` via [tokio::task::spawn] returns a [TaskDownloader] that's
/// connected to that task.
///
/// # Panics
///
@ -61,6 +56,16 @@ impl TaskDownloader {
pub fn spawn<T>(downloader: T) -> Self
where
T: BodyDownloader + 'static,
{
Self::spawn_with(downloader, &TokioTaskExecutor::default())
}
/// Spawns the given `downloader` via the given [TaskSpawner] returns a [TaskDownloader] that's
/// connected to that task.
pub fn spawn_with<T, S>(downloader: T, spawner: &S) -> Self
where
T: BodyDownloader + 'static,
S: TaskSpawner,
{
let (bodies_tx, bodies_rx) = mpsc::unbounded_channel();
let (to_downloader, updates_rx) = mpsc::unbounded_channel();
@ -71,14 +76,9 @@ impl TaskDownloader {
downloader,
};
let mut task = JoinSet::<()>::new();
task.spawn(downloader);
spawner.spawn(async move { downloader.await }.boxed());
Self {
from_downloader: UnboundedReceiverStream::new(bodies_rx),
to_downloader,
_task: task,
}
Self { from_downloader: UnboundedReceiverStream::new(bodies_rx), to_downloader }
}
}
@ -111,16 +111,30 @@ impl<T: BodyDownloader> Future for SpawnedDownloader<T> {
let this = self.get_mut();
loop {
while let Poll::Ready(Some(range)) = this.updates.poll_next_unpin(cx) {
if let Err(err) = this.downloader.set_download_range(range) {
tracing::error!(target: "downloaders::bodies", ?err, "Failed to set download range");
let _ = this.bodies_tx.send(Err(err));
loop {
match this.updates.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// channel closed, this means [TaskDownloader] was dropped, so we can also
// exit
return Poll::Ready(())
}
Poll::Ready(Some(range)) => {
if let Err(err) = this.downloader.set_download_range(range) {
tracing::error!(target: "downloaders::bodies", ?err, "Failed to set download range");
let _ = this.bodies_tx.send(Err(err));
}
}
}
}
match ready!(this.downloader.poll_next_unpin(cx)) {
Some(bodies) => {
let _ = this.bodies_tx.send(bodies);
if this.bodies_tx.send(bodies).is_err() {
// channel closed, this means [TaskDownloader] was dropped, so we can also
// exit
return Poll::Ready(())
}
}
None => return Poll::Pending,
}

View File

@ -16,6 +16,7 @@ use reth_interfaces::{
},
};
use reth_primitives::{BlockNumber, Header, HeadersDirection, PeerId, SealedHeader, H256};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
@ -508,10 +509,18 @@ where
H: HeadersClient,
Self: HeaderDownloader + 'static,
{
/// Spawns the downloader task via [tokio::task::spawn]
pub fn into_task(self) -> TaskDownloader {
self.into_task_with(&TokioTaskExecutor::default())
}
/// Convert the downloader into a [`TaskDownloader`](super::task::TaskDownloader) by spawning
/// it.
pub fn as_task(self) -> TaskDownloader {
TaskDownloader::spawn(self)
/// it via the given `spawner`.
pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader
where
S: TaskSpawner,
{
TaskDownloader::spawn_with(self, spawner)
}
}

View File

@ -1,17 +1,15 @@
use futures::Stream;
use futures::{FutureExt, Stream};
use futures_util::StreamExt;
use pin_project::pin_project;
use reth_interfaces::p2p::headers::downloader::{HeaderDownloader, SyncTarget};
use reth_primitives::SealedHeader;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::{
sync::{mpsc, mpsc::UnboundedSender},
task::JoinSet,
};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A [HeaderDownloader] that drives a spawned [HeaderDownloader] on a spawned task.
@ -21,16 +19,13 @@ pub struct TaskDownloader {
#[pin]
from_downloader: UnboundedReceiverStream<Vec<SealedHeader>>,
to_downloader: UnboundedSender<DownloaderUpdates>,
/// The spawned downloader tasks.
///
/// Note: If this type is dropped, the downloader task gets dropped as well.
_task: JoinSet<()>,
}
// === impl TaskDownloader ===
impl TaskDownloader {
/// Spawns the given `downloader` and returns a [TaskDownloader] that's connected to that task.
/// Spawns the given `downloader` via [tokio::task::spawn] and returns a [TaskDownloader] that's
/// connected to that task.
///
/// # Panics
///
@ -54,6 +49,16 @@ impl TaskDownloader {
pub fn spawn<T>(downloader: T) -> Self
where
T: HeaderDownloader + 'static,
{
Self::spawn_with(downloader, &TokioTaskExecutor::default())
}
/// Spawns the given `downloader` via the given [TaskSpawner] returns a [TaskDownloader] that's
/// connected to that task.
pub fn spawn_with<T, S>(downloader: T, spawner: &S) -> Self
where
T: HeaderDownloader + 'static,
S: TaskSpawner,
{
let (headers_tx, headers_rx) = mpsc::unbounded_channel();
let (to_downloader, updates_rx) = mpsc::unbounded_channel();
@ -63,15 +68,9 @@ impl TaskDownloader {
updates: UnboundedReceiverStream::new(updates_rx),
downloader,
};
spawner.spawn(async move { downloader.await }.boxed());
let mut task = JoinSet::<()>::new();
task.spawn(downloader);
Self {
from_downloader: UnboundedReceiverStream::new(headers_rx),
to_downloader,
_task: task,
}
Self { from_downloader: UnboundedReceiverStream::new(headers_rx), to_downloader }
}
}
@ -115,26 +114,38 @@ impl<T: HeaderDownloader> Future for SpawnedDownloader<T> {
let this = self.get_mut();
loop {
while let Poll::Ready(Some(update)) = this.updates.poll_next_unpin(cx) {
match update {
DownloaderUpdates::UpdateSyncGap(head, target) => {
this.downloader.update_sync_gap(head, target);
}
DownloaderUpdates::UpdateLocalHead(head) => {
this.downloader.update_local_head(head);
}
DownloaderUpdates::UpdateSyncTarget(target) => {
this.downloader.update_sync_target(target);
}
DownloaderUpdates::SetBatchSize(limit) => {
this.downloader.set_batch_size(limit);
loop {
match this.updates.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// channel closed, this means [TaskDownloader] was dropped, so we can also
// exit
return Poll::Ready(())
}
Poll::Ready(Some(update)) => match update {
DownloaderUpdates::UpdateSyncGap(head, target) => {
this.downloader.update_sync_gap(head, target);
}
DownloaderUpdates::UpdateLocalHead(head) => {
this.downloader.update_local_head(head);
}
DownloaderUpdates::UpdateSyncTarget(target) => {
this.downloader.update_sync_target(target);
}
DownloaderUpdates::SetBatchSize(limit) => {
this.downloader.set_batch_size(limit);
}
},
}
}
match ready!(this.downloader.poll_next_unpin(cx)) {
Some(headers) => {
let _ = this.headers_tx.send(headers);
if this.headers_tx.send(headers).is_err() {
// channel closed, this means [TaskDownloader] was dropped, so we can also
// exit
return Poll::Ready(())
}
}
None => return Poll::Pending,
}