mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
fix: unwind on execution and senders errors (#2938)
Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, UnwindInput};
|
||||
use futures_util::Future;
|
||||
use reth_db::database::Database;
|
||||
use reth_interfaces::executor::BlockExecutionError;
|
||||
use reth_primitives::{listener::EventListeners, stage::StageId, BlockNumber, H256};
|
||||
use reth_provider::{providers::get_stage_checkpoint, Transaction};
|
||||
use std::pin::Pin;
|
||||
@ -390,6 +391,25 @@ where
|
||||
target: prev_checkpoint.unwrap_or_default().block_number,
|
||||
bad_block: block,
|
||||
})
|
||||
} else if let StageError::ExecutionError {
|
||||
block,
|
||||
error: BlockExecutionError::Validation(error),
|
||||
} = err
|
||||
{
|
||||
warn!(
|
||||
target: "sync::pipeline",
|
||||
stage = %stage_id,
|
||||
bad_block = %block.number,
|
||||
"Stage encountered an execution error: {error}"
|
||||
);
|
||||
|
||||
// We unwind because of an execution error. If the unwind itself fails, we
|
||||
// bail entirely, otherwise we restart the execution loop from the
|
||||
// beginning.
|
||||
Ok(ControlFlow::Unwind {
|
||||
target: prev_checkpoint.unwrap_or_default().block_number,
|
||||
bad_block: block,
|
||||
})
|
||||
} else if err.is_fatal() {
|
||||
error!(
|
||||
target: "sync::pipeline",
|
||||
|
||||
@ -7,12 +7,13 @@ use reth_db::{
|
||||
transaction::{DbTx, DbTxMut},
|
||||
DatabaseError, RawKey, RawTable, RawValue,
|
||||
};
|
||||
use reth_interfaces::consensus;
|
||||
use reth_primitives::{
|
||||
keccak256,
|
||||
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
|
||||
TransactionSignedNoHash, TxNumber, H160,
|
||||
};
|
||||
use reth_provider::Transaction;
|
||||
use reth_provider::{ProviderError, Transaction};
|
||||
use std::{fmt::Debug, ops::Deref};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc;
|
||||
@ -116,15 +117,18 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
||||
DatabaseError,
|
||||
>,
|
||||
rlp_buf: &mut Vec<u8>|
|
||||
-> Result<(u64, H160), Box<StageError>> {
|
||||
let (tx_id, transaction) = entry.map_err(|e| Box::new(e.into()))?;
|
||||
-> Result<(u64, H160), Box<SenderRecoveryStageError>> {
|
||||
let (tx_id, transaction) =
|
||||
entry.map_err(|e| Box::new(SenderRecoveryStageError::StageError(e.into())))?;
|
||||
let tx_id = tx_id.key().expect("key to be formated");
|
||||
|
||||
let tx = transaction.value().expect("value to be formated");
|
||||
tx.transaction.encode_without_signature(rlp_buf);
|
||||
|
||||
let sender = tx.signature.recover_signer(keccak256(rlp_buf)).ok_or(
|
||||
StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }),
|
||||
SenderRecoveryStageError::FailedRecovery(FailedSenderRecoveryError {
|
||||
tx: tx_id,
|
||||
}),
|
||||
)?;
|
||||
|
||||
Ok((tx_id, sender))
|
||||
@ -144,7 +148,29 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
||||
// Iterate over channels and append the sender in the order that they are received.
|
||||
for mut channel in channels {
|
||||
while let Some(recovered) = channel.recv().await {
|
||||
let (tx_id, sender) = recovered.map_err(|boxed| *boxed)?;
|
||||
let (tx_id, sender) = match recovered {
|
||||
Ok(result) => result,
|
||||
Err(error) => {
|
||||
match *error {
|
||||
SenderRecoveryStageError::FailedRecovery(err) => {
|
||||
// get the block number for the bad transaction
|
||||
let block_number = tx
|
||||
.get::<tables::TransactionBlock>(err.tx)?
|
||||
.ok_or(ProviderError::BlockNumberForTransactionIndexNotFound)?;
|
||||
|
||||
// fetch the sealed header so we can use it in the sender recovery
|
||||
// unwind
|
||||
let sealed_header = tx.get_sealed_header(block_number)?;
|
||||
return Err(StageError::Validation {
|
||||
block: sealed_header,
|
||||
error:
|
||||
consensus::ConsensusError::TransactionSignerRecoveryError,
|
||||
})
|
||||
}
|
||||
SenderRecoveryStageError::StageError(err) => return Err(err),
|
||||
}
|
||||
}
|
||||
};
|
||||
senders_cursor.append(tx_id, sender)?;
|
||||
}
|
||||
}
|
||||
@ -187,17 +213,21 @@ fn stage_checkpoint<DB: Database>(
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(onbjerg): Should unwind
|
||||
#[derive(Error, Debug)]
|
||||
#[error(transparent)]
|
||||
enum SenderRecoveryStageError {
|
||||
#[error("Sender recovery failed for transaction {tx}.")]
|
||||
SenderRecovery { tx: TxNumber },
|
||||
/// A transaction failed sender recovery
|
||||
FailedRecovery(FailedSenderRecoveryError),
|
||||
|
||||
/// A different type of stage error occurred
|
||||
StageError(#[from] StageError),
|
||||
}
|
||||
|
||||
impl From<SenderRecoveryStageError> for StageError {
|
||||
fn from(error: SenderRecoveryStageError) -> Self {
|
||||
StageError::Fatal(Box::new(error))
|
||||
}
|
||||
#[derive(Error, Debug)]
|
||||
#[error("Sender recovery failed for transaction {tx}.")]
|
||||
struct FailedSenderRecoveryError {
|
||||
/// The transaction that failed sender recovery
|
||||
tx: TxNumber,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user