feat(stages): rich checkpoint (#2701)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Alexey Shekhirin
2023-05-18 19:45:50 +01:00
committed by GitHub
parent 9032052678
commit e4cd48aefd
41 changed files with 709 additions and 430 deletions

View File

@ -73,7 +73,7 @@ impl Command {
tx.clear::<tables::AccountChangeSet>()?;
tx.clear::<tables::StorageChangeSet>()?;
tx.clear::<tables::Bytecodes>()?;
tx.put::<tables::SyncStage>(EXECUTION.0.to_string(), 0)?;
tx.put::<tables::SyncStage>(EXECUTION.0.to_string(), Default::default())?;
insert_genesis_state::<Env<WriteMap>>(tx, self.chain.genesis())?;
Ok::<_, eyre::Error>(())
})??;
@ -83,12 +83,12 @@ impl Command {
// Clear hashed accounts
tx.clear::<tables::HashedAccount>()?;
tx.put::<tables::SyncStageProgress>(ACCOUNT_HASHING.0.into(), Vec::new())?;
tx.put::<tables::SyncStage>(ACCOUNT_HASHING.0.to_string(), 0)?;
tx.put::<tables::SyncStage>(ACCOUNT_HASHING.0.to_string(), Default::default())?;
// Clear hashed storages
tx.clear::<tables::HashedStorage>()?;
tx.put::<tables::SyncStageProgress>(STORAGE_HASHING.0.into(), Vec::new())?;
tx.put::<tables::SyncStage>(STORAGE_HASHING.0.to_string(), 0)?;
tx.put::<tables::SyncStage>(STORAGE_HASHING.0.to_string(), Default::default())?;
Ok::<_, eyre::Error>(())
})??;
@ -97,8 +97,11 @@ impl Command {
tool.db.update(|tx| {
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
tx.put::<tables::SyncStage>(MERKLE_EXECUTION.0.to_string(), 0)?;
tx.put::<tables::SyncStage>(MERKLE_UNWIND.0.to_string(), 0)?;
tx.put::<tables::SyncStage>(
MERKLE_EXECUTION.0.to_string(),
Default::default(),
)?;
tx.put::<tables::SyncStage>(MERKLE_UNWIND.0.to_string(), Default::default())?;
tx.delete::<tables::SyncStageProgress>(MERKLE_EXECUTION.0.into(), None)?;
Ok::<_, eyre::Error>(())
})??;
@ -107,8 +110,14 @@ impl Command {
tool.db.update(|tx| {
tx.clear::<tables::AccountHistory>()?;
tx.clear::<tables::StorageHistory>()?;
tx.put::<tables::SyncStage>(INDEX_ACCOUNT_HISTORY.0.to_string(), 0)?;
tx.put::<tables::SyncStage>(INDEX_STORAGE_HISTORY.0.to_string(), 0)?;
tx.put::<tables::SyncStage>(
INDEX_ACCOUNT_HISTORY.0.to_string(),
Default::default(),
)?;
tx.put::<tables::SyncStage>(
INDEX_STORAGE_HISTORY.0.to_string(),
Default::default(),
)?;
Ok::<_, eyre::Error>(())
})??;
}

View File

@ -3,7 +3,7 @@ use eyre::Result;
use reth_db::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
};
use reth_primitives::MAINNET;
use reth_primitives::{StageCheckpoint, MAINNET};
use reth_provider::Transaction;
use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput};
use std::{ops::DerefMut, path::PathBuf, sync::Arc};
@ -100,7 +100,11 @@ async fn unwind_and_copy<DB: Database>(
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None },
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
@ -131,8 +135,8 @@ async fn dry_run(
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?;

View File

@ -1,7 +1,7 @@
use crate::{dump_stage::setup, utils::DbTool};
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_primitives::BlockNumber;
use reth_primitives::{BlockNumber, StageCheckpoint};
use reth_provider::Transaction;
use reth_stages::{stages::AccountHashingStage, Stage, StageId, UnwindInput};
use std::{ops::DerefMut, path::PathBuf};
@ -43,7 +43,11 @@ async fn unwind_and_copy<DB: Database>(
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None },
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
let unwind_inner_tx = unwind_tx.deref_mut();
@ -75,8 +79,8 @@ async fn dry_run(
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?

View File

@ -1,6 +1,7 @@
use crate::{dump_stage::setup, utils::DbTool};
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_primitives::StageCheckpoint;
use reth_provider::Transaction;
use reth_stages::{stages::StorageHashingStage, Stage, StageId, UnwindInput};
use std::{ops::DerefMut, path::PathBuf};
@ -37,7 +38,11 @@ async fn unwind_and_copy<DB: Database>(
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None },
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
let unwind_inner_tx = unwind_tx.deref_mut();
@ -71,8 +76,8 @@ async fn dry_run(
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?

View File

@ -1,7 +1,7 @@
use crate::{dump_stage::setup, utils::DbTool};
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_primitives::{BlockNumber, MAINNET};
use reth_primitives::{BlockNumber, StageCheckpoint, MAINNET};
use reth_provider::Transaction;
use reth_stages::{
stages::{
@ -48,10 +48,14 @@ async fn unwind_and_copy<DB: Database>(
) -> eyre::Result<()> {
let (from, to) = range;
let mut unwind_tx = Transaction::new(db_tool.db)?;
let unwind = UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None };
let unwind = UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
};
let execute_input = reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))),
checkpoint: Some(StageCheckpoint::new(from)),
};
// Unwind hashes all the way to FROM
@ -73,7 +77,11 @@ async fn unwind_and_copy<DB: Database>(
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: to, stage_progress: tip_block_number, bad_block: None },
UnwindInput {
unwind_to: to,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
@ -120,8 +128,8 @@ async fn dry_run(
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?

View File

@ -2,7 +2,7 @@
use crate::dirs::{DataDirPath, MaybePlatformPath};
use clap::Parser;
use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx};
use reth_primitives::ChainSpec;
use reth_primitives::{ChainSpec, StageCheckpoint};
use reth_provider::Transaction;
use reth_staged_sync::utils::{chainspec::genesis_value_parser, init::init_db};
use reth_stages::{
@ -65,14 +65,15 @@ impl Command {
let db = Arc::new(init_db(db_path)?);
let mut tx = Transaction::new(db.as_ref())?;
let execution_progress = EXECUTION.get_progress(tx.deref())?.unwrap_or_default();
assert!(execution_progress < self.to, "Nothing to run");
let execution_checkpoint = EXECUTION.get_checkpoint(tx.deref())?.unwrap_or_default();
assert!(execution_checkpoint.block_number < self.to, "Nothing to run");
let should_reset_stages = !(execution_progress ==
ACCOUNT_HASHING.get_progress(tx.deref())?.unwrap_or_default() &&
execution_progress == STORAGE_HASHING.get_progress(tx.deref())?.unwrap_or_default() &&
execution_progress ==
MERKLE_EXECUTION.get_progress(tx.deref())?.unwrap_or_default());
let should_reset_stages = !(execution_checkpoint ==
ACCOUNT_HASHING.get_checkpoint(tx.deref())?.unwrap_or_default() &&
execution_checkpoint ==
STORAGE_HASHING.get_checkpoint(tx.deref())?.unwrap_or_default() &&
execution_checkpoint ==
MERKLE_EXECUTION.get_checkpoint(tx.deref())?.unwrap_or_default());
let factory = reth_revm::Factory::new(self.chain.clone());
let mut execution_stage = ExecutionStage::new(
@ -88,9 +89,11 @@ impl Command {
let mut storage_hashing_stage = StorageHashingStage::default();
let mut merkle_stage = MerkleStage::default_execution();
for block in execution_progress + 1..=self.to {
for block in execution_checkpoint.block_number + 1..=self.to {
tracing::trace!(target: "reth::cli", block, "Executing block");
let progress = if (!should_reset_stages || block > execution_progress + 1) && block > 0
let progress = if (!should_reset_stages ||
block > execution_checkpoint.block_number + 1) &&
block > 0
{
Some(block - 1)
} else {
@ -101,8 +104,8 @@ impl Command {
.execute(
&mut tx,
ExecInput {
previous_stage: Some((SENDER_RECOVERY, block)),
stage_progress: block.checked_sub(1),
previous_stage: Some((SENDER_RECOVERY, StageCheckpoint::new(block))),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
},
)
.await?;
@ -113,8 +116,8 @@ impl Command {
.execute(
&mut tx,
ExecInput {
previous_stage: Some((EXECUTION, block)),
stage_progress: progress,
previous_stage: Some((EXECUTION, StageCheckpoint::new(block))),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
@ -127,8 +130,8 @@ impl Command {
.execute(
&mut tx,
ExecInput {
previous_stage: Some((ACCOUNT_HASHING, block)),
stage_progress: progress,
previous_stage: Some((ACCOUNT_HASHING, StageCheckpoint::new(block))),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
@ -139,8 +142,8 @@ impl Command {
.execute(
&mut tx,
ExecInput {
previous_stage: Some((STORAGE_HASHING, block)),
stage_progress: progress,
previous_stage: Some((STORAGE_HASHING, StageCheckpoint::new(block))),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await;
@ -157,8 +160,8 @@ impl Command {
.collect::<Result<Vec<_>, _>>()?;
let clean_input = ExecInput {
previous_stage: Some((STORAGE_HASHING, block)),
stage_progress: None,
previous_stage: Some((STORAGE_HASHING, StageCheckpoint::new(block))),
checkpoint: None,
};
loop {
let clean_result = merkle_stage.execute(&mut tx, clean_input).await;

View File

@ -4,8 +4,8 @@ use futures::Stream;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
use reth_primitives::BlockNumber;
use reth_stages::{PipelineEvent, StageId};
use reth_primitives::{BlockNumber, StageCheckpoint};
use reth_stages::{ExecOutput, PipelineEvent, StageId};
use std::{
future::Future,
pin::Pin,
@ -46,14 +46,17 @@ impl NodeState {
info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage");
}
}
PipelineEvent::Ran { stage_id, result } => {
let notable = result.stage_progress > self.current_checkpoint;
self.current_checkpoint = result.stage_progress;
if result.done {
PipelineEvent::Ran {
stage_id,
result: ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done },
} => {
let notable = block_number > self.current_checkpoint;
self.current_checkpoint = block_number;
if done {
self.current_stage = None;
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing");
info!(target: "reth::cli", stage = %stage_id, checkpoint = block_number, "Stage finished executing");
} else if notable {
info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress");
info!(target: "reth::cli", stage = %stage_id, checkpoint = block_number, "Stage committed progress");
}
}
_ => (),

View File

@ -519,7 +519,7 @@ impl Command {
db: Arc<Env<WriteMap>>,
) -> Result<Head, reth_interfaces::db::DatabaseError> {
db.view(|tx| {
let head = FINISH.get_progress(tx)?.unwrap_or_default();
let head = FINISH.get_checkpoint(tx)?.unwrap_or_default().block_number;
let header = tx
.get::<tables::Headers>(head)?
.expect("the header for the latest block is missing, database is corrupt");

View File

@ -9,7 +9,7 @@ use crate::{
use clap::Parser;
use reth_beacon_consensus::BeaconConsensus;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_primitives::ChainSpec;
use reth_primitives::{ChainSpec, StageCheckpoint};
use reth_provider::{ShareableDatabase, Transaction};
use reth_staged_sync::{
utils::{chainspec::chain_spec_value_parser, init::init_db},
@ -200,24 +200,27 @@ impl Command {
let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
let mut input = ExecInput {
previous_stage: Some((StageId("No Previous Stage"), self.to)),
stage_progress: Some(self.from),
previous_stage: Some((StageId("No Previous Stage"), StageCheckpoint::new(self.to))),
checkpoint: Some(StageCheckpoint::new(self.from)),
};
let mut unwind =
UnwindInput { stage_progress: self.to, unwind_to: self.from, bad_block: None };
let mut unwind = UnwindInput {
checkpoint: StageCheckpoint::new(self.to),
unwind_to: self.from,
bad_block: None,
};
if !self.skip_unwind {
while unwind.stage_progress > self.from {
while unwind.checkpoint.block_number > self.from {
let unwind_output = unwind_stage.unwind(&mut tx, unwind).await?;
unwind.stage_progress = unwind_output.stage_progress;
unwind.checkpoint = unwind_output.checkpoint;
}
}
while let ExecOutput { stage_progress, done: false } =
while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut tx, input).await?
{
input.stage_progress = Some(stage_progress)
input.checkpoint = Some(stage_progress)
}
Ok(())

View File

@ -11,7 +11,7 @@ use reth_db::{
};
use reth_primitives::{
keccak256, Account as RethAccount, Address, Bytecode, ChainSpec, JsonU256, SealedBlock,
SealedHeader, StorageEntry, H256, U256,
SealedHeader, StageCheckpoint, StorageEntry, H256, U256,
};
use reth_provider::Transaction;
use reth_rlp::Decodable;
@ -197,8 +197,8 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<TestOutcome> {
// Call execution stage
let input = ExecInput {
previous_stage: last_block.map(|b| (StageId(""), b)),
stage_progress: None,
previous_stage: last_block.map(|b| (StageId(""), StageCheckpoint::new(b))),
checkpoint: None,
};
{
let mut transaction = Transaction::new(db.as_ref())?;

View File

@ -386,7 +386,7 @@ where
debug!(target: "consensus::engine", hash=?state.head_block_hash, number=head_block_number, "canonicalized new head");
let pipeline_min_progress =
FINISH.get_progress(&self.db.tx()?)?.unwrap_or_default();
FINISH.get_checkpoint(&self.db.tx()?)?.unwrap_or_default().block_number;
if pipeline_min_progress < head_block_number {
debug!(target: "consensus::engine", last_finished=pipeline_min_progress, head_number=head_block_number, "pipeline run to head required");
@ -914,7 +914,9 @@ mod tests {
test_utils::{NoopFullBlockClient, TestConsensus},
};
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET};
use reth_primitives::{
ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, StageCheckpoint, H256, MAINNET,
};
use reth_provider::{
providers::BlockchainProvider, test_utils::TestExecutorFactory, ShareableDatabase,
Transaction,
@ -1136,7 +1138,7 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([
Ok(ExecOutput { stage_progress: 1, done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
Err(StageError::ChannelClosed),
]),
Vec::default(),
@ -1168,7 +1170,10 @@ mod tests {
);
let (mut consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { stage_progress: max_block, done: true })]),
VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(max_block),
done: true,
})]),
Vec::default(),
);
consensus_engine.sync.set_max_block(max_block);
@ -1210,7 +1215,10 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
@ -1238,14 +1246,20 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
let genesis = random_block(0, None, None, Some(0));
let block1 = random_block(1, Some(genesis.hash), None, Some(0));
insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter());
env.db.update(|tx| FINISH.save_progress(tx, block1.number)).unwrap().unwrap();
env.db
.update(|tx| FINISH.save_checkpoint(tx, StageCheckpoint::new(block1.number)))
.unwrap()
.unwrap();
let mut engine_rx = spawn_consensus_engine(consensus_engine);
@ -1276,8 +1290,8 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([
Ok(ExecOutput { done: true, stage_progress: 0 }),
Ok(ExecOutput { done: true, stage_progress: 0 }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
);
@ -1324,7 +1338,10 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
@ -1359,8 +1376,8 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([
Ok(ExecOutput { done: true, stage_progress: 0 }),
Ok(ExecOutput { done: true, stage_progress: 0 }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
);
@ -1408,8 +1425,8 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([
Ok(ExecOutput { done: true, stage_progress: 0 }),
Ok(ExecOutput { done: true, stage_progress: 0 }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
);
@ -1456,7 +1473,10 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
@ -1486,7 +1506,10 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
@ -1529,7 +1552,10 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
);
@ -1583,7 +1609,10 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { done: true, stage_progress: 0 })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::from([exec_result2]),
);

View File

@ -135,7 +135,7 @@ impl Compact for Bytecode {
jump_map: JumpMap::from_slice(buf),
},
}),
_ => unreachable!(),
_ => unreachable!("Junk data in database: unknown BytecodeState variant"),
};
(decoded, &[])
}

View File

@ -1,9 +1,11 @@
use crate::{
trie::{hash_builder::HashBuilderState, StoredSubNode},
Address, BlockNumber, H256,
Address, BlockNumber, TxNumber, H256,
};
use bytes::Buf;
use reth_codecs::{main_codec, Compact};
use bytes::{Buf, BufMut};
use reth_codecs::{derive_arbitrary, main_codec, Compact};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
/// Saves the progress of Merkle stage.
#[derive(Default, Debug, Clone, PartialEq)]
@ -97,7 +99,7 @@ impl Compact for MerkleCheckpoint {
/// Saves the progress of AccountHashing
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct AccountHashingCheckpoint {
/// The next account to start hashing from
pub address: Option<Address>,
@ -109,7 +111,7 @@ pub struct AccountHashingCheckpoint {
/// Saves the progress of StorageHashing
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct StorageHashingCheckpoint {
/// The next account to start hashing from
pub address: Option<Address>,
@ -121,6 +123,88 @@ pub struct StorageHashingCheckpoint {
pub to: u64,
}
/// Saves the progress of a stage.
#[main_codec]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct StageCheckpoint {
/// The maximum block processed by the stage.
pub block_number: BlockNumber,
/// Stage-specific checkpoint. None if stage uses only block-based checkpoints.
pub stage_checkpoint: Option<StageUnitCheckpoint>,
}
impl StageCheckpoint {
/// Creates a new [`StageCheckpoint`] with only `block_number` set.
pub fn new(block_number: BlockNumber) -> Self {
Self { block_number, ..Default::default() }
}
}
// TODO(alexey): ideally, we'd want to display block number + stage-specific metric (if available)
// in places like logs or traces
impl Display for StageCheckpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.block_number, f)
}
}
// TODO(alexey): add a merkle checkpoint. Currently it's hard because [`MerkleCheckpoint`]
// is not a Copy type.
/// Stage-specific checkpoint metrics.
#[derive_arbitrary(compact)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
pub enum StageUnitCheckpoint {
/// Saves the progress of transaction-indexed stages.
Transaction(TxNumber),
/// Saves the progress of AccountHashing stage.
Account(AccountHashingCheckpoint),
/// Saves the progress of StorageHashing stage.
Storage(StorageHashingCheckpoint),
}
impl Compact for StageUnitCheckpoint {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: BufMut + AsMut<[u8]>,
{
match self {
StageUnitCheckpoint::Transaction(data) => {
buf.put_u8(0);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::Account(data) => {
buf.put_u8(1);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::Storage(data) => {
buf.put_u8(2);
1 + data.to_compact(buf)
}
}
}
fn from_compact(buf: &[u8], _len: usize) -> (Self, &[u8])
where
Self: Sized,
{
match buf[0] {
0 => {
let (data, buf) = TxNumber::from_compact(&buf[1..], buf.len() - 1);
(Self::Transaction(data), buf)
}
1 => {
let (data, buf) = AccountHashingCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Account(data), buf)
}
2 => {
let (data, buf) = StorageHashingCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Storage(data), buf)
}
_ => unreachable!("Junk data in database: unknown StageUnitCheckpoint variant"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -146,4 +230,30 @@ mod tests {
let (decoded, _) = MerkleCheckpoint::from_compact(&buf, encoded);
assert_eq!(decoded, checkpoint);
}
#[test]
fn stage_unit_checkpoint_roundtrip() {
let mut rng = rand::thread_rng();
let checkpoints = vec![
StageUnitCheckpoint::Transaction(rng.gen()),
StageUnitCheckpoint::Account(AccountHashingCheckpoint {
address: Some(Address::from_low_u64_be(rng.gen())),
from: rng.gen(),
to: rng.gen(),
}),
StageUnitCheckpoint::Storage(StorageHashingCheckpoint {
address: Some(Address::from_low_u64_be(rng.gen())),
storage: Some(H256::from_low_u64_be(rng.gen())),
from: rng.gen(),
to: rng.gen(),
}),
];
for checkpoint in checkpoints {
let mut buf = Vec::new();
let encoded = checkpoint.clone().to_compact(&mut buf);
let (decoded, _) = StageUnitCheckpoint::from_compact(&buf, encoded);
assert_eq!(decoded, checkpoint);
}
}
}

View File

@ -50,7 +50,10 @@ pub use chain::{
AllGenesisFormats, Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI,
MAINNET, SEPOLIA,
};
pub use checkpoints::{AccountHashingCheckpoint, MerkleCheckpoint, StorageHashingCheckpoint};
pub use checkpoints::{
AccountHashingCheckpoint, MerkleCheckpoint, StageCheckpoint, StageUnitCheckpoint,
StorageHashingCheckpoint,
};
pub use compression::*;
pub use constants::{
EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS,

View File

@ -261,7 +261,7 @@ impl Compact for Transaction {
let (tx, buf) = TxEip1559::from_compact(buf, buf.len());
(Transaction::Eip1559(tx), buf)
}
_ => unreachable!(),
_ => unreachable!("Junk data in database: unknown Transaction variant"),
}
}
}
@ -717,7 +717,7 @@ impl Compact for TransactionKind {
let (addr, buf) = Address::from_compact(buf, buf.len());
(TransactionKind::Call(addr), buf)
}
_ => unreachable!(),
_ => unreachable!("Junk data in database: unknown TransactionKind variant"),
}
}
}

View File

@ -42,7 +42,7 @@ impl Compact for HashBuilderValue {
let (bytes, buf) = Vec::from_compact(&buf[1..], 0);
(Self::Bytes(bytes), buf)
}
_ => panic!("Invalid hash builder value"),
_ => unreachable!("Junk data in database: unknown HashBuilderValue variant"),
}
}
}

View File

@ -82,7 +82,7 @@ pub fn init_genesis<DB: Database>(
// insert sync stage
for stage in StageKind::ALL.iter() {
tx.put::<tables::SyncStage>(stage.to_string(), 0)?;
tx.put::<tables::SyncStage>(stage.to_string(), Default::default())?;
}
tx.commit()?;

View File

@ -5,6 +5,7 @@ use criterion::{
use pprof::criterion::{Output, PProfProfiler};
use reth_db::mdbx::{Env, WriteMap};
use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::StageCheckpoint;
use reth_stages::{
stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage},
test_utils::TestTransaction,
@ -161,11 +162,14 @@ fn measure_stage<F, S>(
stage,
(
ExecInput {
previous_stage: Some((StageId("Another"), block_interval.end)),
stage_progress: Some(block_interval.start),
previous_stage: Some((
StageId("Another"),
StageCheckpoint::new(block_interval.end),
)),
checkpoint: Some(StageCheckpoint::new(block_interval.start)),
},
UnwindInput {
stage_progress: block_interval.end,
checkpoint: StageCheckpoint::new(block_interval.end),
unwind_to: block_interval.start,
bad_block: None,
},

View File

@ -2,6 +2,7 @@ use super::{constants, StageRange};
use reth_db::{
cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError as DbError,
};
use reth_primitives::StageCheckpoint;
use reth_stages::{
stages::{AccountHashingStage, SeedOpts},
test_utils::TestTransaction,
@ -35,14 +36,14 @@ fn find_stage_range(db: &Path) -> StageRange {
.view(|tx| {
let mut cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let from = cursor.first()?.unwrap().0;
let to = cursor.last()?.unwrap().0;
let to = StageCheckpoint::new(cursor.last()?.unwrap().0);
stage_range = Some((
ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
checkpoint: Some(StageCheckpoint::new(from)),
},
UnwindInput { unwind_to: from, stage_progress: to, bad_block: None },
UnwindInput { unwind_to: from, checkpoint: to, bad_block: None },
));
Ok::<(), DbError>(())
})
@ -69,7 +70,7 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
path,
(
ExecInput {
previous_stage: Some((StageId("Another"), num_blocks)),
previous_stage: Some((StageId("Another"), StageCheckpoint::new(num_blocks))),
..Default::default()
},
UnwindInput::default(),

View File

@ -8,7 +8,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
DatabaseError as DbError,
};
use reth_primitives::BlockNumber;
use reth_primitives::StageCheckpoint;
use std::fmt::Display;
/// All known stages
@ -98,17 +98,20 @@ impl StageId {
}
/// Get the last committed progress of this stage.
pub fn get_progress<'db>(&self, tx: &impl DbTx<'db>) -> Result<Option<BlockNumber>, DbError> {
pub fn get_checkpoint<'db>(
&self,
tx: &impl DbTx<'db>,
) -> Result<Option<StageCheckpoint>, DbError> {
tx.get::<SyncStage>(self.0.to_string())
}
/// Save the progress of this stage.
pub fn save_progress<'db>(
pub fn save_checkpoint<'db>(
&self,
tx: &impl DbTxMut<'db>,
block: BlockNumber,
checkpoint: StageCheckpoint,
) -> Result<(), DbError> {
tx.put::<SyncStage>(self.0.to_string(), block)
tx.put::<SyncStage>(self.0.to_string(), checkpoint)
}
}

View File

@ -1,7 +1,7 @@
use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
use futures_util::Future;
use reth_db::database::Database;
use reth_primitives::{listener::EventListeners, BlockNumber, H256};
use reth_primitives::{listener::EventListeners, BlockNumber, StageCheckpoint, H256};
use reth_provider::Transaction;
use std::{ops::Deref, pin::Pin};
use tokio::sync::watch;
@ -134,7 +134,7 @@ where
self.metrics.stage_checkpoint(
stage_id,
self.db
.view(|tx| stage_id.get_progress(tx).ok().flatten().unwrap_or_default())
.view(|tx| stage_id.get_checkpoint(tx).ok().flatten().unwrap_or_default())
.ok()
.unwrap_or_default(),
);
@ -222,7 +222,7 @@ where
previous_stage = Some((
stage_id,
self.db.view(|tx| stage_id.get_progress(tx))??.unwrap_or_default(),
self.db.view(|tx| stage_id.get_checkpoint(tx))??.unwrap_or_default(),
));
}
@ -247,24 +247,24 @@ where
let span = info_span!("Unwinding", stage = %stage_id);
let _enter = span.enter();
let mut stage_progress = stage_id.get_progress(tx.deref())?.unwrap_or_default();
if stage_progress < to {
let mut stage_progress = stage_id.get_checkpoint(tx.deref())?.unwrap_or_default();
if stage_progress.block_number < to {
debug!(target: "sync::pipeline", from = %stage_progress, %to, "Unwind point too far for stage");
self.listeners.notify(PipelineEvent::Skipped { stage_id });
continue
}
debug!(target: "sync::pipeline", from = %stage_progress, %to, ?bad_block, "Starting unwind");
while stage_progress > to {
let input = UnwindInput { stage_progress, unwind_to: to, bad_block };
while stage_progress.block_number > to {
let input = UnwindInput { checkpoint: stage_progress, unwind_to: to, bad_block };
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
let output = stage.unwind(&mut tx, input).await;
match output {
Ok(unwind_output) => {
stage_progress = unwind_output.stage_progress;
stage_progress = unwind_output.checkpoint;
self.metrics.stage_checkpoint(stage_id, stage_progress);
stage_id.save_progress(tx.deref(), stage_progress)?;
stage_id.save_checkpoint(tx.deref(), stage_progress)?;
self.listeners
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
@ -284,7 +284,7 @@ where
async fn execute_stage_to_completion(
&mut self,
previous_stage: Option<(StageId, BlockNumber)>,
previous_stage: Option<(StageId, StageCheckpoint)>,
stage_index: usize,
) -> Result<ControlFlow, PipelineError> {
let stage = &mut self.stages[stage_index];
@ -293,43 +293,47 @@ where
loop {
let mut tx = Transaction::new(&self.db)?;
let prev_progress = stage_id.get_progress(tx.deref())?;
let prev_checkpoint = stage_id.get_checkpoint(tx.deref())?;
let stage_reached_max_block = prev_progress
let stage_reached_max_block = prev_checkpoint
.zip(self.max_block)
.map_or(false, |(prev_progress, target)| prev_progress >= target);
.map_or(false, |(prev_progress, target)| prev_progress.block_number >= target);
if stage_reached_max_block {
warn!(
target: "sync::pipeline",
stage = %stage_id,
max_block = self.max_block,
prev_block = prev_progress,
prev_block = prev_checkpoint.map(|progress| progress.block_number),
"Stage reached maximum block, skipping."
);
self.listeners.notify(PipelineEvent::Skipped { stage_id });
// We reached the maximum block, so we skip the stage
return Ok(ControlFlow::NoProgress { stage_progress: prev_progress })
return Ok(ControlFlow::NoProgress {
stage_progress: prev_checkpoint.map(|progress| progress.block_number),
})
}
self.listeners
.notify(PipelineEvent::Running { stage_id, stage_progress: prev_progress });
self.listeners.notify(PipelineEvent::Running {
stage_id,
stage_progress: prev_checkpoint.map(|progress| progress.block_number),
});
match stage
.execute(&mut tx, ExecInput { previous_stage, stage_progress: prev_progress })
.execute(&mut tx, ExecInput { previous_stage, checkpoint: prev_checkpoint })
.await
{
Ok(out @ ExecOutput { stage_progress, done }) => {
made_progress |= stage_progress != prev_progress.unwrap_or_default();
Ok(out @ ExecOutput { checkpoint, done }) => {
made_progress |= checkpoint != prev_checkpoint.unwrap_or_default();
info!(
target: "sync::pipeline",
stage = %stage_id,
%stage_progress,
%checkpoint,
%done,
"Stage made progress"
);
self.metrics.stage_checkpoint(stage_id, stage_progress);
stage_id.save_progress(tx.deref(), stage_progress)?;
self.metrics.stage_checkpoint(stage_id, checkpoint);
stage_id.save_checkpoint(tx.deref(), checkpoint)?;
self.listeners.notify(PipelineEvent::Ran { stage_id, result: out.clone() });
@ -337,6 +341,7 @@ where
tx.commit()?;
if done {
let stage_progress = checkpoint.block_number;
return Ok(if made_progress {
ControlFlow::Continue { progress: stage_progress }
} else {
@ -359,7 +364,7 @@ where
// we bail entirely, otherwise we restart the execution loop from the
// beginning.
Ok(ControlFlow::Unwind {
target: prev_progress.unwrap_or_default(),
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: Some(block),
})
} else if err.is_fatal() {
@ -440,11 +445,11 @@ mod tests {
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })),
)
.add_stage(
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(db);
@ -462,12 +467,12 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 20, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
]
);
@ -481,18 +486,18 @@ mod tests {
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.add_stage(
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.add_stage(
TestStage::new(StageId("C"))
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.with_max_block(10)
.build(db);
@ -515,42 +520,54 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 100, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId("C"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("C"),
result: ExecOutput { stage_progress: 20, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
// Unwinding
PipelineEvent::Unwinding {
stage_id: StageId("C"),
input: UnwindInput { stage_progress: 20, unwind_to: 1, bad_block: None }
input: UnwindInput {
checkpoint: StageCheckpoint::new(20),
unwind_to: 1,
bad_block: None
}
},
PipelineEvent::Unwound {
stage_id: StageId("C"),
result: UnwindOutput { stage_progress: 1 },
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
},
PipelineEvent::Unwinding {
stage_id: StageId("B"),
input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None }
input: UnwindInput {
checkpoint: StageCheckpoint::new(10),
unwind_to: 1,
bad_block: None
}
},
PipelineEvent::Unwound {
stage_id: StageId("B"),
result: UnwindOutput { stage_progress: 1 },
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
},
PipelineEvent::Unwinding {
stage_id: StageId("A"),
input: UnwindInput { stage_progress: 100, unwind_to: 1, bad_block: None }
input: UnwindInput {
checkpoint: StageCheckpoint::new(100),
unwind_to: 1,
bad_block: None
}
},
PipelineEvent::Unwound {
stage_id: StageId("A"),
result: UnwindOutput { stage_progress: 1 },
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
},
]
);
@ -564,12 +581,12 @@ mod tests {
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 50 })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
)
.add_stage(
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(db);
@ -592,23 +609,27 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 100, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
// Unwinding
// Nothing to unwind in stage "B"
PipelineEvent::Skipped { stage_id: StageId("B") },
PipelineEvent::Unwinding {
stage_id: StageId("A"),
input: UnwindInput { stage_progress: 100, unwind_to: 50, bad_block: None }
input: UnwindInput {
checkpoint: StageCheckpoint::new(100),
unwind_to: 50,
bad_block: None
}
},
PipelineEvent::Unwound {
stage_id: StageId("A"),
result: UnwindOutput { stage_progress: 50 },
result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
},
]
);
@ -633,9 +654,9 @@ mod tests {
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.add_stage(
TestStage::new(StageId("B"))
@ -643,8 +664,8 @@ mod tests {
block: 5,
error: consensus::ConsensusError::BaseFeeMissing,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(db);
@ -662,27 +683,31 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 10, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Error { stage_id: StageId("B") },
PipelineEvent::Unwinding {
stage_id: StageId("A"),
input: UnwindInput { stage_progress: 10, unwind_to: 0, bad_block: Some(5) }
input: UnwindInput {
checkpoint: StageCheckpoint::new(10),
unwind_to: 0,
bad_block: Some(5)
}
},
PipelineEvent::Unwound {
stage_id: StageId("A"),
result: UnwindOutput { stage_progress: 0 },
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
},
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 10, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
]
);
@ -697,7 +722,7 @@ mod tests {
.add_stage(
TestStage::new(StageId("NonFatal"))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(db);

View File

@ -1,6 +1,7 @@
use crate::StageId;
use metrics::Gauge;
use reth_metrics_derive::Metrics;
use reth_primitives::StageCheckpoint;
use std::collections::HashMap;
#[derive(Metrics)]
@ -16,11 +17,12 @@ pub(crate) struct Metrics {
}
impl Metrics {
pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, progress: u64) {
pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, checkpoint: StageCheckpoint) {
// TODO(alexey): track other metrics from `checkpoint`
self.checkpoints
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]))
.checkpoint
.set(progress as f64);
.set(checkpoint.block_number as f64);
}
}

View File

@ -1,7 +1,7 @@
use crate::{error::StageError, id::StageId};
use async_trait::async_trait;
use reth_db::database::Database;
use reth_primitives::BlockNumber;
use reth_primitives::{BlockNumber, StageCheckpoint};
use reth_provider::Transaction;
use std::{
cmp::{max, min},
@ -11,21 +11,21 @@ use std::{
/// Stage execution input, see [Stage::execute].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct ExecInput {
/// The stage that was run before the current stage and the block number it reached.
pub previous_stage: Option<(StageId, BlockNumber)>,
/// The stage that was run before the current stage and the progress it reached.
pub previous_stage: Option<(StageId, StageCheckpoint)>,
/// The progress of this stage the last time it was executed.
pub stage_progress: Option<BlockNumber>,
pub checkpoint: Option<StageCheckpoint>,
}
impl ExecInput {
/// Return the progress of the stage or default.
pub fn stage_progress(&self) -> BlockNumber {
self.stage_progress.unwrap_or_default()
pub fn checkpoint(&self) -> StageCheckpoint {
self.checkpoint.unwrap_or_default()
}
/// Return the progress of the previous stage or default.
pub fn previous_stage_progress(&self) -> BlockNumber {
self.previous_stage.as_ref().map(|(_, num)| *num).unwrap_or_default()
pub fn previous_stage_checkpoint(&self) -> StageCheckpoint {
self.previous_stage.map(|(_, checkpoint)| checkpoint).unwrap_or_default()
}
/// Return next block range that needs to be executed.
@ -36,7 +36,7 @@ impl ExecInput {
/// Return true if this is the first block range to execute.
pub fn is_first_range(&self) -> bool {
self.stage_progress.is_none()
self.checkpoint.is_none()
}
/// Return the next block range to execute.
@ -45,12 +45,12 @@ impl ExecInput {
&self,
threshold: u64,
) -> (RangeInclusive<BlockNumber>, bool) {
let current_block = self.stage_progress.unwrap_or_default();
let current_block = self.checkpoint.unwrap_or_default();
// +1 is to skip present block and always start from block number 1, not 0.
let start = current_block + 1;
let target = self.previous_stage_progress();
let start = current_block.block_number + 1;
let target = self.previous_stage_checkpoint().block_number;
let end = min(target, current_block.saturating_add(threshold));
let end = min(target, current_block.block_number.saturating_add(threshold));
let is_final_range = end == target;
(start..=end, is_final_range)
@ -60,8 +60,8 @@ impl ExecInput {
/// Stage unwind input, see [Stage::unwind].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct UnwindInput {
/// The current highest block of the stage.
pub stage_progress: BlockNumber,
/// The current highest progress of the stage.
pub checkpoint: StageCheckpoint,
/// The block to unwind to.
pub unwind_to: BlockNumber,
/// The bad block that caused the unwind, if any.
@ -81,14 +81,14 @@ impl UnwindInput {
) -> (RangeInclusive<BlockNumber>, BlockNumber, bool) {
// +1 is to skip the block we're unwinding to
let mut start = self.unwind_to + 1;
let end = self.stage_progress;
let end = self.checkpoint;
start = max(start, end.saturating_sub(threshold));
start = max(start, end.block_number.saturating_sub(threshold));
let unwind_to = start - 1;
let is_final_range = unwind_to == self.unwind_to;
(start..=end, unwind_to, is_final_range)
(start..=end.block_number, unwind_to, is_final_range)
}
}
@ -96,15 +96,15 @@ impl UnwindInput {
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ExecOutput {
/// How far the stage got.
pub stage_progress: BlockNumber,
pub checkpoint: StageCheckpoint,
/// Whether or not the stage is done.
pub done: bool,
}
impl ExecOutput {
/// Mark the stage as done, checkpointing at the given block number.
pub fn done(stage_progress: BlockNumber) -> Self {
Self { stage_progress, done: true }
/// Mark the stage as done, checkpointing at the given place.
pub fn done(checkpoint: StageCheckpoint) -> Self {
Self { checkpoint, done: true }
}
}
@ -112,7 +112,7 @@ impl ExecOutput {
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct UnwindOutput {
/// The block at which the stage has unwound to.
pub stage_progress: BlockNumber,
pub checkpoint: StageCheckpoint,
}
/// A stage is a segmented part of the syncing process of the node.

View File

@ -11,6 +11,7 @@ use reth_interfaces::{
consensus::Consensus,
p2p::bodies::{downloader::BodyDownloader, response::BlockResponse},
};
use reth_primitives::StageCheckpoint;
use reth_provider::Transaction;
use std::sync::Arc;
use tracing::*;
@ -75,7 +76,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
if range.is_empty() {
let (from, to) = range.into_inner();
info!(target: "sync::stages::bodies", from, "Target block already reached");
return Ok(ExecOutput::done(to))
return Ok(ExecOutput::done(StageCheckpoint::new(to)))
}
// Update the header range on the downloader
@ -156,7 +157,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// - We reached our target and the target was not limited by the batch size of the stage
let done = highest_block == to_block;
info!(target: "sync::stages::bodies", stage_progress = highest_block, target = to_block, is_final_range = done, "Stage iteration finished");
Ok(ExecOutput { stage_progress: highest_block, done })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(highest_block), done })
}
/// Unwind the stage.
@ -209,7 +210,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}
info!(target: "sync::stages::bodies", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: input.unwind_to })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
}
@ -233,8 +234,8 @@ mod tests {
// Set up test runner
let mut runner = BodyTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -250,7 +251,7 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress, done: false }) if stage_progress < 200
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false }) if block_number < 200
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
@ -263,8 +264,8 @@ mod tests {
// Set up test runner
let mut runner = BodyTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -277,7 +278,13 @@ mod tests {
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
assert_matches!(output, Ok(ExecOutput { stage_progress: 20, done: true }));
assert_matches!(
output,
Ok(ExecOutput {
checkpoint: StageCheckpoint { block_number: 20, stage_checkpoint: None },
done: true
})
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
@ -289,8 +296,8 @@ mod tests {
// Set up test runner
let mut runner = BodyTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -303,14 +310,14 @@ mod tests {
let first_run = rx.await.unwrap();
assert_matches!(
first_run,
Ok(ExecOutput { stage_progress, done: false }) if stage_progress >= 10
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false }) if block_number >= 10
);
let first_run_progress = first_run.unwrap().stage_progress;
let first_run_checkpoint = first_run.unwrap().checkpoint;
// Execute again on top of the previous run
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(first_run_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(first_run_checkpoint),
};
let rx = runner.execute(input);
@ -318,7 +325,7 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress, done: true }) if stage_progress > first_run_progress
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true }) if block_number > first_run_checkpoint.block_number
);
assert_matches!(
runner.validate_execution(input, output.ok()),
@ -335,8 +342,8 @@ mod tests {
// Set up test runner
let mut runner = BodyTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -351,11 +358,14 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress, done: true }) if stage_progress == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true }) if block_number == previous_stage
);
let stage_progress = output.unwrap().stage_progress;
let checkpoint = output.unwrap().checkpoint;
runner
.validate_db_blocks(input.stage_progress.unwrap_or_default(), stage_progress)
.validate_db_blocks(
input.checkpoint.unwrap_or_default().block_number,
checkpoint.block_number,
)
.expect("Written block data invalid");
// Delete a transaction
@ -371,11 +381,11 @@ mod tests {
// Unwind all of it
let unwind_to = 1;
let input = UnwindInput { bad_block: None, stage_progress, unwind_to };
let input = UnwindInput { bad_block: None, checkpoint, unwind_to };
let res = runner.unwind(input).await;
assert_matches!(
res,
Ok(UnwindOutput { stage_progress }) if stage_progress == 1
Ok(UnwindOutput { checkpoint: StageCheckpoint { block_number: 1, .. } })
);
assert_matches!(runner.validate_unwind(input), Ok(_), "unwind validation");
@ -492,8 +502,8 @@ mod tests {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let start = input.stage_progress.unwrap_or_default();
let end = input.previous_stage_progress();
let start = input.checkpoint.unwrap_or_default().block_number;
let end = input.previous_stage_checkpoint().block_number;
let blocks = random_block_range(start..=end, GENESIS_HASH, 0..2);
self.tx.insert_headers_with_td(blocks.iter().map(|block| &block.header))?;
if let Some(progress) = blocks.first() {
@ -536,9 +546,10 @@ mod tests {
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
let highest_block = match output.as_ref() {
Some(output) => output.stage_progress,
None => input.stage_progress.unwrap_or_default(),
};
Some(output) => output.checkpoint,
None => input.checkpoint.unwrap_or_default(),
}
.block_number;
self.validate_db_blocks(highest_block, highest_block)
}
}

View File

@ -9,7 +9,8 @@ use reth_db::{
};
use reth_metrics_derive::Metrics;
use reth_primitives::{
constants::MGAS_TO_GAS, Block, BlockNumber, BlockWithSenders, TransactionSigned, U256,
constants::MGAS_TO_GAS, Block, BlockNumber, BlockWithSenders, StageCheckpoint,
TransactionSigned, U256,
};
use reth_provider::{
post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction,
@ -134,8 +135,8 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let start_block = input.stage_progress() + 1;
let max_block = input.previous_stage_progress();
let start_block = input.checkpoint().block_number + 1;
let max_block = input.previous_stage_checkpoint().block_number;
// Build executor
let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx));
@ -189,7 +190,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let is_final_range = stage_progress == max_block;
info!(target: "sync::stages::execution", stage_progress, is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress, done: is_final_range })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress), done: is_final_range })
}
}
@ -248,7 +249,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
return Ok(UnwindOutput { stage_progress: input.unwind_to })
return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
// get all batches for account change
@ -305,7 +306,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
}
info!(target: "sync::stages::execution", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_to })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
}
}
@ -395,9 +396,9 @@ mod tests {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut tx = Transaction::new(state_db.as_ref()).unwrap();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, 1)),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(1))),
/// The progress of this stage the last time it was executed.
stage_progress: None,
checkpoint: None,
};
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
@ -432,7 +433,7 @@ mod tests {
let mut execution_stage = stage();
let output = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();
assert_eq!(output, ExecOutput { stage_progress: 1, done: true });
assert_eq!(output, ExecOutput { checkpoint: StageCheckpoint::new(1), done: true });
let tx = tx.deref_mut();
// check post state
let account1 = H160(hex!("1000000000000000000000000000000000000000"));
@ -484,9 +485,9 @@ mod tests {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut tx = Transaction::new(state_db.as_ref()).unwrap();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, 1)),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(1))),
/// The progress of this stage the last time it was executed.
stage_progress: None,
checkpoint: None,
};
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
@ -519,11 +520,14 @@ mod tests {
let mut stage = stage();
let o = stage
.unwind(&mut tx, UnwindInput { stage_progress: 1, unwind_to: 0, bad_block: None })
.unwind(
&mut tx,
UnwindInput { checkpoint: StageCheckpoint::new(1), unwind_to: 0, bad_block: None },
)
.await
.unwrap();
assert_eq!(o, UnwindOutput { stage_progress: 0 });
assert_eq!(o, UnwindOutput { checkpoint: StageCheckpoint::new(0) });
// assert unwind stage
let db_tx = tx.deref();
@ -551,9 +555,9 @@ mod tests {
let test_tx = TestTransaction::default();
let mut tx = test_tx.inner();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, 1)),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(1))),
/// The progress of this stage the last time it was executed.
stage_progress: None,
checkpoint: None,
};
let mut genesis_rlp = hex!("f901f8f901f3a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa0c9ceb8372c88cb461724d8d3d87e8b933f6fc5f679d4841800e662f4428ffd0da056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008302000080830f4240808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();

View File

@ -1,5 +1,6 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::StageCheckpoint;
use reth_provider::Transaction;
/// The [`StageId`] of the finish stage.
@ -23,7 +24,7 @@ impl<DB: Database> Stage<DB> for FinishStage {
_tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
Ok(ExecOutput { done: true, stage_progress: input.previous_stage_progress() })
Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true })
}
async fn unwind(
@ -31,7 +32,7 @@ impl<DB: Database> Stage<DB> for FinishStage {
_tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
Ok(UnwindOutput { stage_progress: input.unwind_to })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
}
@ -68,12 +69,12 @@ mod tests {
type Seed = Vec<SealedHeader>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let start = input.stage_progress.unwrap_or_default();
let start = input.checkpoint.unwrap_or_default().block_number;
let head = random_header(start, None);
self.tx.insert_headers_with_td(std::iter::once(&head))?;
// use previous progress as seed size
let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1;
let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default().block_number + 1;
if start + 1 >= end {
return Ok(Vec::default())
@ -93,8 +94,8 @@ mod tests {
if let Some(output) = output {
assert!(output.done, "stage should always be done");
assert_eq!(
output.stage_progress,
input.previous_stage_progress(),
output.checkpoint,
input.previous_stage_checkpoint(),
"stage progress should always match progress of previous stage"
);
}

View File

@ -9,7 +9,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
RawKey, RawTable,
};
use reth_primitives::{keccak256, AccountHashingCheckpoint};
use reth_primitives::{keccak256, AccountHashingCheckpoint, StageCheckpoint};
use reth_provider::Transaction;
use std::{
cmp::max,
@ -164,7 +164,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
) -> Result<ExecOutput, StageError> {
let range = input.next_block_range();
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
return Ok(ExecOutput::done(StageCheckpoint::new(*range.end())))
}
let (from_block, to_block) = range.into_inner();
@ -255,8 +255,8 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
if next_address.is_some() {
// from block is correct here as were are iteration over state for this
// particular block
info!(target: "sync::stages::hashing_account", stage_progress = input.stage_progress(), is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { stage_progress: input.stage_progress(), done: false })
info!(target: "sync::stages::hashing_account", stage_progress = %input.checkpoint(), is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })
}
} else {
// Aggregate all transition changesets and and make list of account that have been
@ -270,8 +270,8 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
tx.insert_account_for_hashing(accounts.into_iter())?;
}
info!(target: "sync::stages::hashing_account", stage_progress = input.previous_stage_progress(), is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
info!(target: "sync::stages::hashing_account", stage_progress = %input.previous_stage_checkpoint(), is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true })
}
/// Unwind the stage.
@ -287,7 +287,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
tx.unwind_account_hashing(range)?;
info!(target: "sync::stages::hashing_account", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_progress })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
@ -312,8 +312,8 @@ mod tests {
runner.set_clean_threshold(1);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -321,7 +321,7 @@ mod tests {
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == previous_stage);
assert_matches!(result, Ok(ExecOutput {checkpoint: StageCheckpoint { block_number, ..}, done: true}) if block_number == previous_stage);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
@ -336,8 +336,8 @@ mod tests {
runner.set_commit_threshold(5);
let mut input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -346,7 +346,10 @@ mod tests {
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 10);
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 10, .. }, done: false })
);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 5);
let fifth_address = runner
.tx
@ -368,11 +371,14 @@ mod tests {
);
// second run, hash next five account.
input.stage_progress = Some(result.unwrap().stage_progress);
input.checkpoint = Some(result.unwrap().checkpoint);
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == 20);
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 20, .. }, done: true })
);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 10);
// Validate the stage execution
@ -483,7 +489,7 @@ mod tests {
Ok(AccountHashingStage::seed(
&mut self.tx.inner(),
SeedOpts {
blocks: 1..=input.previous_stage_progress(),
blocks: 1..=input.previous_stage_checkpoint().block_number,
accounts: 0..10,
txs: 0..3,
},
@ -497,8 +503,8 @@ mod tests {
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
let start_block = input.checkpoint.unwrap_or_default().block_number + 1;
let end_block = output.checkpoint.block_number;
if start_block > end_block {
return Ok(())
}

View File

@ -8,7 +8,7 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{keccak256, StorageEntry, StorageHashingCheckpoint};
use reth_primitives::{keccak256, StageCheckpoint, StorageEntry, StorageHashingCheckpoint};
use reth_provider::Transaction;
use std::{collections::BTreeMap, fmt::Debug};
use tracing::*;
@ -85,7 +85,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
) -> Result<ExecOutput, StageError> {
let range = input.next_block_range();
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
return Ok(ExecOutput::done(StageCheckpoint::new(*range.end())))
}
let (from_block, to_block) = range.into_inner();
@ -181,8 +181,8 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
if current_key.is_some() {
// `from_block` is correct here as were are iteration over state for this
// particular block.
info!(target: "sync::stages::hashing_storage", stage_progress = input.stage_progress(), is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { stage_progress: input.stage_progress(), done: false })
info!(target: "sync::stages::hashing_storage", stage_progress = %input.checkpoint(), is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })
}
} else {
// Aggregate all changesets and and make list of storages that have been
@ -195,8 +195,8 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.insert_storage_for_hashing(storages.into_iter())?;
}
info!(target: "sync::stages::hashing_storage", stage_progress = input.previous_stage_progress(), is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
info!(target: "sync::stages::hashing_storage", stage_progress = %input.previous_stage_checkpoint(), is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true })
}
/// Unwind the stage.
@ -211,7 +211,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.unwind_storage_hashing(BlockNumberAddress::range(range))?;
info!(target: "sync::stages::hashing_storage", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_progress })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
@ -251,8 +251,8 @@ mod tests {
runner.set_commit_threshold(1);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -263,7 +263,7 @@ mod tests {
// Continue from checkpoint
continue
} else {
assert!(result.stage_progress == previous_stage);
assert!(result.checkpoint.block_number == previous_stage);
// Validate the stage execution
assert!(
@ -287,8 +287,8 @@ mod tests {
runner.set_commit_threshold(500);
let mut input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -296,7 +296,10 @@ mod tests {
// first run, hash first half of storages.
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 100);
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 100, .. }, done: false })
);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 500);
let (progress_address, progress_key) = runner
.tx
@ -327,7 +330,10 @@ mod tests {
runner.set_commit_threshold(2);
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if !done && stage_progress == 100);
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 100, .. }, done: false })
);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 502);
let (progress_address, progress_key) = runner
.tx
@ -356,11 +362,14 @@ mod tests {
// third last run, hash rest of storages.
runner.set_commit_threshold(1000);
input.stage_progress = Some(result.unwrap().stage_progress);
input.checkpoint = Some(result.unwrap().checkpoint);
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {done, stage_progress}) if done && stage_progress == 500);
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 500, .. }, done: true })
);
assert_eq!(
runner.tx.table::<tables::HashedStorage>().unwrap().len(),
runner.tx.table::<tables::PlainStorageState>().unwrap().len()
@ -402,8 +411,8 @@ mod tests {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let stage_progress = input.stage_progress.unwrap_or_default() + 1;
let end = input.previous_stage_progress();
let stage_progress = input.checkpoint.unwrap_or_default().block_number + 1;
let end = input.previous_stage_checkpoint().block_number;
let n_accounts = 31;
let mut accounts = random_contract_account_range(&mut (0..n_accounts));
@ -483,8 +492,8 @@ mod tests {
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
let start_block = input.checkpoint.unwrap_or_default().block_number + 1;
let end_block = output.checkpoint.block_number;
if start_block > end_block {
return Ok(())
}

View File

@ -10,7 +10,7 @@ use reth_interfaces::{
p2p::headers::downloader::{HeaderDownloader, SyncTarget},
provider::ProviderError,
};
use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader, H256};
use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader, StageCheckpoint, H256};
use reth_provider::Transaction;
use tokio::sync::watch;
use tracing::*;
@ -193,16 +193,16 @@ where
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let current_progress = input.stage_progress.unwrap_or_default();
let current_progress = input.checkpoint.unwrap_or_default();
// Lookup the head and tip of the sync range
let gap = self.get_sync_gap(tx, current_progress).await?;
let gap = self.get_sync_gap(tx, current_progress.block_number).await?;
let tip = gap.target.tip();
// Nothing to sync
if gap.is_closed() {
info!(target: "sync::stages::headers", stage_progress = current_progress, target = ?tip, "Target block already reached");
return Ok(ExecOutput { stage_progress: current_progress, done: true })
info!(target: "sync::stages::headers", stage_progress = %current_progress, target = ?tip, "Target block already reached");
return Ok(ExecOutput { checkpoint: current_progress, done: true })
}
debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync");
@ -221,16 +221,16 @@ where
// Write the headers to db
self.write_headers::<DB>(tx, downloaded_headers)?.unwrap_or_default();
if self.is_stage_done(tx, current_progress)? {
let stage_progress = current_progress.max(
if self.is_stage_done(tx, current_progress.block_number)? {
let stage_progress = current_progress.block_number.max(
tx.cursor_read::<tables::CanonicalHeaders>()?
.last()?
.map(|(num, _)| num)
.unwrap_or_default(),
);
Ok(ExecOutput { stage_progress, done: true })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress), done: true })
} else {
Ok(ExecOutput { stage_progress: current_progress, done: false })
Ok(ExecOutput { checkpoint: current_progress, done: false })
}
}
@ -248,7 +248,7 @@ where
tx.unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
info!(target: "sync::stages::headers", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: input.unwind_to })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
}
@ -345,14 +345,15 @@ mod tests {
type Seed = Vec<SealedHeader>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let start = input.stage_progress.unwrap_or_default();
let start = input.checkpoint.unwrap_or_default().block_number;
let head = random_header(start, None);
self.tx.insert_headers(std::iter::once(&head))?;
// patch td table for `update_head` call
self.tx.commit(|tx| tx.put::<tables::HeaderTD>(head.number, U256::ZERO.into()))?;
// use previous progress as seed size
let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1;
let end =
input.previous_stage.map(|(_, num)| num).unwrap_or_default().block_number + 1;
if start + 1 >= end {
return Ok(Vec::default())
@ -369,11 +370,13 @@ mod tests {
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
let initial_stage_progress = input.stage_progress.unwrap_or_default();
let initial_stage_progress = input.checkpoint.unwrap_or_default().block_number;
match output {
Some(output) if output.stage_progress > initial_stage_progress => {
Some(output) if output.checkpoint.block_number > initial_stage_progress => {
self.tx.query(|tx| {
for block_num in (initial_stage_progress..output.stage_progress).rev() {
for block_num in
(initial_stage_progress..output.checkpoint.block_number).rev()
{
// look up the header hash
let hash = tx
.get::<tables::CanonicalHeaders>(block_num)?
@ -458,8 +461,8 @@ mod tests {
let mut runner = HeadersTestRunner::with_linear_downloader();
let (stage_progress, previous_stage) = (1000, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
@ -471,7 +474,7 @@ mod tests {
runner.send_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: true, stage_progress }) if stage_progress == tip.number);
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) if block_number == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
@ -537,8 +540,8 @@ mod tests {
// pick range that's larger than the configured headers batch size
let (stage_progress, previous_stage) = (600, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
@ -550,7 +553,7 @@ mod tests {
runner.send_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: false, stage_progress: progress }) if progress == stage_progress);
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false }) if block_number == stage_progress);
runner.client.clear().await;
runner.client.extend(headers.iter().take(101).map(|h| h.clone().unseal()).rev()).await;
@ -558,7 +561,7 @@ mod tests {
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: true, stage_progress }) if stage_progress == tip.number);
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) if block_number == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
}

View File

@ -1,5 +1,6 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::StageCheckpoint;
use reth_provider::Transaction;
use std::fmt::Debug;
use tracing::*;
@ -39,7 +40,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
return Ok(ExecOutput::done(StageCheckpoint::new(*range.end())))
}
let indices = tx.get_account_transition_ids_from_changeset(range.clone())?;
@ -47,7 +48,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
tx.insert_account_history_index(indices)?;
info!(target: "sync::stages::index_account_history", stage_progress = *range.end(), is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress: *range.end(), done: is_final_range })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
}
/// Unwind the stage.
@ -63,7 +64,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
// from HistoryIndex higher than that number.
Ok(UnwindOutput { stage_progress: unwind_progress })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
@ -136,21 +137,27 @@ mod tests {
}
async fn run(tx: &TestTransaction, run_to: u64) {
let input =
ExecInput { previous_stage: Some((PREV_STAGE_ID, run_to)), ..Default::default() };
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(run_to))),
..Default::default()
};
let mut stage = IndexAccountHistoryStage::default();
let mut tx = tx.inner();
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(out, ExecOutput { stage_progress: 5, done: true });
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
tx.commit().unwrap();
}
async fn unwind(tx: &TestTransaction, unwind_from: u64, unwind_to: u64) {
let input = UnwindInput { stage_progress: unwind_from, unwind_to, ..Default::default() };
let input = UnwindInput {
checkpoint: StageCheckpoint::new(unwind_from),
unwind_to,
..Default::default()
};
let mut stage = IndexAccountHistoryStage::default();
let mut tx = tx.inner();
let out = stage.unwind(&mut tx, input).await.unwrap();
assert_eq!(out, UnwindOutput { stage_progress: unwind_to });
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
tx.commit().unwrap();
}

