refactor/perf: use rayon spawn for recovery jobs (#1226)

This commit is contained in:
Matthias Seitz
2023-02-09 23:21:33 +01:00
committed by GitHub
parent d82553cd66
commit eb2f5e4ea0
10 changed files with 202 additions and 49 deletions

2
Cargo.lock generated
View File

@ -4738,7 +4738,9 @@ dependencies = [
"hasher",
"itertools 0.10.5",
"metrics",
"num-traits",
"paste",
"pin-project",
"proptest",
"rand 0.8.5",
"rayon",

View File

@ -286,7 +286,6 @@ impl Command {
.add_stages(
OfflineStages::default()
.set(SenderRecoveryStage {
batch_size: stage_conf.sender_recovery.batch_size,
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage {

View File

@ -170,10 +170,7 @@ impl Command {
stage.execute(&mut tx, input).await?;
}
StageEnum::Senders => {
let mut stage = SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: num_blocks,
};
let mut stage = SenderRecoveryStage { commit_threshold: num_blocks };
// Unwind first
if !self.skip_unwind {

View File

@ -152,13 +152,11 @@ impl From<BodiesConfig> for BodiesDownloaderBuilder {
pub struct SenderRecoveryConfig {
/// The maximum number of blocks to process before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of transactions to recover senders for concurrently.
pub batch_size: usize,
}
impl Default for SenderRecoveryConfig {
fn default() -> Self {
Self { commit_threshold: 5_000, batch_size: 1000 }
Self { commit_threshold: 5_000 }
}
}

View File

@ -28,6 +28,7 @@ tokio = { version = "1.21.2", features = ["sync"] }
tokio-stream = "0.1.10"
async-trait = "0.1.57"
futures-util = "0.3.25"
pin-project = "1.0.12"
# observability
tracing = "0.1.36"
@ -39,6 +40,7 @@ thiserror = "1.0.37"
aquamarine = "0.2.1"
itertools = "0.10.5"
rayon = "1.6.0"
num-traits = "0.2.15"
# trie
cita_trie = "4.0.0"

View File

@ -23,9 +23,8 @@ fn senders(c: &mut Criterion) {
for batch in [1000usize, 10_000, 100_000, 250_000] {
let num_blocks = 10_000;
let mut stage = SenderRecoveryStage::default();
stage.batch_size = batch;
stage.commit_threshold = num_blocks;
let label = format!("SendersRecovery-batch-{}", batch);
let label = format!("SendersRecovery-batch-{batch}");
measure_stage(&mut group, stage, num_blocks, label);
}
}
@ -82,7 +81,7 @@ fn measure_stage<S: Clone + Default + Stage<Env<WriteMap>>>(
|_| async {
let mut stage = stage.clone();
let mut db_tx = tx.inner();
stage.execute(&mut db_tx, input.clone()).await.unwrap();
stage.execute(&mut db_tx, input).await.unwrap();
db_tx.commit().unwrap();
},
)
@ -116,8 +115,7 @@ fn txs_testdata(num_blocks: usize) -> PathBuf {
transaction::{DbTx, DbTxMut},
};
tx.commit(|tx| {
let (head, _) =
tx.cursor_read::<tables::Headers>()?.first()?.unwrap_or_default().into();
let (head, _) = tx.cursor_read::<tables::Headers>()?.first()?.unwrap_or_default();
tx.put::<tables::HeaderTD>(head, reth_primitives::U256::from(0).into())
})
.unwrap();

View File

@ -16,6 +16,8 @@ mod index_storage_history;
mod merkle;
/// The sender recovery stage.
mod sender_recovery;
/// Helper types for working with streams.
mod stream;
/// The total difficulty stage
mod total_difficulty;
/// The transaction lookup stage

View File

@ -2,18 +2,20 @@ use crate::{
db::Transaction, exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use itertools::Itertools;
use rayon::prelude::*;
use futures_util::StreamExt;
use crate::stages::stream::SequentialPairStream;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
Error as DbError,
};
use reth_primitives::TxNumber;
use std::fmt::Debug;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
const SENDER_RECOVERY: StageId = StageId("SenderRecovery");
@ -23,8 +25,6 @@ const SENDER_RECOVERY: StageId = StageId("SenderRecovery");
/// in [`TxSenders`][reth_db::tables::TxSenders] table.
#[derive(Clone, Debug)]
pub struct SenderRecoveryStage {
/// The size of the chunk for parallel sender recovery
pub batch_size: usize,
/// The size of inserted items after which the control
/// flow will be returned to the pipeline for commit
pub commit_threshold: u64,
@ -32,20 +32,7 @@ pub struct SenderRecoveryStage {
impl Default for SenderRecoveryStage {
fn default() -> Self {
Self { batch_size: 250000, commit_threshold: 10000 }
}
}
// TODO(onbjerg): Should unwind
#[derive(Error, Debug)]
enum SenderRecoveryStageError {
#[error("Sender recovery failed for transaction {tx}.")]
SenderRecovery { tx: TxNumber },
}
impl From<SenderRecoveryStageError> for StageError {
fn from(error: SenderRecoveryStageError) -> Self {
StageError::Fatal(Box::new(error))
Self { commit_threshold: 10000 }
}
}
@ -91,22 +78,34 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");
for chunk in &entries.chunks(self.batch_size) {
let transactions = chunk.collect::<Result<Vec<_>, DbError>>()?;
// Recover signers for the chunk in parallel
let recovered = transactions
.into_par_iter()
.map(|(tx_id, transaction)| {
// a channel to receive results from a rayon job
let (tx, rx) = mpsc::unbounded_channel();
// spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel
for entry in entries {
let (tx_id, transaction) = entry?;
let tx = tx.clone();
rayon::spawn_fifo(move || {
trace!(target: "sync::stages::sender_recovery", tx_id, hash = ?transaction.hash(), "Recovering sender");
let signer =
transaction.recover_signer().ok_or_else::<StageError, _>(|| {
SenderRecoveryStageError::SenderRecovery { tx: tx_id }.into()
})?;
let res = if let Some(signer) = transaction.recover_signer() {
Ok((tx_id, signer))
})
.collect::<Result<Vec<_>, StageError>>()?;
// Append the signers to the table
recovered.into_iter().try_for_each(|(id, sender)| senders_cursor.append(id, sender))?;
} else {
Err(StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }))
};
// send the result back
let _ = tx.send(res);
});
}
drop(tx);
let mut recovered_senders =
SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx));
while let Some(recovered) = recovered_senders.next().await {
let (id, sender) = recovered?;
senders_cursor.append(id, sender)?;
}
let done = !capped;
@ -128,6 +127,19 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
}
// TODO(onbjerg): Should unwind
#[derive(Error, Debug)]
enum SenderRecoveryStageError {
#[error("Sender recovery failed for transaction {tx}.")]
SenderRecovery { tx: TxNumber },
}
impl From<SenderRecoveryStageError> for StageError {
fn from(error: SenderRecoveryStageError) -> Self {
StageError::Fatal(Box::new(error))
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
@ -264,7 +276,7 @@ mod tests {
}
fn stage(&self) -> Self::S {
SenderRecoveryStage { batch_size: 100, commit_threshold: self.threshold }
SenderRecoveryStage { commit_threshold: self.threshold }
}
}

View File

@ -0,0 +1,144 @@
use futures_util::stream::Stream;
use num_traits::One;
use std::{
cmp::Ordering,
collections::{binary_heap::PeekMut, BinaryHeap},
ops::Add,
pin::Pin,
task::{Context, Poll},
};
/// A Stream type that emits key-value pairs in sequential order of the key.
#[pin_project::pin_project]
#[must_use = "stream does nothing unless polled"]
pub(crate) struct SequentialPairStream<Key: Ord, Value, St> {
/// The next item we expect from the stream
next: Key,
/// buffered entries
pending: BinaryHeap<OrderedItem<Key, Value>>,
#[pin]
stream: St,
done: bool,
}
// === impl SequentialPairStream ===
impl<Key: Ord, Value, St> SequentialPairStream<Key, Value, St> {
/// Returns a new [SequentialPairStream] that emits the items of the given stream in order
/// starting at the given start point.
pub(crate) fn new(start: Key, stream: St) -> Self {
Self { next: start, pending: Default::default(), stream, done: false }
}
}
/// implements Stream for any underlying Stream that returns a result
impl<Key, Value, St, Err> Stream for SequentialPairStream<Key, Value, St>
where
Key: Ord + Copy + Add<Output = Key> + One,
St: Stream<Item = Result<(Key, Value), Err>>,
{
type Item = Result<(Key, Value), Err>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
'outer: loop {
// try to drain buffered items
while let Some(maybe_next) = this.pending.peek_mut() {
match (maybe_next.0).cmp(&*this.next) {
Ordering::Less => {
PeekMut::pop(maybe_next);
continue
}
Ordering::Equal => {
let next = PeekMut::pop(maybe_next);
*this.next = *this.next + Key::one();
return Poll::Ready(Some(Ok(next.into())))
}
Ordering::Greater => {
if *this.done {
let next = PeekMut::pop(maybe_next);
return Poll::Ready(Some(Ok(next.into())))
}
break
}
}
}
if *this.done {
return Poll::Ready(None)
}
loop {
match this.stream.as_mut().poll_next(cx) {
Poll::Pending => break,
Poll::Ready(item) => match item {
Some(Ok((k, v))) => {
if k == *this.next {
*this.next = *this.next + Key::one();
return Poll::Ready(Some(Ok((k, v))))
}
this.pending.push(OrderedItem(k, v));
}
Some(err @ Err(_)) => return Poll::Ready(Some(err)),
None => {
*this.done = true;
continue 'outer
}
},
}
}
return Poll::Pending
}
}
}
/// The item a [SequentialPairStream] emits
struct OrderedItem<Key, Value>(Key, Value);
impl<Key, Value> From<OrderedItem<Key, Value>> for (Key, Value) {
fn from(value: OrderedItem<Key, Value>) -> Self {
(value.0, value.1)
}
}
impl<Key: Ord, Value> Eq for OrderedItem<Key, Value> {}
impl<Key: Ord, Value> PartialEq for OrderedItem<Key, Value> {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}
impl<Key: Ord, Value> PartialOrd for OrderedItem<Key, Value> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(other.0.cmp(&self.0))
}
}
impl<Key: Ord, Value> Ord for OrderedItem<Key, Value> {
fn cmp(&self, other: &Self) -> Ordering {
// binary heap is max-heap
other.0.cmp(&self.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::{stream, TryStreamExt};
use rand::{seq::SliceRandom, thread_rng};
#[tokio::test]
async fn test_ordered_stream() {
let values: Vec<_> = (0..10).map(|i| (i, i)).collect();
let mut input = values.clone();
input.shuffle(&mut thread_rng());
let stream = stream::iter(input.into_iter().map(Ok::<_, ()>));
let ordered = SequentialPairStream::new(values[0].0, stream);
let received = ordered.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(received, values);
}
}

View File

@ -64,7 +64,6 @@ let mut pipeline = reth_stages::Pipeline::new()
commit_threshold: config.stages.bodies.commit_threshold,
})
.push(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.push(ExecutionStage { config: ExecutorConfig::new_ethereum() });