chore(sync): rename senders stage (#554)

This commit is contained in:
Roman Krasiuk
2022-12-21 15:34:18 +02:00
committed by GitHub
parent 151420df58
commit 276be27310
5 changed files with 39 additions and 37 deletions

View File

@ -17,7 +17,7 @@ pub struct StageConfig {
/// Body stage configuration.
pub bodies: BodiesConfig,
/// Sender recovery stage configuration.
pub senders: SendersConfig,
pub sender_recovery: SenderRecoveryConfig,
}
/// Header stage configuration.
@ -66,14 +66,14 @@ impl Default for BodiesConfig {
/// Sender recovery stage configuration.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SendersConfig {
pub struct SenderRecoveryConfig {
/// The maximum number of blocks to process before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of transactions to recover senders for concurrently.
pub batch_size: usize,
}
impl Default for SendersConfig {
impl Default for SenderRecoveryConfig {
fn default() -> Self {
Self { commit_threshold: 5_000, batch_size: 1000 }
}

View File

@ -27,7 +27,9 @@ use reth_network::{
};
use reth_primitives::{Account, Header, H256};
use reth_provider::{db_provider::ProviderImpl, BlockProvider, HeaderProvider};
use reth_stages::stages::{bodies::BodyStage, headers::HeaderStage, senders::SendersStage};
use reth_stages::stages::{
bodies::BodyStage, headers::HeaderStage, sender_recovery::SenderRecoveryStage,
};
use std::{net::SocketAddr, path::Path, sync::Arc};
use tracing::{debug, info};
@ -137,9 +139,9 @@ impl Command {
consensus: consensus.clone(),
commit_threshold: config.stages.bodies.commit_threshold,
})
.push(SendersStage {
batch_size: config.stages.senders.batch_size,
commit_threshold: config.stages.senders.commit_threshold,
.push(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.sender_recovery.commit_threshold,
});
if let Some(tip) = self.tip {

View File

@ -1,6 +1,6 @@
# Stages
The `stages` lib plays a central role in syncing the node, maintaining state, updating the database and more. The stages involved in the Reth pipeline are the `HeaderStage`, `BodyStage`, `SendersStage`, and `ExecutionStage` (note that this list is non-exhaustive, and more pipeline stages will be added in the near future). Each of these stages are queued up and stored within the Reth pipeline.
The `stages` lib plays a central role in syncing the node, maintaining state, updating the database and more. The stages involved in the Reth pipeline are the `HeaderStage`, `BodyStage`, `SenderRecoveryStage`, and `ExecutionStage` (note that this list is non-exhaustive, and more pipeline stages will be added in the near future). Each of these stages are queued up and stored within the Reth pipeline.
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/stages/src/pipeline.rs anchor=struct-Pipeline}}
@ -51,9 +51,9 @@ The new block is then pre-validated, checking that the ommers hash and transacti
<br>
## SendersStage
## SenderRecoveryStage
Following a successful `BodyStage`, the `SenderStage` starts to execute. The `SenderStage` is responsible for recovering the transaction sender for each of the newly added transactions to the database. At the beginning of the execution function, all of the transactions are first retrieved from the database. Then the `SenderStage` goes through each transaction and recovers the signer from the transaction signature and hash. The transaction hash is derived by taking the Keccak 256-bit hash of the RLP encoded transaction bytes. This hash is then passed into the `recover_signer` function.
Following a successful `BodyStage`, the `SenderRecoveryStage` starts to execute. The `SenderRecoveryStage` is responsible for recovering the transaction sender for each of the newly added transactions to the database. At the beginning of the execution function, all of the transactions are first retrieved from the database. Then the `SenderRecoveryStage` goes through each transaction and recovers the signer from the transaction signature and hash. The transaction hash is derived by taking the Keccak 256-bit hash of the RLP encoded transaction bytes. This hash is then passed into the `recover_signer` function.
{{#template ../templates/source_and_github.md path_to_root=../../ path=crates/primitives/src/transaction/signature.rs anchor=fn-recover_signer}}

View File

@ -5,4 +5,4 @@ pub mod execution;
/// The headers stage.
pub mod headers;
/// The sender recovery stage.
pub mod senders;
pub mod sender_recovery;

View File

@ -15,13 +15,13 @@ use std::fmt::Debug;
use thiserror::Error;
use tracing::*;
const SENDERS: StageId = StageId("Senders");
const SENDER_RECOVERY: StageId = StageId("SenderRecovery");
/// The senders stage iterates over existing transactions,
/// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them
/// in [`TxSenders`][reth_interfaces::db::tables::TxSenders] table.
#[derive(Debug)]
pub struct SendersStage {
pub struct SenderRecoveryStage {
/// The size of the chunk for parallel sender recovery
pub batch_size: usize,
/// The size of inserted items after which the control
@ -31,22 +31,22 @@ pub struct SendersStage {
// TODO(onbjerg): Should unwind
#[derive(Error, Debug)]
enum SendersStageError {
enum SenderRecoveryStageError {
#[error("Sender recovery failed for transaction {tx}.")]
SenderRecovery { tx: TxNumber },
}
impl From<SendersStageError> for StageError {
fn from(error: SendersStageError) -> Self {
impl From<SenderRecoveryStageError> for StageError {
fn from(error: SenderRecoveryStageError) -> Self {
StageError::Fatal(Box::new(error))
}
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for SendersStage {
impl<DB: Database> Stage<DB> for SenderRecoveryStage {
/// Return the id of the stage
fn id(&self) -> StageId {
SENDERS
SENDER_RECOVERY
}
/// Retrieve the range of transactions to iterate over by querying
@ -64,7 +64,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold);
if max_block_num <= stage_progress {
info!(target: "sync::stages::senders", target = max_block_num, stage_progress, "Target block already reached");
info!(target: "sync::stages::sender_recovery", target = max_block_num, stage_progress, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
@ -76,7 +76,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
// No transactions to walk over
if start_tx_index > end_tx_index {
info!(target: "sync::stages::senders", start_tx_index, end_tx_index, "Target transaction already reached");
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Target transaction already reached");
return Ok(ExecOutput { stage_progress: max_block_num, done: true })
}
@ -91,17 +91,17 @@ impl<DB: Database> Stage<DB> for SendersStage {
.take_while(|res| res.as_ref().map(|(k, _)| *k <= end_tx_index).unwrap_or_default());
// Iterate over transactions in chunks
info!(target: "sync::stages::senders", start_tx_index, end_tx_index, "Recovering senders");
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");
for chunk in &entries.chunks(self.batch_size) {
let transactions = chunk.collect::<Result<Vec<_>, DbError>>()?;
// Recover signers for the chunk in parallel
let recovered = transactions
.into_par_iter()
.map(|(tx_id, transaction)| {
trace!(target: "sync::stages::senders", tx_id, hash = ?transaction.hash(), "Recovering sender");
trace!(target: "sync::stages::sender_recovery", tx_id, hash = ?transaction.hash(), "Recovering sender");
let signer =
transaction.recover_signer().ok_or_else::<StageError, _>(|| {
SendersStageError::SenderRecovery { tx: tx_id }.into()
SenderRecoveryStageError::SenderRecovery { tx: tx_id }.into()
})?;
Ok((tx_id, signer))
})
@ -111,7 +111,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
}
let done = max_block_num >= previous_stage_progress;
info!(target: "sync::stages::senders", stage_progress = max_block_num, done, "Sync iteration finished");
info!(target: "sync::stages::sender_recovery", stage_progress = max_block_num, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: max_block_num, done })
}
@ -141,7 +141,7 @@ mod tests {
TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID,
};
stage_test_suite_ext!(SendersTestRunner);
stage_test_suite_ext!(SenderRecoveryTestRunner);
/// Execute a block range with a single transaction
#[tokio::test]
@ -149,7 +149,7 @@ mod tests {
let (previous_stage, stage_progress) = (500, 100);
// Set up the runner
let runner = SendersTestRunner::default();
let runner = SenderRecoveryTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
@ -186,7 +186,7 @@ mod tests {
#[tokio::test]
async fn execute_intermediate_commit() {
let threshold = 50;
let mut runner = SendersTestRunner::default();
let mut runner = SenderRecoveryTestRunner::default();
runner.set_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
let first_input = ExecInput {
@ -221,36 +221,36 @@ mod tests {
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}
struct SendersTestRunner {
struct SenderRecoveryTestRunner {
tx: TestTransaction,
threshold: u64,
}
impl Default for SendersTestRunner {
impl Default for SenderRecoveryTestRunner {
fn default() -> Self {
Self { threshold: 1000, tx: TestTransaction::default() }
}
}
impl SendersTestRunner {
impl SenderRecoveryTestRunner {
fn set_threshold(&mut self, threshold: u64) {
self.threshold = threshold;
}
}
impl StageTestRunner for SendersTestRunner {
type S = SendersStage;
impl StageTestRunner for SenderRecoveryTestRunner {
type S = SenderRecoveryStage;
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
SendersStage { batch_size: 100, commit_threshold: self.threshold }
SenderRecoveryStage { batch_size: 100, commit_threshold: self.threshold }
}
}
impl ExecuteStageTestRunner for SendersTestRunner {
impl ExecuteStageTestRunner for SenderRecoveryTestRunner {
type Seed = Vec<BlockLocked>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
@ -306,13 +306,13 @@ mod tests {
}
}
impl UnwindStageTestRunner for SendersTestRunner {
impl UnwindStageTestRunner for SenderRecoveryTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.check_no_senders_by_block(input.unwind_to)
}
}
impl SendersTestRunner {
impl SenderRecoveryTestRunner {
fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().get_block_body_by_num(block);
match body_result {