View File

@ -1,5 +1,6 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::{database::Database, models::BlockNumberAddress};
use reth_primitives::StageCheckpoint;
use reth_provider::Transaction;
use std::fmt::Debug;
use tracing::*;
@ -36,7 +37,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let target = input.previous_stage_progress();
let target = input.previous_stage_checkpoint();
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
if range.is_empty() {
@ -47,7 +48,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
tx.insert_storage_history_index(indices)?;
info!(target: "sync::stages::index_storage_history", stage_progress = *range.end(), done = is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress: *range.end(), done: is_final_range })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
}
/// Unwind the stage.
@ -62,7 +63,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
info!(target: "sync::stages::index_storage_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_progress })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
@ -146,21 +147,27 @@ mod tests {
}
async fn run(tx: &TestTransaction, run_to: u64) {
let input =
ExecInput { previous_stage: Some((PREV_STAGE_ID, run_to)), ..Default::default() };
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(run_to))),
..Default::default()
};
let mut stage = IndexStorageHistoryStage::default();
let mut tx = tx.inner();
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(out, ExecOutput { stage_progress: 5, done: true });
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
tx.commit().unwrap();
}
async fn unwind(tx: &TestTransaction, unwind_from: u64, unwind_to: u64) {
let input = UnwindInput { stage_progress: unwind_from, unwind_to, ..Default::default() };
let input = UnwindInput {
checkpoint: StageCheckpoint::new(unwind_from),
unwind_to,
..Default::default()
};
let mut stage = IndexStorageHistoryStage::default();
let mut tx = tx.inner();
let out = stage.unwind(&mut tx, input).await.unwrap();
assert_eq!(out, UnwindOutput { stage_progress: unwind_to });
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
tx.commit().unwrap();
}
@ -219,7 +226,10 @@ mod tests {
async fn insert_index_to_full_shard() {
// init
let tx = TestTransaction::default();
let _input = ExecInput { previous_stage: Some((PREV_STAGE_ID, 5)), ..Default::default() };
let _input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(5))),
..Default::default()
};
// change does not matter only that account is present in changeset.
let full_list = vec![3; NUM_OF_INDICES_IN_SHARD];

