diff --git a/bin/reth/src/config.rs b/bin/reth/src/config.rs
index 5e3783025..dd170b8e0 100644
--- a/bin/reth/src/config.rs
+++ b/bin/reth/src/config.rs
@@ -17,7 +17,7 @@ pub struct StageConfig {
/// Body stage configuration.
pub bodies: BodiesConfig,
/// Sender recovery stage configuration.
- pub senders: SendersConfig,
+ pub sender_recovery: SenderRecoveryConfig,
}
/// Header stage configuration.
@@ -66,14 +66,14 @@ impl Default for BodiesConfig {
/// Sender recovery stage configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct SendersConfig {
+pub struct SenderRecoveryConfig {
/// The maximum number of blocks to process before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of transactions to recover senders for concurrently.
pub batch_size: usize,
}
-impl Default for SendersConfig {
+impl Default for SenderRecoveryConfig {
fn default() -> Self {
Self { commit_threshold: 5_000, batch_size: 1000 }
}
diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs
index 50a94252b..cd4a24ce4 100644
--- a/bin/reth/src/node/mod.rs
+++ b/bin/reth/src/node/mod.rs
@@ -27,7 +27,9 @@ use reth_network::{
};
use reth_primitives::{Account, Header, H256};
use reth_provider::{db_provider::ProviderImpl, BlockProvider, HeaderProvider};
-use reth_stages::stages::{bodies::BodyStage, headers::HeaderStage, senders::SendersStage};
+use reth_stages::stages::{
+ bodies::BodyStage, headers::HeaderStage, sender_recovery::SenderRecoveryStage,
+};
use std::{net::SocketAddr, path::Path, sync::Arc};
use tracing::{debug, info};
@@ -137,9 +139,9 @@ impl Command {
consensus: consensus.clone(),
commit_threshold: config.stages.bodies.commit_threshold,
})
- .push(SendersStage {
- batch_size: config.stages.senders.batch_size,
- commit_threshold: config.stages.senders.commit_threshold,
+ .push(SenderRecoveryStage {
+ batch_size: config.stages.sender_recovery.batch_size,
+ commit_threshold: config.stages.sender_recovery.commit_threshold,
});
if let Some(tip) = self.tip {
diff --git a/book/stages/README.md b/book/stages/README.md
index 125f974ea..08c49b5d1 100644
--- a/book/stages/README.md
+++ b/book/stages/README.md
@@ -1,6 +1,6 @@
# Stages
-The `stages` lib plays a central role in syncing the node, maintaining state, updating the database and more. The stages involved in the Reth pipeline are the `HeaderStage`, `BodyStage`, `SendersStage`, and `ExecutionStage` (note that this list is non-exhaustive, and more pipeline stages will be added in the near future). Each of these stages are queued up and stored within the Reth pipeline.
+The `stages` lib plays a central role in syncing the node, maintaining state, updating the database and more. The stages involved in the Reth pipeline are the `HeaderStage`, `BodyStage`, `SenderRecoveryStage`, and `ExecutionStage` (note that this list is non-exhaustive, and more pipeline stages will be added in the near future). Each of these stages are queued up and stored within the Reth pipeline.
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/stages/src/pipeline.rs anchor=struct-Pipeline}}
@@ -51,9 +51,9 @@ The new block is then pre-validated, checking that the ommers hash and transacti
-## SendersStage
+## SenderRecoveryStage
-Following a successful `BodyStage`, the `SenderStage` starts to execute. The `SenderStage` is responsible for recovering the transaction sender for each of the newly added transactions to the database. At the beginning of the execution function, all of the transactions are first retrieved from the database. Then the `SenderStage` goes through each transaction and recovers the signer from the transaction signature and hash. The transaction hash is derived by taking the Keccak 256-bit hash of the RLP encoded transaction bytes. This hash is then passed into the `recover_signer` function.
+Following a successful `BodyStage`, the `SenderRecoveryStage` starts to execute. The `SenderRecoveryStage` is responsible for recovering the transaction sender for each of the newly added transactions to the database. At the beginning of the execution function, all of the transactions are first retrieved from the database. Then the `SenderRecoveryStage` goes through each transaction and recovers the signer from the transaction signature and hash. The transaction hash is derived by taking the Keccak 256-bit hash of the RLP encoded transaction bytes. This hash is then passed into the `recover_signer` function.
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/primitives/src/transaction/signature.rs anchor=fn-recover_signer}}
diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs
index dc87fe330..80b07365b 100644
--- a/crates/stages/src/stages/mod.rs
+++ b/crates/stages/src/stages/mod.rs
@@ -5,4 +5,4 @@ pub mod execution;
/// The headers stage.
pub mod headers;
/// The sender recovery stage.
-pub mod senders;
+pub mod sender_recovery;
diff --git a/crates/stages/src/stages/senders.rs b/crates/stages/src/stages/sender_recovery.rs
similarity index 87%
rename from crates/stages/src/stages/senders.rs
rename to crates/stages/src/stages/sender_recovery.rs
index 94920d03b..b159634a1 100644
--- a/crates/stages/src/stages/senders.rs
+++ b/crates/stages/src/stages/sender_recovery.rs
@@ -15,13 +15,13 @@ use std::fmt::Debug;
use thiserror::Error;
use tracing::*;
-const SENDERS: StageId = StageId("Senders");
+const SENDER_RECOVERY: StageId = StageId("SenderRecovery");
-/// The senders stage iterates over existing transactions,
+/// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them
/// in [`TxSenders`][reth_interfaces::db::tables::TxSenders] table.
#[derive(Debug)]
-pub struct SendersStage {
+pub struct SenderRecoveryStage {
/// The size of the chunk for parallel sender recovery
pub batch_size: usize,
/// The size of inserted items after which the control
@@ -31,22 +31,22 @@ pub struct SendersStage {
// TODO(onbjerg): Should unwind
#[derive(Error, Debug)]
-enum SendersStageError {
+enum SenderRecoveryStageError {
#[error("Sender recovery failed for transaction {tx}.")]
SenderRecovery { tx: TxNumber },
}
-impl From for StageError {
- fn from(error: SendersStageError) -> Self {
+impl From for StageError {
+ fn from(error: SenderRecoveryStageError) -> Self {
StageError::Fatal(Box::new(error))
}
}
#[async_trait::async_trait]
-impl Stage for SendersStage {
+impl Stage for SenderRecoveryStage {
/// Return the id of the stage
fn id(&self) -> StageId {
- SENDERS
+ SENDER_RECOVERY
}
/// Retrieve the range of transactions to iterate over by querying
@@ -64,7 +64,7 @@ impl Stage for SendersStage {
let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold);
if max_block_num <= stage_progress {
- info!(target: "sync::stages::senders", target = max_block_num, stage_progress, "Target block already reached");
+ info!(target: "sync::stages::sender_recovery", target = max_block_num, stage_progress, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
@@ -76,7 +76,7 @@ impl Stage for SendersStage {
// No transactions to walk over
if start_tx_index > end_tx_index {
- info!(target: "sync::stages::senders", start_tx_index, end_tx_index, "Target transaction already reached");
+ info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Target transaction already reached");
return Ok(ExecOutput { stage_progress: max_block_num, done: true })
}
@@ -91,17 +91,17 @@ impl Stage for SendersStage {
.take_while(|res| res.as_ref().map(|(k, _)| *k <= end_tx_index).unwrap_or_default());
// Iterate over transactions in chunks
- info!(target: "sync::stages::senders", start_tx_index, end_tx_index, "Recovering senders");
+ info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");
for chunk in &entries.chunks(self.batch_size) {
let transactions = chunk.collect::, DbError>>()?;
// Recover signers for the chunk in parallel
let recovered = transactions
.into_par_iter()
.map(|(tx_id, transaction)| {
- trace!(target: "sync::stages::senders", tx_id, hash = ?transaction.hash(), "Recovering sender");
+ trace!(target: "sync::stages::sender_recovery", tx_id, hash = ?transaction.hash(), "Recovering sender");
let signer =
transaction.recover_signer().ok_or_else::(|| {
- SendersStageError::SenderRecovery { tx: tx_id }.into()
+ SenderRecoveryStageError::SenderRecovery { tx: tx_id }.into()
})?;
Ok((tx_id, signer))
})
@@ -111,7 +111,7 @@ impl Stage for SendersStage {
}
let done = max_block_num >= previous_stage_progress;
- info!(target: "sync::stages::senders", stage_progress = max_block_num, done, "Sync iteration finished");
+ info!(target: "sync::stages::sender_recovery", stage_progress = max_block_num, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: max_block_num, done })
}
@@ -141,7 +141,7 @@ mod tests {
TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID,
};
- stage_test_suite_ext!(SendersTestRunner);
+ stage_test_suite_ext!(SenderRecoveryTestRunner);
/// Execute a block range with a single transaction
#[tokio::test]
@@ -149,7 +149,7 @@ mod tests {
let (previous_stage, stage_progress) = (500, 100);
// Set up the runner
- let runner = SendersTestRunner::default();
+ let runner = SenderRecoveryTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
@@ -186,7 +186,7 @@ mod tests {
#[tokio::test]
async fn execute_intermediate_commit() {
let threshold = 50;
- let mut runner = SendersTestRunner::default();
+ let mut runner = SenderRecoveryTestRunner::default();
runner.set_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
let first_input = ExecInput {
@@ -221,36 +221,36 @@ mod tests {
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}
- struct SendersTestRunner {
+ struct SenderRecoveryTestRunner {
tx: TestTransaction,
threshold: u64,
}
- impl Default for SendersTestRunner {
+ impl Default for SenderRecoveryTestRunner {
fn default() -> Self {
Self { threshold: 1000, tx: TestTransaction::default() }
}
}
- impl SendersTestRunner {
+ impl SenderRecoveryTestRunner {
fn set_threshold(&mut self, threshold: u64) {
self.threshold = threshold;
}
}
- impl StageTestRunner for SendersTestRunner {
- type S = SendersStage;
+ impl StageTestRunner for SenderRecoveryTestRunner {
+ type S = SenderRecoveryStage;
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
- SendersStage { batch_size: 100, commit_threshold: self.threshold }
+ SenderRecoveryStage { batch_size: 100, commit_threshold: self.threshold }
}
}
- impl ExecuteStageTestRunner for SendersTestRunner {
+ impl ExecuteStageTestRunner for SenderRecoveryTestRunner {
type Seed = Vec;
fn seed_execution(&mut self, input: ExecInput) -> Result {
@@ -306,13 +306,13 @@ mod tests {
}
}
- impl UnwindStageTestRunner for SendersTestRunner {
+ impl UnwindStageTestRunner for SenderRecoveryTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.check_no_senders_by_block(input.unwind_to)
}
}
- impl SendersTestRunner {
+ impl SenderRecoveryTestRunner {
fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body_by_num(block);
match body_result {