fix(op): stages checkpoints init-state (#8021)

This commit is contained in:
Emilia Hane
2024-05-07 15:29:16 +02:00
committed by GitHub
parent cbc6f268c0
commit f281bbdccd
4 changed files with 70 additions and 27 deletions

View File

@ -2,23 +2,20 @@
use reth_codecs::Compact;
use reth_config::config::EtlConfig;
use reth_db::{
database::Database,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_db::{database::Database, tables, transaction::DbTxMut};
use reth_etl::Collector;
use reth_interfaces::{db::DatabaseError, provider::ProviderResult};
use reth_primitives::{
stage::StageId, Account, Address, Bytecode, ChainSpec, GenesisAccount, Receipts,
StaticFileSegment, StorageEntry, B256, U256,
stage::{StageCheckpoint, StageId},
Account, Address, Bytecode, ChainSpec, GenesisAccount, Receipts, StaticFileSegment,
StorageEntry, B256, U256,
};
use reth_provider::{
bundle_state::{BundleStateInit, RevertsInit},
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, BlockNumReader, BundleStateWithReceipts, ChainSpecProvider,
DatabaseProviderRW, HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError,
ProviderFactory, StaticFileProviderFactory,
ProviderFactory, StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress};
use serde::{Deserialize, Serialize};
@ -114,18 +111,18 @@ pub fn init_genesis<DB: Database>(factory: ProviderFactory<DB>) -> Result<B256,
insert_genesis_history(&provider_rw, alloc.iter())?;
// Insert header
let tx = provider_rw.into_tx();
let tx = provider_rw.tx_ref();
let static_file_provider = factory.static_file_provider();
insert_genesis_header::<DB>(&tx, &static_file_provider, chain.clone())?;
insert_genesis_header::<DB>(tx, &static_file_provider, chain.clone())?;
insert_genesis_state::<DB>(&tx, alloc.len(), alloc.iter())?;
insert_genesis_state::<DB>(tx, alloc.len(), alloc.iter())?;
// insert sync stage
for stage in StageId::ALL.iter() {
tx.put::<tables::StageCheckpoints>(stage.to_string(), Default::default())?;
for stage in StageId::ALL {
provider_rw.save_stage_checkpoint(stage, Default::default())?;
}
tx.commit()?;
provider_rw.commit()?;
static_file_provider.commit()?;
Ok(hash)
@ -343,6 +340,11 @@ pub fn init_from_state_dump<DB: Database>(
);
}
// insert sync stages for stages that require state
for stage in StageId::STATE_REQUIRED {
provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(block))?;
}
provider_rw.commit()?;
Ok(hash)
@ -524,6 +526,7 @@ mod tests {
cursor::DbCursorRO,
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
table::{Table, TableRow},
transaction::DbTx,
DatabaseEnv,
};
use reth_primitives::{

View File

@ -6,6 +6,8 @@ use bytes::Buf;
use reth_codecs::{main_codec, Compact};
use std::ops::RangeInclusive;
use super::StageId;
/// Saves the progress of Merkle stage.
#[derive(Default, Debug, Clone, PartialEq)]
pub struct MerkleCheckpoint {
@ -201,6 +203,25 @@ impl StageCheckpoint {
self
}
/// Sets the block range, if checkpoint uses block range.
pub fn with_block_range(mut self, stage_id: &StageId, from: u64, to: u64) -> Self {
self.stage_checkpoint = Some(match stage_id {
StageId::Execution => StageUnitCheckpoint::Execution(ExecutionCheckpoint::default()),
StageId::AccountHashing => {
StageUnitCheckpoint::Account(AccountHashingCheckpoint::default())
}
StageId::StorageHashing => {
StageUnitCheckpoint::Storage(StorageHashingCheckpoint::default())
}
StageId::IndexStorageHistory | StageId::IndexAccountHistory => {
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint::default())
}
_ => return self,
});
_ = self.stage_checkpoint.map(|mut checkpoint| checkpoint.set_block_range(from, to));
self
}
/// Get the underlying [`EntitiesCheckpoint`], if any, to determine the number of entities
/// processed, and the number of total entities to process.
pub fn entities(&self) -> Option<EntitiesCheckpoint> {
@ -244,6 +265,25 @@ pub enum StageUnitCheckpoint {
IndexHistory(IndexHistoryCheckpoint),
}
impl StageUnitCheckpoint {
/// Sets the block range. Returns old block range, or `None` if checkpoint doesn't use block
/// range.
pub fn set_block_range(&mut self, from: u64, to: u64) -> Option<CheckpointBlockRange> {
match self {
Self::Account(AccountHashingCheckpoint { ref mut block_range, .. }) |
Self::Storage(StorageHashingCheckpoint { ref mut block_range, .. }) |
Self::Execution(ExecutionCheckpoint { ref mut block_range, .. }) |
Self::IndexHistory(IndexHistoryCheckpoint { ref mut block_range, .. }) => {
let old_range = *block_range;
*block_range = CheckpointBlockRange { from, to };
Some(old_range)
}
_ => None,
}
}
}
#[cfg(test)]
impl Default for StageUnitCheckpoint {
fn default() -> Self {

View File

@ -53,6 +53,17 @@ impl StageId {
StageId::Finish,
];
/// Stages that require state.
pub const STATE_REQUIRED: [StageId; 7] = [
StageId::Execution,
StageId::MerkleUnwind,
StageId::AccountHashing,
StageId::StorageHashing,
StageId::MerkleExecute,
StageId::IndexStorageHistory,
StageId::IndexAccountHistory,
];
/// Return stage id formatted as string.
pub fn as_str(&self) -> &str {
match self {