feat: add task header downloader (#958)

This commit is contained in:
Matthias Seitz
2023-01-25 12:32:28 +01:00
committed by GitHub
parent d08aa94f5d
commit 6c37b0aa1b
8 changed files with 480 additions and 229 deletions

View File

@ -18,6 +18,9 @@ reth-metrics-derive = { path = "../../metrics/metrics-derive" }
# async
futures = "0.3"
futures-util = "0.3.25"
pin-project = "1.0"
tokio = { version = "1.0", features = ["sync"] }
tokio-stream = "0.1"
# misc
tracing = "0.1.37"
@ -30,5 +33,4 @@ reth-tracing = { path = "../../tracing" }
async-trait = "0.1.58"
assert_matches = "1.5.0"
once_cell = "1.17.0"
tokio = { version = "1.21.2", features = ["full"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

View File

@ -92,6 +92,11 @@ impl<H> LinearDownloader<H>
where
H: HeadersClient + 'static,
{
/// Convenience method to create a [LinearDownloadBuilder] without importing it
pub fn builder() -> LinearDownloadBuilder {
LinearDownloadBuilder::default()
}
/// Returns the block number the local node is at.
#[inline]
fn local_block_number(&self) -> u64 {
@ -446,6 +451,11 @@ where
}
}
/// Validate whether the header is valid in relation to it's parent
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
validate_header_download(&self.consensus, header, parent)
}
/// Clears all requests/responses.
fn clear(&mut self) {
self.lowest_validated_header.take();
@ -527,11 +537,6 @@ where
fn set_batch_size(&mut self, batch_size: usize) {
self.stream_batch_size = batch_size;
}
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
validate_header_download(&self.consensus, header, parent)?;
Ok(())
}
}
impl<H> Stream for LinearDownloader<H>
@ -875,20 +880,11 @@ fn calc_next_request(
#[cfg(test)]
mod tests {
use super::*;
use once_cell::sync::Lazy;
use crate::headers::test_utils::child_header;
use reth_interfaces::test_utils::{TestConsensus, TestHeadersClient};
use reth_primitives::SealedHeader;
static CONSENSUS: Lazy<Arc<dyn Consensus>> = Lazy::new(|| Arc::new(TestConsensus::default()));
fn child_header(parent: &SealedHeader) -> SealedHeader {
let mut child = parent.as_ref().clone();
child.number += 1;
child.parent_hash = parent.hash_slow();
let hash = child.hash_slow();
SealedHeader::new(child, hash)
}
/// Tests that request calc works
#[test]
fn test_sync_target_update() {
@ -897,7 +893,7 @@ mod tests {
let genesis = SealedHeader::default();
let mut downloader = LinearDownloadBuilder::default().build(
CONSENSUS.clone(),
Arc::new(TestConsensus::default()),
Arc::clone(&client),
genesis,
H256::random(),
@ -924,7 +920,7 @@ mod tests {
let header = SealedHeader::default();
let mut downloader = LinearDownloadBuilder::default().build(
CONSENSUS.clone(),
Arc::new(TestConsensus::default()),
Arc::clone(&client),
header.clone(),
H256::random(),
@ -966,7 +962,7 @@ mod tests {
let batch_size = 99;
let start = 1000;
let mut downloader = LinearDownloadBuilder::default().request_limit(batch_size).build(
CONSENSUS.clone(),
Arc::new(TestConsensus::default()),
Arc::clone(&client),
genesis,
H256::random(),
@ -1017,7 +1013,7 @@ mod tests {
let mut downloader = LinearDownloadBuilder::default()
.stream_batch_size(3)
.request_limit(3)
.build(CONSENSUS.clone(), Arc::clone(&client), p3.clone(), p0.hash());
.build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash());
client
.extend(vec![
@ -1047,7 +1043,7 @@ mod tests {
let mut downloader = LinearDownloadBuilder::default()
.stream_batch_size(1)
.request_limit(1)
.build(CONSENSUS.clone(), Arc::clone(&client), p3.clone(), p0.hash());
.build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash());
client
.extend(vec![

View File

@ -1,2 +1,8 @@
/// A Linear downloader implementation.
pub mod linear;
/// A downloader implementation that spawns a downloader to a task
pub mod task;
#[cfg(test)]
mod test_utils;

View File

@ -0,0 +1,196 @@
use futures::Stream;
use futures_util::StreamExt;
use pin_project::pin_project;
use reth_interfaces::p2p::headers::downloader::{HeaderDownloader, SyncTarget};
use reth_primitives::SealedHeader;
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::{
sync::{mpsc, mpsc::UnboundedSender},
task::JoinSet,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A [HeaderDownloader] that drives a spawned [HeaderDownloader] on a spawned task.
#[derive(Debug)]
#[pin_project]
pub struct TaskDownloader {
#[pin]
from_downloader: UnboundedReceiverStream<Vec<SealedHeader>>,
to_downloader: UnboundedSender<DownloaderUpdates>,
/// The spawned downloader tasks.
///
/// Note: If this type is dropped, the downloader task gets dropped as well.
_task: JoinSet<()>,
}
// === impl TaskDownloader ===
impl TaskDownloader {
/// Spawns the given `downloader` and returns a [TaskDownloader] that's connected to that task.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime
///
/// # Example
///
/// ```
/// # use std::sync::Arc;
/// # use reth_downloaders::headers::linear::LinearDownloader;
/// # use reth_downloaders::headers::task::TaskDownloader;
/// # use reth_interfaces::consensus::Consensus;
/// # use reth_interfaces::p2p::headers::client::HeadersClient;
/// # fn t<H: HeadersClient + 'static>(consensus:Arc<dyn Consensus>, client: Arc<H>) {
/// let downloader = LinearDownloader::builder().build(
/// consensus,
/// client,
/// Default::default(),
/// Default::default(),
/// );
/// let downloader = TaskDownloader::spawn(downloader);
/// # }
pub fn spawn<T>(downloader: T) -> Self
where
T: HeaderDownloader + 'static,
{
let (headers_tx, headers_rx) = mpsc::unbounded_channel();
let (to_downloader, updates_rx) = mpsc::unbounded_channel();
let downloader = SpawnedDownloader {
headers_tx,
updates: UnboundedReceiverStream::new(updates_rx),
downloader,
};
let mut task = JoinSet::<()>::new();
task.spawn(downloader);
Self {
from_downloader: UnboundedReceiverStream::new(headers_rx),
to_downloader,
_task: task,
}
}
}
impl HeaderDownloader for TaskDownloader {
fn update_sync_gap(&mut self, head: SealedHeader, target: SyncTarget) {
let _ = self.to_downloader.send(DownloaderUpdates::UpdateSyncGap(head, target));
}
fn update_local_head(&mut self, head: SealedHeader) {
let _ = self.to_downloader.send(DownloaderUpdates::UpdateLocalHead(head));
}
fn update_sync_target(&mut self, target: SyncTarget) {
let _ = self.to_downloader.send(DownloaderUpdates::UpdateSyncTarget(target));
}
fn set_batch_size(&mut self, limit: usize) {
let _ = self.to_downloader.send(DownloaderUpdates::SetBatchSize(limit));
}
}
impl Stream for TaskDownloader {
type Item = Vec<SealedHeader>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().from_downloader.poll_next(cx)
}
}
/// A [HeaderDownloader] that runs on its own task
struct SpawnedDownloader<T> {
updates: UnboundedReceiverStream<DownloaderUpdates>,
headers_tx: UnboundedSender<Vec<SealedHeader>>,
downloader: T,
}
impl<T: HeaderDownloader> Future for SpawnedDownloader<T> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
while let Poll::Ready(Some(update)) = this.updates.poll_next_unpin(cx) {
match update {
DownloaderUpdates::UpdateSyncGap(head, target) => {
this.downloader.update_sync_gap(head, target);
}
DownloaderUpdates::UpdateLocalHead(head) => {
this.downloader.update_local_head(head);
}
DownloaderUpdates::UpdateSyncTarget(target) => {
this.downloader.update_sync_target(target);
}
DownloaderUpdates::SetBatchSize(limit) => {
this.downloader.set_batch_size(limit);
}
}
}
match ready!(this.downloader.poll_next_unpin(cx)) {
Some(headers) => {
let _ = this.headers_tx.send(headers);
}
None => return Poll::Pending,
}
}
}
}
/// Commands delegated tot the spawned [HeaderDownloader]
enum DownloaderUpdates {
UpdateSyncGap(SealedHeader, SyncTarget),
UpdateLocalHead(SealedHeader),
UpdateSyncTarget(SyncTarget),
SetBatchSize(usize),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::headers::{linear::LinearDownloadBuilder, test_utils::child_header};
use reth_interfaces::test_utils::{TestConsensus, TestHeadersClient};
use std::sync::Arc;
#[tokio::test(flavor = "multi_thread")]
async fn download_one_by_one_on_task() {
reth_tracing::init_test_tracing();
let p3 = SealedHeader::default();
let p2 = child_header(&p3);
let p1 = child_header(&p2);
let p0 = child_header(&p1);
let client = Arc::new(TestHeadersClient::default());
let downloader = LinearDownloadBuilder::default()
.stream_batch_size(1)
.request_limit(1)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash());
let mut downloader = TaskDownloader::spawn(downloader);
client
.extend(vec![
p0.as_ref().clone(),
p1.as_ref().clone(),
p2.as_ref().clone(),
p3.as_ref().clone(),
])
.await;
let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p0]);
let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p1]);
let headers = downloader.next().await.unwrap();
assert_eq!(headers, vec![p2]);
}
}

View File

@ -0,0 +1,10 @@
use reth_primitives::SealedHeader;
/// Returns a new [SealedHeader] that's the child header of the given `parent`.
pub(crate) fn child_header(parent: &SealedHeader) -> SealedHeader {
let mut child = parent.as_ref().clone();
child.number += 1;
child.parent_hash = parent.hash_slow();
let hash = child.hash_slow();
SealedHeader::new(child, hash)
}