fix: spawn batch ranges from a tokio thread instead on SenderRecovery stage (#7333)

This commit is contained in:
joshieDo
2024-03-26 18:05:36 +01:00
committed by GitHub
parent e3d8ceb4be
commit 8e1d78123c
2 changed files with 109 additions and 76 deletions

View File

@ -5,6 +5,7 @@ use reth_db::{
static_file::TransactionMask,
tables,
transaction::{DbTx, DbTxMut},
RawValue,
};
use reth_interfaces::consensus;
use reth_primitives::{
@ -20,6 +21,14 @@ use std::{fmt::Debug, ops::Range, sync::mpsc};
use thiserror::Error;
use tracing::*;
/// Maximum amount of transactions to read from disk at one time before we flush their senders to
/// disk. Since each rayon worker will hold at most 100 transactions (WORKER_CHUNK_SIZE), we
/// effectively max limit each batch to 1000 channels in memory.
const BATCH_SIZE: usize = 100_000;
/// Maximum number of senders to recover per rayon worker job.
const WORKER_CHUNK_SIZE: usize = 100;
/// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them
/// in [`TransactionSenders`][reth_db::tables::TransactionSenders] table.
@ -84,83 +93,13 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders");
// Spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel.
//
// Transactions are different size, so chunks will not all take the same processing time. If
// chunks are too big, there will be idle threads waiting for work. Choosing an
// arbitrary smaller value to make sure it doesn't happen.
let chunk_size = 100;
let chunks = (tx_range.start..tx_range.end)
.step_by(chunk_size as usize)
.map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end))
let batch = (tx_range.start..tx_range.end)
.step_by(BATCH_SIZE)
.map(|start| start..std::cmp::min(start + BATCH_SIZE as u64, tx_range.end))
.collect::<Vec<Range<u64>>>();
let mut channels = Vec::with_capacity(chunks.len());
for chunk_range in chunks {
// An _unordered_ channel to receive results from a rayon job
let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel();
channels.push((chunk_range.clone(), recovered_senders_rx));
let static_file_provider = provider.static_file_provider().clone();
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has read the transaction
// and calculated the sender.
rayon::spawn(move || {
debug!(target: "sync::stages::sender_recovery", ?chunk_range, "Recovering senders batch");
let mut rlp_buf = Vec::with_capacity(128);
let _ = static_file_provider.fetch_range_with_predicate(
StaticFileSegment::Transactions,
chunk_range.clone(),
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>(number.into())?
.map(|tx| {
rlp_buf.clear();
let _ = recovered_senders_tx
.send(recover_sender((number, tx), &mut rlp_buf));
}))
},
|_| true,
);
debug!(target: "sync::stages::sender_recovery", ?chunk_range, "Finished recovering senders batch");
});
}
// Iterate over channels and append the sender in the order that they are received.
for (chunk_range, channel) in channels {
debug!(target: "sync::stages::sender_recovery", ?chunk_range, "Appending recovered senders to the database");
while let Ok(recovered) = channel.recv() {
let (tx_id, sender) = match recovered {
Ok(result) => result,
Err(error) => {
return match *error {
SenderRecoveryStageError::FailedRecovery(err) => {
// get the block number for the bad transaction
let block_number = tx
.get::<tables::TransactionBlocks>(err.tx)?
.ok_or(ProviderError::BlockNumberForTransactionIndexNotFound)?;
// fetch the sealed header so we can use it in the sender recovery
// unwind
let sealed_header = provider
.sealed_header(block_number)?
.ok_or(ProviderError::HeaderNotFound(block_number.into()))?;
Err(StageError::Block {
block: Box::new(sealed_header),
error: BlockErrorKind::Validation(
consensus::ConsensusError::TransactionSignerRecoveryError,
),
})
}
SenderRecoveryStageError::StageError(err) => Err(err),
}
}
};
senders_cursor.append(tx_id, sender)?;
}
for range in batch {
recover_range(range, provider, tx, &mut senders_cursor)?;
}
Ok(ExecOutput {
@ -192,6 +131,99 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
}
fn recover_range<DB: Database>(
tx_range: Range<u64>,
provider: &DatabaseProviderRW<DB>,
tx: &<DB as Database>::TXMut,
senders_cursor: &mut <<DB as Database>::TXMut as DbTxMut>::CursorMut<
tables::TransactionSenders,
>,
) -> Result<(), StageError> {
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders batch");
// Preallocate channels
let (chunks, receivers): (Vec<_>, Vec<_>) = (tx_range.start..tx_range.end)
.step_by(WORKER_CHUNK_SIZE)
.map(|start| {
let range = start..std::cmp::min(start + WORKER_CHUNK_SIZE as u64, tx_range.end);
let (tx, rx) = mpsc::channel();
// Range and channel sender will be sent to rayon worker
((range, tx), rx)
})
.unzip();
let static_file_provider = provider.static_file_provider().clone();
tokio::task::spawn_blocking(move || {
for (chunk_range, recovered_senders_tx) in chunks {
let static_file_provider = static_file_provider.clone();
// Read the raw value, and let the rayon worker to decompress & decode.
let chunk = static_file_provider
.fetch_range_with_predicate(
StaticFileSegment::Transactions,
chunk_range.clone(),
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
number.into(),
)?
.map(|tx| (number, tx)))
},
|_| true,
)
.expect("failed to fetch range");
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has read the transaction
// and calculated the sender.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for (number, tx) in chunk {
rlp_buf.clear();
let tx = tx.value().expect("decode error");
let _ = recovered_senders_tx.send(recover_sender((number, tx), &mut rlp_buf));
}
});
}
});
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Appending recovered senders to the database");
for channel in receivers {
while let Ok(recovered) = channel.recv() {
let (tx_id, sender) = match recovered {
Ok(result) => result,
Err(error) => {
return match *error {
SenderRecoveryStageError::FailedRecovery(err) => {
// get the block number for the bad transaction
let block_number = tx
.get::<tables::TransactionBlocks>(err.tx)?
.ok_or(ProviderError::BlockNumberForTransactionIndexNotFound)?;
// fetch the sealed header so we can use it in the sender recovery
// unwind
let sealed_header = provider
.sealed_header(block_number)?
.ok_or(ProviderError::HeaderNotFound(block_number.into()))?;
Err(StageError::Block {
block: Box::new(sealed_header),
error: BlockErrorKind::Validation(
consensus::ConsensusError::TransactionSignerRecoveryError,
),
})
}
SenderRecoveryStageError::StageError(err) => Err(err),
}
}
};
senders_cursor.append(tx_id, sender)?;
}
}
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Finished recovering senders batch");
Ok(())
}
#[inline]
fn recover_sender(
(tx_id, tx): (TxNumber, TransactionSignedNoHash),

View File

@ -3,7 +3,7 @@ use crate::{
add_static_file_mask,
static_file::mask::{ColumnSelectorOne, ColumnSelectorTwo, HeaderMask},
table::Table,
HeaderTerminalDifficulties, Receipts, Transactions,
HeaderTerminalDifficulties, RawValue, Receipts, Transactions,
};
use reth_primitives::{BlockHash, Header};
@ -19,3 +19,4 @@ add_static_file_mask!(ReceiptMask, <Receipts as Table>::Value, 0b1);
// TRANSACTION MASKS
add_static_file_mask!(TransactionMask, <Transactions as Table>::Value, 0b1);
add_static_file_mask!(TransactionMask, RawValue<<Transactions as Table>::Value>, 0b1);