From eb2f5e4ea0fecd9d91d96de3dc24af533a9794e0 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 9 Feb 2023 23:21:33 +0100 Subject: [PATCH] refactor/perf: use rayon spawn for recovery jobs (#1226) --- Cargo.lock | 2 + bin/reth/src/node/mod.rs | 1 - bin/reth/src/stage/mod.rs | 5 +- crates/staged-sync/src/config.rs | 4 +- crates/stages/Cargo.toml | 2 + crates/stages/benches/criterion.rs | 8 +- crates/stages/src/stages/mod.rs | 2 + crates/stages/src/stages/sender_recovery.rs | 82 ++++++----- crates/stages/src/stages/stream.rs | 144 ++++++++++++++++++++ docs/repo/crates/network.md | 1 - 10 files changed, 202 insertions(+), 49 deletions(-) create mode 100644 crates/stages/src/stages/stream.rs diff --git a/Cargo.lock b/Cargo.lock index b6f7c1bdc..3cc1f93cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4738,7 +4738,9 @@ dependencies = [ "hasher", "itertools 0.10.5", "metrics", + "num-traits", "paste", + "pin-project", "proptest", "rand 0.8.5", "rayon", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index bf9a559d4..f9a414595 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -286,7 +286,6 @@ impl Command { .add_stages( OfflineStages::default() .set(SenderRecoveryStage { - batch_size: stage_conf.sender_recovery.batch_size, commit_threshold: stage_conf.sender_recovery.commit_threshold, }) .set(ExecutionStage { diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index 465f71fa0..864602106 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -170,10 +170,7 @@ impl Command { stage.execute(&mut tx, input).await?; } StageEnum::Senders => { - let mut stage = SenderRecoveryStage { - batch_size: config.stages.sender_recovery.batch_size, - commit_threshold: num_blocks, - }; + let mut stage = SenderRecoveryStage { commit_threshold: num_blocks }; // Unwind first if !self.skip_unwind { diff --git a/crates/staged-sync/src/config.rs b/crates/staged-sync/src/config.rs index 7d3e32699..7c683f331 100644 --- a/crates/staged-sync/src/config.rs +++ b/crates/staged-sync/src/config.rs @@ -152,13 +152,11 @@ impl From for BodiesDownloaderBuilder { pub struct SenderRecoveryConfig { /// The maximum number of blocks to process before committing progress to the database. pub commit_threshold: u64, - /// The maximum number of transactions to recover senders for concurrently. - pub batch_size: usize, } impl Default for SenderRecoveryConfig { fn default() -> Self { - Self { commit_threshold: 5_000, batch_size: 1000 } + Self { commit_threshold: 5_000 } } } diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index c9f62ac1a..a5fb4078d 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -28,6 +28,7 @@ tokio = { version = "1.21.2", features = ["sync"] } tokio-stream = "0.1.10" async-trait = "0.1.57" futures-util = "0.3.25" +pin-project = "1.0.12" # observability tracing = "0.1.36" @@ -39,6 +40,7 @@ thiserror = "1.0.37" aquamarine = "0.2.1" itertools = "0.10.5" rayon = "1.6.0" +num-traits = "0.2.15" # trie cita_trie = "4.0.0" diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 126bc4328..4556f18e5 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -23,9 +23,8 @@ fn senders(c: &mut Criterion) { for batch in [1000usize, 10_000, 100_000, 250_000] { let num_blocks = 10_000; let mut stage = SenderRecoveryStage::default(); - stage.batch_size = batch; stage.commit_threshold = num_blocks; - let label = format!("SendersRecovery-batch-{}", batch); + let label = format!("SendersRecovery-batch-{batch}"); measure_stage(&mut group, stage, num_blocks, label); } } @@ -82,7 +81,7 @@ fn measure_stage>>( |_| async { let mut stage = stage.clone(); let mut db_tx = tx.inner(); - stage.execute(&mut db_tx, input.clone()).await.unwrap(); + stage.execute(&mut db_tx, input).await.unwrap(); db_tx.commit().unwrap(); }, ) @@ -116,8 +115,7 @@ fn txs_testdata(num_blocks: usize) -> PathBuf { transaction::{DbTx, DbTxMut}, }; tx.commit(|tx| { - let (head, _) = - tx.cursor_read::()?.first()?.unwrap_or_default().into(); + let (head, _) = tx.cursor_read::()?.first()?.unwrap_or_default(); tx.put::(head, reth_primitives::U256::from(0).into()) }) .unwrap(); diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 2e7faf5c5..4b344a8db 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -16,6 +16,8 @@ mod index_storage_history; mod merkle; /// The sender recovery stage. mod sender_recovery; +/// Helper types for working with streams. +mod stream; /// The total difficulty stage mod total_difficulty; /// The transaction lookup stage diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 8f8264877..38b87f082 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -2,18 +2,20 @@ use crate::{ db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; -use itertools::Itertools; -use rayon::prelude::*; +use futures_util::StreamExt; + +use crate::stages::stream::SequentialPairStream; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, tables, transaction::{DbTx, DbTxMut}, - Error as DbError, }; use reth_primitives::TxNumber; use std::fmt::Debug; use thiserror::Error; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; const SENDER_RECOVERY: StageId = StageId("SenderRecovery"); @@ -23,8 +25,6 @@ const SENDER_RECOVERY: StageId = StageId("SenderRecovery"); /// in [`TxSenders`][reth_db::tables::TxSenders] table. #[derive(Clone, Debug)] pub struct SenderRecoveryStage { - /// The size of the chunk for parallel sender recovery - pub batch_size: usize, /// The size of inserted items after which the control /// flow will be returned to the pipeline for commit pub commit_threshold: u64, @@ -32,20 +32,7 @@ pub struct SenderRecoveryStage { impl Default for SenderRecoveryStage { fn default() -> Self { - Self { batch_size: 250000, commit_threshold: 10000 } - } -} - -// TODO(onbjerg): Should unwind -#[derive(Error, Debug)] -enum SenderRecoveryStageError { - #[error("Sender recovery failed for transaction {tx}.")] - SenderRecovery { tx: TxNumber }, -} - -impl From for StageError { - fn from(error: SenderRecoveryStageError) -> Self { - StageError::Fatal(Box::new(error)) + Self { commit_threshold: 10000 } } } @@ -91,22 +78,34 @@ impl Stage for SenderRecoveryStage { // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders"); - for chunk in &entries.chunks(self.batch_size) { - let transactions = chunk.collect::, DbError>>()?; - // Recover signers for the chunk in parallel - let recovered = transactions - .into_par_iter() - .map(|(tx_id, transaction)| { - trace!(target: "sync::stages::sender_recovery", tx_id, hash = ?transaction.hash(), "Recovering sender"); - let signer = - transaction.recover_signer().ok_or_else::(|| { - SenderRecoveryStageError::SenderRecovery { tx: tx_id }.into() - })?; + + // a channel to receive results from a rayon job + let (tx, rx) = mpsc::unbounded_channel(); + + // spawn recovery jobs onto the default rayon threadpool and send the result through the + // channel + for entry in entries { + let (tx_id, transaction) = entry?; + let tx = tx.clone(); + rayon::spawn_fifo(move || { + trace!(target: "sync::stages::sender_recovery", tx_id, hash = ?transaction.hash(), "Recovering sender"); + let res = if let Some(signer) = transaction.recover_signer() { Ok((tx_id, signer)) - }) - .collect::, StageError>>()?; - // Append the signers to the table - recovered.into_iter().try_for_each(|(id, sender)| senders_cursor.append(id, sender))?; + } else { + Err(StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id })) + }; + // send the result back + let _ = tx.send(res); + }); + } + drop(tx); + + let mut recovered_senders = + SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx)); + + while let Some(recovered) = recovered_senders.next().await { + let (id, sender) = recovered?; + senders_cursor.append(id, sender)?; } let done = !capped; @@ -128,6 +127,19 @@ impl Stage for SenderRecoveryStage { } } +// TODO(onbjerg): Should unwind +#[derive(Error, Debug)] +enum SenderRecoveryStageError { + #[error("Sender recovery failed for transaction {tx}.")] + SenderRecovery { tx: TxNumber }, +} + +impl From for StageError { + fn from(error: SenderRecoveryStageError) -> Self { + StageError::Fatal(Box::new(error)) + } +} + #[cfg(test)] mod tests { use assert_matches::assert_matches; @@ -264,7 +276,7 @@ mod tests { } fn stage(&self) -> Self::S { - SenderRecoveryStage { batch_size: 100, commit_threshold: self.threshold } + SenderRecoveryStage { commit_threshold: self.threshold } } } diff --git a/crates/stages/src/stages/stream.rs b/crates/stages/src/stages/stream.rs new file mode 100644 index 000000000..5e8f41e3c --- /dev/null +++ b/crates/stages/src/stages/stream.rs @@ -0,0 +1,144 @@ +use futures_util::stream::Stream; +use num_traits::One; +use std::{ + cmp::Ordering, + collections::{binary_heap::PeekMut, BinaryHeap}, + ops::Add, + pin::Pin, + task::{Context, Poll}, +}; + +/// A Stream type that emits key-value pairs in sequential order of the key. +#[pin_project::pin_project] +#[must_use = "stream does nothing unless polled"] +pub(crate) struct SequentialPairStream { + /// The next item we expect from the stream + next: Key, + /// buffered entries + pending: BinaryHeap>, + #[pin] + stream: St, + done: bool, +} + +// === impl SequentialPairStream === + +impl SequentialPairStream { + /// Returns a new [SequentialPairStream] that emits the items of the given stream in order + /// starting at the given start point. + pub(crate) fn new(start: Key, stream: St) -> Self { + Self { next: start, pending: Default::default(), stream, done: false } + } +} + +/// implements Stream for any underlying Stream that returns a result +impl Stream for SequentialPairStream +where + Key: Ord + Copy + Add + One, + St: Stream>, +{ + type Item = Result<(Key, Value), Err>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + 'outer: loop { + // try to drain buffered items + while let Some(maybe_next) = this.pending.peek_mut() { + match (maybe_next.0).cmp(&*this.next) { + Ordering::Less => { + PeekMut::pop(maybe_next); + continue + } + Ordering::Equal => { + let next = PeekMut::pop(maybe_next); + *this.next = *this.next + Key::one(); + return Poll::Ready(Some(Ok(next.into()))) + } + Ordering::Greater => { + if *this.done { + let next = PeekMut::pop(maybe_next); + return Poll::Ready(Some(Ok(next.into()))) + } + break + } + } + } + + if *this.done { + return Poll::Ready(None) + } + + loop { + match this.stream.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(item) => match item { + Some(Ok((k, v))) => { + if k == *this.next { + *this.next = *this.next + Key::one(); + return Poll::Ready(Some(Ok((k, v)))) + } + this.pending.push(OrderedItem(k, v)); + } + Some(err @ Err(_)) => return Poll::Ready(Some(err)), + None => { + *this.done = true; + continue 'outer + } + }, + } + } + + return Poll::Pending + } + } +} + +/// The item a [SequentialPairStream] emits +struct OrderedItem(Key, Value); + +impl From> for (Key, Value) { + fn from(value: OrderedItem) -> Self { + (value.0, value.1) + } +} + +impl Eq for OrderedItem {} + +impl PartialEq for OrderedItem { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl PartialOrd for OrderedItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(other.0.cmp(&self.0)) + } +} + +impl Ord for OrderedItem { + fn cmp(&self, other: &Self) -> Ordering { + // binary heap is max-heap + other.0.cmp(&self.0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::{stream, TryStreamExt}; + use rand::{seq::SliceRandom, thread_rng}; + + #[tokio::test] + async fn test_ordered_stream() { + let values: Vec<_> = (0..10).map(|i| (i, i)).collect(); + + let mut input = values.clone(); + input.shuffle(&mut thread_rng()); + let stream = stream::iter(input.into_iter().map(Ok::<_, ()>)); + let ordered = SequentialPairStream::new(values[0].0, stream); + let received = ordered.try_collect::>().await.unwrap(); + assert_eq!(received, values); + } +} diff --git a/docs/repo/crates/network.md b/docs/repo/crates/network.md index 52ab5441f..c67b7b6ee 100644 --- a/docs/repo/crates/network.md +++ b/docs/repo/crates/network.md @@ -64,7 +64,6 @@ let mut pipeline = reth_stages::Pipeline::new() commit_threshold: config.stages.bodies.commit_threshold, }) .push(SenderRecoveryStage { - batch_size: config.stages.sender_recovery.batch_size, commit_threshold: config.stages.sender_recovery.commit_threshold, }) .push(ExecutionStage { config: ExecutorConfig::new_ethereum() });