View File

@ -6,7 +6,9 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::consensus;
use reth_primitives::{hex, trie::StoredSubNode, BlockNumber, MerkleCheckpoint, H256};
use reth_primitives::{
hex, trie::StoredSubNode, BlockNumber, MerkleCheckpoint, StageCheckpoint, H256,
};
use reth_provider::Transaction;
use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress};
use std::{fmt::Debug, ops::DerefMut};
@ -146,10 +148,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let threshold = match self {
MerkleStage::Unwind => {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(ExecOutput {
stage_progress: input.previous_stage_progress(),
done: true,
})
return Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true })
}
MerkleStage::Execution { clean_threshold } => *clean_threshold,
#[cfg(any(test, feature = "test-utils"))]
@ -158,7 +157,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let range = input.next_block_range();
let (from_block, to_block) = range.clone().into_inner();
let current_block = input.previous_stage_progress();
let current_block = input.previous_stage_checkpoint().block_number;
let block_root = tx.get_header(current_block)?.state_root;
@ -206,7 +205,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
state.hash_builder.into(),
);
self.save_execution_checkpoint(tx, Some(checkpoint))?;
return Ok(ExecOutput { stage_progress: input.stage_progress(), done: false })
return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })
}
StateRootProgress::Complete(root, updates) => {
updates.flush(tx.deref_mut())?;
@ -227,7 +226,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
self.validate_state_root(trie_root, block_root, to_block)?;
info!(target: "sync::stages::merkle::exec", stage_progress = to_block, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { stage_progress: to_block, done: true })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(to_block), done: true })
}
/// Unwind the stage.
@ -239,14 +238,14 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let range = input.unwind_block_range();
if matches!(self, MerkleStage::Execution { .. }) {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(UnwindOutput { stage_progress: input.unwind_to })
return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
if input.unwind_to == 0 {
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
return Ok(UnwindOutput { stage_progress: input.unwind_to })
return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
// Unwind trie only if there are transitions
@ -266,7 +265,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
}
info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: input.unwind_to })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
}
@ -301,8 +300,8 @@ mod tests {
let mut runner = MerkleTestRunner::default();
// set low threshold so we hash the whole storage
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -313,8 +312,8 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
);
// Validate the stage execution
@ -329,8 +328,8 @@ mod tests {
// Set up the runner
let mut runner = MerkleTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
runner.seed_execution(input).expect("failed to seed execution");
@ -341,8 +340,8 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
);
// Validate the stage execution
@ -377,9 +376,9 @@ mod tests {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let stage_progress = input.checkpoint.unwrap_or_default().block_number;
let start = stage_progress + 1;
let end = input.previous_stage_progress();
let end = input.previous_stage_checkpoint().block_number;
let num_of_accounts = 31;
let accounts = random_contract_account_range(&mut (0..num_of_accounts))

View File

@ -7,7 +7,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
RawKey, RawTable, RawValue,
};
use reth_primitives::{keccak256, TransactionSignedNoHash, TxNumber, H160};
use reth_primitives::{keccak256, StageCheckpoint, TransactionSignedNoHash, TxNumber, H160};
use reth_provider::Transaction;
use std::fmt::Debug;
use thiserror::Error;
@ -52,7 +52,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
) -> Result<ExecOutput, StageError> {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
return Ok(ExecOutput::done(StageCheckpoint::new(*range.end())))
}
let (start_block, end_block) = range.clone().into_inner();
@ -65,7 +65,10 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// No transactions to walk over
if first_tx_num > last_tx_num {
info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Target transaction already reached");
return Ok(ExecOutput { stage_progress: end_block, done: is_final_range })
return Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block),
done: is_final_range,
})
}
// Acquire the cursor for inserting elements
@ -137,7 +140,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress: end_block, done: is_final_range })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block), done: is_final_range })
}
/// Unwind the stage.
@ -154,7 +157,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
tx.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_to })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
}
}
@ -193,13 +196,13 @@ mod tests {
// Set up the runner
let runner = SenderRecoveryTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Insert blocks with a single transaction at block `stage_progress + 10`
let non_empty_block_number = stage_progress + 10;
let blocks = (stage_progress..=input.previous_stage_progress())
let blocks = (stage_progress..=input.previous_stage_checkpoint().block_number)
.map(|number| {
random_block(number, None, Some((number == non_empty_block_number) as u8), None)
})
@ -212,8 +215,8 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
);
// Validate the stage execution
@ -228,8 +231,8 @@ mod tests {
runner.set_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
let first_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Seed only once with full input range
@ -240,20 +243,20 @@ mod tests {
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { done: false, stage_progress })
if stage_progress == expected_progress
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false })
if block_number == expected_progress
);
// Execute second time
let second_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(expected_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(expected_progress)),
};
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: true, stage_progress })
if stage_progress == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
);
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
@ -313,8 +316,8 @@ mod tests {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let end = input.previous_stage_progress();
let stage_progress = input.checkpoint.unwrap_or_default().block_number;
let end = input.previous_stage_checkpoint().block_number;
let blocks = random_block_range(stage_progress..=end, H256::zero(), 0..2);
self.tx.insert_blocks(blocks.iter(), None)?;
@ -328,8 +331,8 @@ mod tests {
) -> Result<(), TestRunnerError> {
match output {
Some(output) => self.tx.query(|tx| {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
let start_block = input.checkpoint.unwrap_or_default().block_number + 1;
let end_block = output.checkpoint.block_number;
if start_block > end_block {
return Ok(())
@ -352,9 +355,9 @@ mod tests {
Ok(())
})?,
None => {
self.ensure_no_senders_by_block(input.stage_progress.unwrap_or_default())?
}
None => self.ensure_no_senders_by_block(
input.checkpoint.unwrap_or_default().block_number,
)?,
};
Ok(())

View File

@ -6,7 +6,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::{consensus::Consensus, provider::ProviderError};
use reth_primitives::U256;
use reth_primitives::{StageCheckpoint, U256};
use reth_provider::Transaction;
use std::sync::Arc;
use tracing::*;
@ -63,7 +63,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
let mut cursor_headers = tx.cursor_read::<tables::Headers>()?;
// Get latest total difficulty
let last_header_number = input.stage_progress.unwrap_or_default();
let last_header_number = input.checkpoint.unwrap_or_default().block_number;
let last_entry = cursor_td
.seek_exact(last_header_number)?
.ok_or(ProviderError::TotalDifficultyNotFound { number: last_header_number })?;
@ -82,7 +82,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
cursor_td.append(block_number, td.into())?;
}
info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress: end_block, done: is_final_range })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block), done: is_final_range })
}
/// Unwind the stage.
@ -97,7 +97,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
tx.unwind_table_by_num::<tables::HeaderTD>(unwind_to)?;
info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_to })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
}
}
@ -127,8 +127,8 @@ mod tests {
runner.set_threshold(threshold);
let first_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Seed only once with full input range
@ -139,20 +139,20 @@ mod tests {
let expected_progress = stage_progress + threshold;
assert!(matches!(
result,
Ok(ExecOutput { done: false, stage_progress })
if stage_progress == expected_progress
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: false })
if block_number == expected_progress
));
// Execute second time
let second_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(expected_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(expected_progress)),
};
let result = runner.execute(second_input).await.unwrap();
assert!(matches!(
result,
Ok(ExecOutput { done: true, stage_progress })
if stage_progress == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, ..}, done: true })
if block_number == previous_stage
));
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
@ -194,7 +194,7 @@ mod tests {
type Seed = Vec<SealedHeader>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let start = input.stage_progress.unwrap_or_default();
let start = input.checkpoint.unwrap_or_default().block_number;
let head = random_header(start, None);
self.tx.insert_headers(std::iter::once(&head))?;
self.tx.commit(|tx| {
@ -208,7 +208,7 @@ mod tests {
})?;
// use previous progress as seed size
let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1;
let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default().block_number + 1;
if start + 1 >= end {
return Ok(Vec::default())
@ -226,9 +226,9 @@ mod tests {
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
let initial_stage_progress = input.stage_progress.unwrap_or_default();
let initial_stage_progress = input.checkpoint.unwrap_or_default().block_number;
match output {
Some(output) if output.stage_progress > initial_stage_progress => {
Some(output) if output.checkpoint.block_number > initial_stage_progress => {
self.tx.query(|tx| {
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
let (_, mut current_header) = header_cursor

View File

@ -7,7 +7,9 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{rpc_utils::keccak256, BlockNumber, TransactionSignedNoHash, TxNumber, H256};
use reth_primitives::{
rpc_utils::keccak256, BlockNumber, StageCheckpoint, TransactionSignedNoHash, TxNumber, H256,
};
use reth_provider::Transaction;
use thiserror::Error;
use tokio::sync::mpsc;
@ -55,7 +57,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
) -> Result<ExecOutput, StageError> {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
if range.is_empty() {
return Ok(ExecOutput::done(*range.end()))
return Ok(ExecOutput::done(StageCheckpoint::new(*range.end())))
}
let (start_block, end_block) = range.into_inner();
@ -144,7 +146,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { done: is_final_range, stage_progress: end_block })
Ok(ExecOutput { done: is_final_range, checkpoint: StageCheckpoint::new(end_block) })
}
/// Unwind the stage.
@ -178,7 +180,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_to })
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
}
}
@ -215,13 +217,13 @@ mod tests {
// Set up the runner
let runner = TransactionLookupTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Insert blocks with a single transaction at block `stage_progress + 10`
let non_empty_block_number = stage_progress + 10;
let blocks = (stage_progress..=input.previous_stage_progress())
let blocks = (stage_progress..=input.previous_stage_checkpoint().block_number)
.map(|number| {
random_block(number, None, Some((number == non_empty_block_number) as u8), None)
})
@ -234,8 +236,8 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
);
// Validate the stage execution
@ -250,8 +252,8 @@ mod tests {
runner.set_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
let first_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Seed only once with full input range
@ -262,20 +264,20 @@ mod tests {
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { done: false, stage_progress })
if stage_progress == expected_progress
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false })
if block_number == expected_progress
);
// Execute second time
let second_input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(expected_progress),
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(expected_progress)),
};
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: true, stage_progress })
if stage_progress == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
);
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
@ -336,8 +338,8 @@ mod tests {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let stage_progress = input.stage_progress.unwrap_or_default();
let end = input.previous_stage_progress();
let stage_progress = input.checkpoint.unwrap_or_default().block_number;
let end = input.previous_stage_checkpoint().block_number;
let blocks = random_block_range(stage_progress..=end, H256::zero(), 0..2);
self.tx.insert_blocks(blocks.iter(), None)?;
@ -351,8 +353,8 @@ mod tests {
) -> Result<(), TestRunnerError> {
match output {
Some(output) => self.tx.query(|tx| {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
let start_block = input.checkpoint.unwrap_or_default().block_number + 1;
let end_block = output.checkpoint.block_number;
if start_block > end_block {
return Ok(())
@ -375,7 +377,9 @@ mod tests {
Ok(())
})?,
None => self.ensure_no_hash_by_block(input.stage_progress.unwrap_or_default())?,
None => {
self.ensure_no_hash_by_block(input.checkpoint.unwrap_or_default().block_number)?
}
};
Ok(())
}

View File

@ -29,8 +29,8 @@ macro_rules! stage_test_suite {
// Set up the runner
let mut runner = $runner::default();
let input = crate::stage::ExecInput {
previous_stage: Some((crate::test_utils::PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((crate::test_utils::PREV_STAGE_ID, reth_primitives::StageCheckpoint::new(previous_stage))),
checkpoint: Some(reth_primitives::StageCheckpoint::new(stage_progress)),
};
let seed = runner.seed_execution(input).expect("failed to seed");
let rx = runner.execute(input);
@ -42,8 +42,8 @@ macro_rules! stage_test_suite {
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
Ok(ExecOutput { done, checkpoint })
if done && checkpoint.block_number == previous_stage
);
// Validate the stage execution
@ -66,7 +66,7 @@ macro_rules! stage_test_suite {
let rx = runner.unwind(input).await;
assert_matches::assert_matches!(
rx,
Ok(UnwindOutput { stage_progress }) if stage_progress == input.unwind_to
Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == input.unwind_to
);
// Validate the stage unwind
@ -81,8 +81,8 @@ macro_rules! stage_test_suite {
// Set up the runner
let mut runner = $runner::default();
let execute_input = crate::stage::ExecInput {
previous_stage: Some((crate::test_utils::PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
previous_stage: Some((crate::test_utils::PREV_STAGE_ID, reth_primitives::StageCheckpoint::new(previous_stage))),
checkpoint: Some(reth_primitives::StageCheckpoint::new(stage_progress)),
};
let seed = runner.seed_execution(execute_input).expect("failed to seed");
@ -94,15 +94,17 @@ macro_rules! stage_test_suite {
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
Ok(ExecOutput { done, checkpoint })
if done && checkpoint.block_number == previous_stage
);
assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation");
// Run stage unwind
let unwind_input = crate::stage::UnwindInput {
unwind_to: stage_progress, stage_progress: previous_stage, bad_block: None,
unwind_to: stage_progress,
checkpoint: reth_primitives::StageCheckpoint::new(previous_stage),
bad_block: None,
};
runner.before_unwind(unwind_input).expect("Failed to unwind state");
@ -111,7 +113,7 @@ macro_rules! stage_test_suite {
// Assert the successful unwind result
assert_matches::assert_matches!(
rx,
Ok(UnwindOutput { stage_progress }) if stage_progress == unwind_input.unwind_to
Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == unwind_input.unwind_to
);
// Validate the stage unwind
@ -136,8 +138,8 @@ macro_rules! stage_test_suite_ext {
// Set up the runner
let mut runner = $runner::default();
let input = crate::stage::ExecInput {
previous_stage: Some((crate::test_utils::PREV_STAGE_ID, stage_progress)),
stage_progress: Some(stage_progress),
previous_stage: Some((crate::test_utils::PREV_STAGE_ID, reth_primitives::StageCheckpoint::new(stage_progress))),
checkpoint: Some(reth_primitives::StageCheckpoint::new(stage_progress)),
};
let seed = runner.seed_execution(input).expect("failed to seed");
@ -151,8 +153,8 @@ macro_rules! stage_test_suite_ext {
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == stage_progress
Ok(ExecOutput { done, checkpoint })
if done && checkpoint.block_number == stage_progress
);
// Validate the stage execution

View File

@ -159,7 +159,7 @@ where
impl<T> Compact for Option<T>
where
T: Compact + Default,
T: Compact,
{
/// Returns 0 for `None` and 1 for `Some(_)`.
fn to_compact<B>(self, buf: &mut B) -> usize

View File

@ -44,10 +44,12 @@ impl_compression_for_compact!(
StoredBlockBodyIndices,
StoredBlockOmmers,
StoredBlockWithdrawals,
Bytecode
Bytecode,
AccountBeforeTx,
TransactionSignedNoHash,
CompactU256,
StageCheckpoint
);
impl_compression_for_compact!(AccountBeforeTx, TransactionSignedNoHash);
impl_compression_for_compact!(CompactU256);
macro_rules! impl_compression_fixed_compact {
($($name:tt),+) => {

View File

@ -34,8 +34,8 @@ use crate::{
};
use reth_primitives::{
trie::{BranchNodeCompact, StorageTrieEntry, StoredNibbles, StoredNibblesSubKey},
Account, Address, BlockHash, BlockNumber, Bytecode, Header, IntegerList, Receipt, StorageEntry,
TransactionSignedNoHash, TxHash, TxNumber, H256,
Account, Address, BlockHash, BlockNumber, Bytecode, Header, IntegerList, Receipt,
StageCheckpoint, StorageEntry, TransactionSignedNoHash, TxHash, TxNumber, H256,
};
/// Enum for the types of tables present in libmdbx.
@ -299,8 +299,8 @@ table!(
);
table!(
/// Stores the highest synced block number of each stage.
( SyncStage ) StageId | BlockNumber
/// Stores the highest synced block number and stage-specific checkpoint of each stage.
( SyncStage ) StageId | StageCheckpoint
);
table!(

View File

@ -525,6 +525,7 @@ where
TX: DbTx<'a> + Send + Sync,
{
tx.get::<tables::SyncStage>("Finish".to_string())
.map(|result| result.map(|checkpoint| checkpoint.block_number))
}
/// Fetches the last canonical header from the database.

View File

@ -20,8 +20,8 @@ use reth_db::{
use reth_interfaces::{db::DatabaseError as DbError, provider::ProviderError};
use reth_primitives::{
keccak256, Account, Address, BlockHash, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock,
SealedBlockWithSenders, StorageEntry, TransactionSigned, TransactionSignedEcRecovered,
TxNumber, H256, U256,
SealedBlockWithSenders, StageCheckpoint, StorageEntry, TransactionSigned,
TransactionSignedEcRecovered, TxNumber, H256, U256,
};
use reth_trie::{StateRoot, StateRootError};
use std::{
@ -1031,8 +1031,9 @@ where
) -> Result<(), TransactionError> {
// iterate over all existing stages in the table and update its progress.
let mut cursor = self.cursor_write::<tables::SyncStage>()?;
while let Some((stage_name, _)) = cursor.next()? {
cursor.upsert(stage_name, block_number)?
while let Some((stage_name, checkpoint)) = cursor.next()? {
// TODO(alexey): do we want to invalidate stage-specific checkpoint data?
cursor.upsert(stage_name, StageCheckpoint { block_number, ..checkpoint })?
}
Ok(())