perf: dynamically batch tx sender recovery (#1834)

This commit is contained in:
Bjerg
2023-03-18 14:35:45 +01:00
committed by GitHub
parent 075544e889
commit a05b3ffed6

View File

@ -3,6 +3,7 @@ use crate::{
StageError, StageId, UnwindInput, UnwindOutput,
};
use futures_util::StreamExt;
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
@ -75,36 +76,49 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Acquire the cursor over the transactions
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
// Walk the transactions from start to end index (inclusive)
let entries = tx_cursor.walk_range(start_tx_index..=end_tx_index)?;
let tx_walker = tx_cursor.walk_range(start_tx_index..=end_tx_index)?;
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");
// an _unordered_ channel to receive results from a rayon job
// An _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::unbounded_channel();
// spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel
for entry in entries {
let (tx_id, transaction) = entry?;
// Spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel.
//
// We try to evenly divide the transactions to recover across all threads in the threadpool.
// Chunks are submitted instead of individual transactions to reduce the overhead of work
// stealing in the threadpool workers.
for chunk in
&tx_walker.chunks(self.commit_threshold as usize / rayon::current_num_threads())
{
let tx = tx.clone();
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let mut chunk: Vec<_> = chunk.collect();
// Spawn the sender recovery task onto the global rayon pool
// This task will send the result through the channel after it recovered the sender.
// This task will send the results through the channel after it recovered the senders.
rayon::spawn(move || {
let res = if let Some(signer) = transaction.recover_signer() {
Ok((tx_id, signer))
} else {
Err(StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }))
};
// send the result back
let _ = tx.send(res);
chunk
.drain(..)
.map(|entry| {
let (tx_id, transaction) = entry?;
let sender = transaction.recover_signer().ok_or(StageError::from(
SenderRecoveryStageError::SenderRecovery { tx: tx_id },
))?;
Ok((tx_id, sender))
})
.for_each(|result: Result<_, StageError>| {
let _ = tx.send(result);
});
});
}
drop(tx);
// we need sorted results so we wrap the _unordered_ receiver stream into the sequential
// stream which yields the next result (increasing transaction id)
// We need sorted results, so we wrap the _unordered_ receiver stream into a sequential
// stream, which yields the results by ascending transaction ID.
let mut recovered_senders =
SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx));