fix: ensure that SendersRecoveryStage either recovers all senders or fails (#10810)

This commit is contained in:
joshieDo
2024-09-11 13:20:04 +01:00
committed by GitHub
parent bc14ad1f53
commit 9aea661c94

View File

@ -6,7 +6,7 @@ use reth_db_api::{
database::Database,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{Address, StaticFileSegment, TransactionSignedNoHash, TxNumber};
use reth_primitives::{Address, GotExpected, StaticFileSegment, TransactionSignedNoHash, TxNumber};
use reth_provider::{
BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader,
StatsReader,
@ -153,23 +153,37 @@ where
.unzip();
let static_file_provider = provider.static_file_provider().clone();
tokio::task::spawn_blocking(move || {
// We do not use `tokio::task::spawn_blocking` because, during a shutdown,
// there will be a timeout grace period in which Tokio does not allow spawning
// additional blocking tasks. This would cause this function to return
// `SenderRecoveryStageError::RecoveredSendersMismatch` at the end.
//
// However, using `std::thread::spawn` allows us to utilize the timeout grace
// period to complete some work without throwing errors during the shutdown.
std::thread::spawn(move || {
for (chunk_range, recovered_senders_tx) in chunks {
// Read the raw value, and let the rayon worker to decompress & decode.
let chunk = static_file_provider
.fetch_range_with_predicate(
StaticFileSegment::Transactions,
chunk_range.clone(),
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
number.into(),
)?
.map(|tx| (number, tx)))
},
|_| true,
)
.expect("failed to fetch range");
let chunk = match static_file_provider.fetch_range_with_predicate(
StaticFileSegment::Transactions,
chunk_range.clone(),
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
number.into(),
)?
.map(|tx| (number, tx)))
},
|_| true,
) {
Ok(chunk) => chunk,
Err(err) => {
// We exit early since we could not process this chunk.
let _ = recovered_senders_tx
.send(Err(Box::new(SenderRecoveryStageError::StageError(err.into()))));
break
}
};
// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has read the transaction
@ -178,14 +192,28 @@ where
let mut rlp_buf = Vec::with_capacity(128);
for (number, tx) in chunk {
rlp_buf.clear();
let tx = tx.value().expect("decode error");
let _ = recovered_senders_tx.send(recover_sender((number, tx), &mut rlp_buf));
let res = tx
.value()
.map_err(|err| Box::new(SenderRecoveryStageError::StageError(err.into())))
.and_then(|tx| recover_sender((number, tx), &mut rlp_buf));
let is_err = res.is_err();
let _ = recovered_senders_tx.send(res);
// Finish early
if is_err {
break
}
}
});
}
});
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Appending recovered senders to the database");
let mut processed_transactions = 0;
for channel in receivers {
while let Ok(recovered) = channel.recv() {
let (tx_id, sender) = match recovered {
@ -212,14 +240,33 @@ where
})
}
SenderRecoveryStageError::StageError(err) => Err(err),
SenderRecoveryStageError::RecoveredSendersMismatch(expectation) => {
Err(StageError::Fatal(
SenderRecoveryStageError::RecoveredSendersMismatch(expectation)
.into(),
))
}
}
}
};
senders_cursor.append(tx_id, sender)?;
processed_transactions += 1;
}
}
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Finished recovering senders batch");
// Fail safe to ensure that we do not proceed without having recovered all senders.
let expected = tx_range.end - tx_range.start;
if processed_transactions != expected {
return Err(StageError::Fatal(
SenderRecoveryStageError::RecoveredSendersMismatch(GotExpected {
got: processed_transactions,
expected,
})
.into(),
));
}
Ok(())
}
@ -266,6 +313,10 @@ enum SenderRecoveryStageError {
#[error(transparent)]
FailedRecovery(#[from] FailedSenderRecoveryError),
/// Number of recovered senders does not match
#[error("failed to recover all senders from the batch: {_0}")]
RecoveredSendersMismatch(GotExpected<u64>),
/// A different type of stage error occurred
#[error(transparent)]
StageError(#[from] StageError),