diff --git a/bin/reth/src/args/stage_args.rs b/bin/reth/src/args/stage_args.rs index a46803cf8..00886b718 100644 --- a/bin/reth/src/args/stage_args.rs +++ b/bin/reth/src/args/stage_args.rs @@ -8,6 +8,8 @@ pub enum StageEnum { Bodies, Senders, Execution, + AccountHashing, + StorageHashing, Hashing, Merkle, TxLookup, diff --git a/bin/reth/src/stage/drop.rs b/bin/reth/src/stage/drop.rs index 61882a95d..8fd960f21 100644 --- a/bin/reth/src/stage/drop.rs +++ b/bin/reth/src/stage/drop.rs @@ -63,6 +63,13 @@ impl Command { tool.db.update(|tx| { match &self.stage { + StageEnum::Senders => { + tx.clear::()?; + tx.put::( + StageId::SenderRecovery.to_string(), + Default::default(), + )?; + } StageEnum::Execution => { tx.clear::()?; tx.clear::()?; @@ -76,6 +83,20 @@ impl Command { )?; insert_genesis_state::>(tx, self.chain.genesis())?; } + StageEnum::AccountHashing => { + tx.clear::()?; + tx.put::( + StageId::AccountHashing.to_string(), + Default::default(), + )?; + } + StageEnum::StorageHashing => { + tx.clear::()?; + tx.put::( + StageId::StorageHashing.to_string(), + Default::default(), + )?; + } StageEnum::Hashing => { // Clear hashed accounts tx.clear::()?; diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index f8a45cdf3..a0dfee5d4 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -15,16 +15,17 @@ use reth_primitives::{ stage::{StageCheckpoint, StageId}, ChainSpec, }; -use reth_provider::{ShareableDatabase, Transaction}; +use reth_provider::{providers::get_stage_checkpoint, ShareableDatabase, Transaction}; use reth_staged_sync::utils::init::init_db; use reth_stages::{ stages::{ - BodyStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, - IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, TransactionLookupStage, + AccountHashingStage, BodyStage, ExecutionStage, ExecutionStageThresholds, + IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, + StorageHashingStage, TransactionLookupStage, }, ExecInput, ExecOutput, Stage, UnwindInput, }; -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{any::Any, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc}; use tracing::*; /// `reth stage` command @@ -183,9 +184,7 @@ impl Command { (Box::new(stage), None) } - StageEnum::Senders => { - (Box::new(SenderRecoveryStage { commit_threshold: batch_size }), None) - } + StageEnum::Senders => (Box::new(SenderRecoveryStage::new(batch_size)), None), StageEnum::Execution => { let factory = reth_revm::Factory::new(self.chain.clone()); ( @@ -201,6 +200,12 @@ impl Command { ) } StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None), + StageEnum::AccountHashing => { + (Box::new(AccountHashingStage::new(1, batch_size)), None) + } + StageEnum::StorageHashing => { + (Box::new(StorageHashingStage::new(1, batch_size)), None) + } StageEnum::Merkle => ( Box::new(MerkleStage::default_execution()), Some(Box::new(MerkleStage::default_unwind())), @@ -209,18 +214,16 @@ impl Command { StageEnum::StorageHistory => (Box::::default(), None), _ => return Ok(()), }; + if let Some(unwind_stage) = &unwind_stage { + assert!(exec_stage.type_id() == unwind_stage.type_id()); + } + + let checkpoint = get_stage_checkpoint(tx.deref(), exec_stage.id())?.unwrap_or_default(); + let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage); - let mut input = ExecInput { - previous_stage: Some(( - StageId::Other("No Previous Stage"), - StageCheckpoint::new(self.to), - )), - checkpoint: Some(StageCheckpoint::new(self.from)), - }; - let mut unwind = UnwindInput { - checkpoint: StageCheckpoint::new(self.to), + checkpoint: checkpoint.with_block_number(self.to), unwind_to: self.from, bad_block: None, }; @@ -232,6 +235,14 @@ impl Command { } } + let mut input = ExecInput { + previous_stage: Some(( + StageId::Other("No Previous Stage"), + StageCheckpoint::new(self.to), + )), + checkpoint: Some(checkpoint.with_block_number(self.from)), + }; + while let ExecOutput { checkpoint: stage_progress, done: false } = exec_stage.execute(&mut tx, input).await? { diff --git a/crates/interfaces/src/db.rs b/crates/interfaces/src/db.rs index 0607b91b6..3b814c059 100644 --- a/crates/interfaces/src/db.rs +++ b/crates/interfaces/src/db.rs @@ -28,4 +28,7 @@ pub enum DatabaseError { /// Failed to decode a key from a table. #[error("Error decoding value.")] DecodeError, + /// Failed to get database stats. + #[error("Database stats error code: {0:?}")] + Stats(i32), } diff --git a/crates/primitives/src/stage/checkpoints.rs b/crates/primitives/src/stage/checkpoints.rs index 50a4d4857..5913639be 100644 --- a/crates/primitives/src/stage/checkpoints.rs +++ b/crates/primitives/src/stage/checkpoints.rs @@ -1,11 +1,14 @@ use crate::{ trie::{hash_builder::HashBuilderState, StoredSubNode}, - Address, BlockNumber, TxNumber, H256, + Address, BlockNumber, H256, }; use bytes::{Buf, BufMut}; use reth_codecs::{derive_arbitrary, main_codec, Compact}; use serde::{Deserialize, Serialize}; -use std::fmt::{Display, Formatter}; +use std::{ + fmt::{Display, Formatter}, + ops::RangeInclusive, +}; /// Saves the progress of Merkle stage. #[derive(Default, Debug, Clone, PartialEq)] @@ -101,26 +104,56 @@ impl Compact for MerkleCheckpoint { #[main_codec] #[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] pub struct AccountHashingCheckpoint { - /// The next account to start hashing from + /// The next account to start hashing from. pub address: Option
, - /// Start transition id - pub from: u64, - /// Last transition id - pub to: u64, + /// Block range which this checkpoint is valid for. + pub block_range: CheckpointBlockRange, + /// Progress measured in accounts. + pub progress: EntitiesCheckpoint, } /// Saves the progress of StorageHashing stage. #[main_codec] #[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] pub struct StorageHashingCheckpoint { - /// The next account to start hashing from + /// The next account to start hashing from. pub address: Option
, - /// The next storage slot to start hashing from + /// The next storage slot to start hashing from. pub storage: Option, - /// Start transition id - pub from: u64, - /// Last transition id - pub to: u64, + /// Block range which this checkpoint is valid for. + pub block_range: CheckpointBlockRange, + /// Progress measured in storage slots. + pub progress: EntitiesCheckpoint, +} + +/// Saves the progress of Execution stage. +#[main_codec] +#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] +pub struct ExecutionCheckpoint { + /// Block range which this checkpoint is valid for. + pub block_range: CheckpointBlockRange, + /// Progress measured in gas. + pub progress: EntitiesCheckpoint, +} + +/// Saves the progress of Headers stage. +#[main_codec] +#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] +pub struct HeadersCheckpoint { + /// Block range which this checkpoint is valid for. + pub block_range: CheckpointBlockRange, + /// Progress measured in gas. + pub progress: EntitiesCheckpoint, +} + +/// Saves the progress of Index History stages. +#[main_codec] +#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] +pub struct IndexHistoryCheckpoint { + /// Block range which this checkpoint is valid for. + pub block_range: CheckpointBlockRange, + /// Progress measured in changesets. + pub progress: EntitiesCheckpoint, } /// Saves the progress of abstract stage iterating over or downloading entities. @@ -130,16 +163,35 @@ pub struct EntitiesCheckpoint { /// Number of entities already processed. pub processed: u64, /// Total entities to be processed. - pub total: Option, + pub total: u64, } impl Display for EntitiesCheckpoint { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if let Some(total) = self.total { - write!(f, "{:.1}%", 100.0 * self.processed as f64 / total as f64) - } else { - write!(f, "{}", self.processed) - } + write!(f, "{:.1}%", 100.0 * self.processed as f64 / self.total as f64) + } +} + +/// Saves the block range. Usually, it's used to check the validity of some stage checkpoint across +/// multiple executions. +#[main_codec] +#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] +pub struct CheckpointBlockRange { + /// The first block of the range, inclusive. + pub from: BlockNumber, + /// The last block of the range, inclusive. + pub to: BlockNumber, +} + +impl From> for CheckpointBlockRange { + fn from(range: RangeInclusive) -> Self { + Self { from: *range.start(), to: *range.end() } + } +} + +impl From<&RangeInclusive> for CheckpointBlockRange { + fn from(range: &RangeInclusive) -> Self { + Self { from: *range.start(), to: *range.end() } } } @@ -183,6 +235,36 @@ impl StageCheckpoint { } } + /// Returns the execution stage checkpoint, if any. + pub fn execution_stage_checkpoint(&self) -> Option { + match self.stage_checkpoint { + Some(StageUnitCheckpoint::Execution(checkpoint)) => Some(checkpoint), + _ => None, + } + } + + /// Returns the headers stage checkpoint, if any. + pub fn headers_stage_checkpoint(&self) -> Option { + match self.stage_checkpoint { + Some(StageUnitCheckpoint::Headers(checkpoint)) => Some(checkpoint), + _ => None, + } + } + + /// Returns the index history stage checkpoint, if any. + pub fn index_history_stage_checkpoint(&self) -> Option { + match self.stage_checkpoint { + Some(StageUnitCheckpoint::IndexHistory(checkpoint)) => Some(checkpoint), + _ => None, + } + } + + /// Sets the block number. + pub fn with_block_number(mut self, block_number: BlockNumber) -> Self { + self.block_number = block_number; + self + } + /// Sets the stage checkpoint to account hashing. pub fn with_account_hashing_stage_checkpoint( mut self, @@ -206,13 +288,48 @@ impl StageCheckpoint { self.stage_checkpoint = Some(StageUnitCheckpoint::Entities(checkpoint)); self } + + /// Sets the stage checkpoint to execution. + pub fn with_execution_stage_checkpoint(mut self, checkpoint: ExecutionCheckpoint) -> Self { + self.stage_checkpoint = Some(StageUnitCheckpoint::Execution(checkpoint)); + self + } + + /// Sets the stage checkpoint to headers. + pub fn with_headers_stage_checkpoint(mut self, checkpoint: HeadersCheckpoint) -> Self { + self.stage_checkpoint = Some(StageUnitCheckpoint::Headers(checkpoint)); + self + } + + /// Sets the stage checkpoint to index history. + pub fn with_index_history_stage_checkpoint( + mut self, + checkpoint: IndexHistoryCheckpoint, + ) -> Self { + self.stage_checkpoint = Some(StageUnitCheckpoint::IndexHistory(checkpoint)); + self + } } impl Display for StageCheckpoint { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.stage_checkpoint { - Some(StageUnitCheckpoint::Entities(stage_checkpoint)) => stage_checkpoint.fmt(f), - _ => write!(f, "{}", self.block_number), + Some( + StageUnitCheckpoint::Account(AccountHashingCheckpoint { + progress: entities, .. + }) | + StageUnitCheckpoint::Storage(StorageHashingCheckpoint { + progress: entities, .. + }) | + StageUnitCheckpoint::Entities(entities) | + StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress: entities, .. }) | + StageUnitCheckpoint::Headers(HeadersCheckpoint { progress: entities, .. }) | + StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint { + progress: entities, + .. + }), + ) => entities.fmt(f), + None => write!(f, "{}", self.block_number), } } } @@ -223,14 +340,18 @@ impl Display for StageCheckpoint { #[derive_arbitrary(compact)] #[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] pub enum StageUnitCheckpoint { - /// Saves the progress of transaction-indexed stages. - Transaction(TxNumber), /// Saves the progress of AccountHashing stage. Account(AccountHashingCheckpoint), /// Saves the progress of StorageHashing stage. Storage(StorageHashingCheckpoint), /// Saves the progress of abstract stage iterating over or downloading entities. Entities(EntitiesCheckpoint), + /// Saves the progress of Execution stage. + Execution(ExecutionCheckpoint), + /// Saves the progress of Headers stage. + Headers(HeadersCheckpoint), + /// Saves the progress of Index History stage. + IndexHistory(IndexHistoryCheckpoint), } impl Compact for StageUnitCheckpoint { @@ -239,22 +360,30 @@ impl Compact for StageUnitCheckpoint { B: BufMut + AsMut<[u8]>, { match self { - StageUnitCheckpoint::Transaction(data) => { + StageUnitCheckpoint::Account(data) => { buf.put_u8(0); 1 + data.to_compact(buf) } - StageUnitCheckpoint::Account(data) => { + StageUnitCheckpoint::Storage(data) => { buf.put_u8(1); 1 + data.to_compact(buf) } - StageUnitCheckpoint::Storage(data) => { + StageUnitCheckpoint::Entities(data) => { buf.put_u8(2); 1 + data.to_compact(buf) } - StageUnitCheckpoint::Entities(data) => { + StageUnitCheckpoint::Execution(data) => { buf.put_u8(3); 1 + data.to_compact(buf) } + StageUnitCheckpoint::Headers(data) => { + buf.put_u8(4); + 1 + data.to_compact(buf) + } + StageUnitCheckpoint::IndexHistory(data) => { + buf.put_u8(5); + 1 + data.to_compact(buf) + } } } @@ -264,21 +393,29 @@ impl Compact for StageUnitCheckpoint { { match buf[0] { 0 => { - let (data, buf) = TxNumber::from_compact(&buf[1..], buf.len() - 1); - (Self::Transaction(data), buf) - } - 1 => { let (data, buf) = AccountHashingCheckpoint::from_compact(&buf[1..], buf.len() - 1); (Self::Account(data), buf) } - 2 => { + 1 => { let (data, buf) = StorageHashingCheckpoint::from_compact(&buf[1..], buf.len() - 1); (Self::Storage(data), buf) } - 3 => { + 2 => { let (data, buf) = EntitiesCheckpoint::from_compact(&buf[1..], buf.len() - 1); (Self::Entities(data), buf) } + 3 => { + let (data, buf) = ExecutionCheckpoint::from_compact(&buf[1..], buf.len() - 1); + (Self::Execution(data), buf) + } + 4 => { + let (data, buf) = HeadersCheckpoint::from_compact(&buf[1..], buf.len() - 1); + (Self::Headers(data), buf) + } + 5 => { + let (data, buf) = IndexHistoryCheckpoint::from_compact(&buf[1..], buf.len() - 1); + (Self::IndexHistory(data), buf) + } _ => unreachable!("Junk data in database: unknown StageUnitCheckpoint variant"), } } @@ -314,17 +451,40 @@ mod tests { fn stage_unit_checkpoint_roundtrip() { let mut rng = rand::thread_rng(); let checkpoints = vec![ - StageUnitCheckpoint::Transaction(rng.gen()), StageUnitCheckpoint::Account(AccountHashingCheckpoint { address: Some(Address::from_low_u64_be(rng.gen())), - from: rng.gen(), - to: rng.gen(), + block_range: CheckpointBlockRange { from: rng.gen(), to: rng.gen() }, + progress: EntitiesCheckpoint { + processed: rng.gen::() as u64, + total: u32::MAX as u64 + rng.gen::(), + }, }), StageUnitCheckpoint::Storage(StorageHashingCheckpoint { address: Some(Address::from_low_u64_be(rng.gen())), storage: Some(H256::from_low_u64_be(rng.gen())), - from: rng.gen(), - to: rng.gen(), + block_range: CheckpointBlockRange { from: rng.gen(), to: rng.gen() }, + progress: EntitiesCheckpoint { + processed: rng.gen::() as u64, + total: u32::MAX as u64 + rng.gen::(), + }, + }), + StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed: rng.gen::() as u64, + total: u32::MAX as u64 + rng.gen::(), + }), + StageUnitCheckpoint::Execution(ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: rng.gen(), to: rng.gen() }, + progress: EntitiesCheckpoint { + processed: rng.gen::() as u64, + total: u32::MAX as u64 + rng.gen::(), + }, + }), + StageUnitCheckpoint::Headers(HeadersCheckpoint { + block_range: CheckpointBlockRange { from: rng.gen(), to: rng.gen() }, + progress: EntitiesCheckpoint { + processed: rng.gen::() as u64, + total: u32::MAX as u64 + rng.gen::(), + }, }), ]; diff --git a/crates/primitives/src/stage/mod.rs b/crates/primitives/src/stage/mod.rs index 1d3e37b8e..ffe52554d 100644 --- a/crates/primitives/src/stage/mod.rs +++ b/crates/primitives/src/stage/mod.rs @@ -5,6 +5,7 @@ pub use id::StageId; mod checkpoints; pub use checkpoints::{ - AccountHashingCheckpoint, EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint, + AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, + HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint, StageUnitCheckpoint, StorageHashingCheckpoint, }; diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 8e7727028..204f84713 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -253,30 +253,29 @@ where let span = info_span!("Unwinding", stage = %stage_id); let _enter = span.enter(); - let mut stage_progress = tx.get_stage_checkpoint(stage_id)?.unwrap_or_default(); - if stage_progress.block_number < to { - debug!(target: "sync::pipeline", from = %stage_progress, %to, "Unwind point too far for stage"); + let mut checkpoint = tx.get_stage_checkpoint(stage_id)?.unwrap_or_default(); + if checkpoint.block_number < to { + debug!(target: "sync::pipeline", from = %checkpoint, %to, "Unwind point too far for stage"); self.listeners.notify(PipelineEvent::Skipped { stage_id }); continue } - debug!(target: "sync::pipeline", from = %stage_progress, %to, ?bad_block, "Starting unwind"); - while stage_progress.block_number > to { - let input = UnwindInput { checkpoint: stage_progress, unwind_to: to, bad_block }; + debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind"); + while checkpoint.block_number > to { + let input = UnwindInput { checkpoint, unwind_to: to, bad_block }; self.listeners.notify(PipelineEvent::Unwinding { stage_id, input }); let output = stage.unwind(&mut tx, input).await; match output { Ok(unwind_output) => { - stage_progress = unwind_output.checkpoint; + checkpoint = unwind_output.checkpoint; self.metrics.stage_checkpoint( - stage_id, - stage_progress, + stage_id, checkpoint, // We assume it was set in the previous execute iteration, so it // doesn't change when we unwind. None, ); - tx.save_stage_checkpoint(stage_id, stage_progress)?; + tx.save_stage_checkpoint(stage_id, checkpoint)?; self.listeners .notify(PipelineEvent::Unwound { stage_id, result: unwind_output }); @@ -304,6 +303,7 @@ where let stage = &mut self.stages[stage_index]; let stage_id = stage.id(); let mut made_progress = false; + loop { let mut tx = Transaction::new(&self.db)?; diff --git a/crates/stages/src/pipeline/sync_metrics.rs b/crates/stages/src/pipeline/sync_metrics.rs index 886d091aa..04a7d6358 100644 --- a/crates/stages/src/pipeline/sync_metrics.rs +++ b/crates/stages/src/pipeline/sync_metrics.rs @@ -3,7 +3,11 @@ use reth_metrics::{ Metrics, }; use reth_primitives::{ - stage::{EntitiesCheckpoint, StageCheckpoint, StageId, StageUnitCheckpoint}, + stage::{ + AccountHashingCheckpoint, EntitiesCheckpoint, ExecutionCheckpoint, HeadersCheckpoint, + IndexHistoryCheckpoint, StageCheckpoint, StageId, StageUnitCheckpoint, + StorageHashingCheckpoint, + }, BlockNumber, }; use std::collections::HashMap; @@ -39,13 +43,19 @@ impl Metrics { stage_metrics.checkpoint.set(checkpoint.block_number as f64); let (processed, total) = match checkpoint.stage_checkpoint { - Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { processed, total })) => { - (processed, total) - } - _ => (checkpoint.block_number, max_block_number), + Some( + StageUnitCheckpoint::Account(AccountHashingCheckpoint { progress, .. }) | + StageUnitCheckpoint::Storage(StorageHashingCheckpoint { progress, .. }) | + StageUnitCheckpoint::Entities(progress @ EntitiesCheckpoint { .. }) | + StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress, .. }) | + StageUnitCheckpoint::Headers(HeadersCheckpoint { progress, .. }) | + StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint { progress, .. }), + ) => (progress.processed, Some(progress.total)), + None => (checkpoint.block_number, max_block_number), }; stage_metrics.entities_processed.set(processed as f64); + if let Some(total) = total { stage_metrics.entities_total.set(total as f64); } diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 7de949911..bd3b22966 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -14,14 +14,14 @@ use std::{ /// Stage execution input, see [Stage::execute]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct ExecInput { - /// The stage that was run before the current stage and the progress it reached. + /// The stage that was run before the current stage and the checkpoint it reached. pub previous_stage: Option<(StageId, StageCheckpoint)>, - /// The progress of this stage the last time it was executed. + /// The checkpoint of this stage the last time it was executed. pub checkpoint: Option, } impl ExecInput { - /// Return the progress of the stage or default. + /// Return the checkpoint of the stage or default. pub fn checkpoint(&self) -> StageCheckpoint { self.checkpoint.unwrap_or_default() } @@ -63,7 +63,7 @@ impl ExecInput { /// Stage unwind input, see [Stage::unwind]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct UnwindInput { - /// The current highest progress of the stage. + /// The current highest checkpoint of the stage. pub checkpoint: StageCheckpoint, /// The block to unwind to. pub unwind_to: BlockNumber, @@ -114,7 +114,7 @@ impl ExecOutput { /// The output of a stage unwinding. #[derive(Debug, PartialEq, Eq, Clone)] pub struct UnwindOutput { - /// The block at which the stage has unwound to. + /// The checkpoint at which the stage has unwound to. pub checkpoint: StageCheckpoint, } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 454e059ba..74a98ef3a 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -6,19 +6,22 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; +use reth_interfaces::db::DatabaseError; use reth_metrics::{ metrics::{self, Gauge}, Metrics, }; use reth_primitives::{ constants::MGAS_TO_GAS, - stage::{StageCheckpoint, StageId}, - Block, BlockNumber, BlockWithSenders, TransactionSigned, U256, + stage::{ + CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, StageCheckpoint, StageId, + }, + Block, BlockNumber, BlockWithSenders, Header, TransactionSigned, U256, }; use reth_provider::{ post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction, }; -use std::time::Instant; +use std::{ops::RangeInclusive, time::Instant}; use tracing::*; /// Execution stage metrics. @@ -143,6 +146,8 @@ impl ExecutionStage { // Progress tracking let mut stage_progress = start_block; + let mut stage_checkpoint = + execution_checkpoint(tx, start_block, max_block, input.checkpoint())?; // Execute block range let mut state = PostState::default(); @@ -169,6 +174,7 @@ impl ExecutionStage { // Merge state changes state.extend(block_state); stage_progress = block_number; + stage_checkpoint.progress.processed += block.gas_used; // Write history periodically to free up memory if self.thresholds.should_write_history(state.changeset_size_hint() as u64) { @@ -193,10 +199,99 @@ impl ExecutionStage { let is_final_range = stage_progress == max_block; info!(target: "sync::stages::execution", stage_progress, is_final_range, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress), done: is_final_range }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(stage_progress) + .with_execution_stage_checkpoint(stage_checkpoint), + done: is_final_range, + }) } } +fn execution_checkpoint( + tx: &Transaction<'_, DB>, + start_block: BlockNumber, + max_block: BlockNumber, + checkpoint: StageCheckpoint, +) -> Result { + Ok(match checkpoint.execution_stage_checkpoint() { + // If checkpoint block range fully matches our range, + // we take the previously used stage checkpoint as-is. + Some(stage_checkpoint @ ExecutionCheckpoint { block_range, .. }) + if block_range == CheckpointBlockRange::from(start_block..=max_block) => + { + stage_checkpoint + } + // If checkpoint block range precedes our range seamlessly, we take the previously used + // stage checkpoint and add the amount of gas from our range to the checkpoint total. + Some(ExecutionCheckpoint { + block_range: CheckpointBlockRange { to, .. }, + progress: EntitiesCheckpoint { processed, total }, + }) if to == start_block - 1 => ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: start_block, to: max_block }, + progress: EntitiesCheckpoint { + processed, + total: total + calculate_gas_used_from_headers(tx, start_block..=max_block)?, + }, + }, + // If checkpoint block range ends on the same block as our range, we take the previously + // used stage checkpoint. + Some(ExecutionCheckpoint { block_range: CheckpointBlockRange { to, .. }, progress }) + if to == max_block => + { + ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: start_block, to: max_block }, + progress, + } + } + // If there's any other non-empty checkpoint, we calculate the remaining amount of total gas + // to be processed not including the checkpoint range. + Some(ExecutionCheckpoint { progress: EntitiesCheckpoint { processed, .. }, .. }) => { + let after_checkpoint_block_number = + calculate_gas_used_from_headers(tx, checkpoint.block_number + 1..=max_block)?; + + ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: start_block, to: max_block }, + progress: EntitiesCheckpoint { + processed, + total: processed + after_checkpoint_block_number, + }, + } + } + // Otherwise, we recalculate the whole stage checkpoint including the amount of gas + // already processed, if there's any. + _ => { + let processed = calculate_gas_used_from_headers(tx, 0..=start_block - 1)?; + + ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: start_block, to: max_block }, + progress: EntitiesCheckpoint { + processed, + total: processed + + calculate_gas_used_from_headers(tx, start_block..=max_block)?, + }, + } + } + }) +} + +fn calculate_gas_used_from_headers( + tx: &Transaction<'_, DB>, + range: RangeInclusive, +) -> Result { + let mut gas_total = 0; + + let start = Instant::now(); + for entry in tx.cursor_read::()?.walk_range(range.clone())? { + let (_, Header { gas_used, .. }) = entry?; + gas_total += gas_used; + } + + let duration = start.elapsed(); + trace!(target: "sync::stages::execution", ?range, ?duration, "Time elapsed in calculate_gas_used_from_headers"); + + Ok(gas_total) +} + /// The size of the stack used by the executor. /// /// Ensure the size is aligned to 8 as this is usually more efficient. @@ -290,14 +385,7 @@ impl Stage for ExecutionStage { } // Discard unwinded changesets - let mut rev_acc_changeset_walker = account_changeset.walk_back(None)?; - while let Some((block_num, _)) = rev_acc_changeset_walker.next().transpose()? { - if block_num <= unwind_to { - break - } - // delete all changesets - rev_acc_changeset_walker.delete_current()?; - } + tx.unwind_table_by_num::(unwind_to)?; let mut rev_storage_changeset_walker = storage_changeset.walk_back(None)?; while let Some((key, _)) = rev_storage_changeset_walker.next().transpose()? { @@ -311,13 +399,31 @@ impl Stage for ExecutionStage { // Look up the start index for the transaction range let first_tx_num = tx.block_body_indices(*range.start())?.first_tx_num(); + let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint(); + // Unwind all receipts for transactions in the block range - tx.unwind_table_by_num::(first_tx_num)?; - // `unwind_table_by_num` doesn't unwind the provided key, so we need to unwind it manually - tx.delete::(first_tx_num, None)?; + let mut cursor = tx.cursor_write::()?; + let mut reverse_walker = cursor.walk_back(None)?; + + while let Some(Ok((tx_number, receipt))) = reverse_walker.next() { + if tx_number < first_tx_num { + break + } + reverse_walker.delete_current()?; + + if let Some(stage_checkpoint) = stage_checkpoint.as_mut() { + stage_checkpoint.progress.processed -= receipt.cumulative_gas_used; + } + } + + let checkpoint = if let Some(stage_checkpoint) = stage_checkpoint { + StageCheckpoint::new(unwind_to).with_execution_stage_checkpoint(stage_checkpoint) + } else { + StageCheckpoint::new(unwind_to) + }; info!(target: "sync::stages::execution", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) }) + Ok(UnwindOutput { checkpoint }) } } @@ -371,13 +477,14 @@ impl ExecutionStageThresholds { mod tests { use super::*; use crate::test_utils::{TestTransaction, PREV_STAGE_ID}; + use assert_matches::assert_matches; use reth_db::{ mdbx::{test_utils::create_test_db, EnvKind, WriteMap}, models::AccountBeforeTx, }; use reth_primitives::{ - hex_literal::hex, keccak256, Account, Bytecode, ChainSpecBuilder, SealedBlock, - StorageEntry, H160, H256, U256, + hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode, + ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, U256, }; use reth_provider::insert_canonical_block; use reth_revm::Factory; @@ -400,6 +507,124 @@ mod tests { ) } + #[test] + fn execution_checkpoint_matches() { + let state_db = create_test_db::(EnvKind::RW); + let tx = Transaction::new(state_db.as_ref()).unwrap(); + + let previous_stage_checkpoint = ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: 0, to: 0 }, + progress: EntitiesCheckpoint { processed: 1, total: 2 }, + }; + let previous_checkpoint = StageCheckpoint { + block_number: 0, + stage_checkpoint: Some(StageUnitCheckpoint::Execution(previous_stage_checkpoint)), + }; + + let stage_checkpoint = execution_checkpoint( + &tx, + previous_stage_checkpoint.block_range.from, + previous_stage_checkpoint.block_range.to, + previous_checkpoint, + ); + + assert_eq!(stage_checkpoint, Ok(previous_stage_checkpoint)); + } + + #[test] + fn execution_checkpoint_precedes() { + let state_db = create_test_db::(EnvKind::RW); + let mut tx = Transaction::new(state_db.as_ref()).unwrap(); + + let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); + let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap(); + let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); + let block = SealedBlock::decode(&mut block_rlp).unwrap(); + insert_canonical_block(tx.deref_mut(), genesis, None).unwrap(); + insert_canonical_block(tx.deref_mut(), block.clone(), None).unwrap(); + tx.commit().unwrap(); + + let previous_stage_checkpoint = ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: 0, to: 0 }, + progress: EntitiesCheckpoint { processed: 1, total: 1 }, + }; + let previous_checkpoint = StageCheckpoint { + block_number: 1, + stage_checkpoint: Some(StageUnitCheckpoint::Execution(previous_stage_checkpoint)), + }; + + let stage_checkpoint = execution_checkpoint(&tx, 1, 1, previous_checkpoint); + + assert_matches!(stage_checkpoint, Ok(ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: 1, to: 1 }, + progress: EntitiesCheckpoint { + processed, + total + } + }) if processed == previous_stage_checkpoint.progress.processed && + total == previous_stage_checkpoint.progress.total + block.gas_used); + } + + #[test] + fn execution_checkpoint_recalculate_full_previous_some() { + let state_db = create_test_db::(EnvKind::RW); + let mut tx = Transaction::new(state_db.as_ref()).unwrap(); + + let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); + let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap(); + let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); + let block = SealedBlock::decode(&mut block_rlp).unwrap(); + insert_canonical_block(tx.deref_mut(), genesis, None).unwrap(); + insert_canonical_block(tx.deref_mut(), block.clone(), None).unwrap(); + tx.commit().unwrap(); + + let previous_stage_checkpoint = ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: 0, to: 0 }, + progress: EntitiesCheckpoint { processed: 1, total: 1 }, + }; + let previous_checkpoint = StageCheckpoint { + block_number: 1, + stage_checkpoint: Some(StageUnitCheckpoint::Execution(previous_stage_checkpoint)), + }; + + let stage_checkpoint = execution_checkpoint(&tx, 1, 1, previous_checkpoint); + + assert_matches!(stage_checkpoint, Ok(ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: 1, to: 1 }, + progress: EntitiesCheckpoint { + processed, + total + } + }) if processed == previous_stage_checkpoint.progress.processed && + total == previous_stage_checkpoint.progress.total + block.gas_used); + } + + #[test] + fn execution_checkpoint_recalculate_full_previous_none() { + let state_db = create_test_db::(EnvKind::RW); + let mut tx = Transaction::new(state_db.as_ref()).unwrap(); + + let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); + let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap(); + let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); + let block = SealedBlock::decode(&mut block_rlp).unwrap(); + insert_canonical_block(tx.deref_mut(), genesis, None).unwrap(); + insert_canonical_block(tx.deref_mut(), block.clone(), None).unwrap(); + tx.commit().unwrap(); + + let previous_checkpoint = StageCheckpoint { block_number: 1, stage_checkpoint: None }; + + let stage_checkpoint = execution_checkpoint(&tx, 1, 1, previous_checkpoint); + + assert_matches!(stage_checkpoint, Ok(ExecutionCheckpoint { + block_range: CheckpointBlockRange { from: 1, to: 1 }, + progress: EntitiesCheckpoint { + processed: 0, + total + } + }) if total == block.gas_used); + } + #[tokio::test] async fn sanity_execution_of_block() { // TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332 @@ -444,7 +669,22 @@ mod tests { let mut execution_stage = stage(); let output = execution_stage.execute(&mut tx, input).await.unwrap(); tx.commit().unwrap(); - assert_eq!(output, ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }); + assert_matches!(output, ExecOutput { + checkpoint: StageCheckpoint { + block_number: 1, + stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint { + block_range: CheckpointBlockRange { + from: 1, + to: 1, + }, + progress: EntitiesCheckpoint { + processed, + total + } + })) + }, + done: true + } if processed == total && total == block.gas_used); let tx = tx.deref_mut(); // check post state let account1 = H160(hex!("1000000000000000000000000000000000000000")); @@ -526,19 +766,33 @@ mod tests { // execute let mut execution_stage = stage(); - let _ = execution_stage.execute(&mut tx, input).await.unwrap(); + let result = execution_stage.execute(&mut tx, input).await.unwrap(); tx.commit().unwrap(); let mut stage = stage(); - let o = stage + let result = stage .unwind( &mut tx, - UnwindInput { checkpoint: StageCheckpoint::new(1), unwind_to: 0, bad_block: None }, + UnwindInput { checkpoint: result.checkpoint, unwind_to: 0, bad_block: None }, ) .await .unwrap(); - assert_eq!(o, UnwindOutput { checkpoint: StageCheckpoint::new(0) }); + assert_matches!(result, UnwindOutput { + checkpoint: StageCheckpoint { + block_number: 0, + stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint { + block_range: CheckpointBlockRange { + from: 1, + to: 1, + }, + progress: EntitiesCheckpoint { + processed: 0, + total + } + })) + } + } if total == block.gas_used); // assert unwind stage let db_tx = tx.deref(); diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index fd96eb8b5..dbbc1b423 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -8,15 +8,19 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, RawKey, RawTable, }; +use reth_interfaces::db::DatabaseError; use reth_primitives::{ keccak256, - stage::{AccountHashingCheckpoint, StageCheckpoint, StageId}, + stage::{ + AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, + StageId, + }, }; use reth_provider::Transaction; use std::{ cmp::max, fmt::Debug, - ops::{Range, RangeInclusive}, + ops::{Deref, Range, RangeInclusive}, }; use tokio::sync::mpsc; use tracing::*; @@ -32,6 +36,13 @@ pub struct AccountHashingStage { pub commit_threshold: u64, } +impl AccountHashingStage { + /// Create new instance of [AccountHashingStage]. + pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self { + Self { clean_threshold, commit_threshold } + } +} + impl Default for AccountHashingStage { fn default() -> Self { Self { clean_threshold: 500_000, commit_threshold: 100_000 } @@ -140,10 +151,10 @@ impl Stage for AccountHashingStage { .and_then(|checkpoint| checkpoint.account_hashing_stage_checkpoint()); let start_address = match stage_checkpoint { - Some(AccountHashingCheckpoint { address: address @ Some(_), from, to }) + Some(AccountHashingCheckpoint { address: address @ Some(_), block_range: CheckpointBlockRange { from, to }, .. }) // Checkpoint is only valid if the range of transitions didn't change. // An already hashed account may have been changed with the new range, - // and therefore should be hashed again. + // and therefore should be hashed again. if from == from_block && to == to_block => { debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner account hashing checkpoint"); @@ -219,30 +230,36 @@ impl Stage for AccountHashingStage { let checkpoint = input.checkpoint().with_account_hashing_stage_checkpoint( AccountHashingCheckpoint { address: Some(next_address.key().unwrap()), - from: from_block, - to: to_block, + block_range: CheckpointBlockRange { from: from_block, to: to_block }, + progress: stage_checkpoint_progress(tx)?, }, ); - info!(target: "sync::stages::hashing_account", stage_progress = %checkpoint, is_final_range = false, "Stage iteration finished"); + + info!(target: "sync::stages::hashing_account", checkpoint = %checkpoint, is_final_range = false, "Stage iteration finished"); return Ok(ExecOutput { checkpoint, done: false }) } } else { - // Aggregate all transition changesets and and make list of account that have been + // Aggregate all transition changesets and make a list of accounts that have been // changed. let lists = tx.get_addresses_of_changed_accounts(from_block..=to_block)?; - // iterate over plain state and get newest value. + // Iterate over plain state and get newest value. // Assumption we are okay to make is that plainstate represent // `previous_stage_progress` state. - let accounts = tx.get_plainstate_accounts(lists.into_iter())?; - // insert and hash accounts to hashing table + let accounts = tx.get_plainstate_accounts(lists)?; + // Insert and hash accounts to hashing table tx.insert_account_for_hashing(accounts.into_iter())?; } // We finished the hashing stage, no future iterations is expected for the same block range, // so no checkpoint is needed. - let checkpoint = input.previous_stage_checkpoint(); + let checkpoint = input.previous_stage_checkpoint().with_account_hashing_stage_checkpoint( + AccountHashingCheckpoint { + progress: stage_checkpoint_progress(tx)?, + ..Default::default() + }, + ); - info!(target: "sync::stages::hashing_account", stage_progress = %checkpoint, is_final_range = true, "Stage iteration finished"); + info!(target: "sync::stages::hashing_account", checkpoint = %checkpoint, is_final_range = true, "Stage iteration finished"); Ok(ExecOutput { checkpoint, done: true }) } @@ -255,14 +272,31 @@ impl Stage for AccountHashingStage { let (range, unwind_progress, is_final_range) = input.unwind_block_range_with_threshold(self.commit_threshold); - // Aggregate all transition changesets and and make list of account that have been changed. + // Aggregate all transition changesets and make a list of accounts that have been changed. tx.unwind_account_hashing(range)?; - info!(target: "sync::stages::hashing_account", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) + let mut stage_checkpoint = + input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default(); + + stage_checkpoint.progress = stage_checkpoint_progress(tx)?; + + info!(target: "sync::stages::hashing_account", to_block = input.unwind_to, %unwind_progress, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { + checkpoint: StageCheckpoint::new(unwind_progress) + .with_account_hashing_stage_checkpoint(stage_checkpoint), + }) } } +fn stage_checkpoint_progress( + tx: &Transaction<'_, DB>, +) -> Result { + Ok(EntitiesCheckpoint { + processed: tx.deref().entries::()? as u64, + total: tx.deref().entries::()? as u64, + }) +} + #[cfg(test)] mod tests { use super::*; @@ -293,7 +327,24 @@ mod tests { let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput {checkpoint: StageCheckpoint { block_number, ..}, done: true}) if block_number == previous_stage); + assert_matches!( + result, + Ok(ExecOutput { + checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint { + progress: EntitiesCheckpoint { + processed, + total, + }, + .. + })), + }, + done: true, + }) if block_number == previous_stage && + processed == total && + total == runner.tx.table::().unwrap().len() as u64 + ); // Validate the stage execution assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); @@ -337,11 +388,19 @@ mod tests { checkpoint: StageCheckpoint { block_number: 10, stage_checkpoint: Some(StageUnitCheckpoint::Account( - AccountHashingCheckpoint { address: Some(address), from: 11, to: 20 } + AccountHashingCheckpoint { + address: Some(address), + block_range: CheckpointBlockRange { + from: 11, + to: 20, + }, + progress: EntitiesCheckpoint { processed: 5, total } + } )) }, done: false - }) if address == fifth_address + }) if address == fifth_address && + total == runner.tx.table::().unwrap().len() as u64 ); assert_eq!(runner.tx.table::().unwrap().len(), 5); @@ -353,9 +412,22 @@ mod tests { assert_matches!( result, Ok(ExecOutput { - checkpoint: StageCheckpoint { block_number: 20, stage_checkpoint: None }, + checkpoint: StageCheckpoint { + block_number: 20, + stage_checkpoint: Some(StageUnitCheckpoint::Account( + AccountHashingCheckpoint { + address: None, + block_range: CheckpointBlockRange { + from: 0, + to: 0, + }, + progress: EntitiesCheckpoint { processed, total } + } + )) + }, done: true - }) + }) if processed == total && + total == runner.tx.table::().unwrap().len() as u64 ); assert_eq!(runner.tx.table::().unwrap().len(), 10); diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 1bfa76a13..ea10d27e2 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -7,13 +7,17 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; +use reth_interfaces::db::DatabaseError; use reth_primitives::{ keccak256, - stage::{StageCheckpoint, StageId, StorageHashingCheckpoint}, + stage::{ + CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, StageId, + StorageHashingCheckpoint, + }, StorageEntry, }; use reth_provider::Transaction; -use std::{collections::BTreeMap, fmt::Debug}; +use std::{collections::BTreeMap, fmt::Debug, ops::Deref}; use tracing::*; /// Storage hashing stage hashes plain storage. @@ -27,6 +31,13 @@ pub struct StorageHashingStage { pub commit_threshold: u64, } +impl StorageHashingStage { + /// Create new instance of [StorageHashingStage]. + pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self { + Self { clean_threshold, commit_threshold } + } +} + impl Default for StorageHashingStage { fn default() -> Self { Self { clean_threshold: 500_000, commit_threshold: 100_000 } @@ -65,12 +76,12 @@ impl Stage for StorageHashingStage { Some(StorageHashingCheckpoint { address: address @ Some(_), storage, - from, - to, - }) + block_range: CheckpointBlockRange { from, to }, + .. + }) // Checkpoint is only valid if the range of transitions didn't change. // An already hashed storage may have been changed with the new range, - // and therefore should be hashed again. + // and therefore should be hashed again. if from == from_block && to == to_block => { debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner storage hashing checkpoint"); @@ -149,11 +160,12 @@ impl Stage for StorageHashingStage { StorageHashingCheckpoint { address: current_key, storage: current_subkey, - from: from_block, - to: to_block, + block_range: CheckpointBlockRange { from: from_block, to: to_block }, + progress: stage_checkpoint_progress(tx)?, }, ); - info!(target: "sync::stages::hashing_storage", stage_progress = %checkpoint, is_final_range = false, "Stage iteration finished"); + + info!(target: "sync::stages::hashing_storage", checkpoint = %checkpoint, is_final_range = false, "Stage iteration finished"); return Ok(ExecOutput { checkpoint, done: false }) } } else { @@ -163,15 +175,20 @@ impl Stage for StorageHashingStage { // iterate over plain state and get newest storage value. // Assumption we are okay with is that plain state represent // `previous_stage_progress` state. - let storages = tx.get_plainstate_storages(lists.into_iter())?; + let storages = tx.get_plainstate_storages(lists)?; tx.insert_storage_for_hashing(storages.into_iter())?; } // We finished the hashing stage, no future iterations is expected for the same block range, // so no checkpoint is needed. - let checkpoint = input.previous_stage_checkpoint(); + let checkpoint = input.previous_stage_checkpoint().with_storage_hashing_stage_checkpoint( + StorageHashingCheckpoint { + progress: stage_checkpoint_progress(tx)?, + ..Default::default() + }, + ); - info!(target: "sync::stages::hashing_storage", stage_progress = %checkpoint, is_final_range = true, "Stage iteration finished"); + info!(target: "sync::stages::hashing_storage", checkpoint = %checkpoint, is_final_range = true, "Stage iteration finished"); Ok(ExecOutput { checkpoint, done: true }) } @@ -186,11 +203,28 @@ impl Stage for StorageHashingStage { tx.unwind_storage_hashing(BlockNumberAddress::range(range))?; - info!(target: "sync::stages::hashing_storage", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) + let mut stage_checkpoint = + input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default(); + + stage_checkpoint.progress = stage_checkpoint_progress(tx)?; + + info!(target: "sync::stages::hashing_storage", to_block = input.unwind_to, %unwind_progress, is_final_range, "Unwind iteration finished"); + Ok(UnwindOutput { + checkpoint: StageCheckpoint::new(unwind_progress) + .with_storage_hashing_stage_checkpoint(stage_checkpoint), + }) } } +fn stage_checkpoint_progress( + tx: &Transaction<'_, DB>, +) -> Result { + Ok(EntitiesCheckpoint { + processed: tx.deref().entries::()? as u64, + total: tx.deref().entries::()? as u64, + }) +} + #[cfg(test)] mod tests { use super::*; @@ -236,13 +270,36 @@ mod tests { runner.seed_execution(input).expect("failed to seed execution"); loop { - if let Ok(result) = runner.execute(input).await.unwrap() { - if !result.done { + if let Ok(result @ ExecOutput { checkpoint, done }) = + runner.execute(input).await.unwrap() + { + if !done { + let previous_checkpoint = input + .checkpoint + .and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint()) + .unwrap_or_default(); + assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint { + progress: EntitiesCheckpoint { + processed, + total, + }, + .. + }) if processed == previous_checkpoint.progress.processed + 1 && + total == runner.tx.table::().unwrap().len() as u64); + // Continue from checkpoint - input.checkpoint = Some(result.checkpoint); + input.checkpoint = Some(checkpoint); continue } else { - assert!(result.checkpoint.block_number == previous_stage); + assert!(checkpoint.block_number == previous_stage); + assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint { + progress: EntitiesCheckpoint { + processed, + total, + }, + .. + }) if processed == total && + total == runner.tx.table::().unwrap().len() as u64); // Validate the stage execution assert!( @@ -297,18 +354,26 @@ mod tests { stage_checkpoint: Some(StageUnitCheckpoint::Storage(StorageHashingCheckpoint { address: Some(address), storage: Some(storage), - from: 101, - to: 500 + block_range: CheckpointBlockRange { + from: 101, + to: 500, + }, + progress: EntitiesCheckpoint { + processed: 500, + total + } })) }, done: false - }) if address == progress_address && storage == progress_key + }) if address == progress_address && storage == progress_key && + total == runner.tx.table::().unwrap().len() as u64 ); assert_eq!(runner.tx.table::().unwrap().len(), 500); // second run with commit threshold of 2 to check if subkey is set. runner.set_commit_threshold(2); - input.checkpoint = Some(result.unwrap().checkpoint); + let result = result.unwrap(); + input.checkpoint = Some(result.checkpoint); let rx = runner.execute(input); let result = rx.await.unwrap(); @@ -334,13 +399,20 @@ mod tests { StorageHashingCheckpoint { address: Some(address), storage: Some(storage), - from: 101, - to: 500, + block_range: CheckpointBlockRange { + from: 101, + to: 500, + }, + progress: EntitiesCheckpoint { + processed: 502, + total + } } )) }, done: false - }) if address == progress_address && storage == progress_key + }) if address == progress_address && storage == progress_key && + total == runner.tx.table::().unwrap().len() as u64 ); assert_eq!(runner.tx.table::().unwrap().len(), 502); @@ -353,9 +425,26 @@ mod tests { assert_matches!( result, Ok(ExecOutput { - checkpoint: StageCheckpoint { block_number: 500, stage_checkpoint: None }, + checkpoint: StageCheckpoint { + block_number: 500, + stage_checkpoint: Some(StageUnitCheckpoint::Storage( + StorageHashingCheckpoint { + address: None, + storage: None, + block_range: CheckpointBlockRange { + from: 0, + to: 0, + }, + progress: EntitiesCheckpoint { + processed, + total + } + } + )) + }, done: true - }) + }) if processed == total && + total == runner.tx.table::().unwrap().len() as u64 ); assert_eq!( runner.tx.table::().unwrap().len(), diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 26edef0de..93df9e8db 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -11,7 +11,9 @@ use reth_interfaces::{ provider::ProviderError, }; use reth_primitives::{ - stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, + stage::{ + CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId, + }, BlockHashOrNumber, BlockNumber, SealedHeader, H256, }; use reth_provider::Transaction; @@ -249,30 +251,42 @@ where None }; - let mut stage_checkpoint = current_checkpoint - .entities_stage_checkpoint() - .unwrap_or(EntitiesCheckpoint { - // If for some reason (e.g. due to DB migration) we don't have `processed` - // in the middle of headers sync, set it to the local head block number + - // number of block already filled in the gap. - processed: local_head + - (target_block_number.unwrap_or_default() - tip_block_number.unwrap_or_default()), - // Shouldn't fail because on the first iteration, we download the header for missing - // tip, and use its block number. - total: target_block_number.or_else(|| { - warn!(target: "sync::stages::headers", ?tip, "No downloaded header for tip found"); - // Safe, because `Display` impl for `EntitiesCheckpoint` will fallback to displaying - // just `processed` - None - }), - }); + let mut stage_checkpoint = match current_checkpoint.headers_stage_checkpoint() { + // If checkpoint block range matches our range, we take the previously used + // stage checkpoint as-is. + Some(stage_checkpoint) + if stage_checkpoint.block_range.from == input.checkpoint().block_number => + { + stage_checkpoint + } + // Otherwise, we're on the first iteration of new gap sync, so we recalculate the number + // of already processed and total headers. + // `target_block_number` is guaranteed to be `Some`, because on the first iteration + // we download the header for missing tip and use its block number. + _ => { + HeadersCheckpoint { + block_range: CheckpointBlockRange { + from: input.checkpoint().block_number, + to: target_block_number.expect("No downloaded header for tip found"), + }, + progress: EntitiesCheckpoint { + // Set processed to the local head block number + number + // of block already filled in the gap. + processed: local_head + + (target_block_number.unwrap_or_default() - + tip_block_number.unwrap_or_default()), + total: target_block_number.expect("No downloaded header for tip found"), + }, + } + } + }; // Total headers can be updated if we received new tip from the network, and need to fill // the local gap. if let Some(target_block_number) = target_block_number { - stage_checkpoint.total = Some(target_block_number); + stage_checkpoint.progress.total = target_block_number; } - stage_checkpoint.processed += downloaded_headers.len() as u64; + stage_checkpoint.progress.processed += downloaded_headers.len() as u64; // Write the headers to db self.write_headers::(tx, downloaded_headers)?.unwrap_or_default(); @@ -286,12 +300,12 @@ where ); Ok(ExecOutput { checkpoint: StageCheckpoint::new(checkpoint) - .with_entities_stage_checkpoint(stage_checkpoint), + .with_headers_stage_checkpoint(stage_checkpoint), done: true, }) } else { Ok(ExecOutput { - checkpoint: current_checkpoint.with_entities_stage_checkpoint(stage_checkpoint), + checkpoint: current_checkpoint.with_headers_stage_checkpoint(stage_checkpoint), done: false, }) } @@ -311,14 +325,20 @@ where let unwound_headers = tx.unwind_table_by_num::(input.unwind_to)?; let stage_checkpoint = - input.checkpoint.entities_stage_checkpoint().map(|checkpoint| EntitiesCheckpoint { - processed: checkpoint.processed.saturating_sub(unwound_headers as u64), - total: None, + input.checkpoint.headers_stage_checkpoint().map(|stage_checkpoint| HeadersCheckpoint { + block_range: stage_checkpoint.block_range, + progress: EntitiesCheckpoint { + processed: stage_checkpoint + .progress + .processed + .saturating_sub(unwound_headers as u64), + total: stage_checkpoint.progress.total, + }, }); let mut checkpoint = StageCheckpoint::new(input.unwind_to); if let Some(stage_checkpoint) = stage_checkpoint { - checkpoint = checkpoint.with_entities_stage_checkpoint(stage_checkpoint); + checkpoint = checkpoint.with_headers_stage_checkpoint(stage_checkpoint); } info!(target: "sync::stages::headers", to_block = input.unwind_to, checkpoint = input.unwind_to, is_final_range = true, "Unwind iteration finished"); @@ -550,14 +570,20 @@ mod tests { let result = rx.await.unwrap(); assert_matches!( result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, - stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { - processed, - total: Some(total), + stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint { + block_range: CheckpointBlockRange { + from, + to + }, + progress: EntitiesCheckpoint { + processed, + total, + } })) - }, done: true }) if block_number == tip.number + }, done: true }) if block_number == tip.number && + from == checkpoint && to == previous_stage && // -1 because we don't need to download the local head - && processed == checkpoint + headers.len() as u64 - 1 - && total == tip.number); + processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); } @@ -638,13 +664,19 @@ mod tests { let result = rx.await.unwrap(); assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, - stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { - processed, - total: Some(total), + stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint { + block_range: CheckpointBlockRange { + from, + to + }, + progress: EntitiesCheckpoint { + processed, + total, + } })) }, done: false }) if block_number == checkpoint && - processed == checkpoint + 500 && - total == tip.number); + from == checkpoint && to == previous_stage && + processed == checkpoint + 500 && total == tip.number); runner.client.clear().await; runner.client.extend(headers.iter().take(101).map(|h| h.clone().unseal()).rev()).await; @@ -655,14 +687,20 @@ mod tests { assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, - stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { - processed, - total: Some(total), + stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint { + block_range: CheckpointBlockRange { + from, + to + }, + progress: EntitiesCheckpoint { + processed, + total, + } })) - }, done: true }) if block_number == tip.number + }, done: true }) if block_number == tip.number && + from == checkpoint && to == previous_stage && // -1 because we don't need to download the local head - && processed == checkpoint + headers.len() as u64 - 1 - && total == tip.number); + processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); } } diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 0b7e32bb9..09fe45111 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,8 +1,16 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use reth_db::database::Database; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError}; +use reth_primitives::{ + stage::{ + CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId, + }, + BlockNumber, +}; use reth_provider::Transaction; -use std::fmt::Debug; +use std::{ + fmt::Debug, + ops::{Deref, RangeInclusive}, +}; use tracing::*; /// Stage is indexing history the account changesets generated in @@ -40,12 +48,22 @@ impl Stage for IndexAccountHistoryStage { return Ok(ExecOutput::done(StageCheckpoint::new(*range.end()))) } + let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?; + let indices = tx.get_account_transition_ids_from_changeset(range.clone())?; + let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::(); + // Insert changeset to history index tx.insert_account_history_index(indices)?; + stage_checkpoint.progress.processed += changesets; + info!(target: "sync::stages::index_account_history", stage_progress = *range.end(), is_final_range, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(*range.end()) + .with_index_history_stage_checkpoint(stage_checkpoint), + done: is_final_range, + }) } /// Unwind the stage. @@ -57,16 +75,70 @@ impl Stage for IndexAccountHistoryStage { let (range, unwind_progress, is_final_range) = input.unwind_block_range_with_threshold(self.commit_threshold); - tx.unwind_account_history_indices(range)?; + let changesets = tx.unwind_account_history_indices(range)?; + + let checkpoint = + if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() { + stage_checkpoint.progress.processed -= changesets as u64; + StageCheckpoint::new(unwind_progress) + .with_index_history_stage_checkpoint(stage_checkpoint) + } else { + StageCheckpoint::new(unwind_progress) + }; info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished"); // from HistoryIndex higher than that number. - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) + Ok(UnwindOutput { checkpoint }) } } +// The function proceeds as follows: +// 1. It first checks if the checkpoint has an `IndexHistoryCheckpoint` that matches the given +// block range. If it does, the function returns that checkpoint. +// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates +// a new `IndexHistoryCheckpoint` with the given block range and updates the progress with the +// current progress. +// 3. If none of the above conditions are met, it creates a new `IndexHistoryCheckpoint` with the +// given block range and calculates the progress by counting the number of processed entries in the +// `AccountChangeSet` table within the given block range. +fn stage_checkpoint( + tx: &Transaction<'_, DB>, + checkpoint: StageCheckpoint, + range: &RangeInclusive, +) -> Result { + Ok(match checkpoint.index_history_stage_checkpoint() { + Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. }) + if block_range == CheckpointBlockRange::from(range) => + { + stage_checkpoint + } + Some(IndexHistoryCheckpoint { block_range, progress }) + if block_range.to == checkpoint.block_number => + { + IndexHistoryCheckpoint { + block_range: CheckpointBlockRange::from(range), + progress: EntitiesCheckpoint { + processed: progress.processed, + total: tx.deref().entries::()? as u64, + }, + } + } + _ => IndexHistoryCheckpoint { + block_range: CheckpointBlockRange::from(range), + progress: EntitiesCheckpoint { + processed: tx + .cursor_read::()? + .walk_range(0..=checkpoint.block_number)? + .count() as u64, + total: tx.deref().entries::()? as u64, + }, + }, + }) +} + #[cfg(test)] mod tests { + use assert_matches::assert_matches; use std::collections::BTreeMap; use super::*; @@ -141,7 +213,21 @@ mod tests { let mut stage = IndexAccountHistoryStage::default(); let mut tx = tx.inner(); let out = stage.execute(&mut tx, input).await.unwrap(); - assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + assert_eq!( + out, + ExecOutput { + checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint( + IndexHistoryCheckpoint { + block_range: CheckpointBlockRange { + from: input.checkpoint().block_number + 1, + to: run_to + }, + progress: EntitiesCheckpoint { processed: 2, total: 2 } + } + ), + done: true + } + ); tx.commit().unwrap(); } @@ -353,4 +439,54 @@ mod tests { ]) ); } + + #[test] + fn stage_checkpoint_recalculation() { + let tx = TestTransaction::default(); + + tx.commit(|tx| { + tx.put::( + 1, + AccountBeforeTx { + address: H160(hex!("0000000000000000000000000000000000000001")), + info: None, + }, + ) + .unwrap(); + tx.put::( + 1, + AccountBeforeTx { + address: H160(hex!("0000000000000000000000000000000000000002")), + info: None, + }, + ) + .unwrap(); + tx.put::( + 2, + AccountBeforeTx { + address: H160(hex!("0000000000000000000000000000000000000001")), + info: None, + }, + ) + .unwrap(); + tx.put::( + 2, + AccountBeforeTx { + address: H160(hex!("0000000000000000000000000000000000000002")), + info: None, + }, + ) + .unwrap(); + Ok(()) + }) + .unwrap(); + + assert_matches!( + stage_checkpoint(&tx.inner(), StageCheckpoint::new(1), &(1..=2)).unwrap(), + IndexHistoryCheckpoint { + block_range: CheckpointBlockRange { from: 1, to: 2 }, + progress: EntitiesCheckpoint { processed: 2, total: 4 } + } + ); + } } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 274fcf143..6204d0a4f 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,8 +1,19 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use reth_db::{database::Database, models::BlockNumberAddress}; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_db::{ + cursor::DbCursorRO, database::Database, models::BlockNumberAddress, tables, transaction::DbTx, + DatabaseError, +}; +use reth_primitives::{ + stage::{ + CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId, + }, + BlockNumber, +}; use reth_provider::Transaction; -use std::fmt::Debug; +use std::{ + fmt::Debug, + ops::{Deref, RangeInclusive}, +}; use tracing::*; /// Stage is indexing history the account changesets generated in @@ -41,11 +52,21 @@ impl Stage for IndexStorageHistoryStage { return Ok(ExecOutput::done(target)) } + let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?; + let indices = tx.get_storage_transition_ids_from_changeset(range.clone())?; + let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::(); + tx.insert_storage_history_index(indices)?; + stage_checkpoint.progress.processed += changesets; + info!(target: "sync::stages::index_storage_history", stage_progress = *range.end(), done = is_final_range, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(*range.end()) + .with_index_history_stage_checkpoint(stage_checkpoint), + done: is_final_range, + }) } /// Unwind the stage. @@ -57,16 +78,70 @@ impl Stage for IndexStorageHistoryStage { let (range, unwind_progress, is_final_range) = input.unwind_block_range_with_threshold(self.commit_threshold); - tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?; + let changesets = tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?; + + let checkpoint = + if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() { + stage_checkpoint.progress.processed -= changesets as u64; + StageCheckpoint::new(unwind_progress) + .with_index_history_stage_checkpoint(stage_checkpoint) + } else { + StageCheckpoint::new(unwind_progress) + }; info!(target: "sync::stages::index_storage_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) + Ok(UnwindOutput { checkpoint }) } } +// The function proceeds as follows: +// 1. It first checks if the checkpoint has an `IndexHistoryCheckpoint` that matches the given +// block range. If it does, the function returns that checkpoint. +// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates +// a new `IndexHistoryCheckpoint` with the given block range and updates the progress with the +// current progress. +// 3. If none of the above conditions are met, it creates a new `IndexHistoryCheckpoint` with the +// given block range and calculates the progress by counting the number of processed entries in the +// `StorageChangeSet` table within the given block range. +fn stage_checkpoint( + tx: &Transaction<'_, DB>, + checkpoint: StageCheckpoint, + range: &RangeInclusive, +) -> Result { + Ok(match checkpoint.index_history_stage_checkpoint() { + Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. }) + if block_range == CheckpointBlockRange::from(range) => + { + stage_checkpoint + } + Some(IndexHistoryCheckpoint { block_range, progress }) + if block_range.to == checkpoint.block_number => + { + IndexHistoryCheckpoint { + block_range: CheckpointBlockRange::from(range), + progress: EntitiesCheckpoint { + processed: progress.processed, + total: tx.deref().entries::()? as u64, + }, + } + } + _ => IndexHistoryCheckpoint { + block_range: CheckpointBlockRange::from(range), + progress: EntitiesCheckpoint { + processed: tx + .cursor_read::()? + .walk_range(BlockNumberAddress::range(0..=checkpoint.block_number))? + .count() as u64, + total: tx.deref().entries::()? as u64, + }, + }, + }) +} + #[cfg(test)] mod tests { + use assert_matches::assert_matches; use std::collections::BTreeMap; use super::*; @@ -151,7 +226,21 @@ mod tests { let mut stage = IndexStorageHistoryStage::default(); let mut tx = tx.inner(); let out = stage.execute(&mut tx, input).await.unwrap(); - assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + assert_eq!( + out, + ExecOutput { + checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint( + IndexHistoryCheckpoint { + block_range: CheckpointBlockRange { + from: input.checkpoint().block_number + 1, + to: run_to + }, + progress: EntitiesCheckpoint { processed: 2, total: 2 } + } + ), + done: true + } + ); tx.commit().unwrap(); } @@ -369,4 +458,64 @@ mod tests { ]) ); } + + #[test] + fn stage_checkpoint_recalculation() { + let tx = TestTransaction::default(); + + tx.commit(|tx| { + tx.put::( + BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000001")))), + storage(H256(hex!( + "0000000000000000000000000000000000000000000000000000000000000001" + ))), + ) + .unwrap(); + tx.put::( + BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000001")))), + storage(H256(hex!( + "0000000000000000000000000000000000000000000000000000000000000002" + ))), + ) + .unwrap(); + tx.put::( + BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000002")))), + storage(H256(hex!( + "0000000000000000000000000000000000000000000000000000000000000001" + ))), + ) + .unwrap(); + tx.put::( + BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000001")))), + storage(H256(hex!( + "0000000000000000000000000000000000000000000000000000000000000001" + ))), + ) + .unwrap(); + tx.put::( + BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000001")))), + storage(H256(hex!( + "0000000000000000000000000000000000000000000000000000000000000002" + ))), + ) + .unwrap(); + tx.put::( + BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000002")))), + storage(H256(hex!( + "0000000000000000000000000000000000000000000000000000000000000001" + ))), + ) + .unwrap(); + Ok(()) + }) + .unwrap(); + + assert_matches!( + stage_checkpoint(&tx.inner(), StageCheckpoint::new(1), &(1..=2)).unwrap(), + IndexHistoryCheckpoint { + block_range: CheckpointBlockRange { from: 1, to: 2 }, + progress: EntitiesCheckpoint { processed: 3, total: 6 } + } + ); + } } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 96d92a7b5..54cd285ef 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -8,13 +8,16 @@ use reth_db::{ use reth_interfaces::consensus; use reth_primitives::{ hex, - stage::{MerkleCheckpoint, StageCheckpoint, StageId}, + stage::{EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint, StageId}, trie::StoredSubNode, BlockNumber, SealedHeader, H256, }; use reth_provider::Transaction; use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress}; -use std::{fmt::Debug, ops::DerefMut}; +use std::{ + fmt::Debug, + ops::{Deref, DerefMut}, +}; use tracing::*; /// The merkle hashing stage uses input from @@ -162,11 +165,13 @@ impl Stage for MerkleStage { let mut checkpoint = self.get_execution_checkpoint(tx)?; - let trie_root = if range.is_empty() { - block_root + let (trie_root, entities_checkpoint) = if range.is_empty() { + (block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default()) } else if to_block - from_block > threshold || from_block == 1 { // if there are more blocks than threshold it is faster to rebuild the trie - if let Some(checkpoint) = checkpoint.as_ref().filter(|c| c.target_block == to_block) { + let mut entities_checkpoint = if let Some(checkpoint) = + checkpoint.as_ref().filter(|c| c.target_block == to_block) + { debug!( target: "sync::stages::merkle::exec", current = ?current_block, @@ -175,6 +180,8 @@ impl Stage for MerkleStage { last_walker_key = ?hex::encode(&checkpoint.last_walker_key), "Continuing inner merkle checkpoint" ); + + input.checkpoint().entities_stage_checkpoint() } else { debug!( target: "sync::stages::merkle::exec", @@ -188,15 +195,23 @@ impl Stage for MerkleStage { self.save_execution_checkpoint(tx, None)?; tx.clear::()?; tx.clear::()?; + + None } + .unwrap_or(EntitiesCheckpoint { + processed: 0, + total: (tx.deref().entries::()? + + tx.deref().entries::()?) as u64, + }); let progress = StateRoot::new(tx.deref_mut()) .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from)) .root_with_progress() .map_err(|e| StageError::Fatal(Box::new(e)))?; match progress { - StateRootProgress::Progress(state, updates) => { + StateRootProgress::Progress(state, hashed_entries_walked, updates) => { updates.flush(tx.deref_mut())?; + let checkpoint = MerkleCheckpoint::new( to_block, state.last_account_key, @@ -205,11 +220,22 @@ impl Stage for MerkleStage { state.hash_builder.into(), ); self.save_execution_checkpoint(tx, Some(checkpoint))?; - return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false }) + + entities_checkpoint.processed += hashed_entries_walked as u64; + + return Ok(ExecOutput { + checkpoint: input + .checkpoint() + .with_entities_stage_checkpoint(entities_checkpoint), + done: false, + }) } - StateRootProgress::Complete(root, updates) => { + StateRootProgress::Complete(root, hashed_entries_walked, updates) => { updates.flush(tx.deref_mut())?; - root + + entities_checkpoint.processed += hashed_entries_walked as u64; + + (root, entities_checkpoint) } } } else { @@ -217,7 +243,20 @@ impl Stage for MerkleStage { let (root, updates) = StateRoot::incremental_root_with_updates(tx.deref_mut(), range) .map_err(|e| StageError::Fatal(Box::new(e)))?; updates.flush(tx.deref_mut())?; - root + + let total_hashed_entries = (tx.deref().entries::()? + + tx.deref().entries::()?) + as u64; + + let entities_checkpoint = EntitiesCheckpoint { + // This is fine because `range` doesn't have an upper bound, so in this `else` + // branch we're just hashing all remaining accounts and storage slots we have in the + // database. + processed: total_hashed_entries, + total: total_hashed_entries, + }; + + (root, entities_checkpoint) }; // Reset the checkpoint @@ -226,7 +265,11 @@ impl Stage for MerkleStage { self.validate_state_root(trie_root, block.seal_slow(), to_block)?; info!(target: "sync::stages::merkle::exec", stage_progress = to_block, is_final_range = true, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: StageCheckpoint::new(to_block), done: true }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(to_block) + .with_entities_stage_checkpoint(entities_checkpoint), + done: true, + }) } /// Unwind the stage. @@ -241,11 +284,24 @@ impl Stage for MerkleStage { return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) } + let mut entities_checkpoint = + input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint { + processed: 0, + total: (tx.deref().entries::()? + + tx.deref().entries::()?) as u64, + }); + if input.unwind_to == 0 { tx.clear::()?; tx.clear::()?; info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished"); - return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) + + entities_checkpoint.processed = 0; + + return Ok(UnwindOutput { + checkpoint: StageCheckpoint::new(input.unwind_to) + .with_entities_stage_checkpoint(entities_checkpoint), + }) } // Unwind trie only if there are transitions @@ -260,6 +316,8 @@ impl Stage for MerkleStage { // Validation passed, apply unwind changes to the database. updates.flush(tx.deref_mut())?; + + // TODO(alexey): update entities checkpoint } else { info!(target: "sync::stages::merkle::unwind", "Nothing to unwind"); } @@ -285,7 +343,9 @@ mod tests { use reth_interfaces::test_utils::generators::{ random_block, random_block_range, random_contract_account_range, random_transition_range, }; - use reth_primitives::{keccak256, SealedBlock, StorageEntry, H256, U256}; + use reth_primitives::{ + keccak256, stage::StageUnitCheckpoint, SealedBlock, StorageEntry, H256, U256, + }; use reth_trie::test_utils::{state_root, state_root_prehashed}; use std::collections::BTreeMap; @@ -312,8 +372,20 @@ mod tests { let result = rx.await.unwrap(); assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) - if block_number == previous_stage + Ok(ExecOutput { + checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, + done: true + }) if block_number == previous_stage && processed == total && + total == ( + runner.tx.table::().unwrap().len() + + runner.tx.table::().unwrap().len() + ) as u64 ); // Validate the stage execution @@ -340,8 +412,20 @@ mod tests { let result = rx.await.unwrap(); assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) - if block_number == previous_stage + Ok(ExecOutput { + checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, + done: true + }) if block_number == previous_stage && processed == total && + total == ( + runner.tx.table::().unwrap().len() + + runner.tx.table::().unwrap().len() + ) as u64 ); // Validate the stage execution diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 521146ffd..601a3c9de 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -5,15 +5,15 @@ use reth_db::{ database::Database, tables, transaction::{DbTx, DbTxMut}, - RawKey, RawTable, RawValue, + DatabaseError, RawKey, RawTable, RawValue, }; use reth_primitives::{ keccak256, - stage::{StageCheckpoint, StageId}, + stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, TransactionSignedNoHash, TxNumber, H160, }; use reth_provider::Transaction; -use std::fmt::Debug; +use std::{fmt::Debug, ops::Deref}; use thiserror::Error; use tokio::sync::mpsc; use tracing::*; @@ -28,6 +28,13 @@ pub struct SenderRecoveryStage { pub commit_threshold: u64, } +impl SenderRecoveryStage { + /// Create new instance of [SenderRecoveryStage]. + pub fn new(commit_threshold: u64) -> Self { + Self { commit_threshold } + } +} + impl Default for SenderRecoveryStage { fn default() -> Self { Self { commit_threshold: 500_000 } @@ -67,7 +74,8 @@ impl Stage for SenderRecoveryStage { if first_tx_num > last_tx_num { info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Target transaction already reached"); return Ok(ExecOutput { - checkpoint: StageCheckpoint::new(end_block), + checkpoint: StageCheckpoint::new(end_block) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), done: is_final_range, }) } @@ -105,7 +113,7 @@ impl Stage for SenderRecoveryStage { // closure that would recover signer. Used as utility to wrap result let recover = |entry: Result< (RawKey, RawValue), - reth_db::DatabaseError, + DatabaseError, >, rlp_buf: &mut Vec| -> Result<(u64, H160), Box> { @@ -132,6 +140,7 @@ impl Stage for SenderRecoveryStage { } }); } + // Iterate over channels and append the sender in the order that they are received. for mut channel in channels { while let Some(recovered) = channel.recv().await { @@ -141,7 +150,11 @@ impl Stage for SenderRecoveryStage { } info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Stage iteration finished"); - Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block), done: is_final_range }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(end_block) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done: is_final_range, + }) } /// Unwind the stage. @@ -158,10 +171,22 @@ impl Stage for SenderRecoveryStage { tx.unwind_table_by_num::(latest_tx_id)?; info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) }) + Ok(UnwindOutput { + checkpoint: StageCheckpoint::new(unwind_to) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + }) } } +fn stage_checkpoint( + tx: &Transaction<'_, DB>, +) -> Result { + Ok(EntitiesCheckpoint { + processed: tx.deref().entries::()? as u64, + total: tx.deref().entries::()? as u64, + }) +} + // TODO(onbjerg): Should unwind #[derive(Error, Debug)] enum SenderRecoveryStageError { @@ -179,7 +204,9 @@ impl From for StageError { mod tests { use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; - use reth_primitives::{BlockNumber, SealedBlock, TransactionSigned, H256}; + use reth_primitives::{ + stage::StageUnitCheckpoint, BlockNumber, SealedBlock, TransactionSigned, H256, + }; use super::*; use crate::test_utils::{ @@ -216,8 +243,13 @@ mod tests { let result = rx.await.unwrap(); assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) - if block_number == previous_stage + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed: 1, + total: 1 + })) + }, done: true }) if block_number == previous_stage ); // Validate the stage execution @@ -239,13 +271,22 @@ mod tests { // Seed only once with full input range runner.seed_execution(first_input).expect("failed to seed execution"); + let total_transactions = runner.tx.table::().unwrap().len() as u64; + // Execute first time let result = runner.execute(first_input).await.unwrap(); let expected_progress = stage_progress + threshold; assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false }) - if block_number == expected_progress + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: false }) if block_number == expected_progress && + processed == runner.tx.table::().unwrap().len() as u64 && + total == total_transactions ); // Execute second time @@ -256,8 +297,15 @@ mod tests { let result = runner.execute(second_input).await.unwrap(); assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) - if block_number == previous_stage + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true }) if block_number == previous_stage && + processed == runner.tx.table::().unwrap().len() as u64 && + total == total_transactions ); assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 3d09eb05c..671b8b84d 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -6,13 +6,15 @@ use reth_db::{ database::Database, tables, transaction::{DbTx, DbTxMut}, + DatabaseError, }; use reth_primitives::{ rpc_utils::keccak256, - stage::{StageCheckpoint, StageId}, + stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, BlockNumber, TransactionSignedNoHash, TxNumber, H256, }; use reth_provider::Transaction; +use std::ops::Deref; use thiserror::Error; use tokio::sync::mpsc; use tracing::*; @@ -145,7 +147,11 @@ impl Stage for TransactionLookupStage { } info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, is_final_range, "Stage iteration finished"); - Ok(ExecOutput { done: is_final_range, checkpoint: StageCheckpoint::new(end_block) }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(end_block) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + done: is_final_range, + }) } /// Unwind the stage. @@ -179,7 +185,10 @@ impl Stage for TransactionLookupStage { } info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) }) + Ok(UnwindOutput { + checkpoint: StageCheckpoint::new(unwind_to) + .with_entities_stage_checkpoint(stage_checkpoint(tx)?), + }) } } @@ -195,6 +204,15 @@ impl From for StageError { } } +fn stage_checkpoint( + tx: &Transaction<'_, DB>, +) -> Result { + Ok(EntitiesCheckpoint { + processed: tx.deref().entries::()? as u64, + total: tx.deref().entries::()? as u64, + }) +} + #[cfg(test)] mod tests { use super::*; @@ -204,7 +222,7 @@ mod tests { }; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; - use reth_primitives::{BlockNumber, SealedBlock, H256}; + use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; // Implement stage test suite. stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); @@ -235,8 +253,14 @@ mod tests { let result = rx.await.unwrap(); assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) - if block_number == previous_stage + Ok(ExecOutput {checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true }) if block_number == previous_stage && processed == total && + total == runner.tx.table::().unwrap().len() as u64 ); // Validate the stage execution @@ -263,8 +287,15 @@ mod tests { let expected_progress = stage_progress + threshold; assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false }) - if block_number == expected_progress + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: false }) if block_number == expected_progress && + processed == runner.tx.table::().unwrap().len() as u64 && + total == runner.tx.table::().unwrap().len() as u64 ); // Execute second time @@ -275,8 +306,14 @@ mod tests { let result = runner.execute(second_input).await.unwrap(); assert_matches!( result, - Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) - if block_number == previous_stage + Ok(ExecOutput {checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true }) if block_number == previous_stage && processed == total && + total == runner.tx.table::().unwrap().len() as u64 ); assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); @@ -340,7 +377,7 @@ mod tests { let stage_progress = input.checkpoint().block_number; let end = input.previous_stage_checkpoint().block_number; - let blocks = random_block_range(stage_progress..=end, H256::zero(), 0..2); + let blocks = random_block_range(stage_progress + 1..=end, H256::zero(), 0..2); self.tx.insert_blocks(blocks.iter(), None)?; Ok(blocks) } diff --git a/crates/storage/codecs/derive/src/compact/structs.rs b/crates/storage/codecs/derive/src/compact/structs.rs index dfa1e9169..9bf56950e 100644 --- a/crates/storage/codecs/derive/src/compact/structs.rs +++ b/crates/storage/codecs/derive/src/compact/structs.rs @@ -119,7 +119,13 @@ impl<'a> StructHandler<'a> { assert!( known_types.contains(&ftype.as_str()) || is_flag_type(ftype) || - self.fields_iterator.peek().is_none(), + self.fields_iterator.peek().map_or(true, |ftypes| { + if let FieldTypes::StructField((_, ftype, _, _)) = ftypes { + !known_types.contains(&ftype.as_str()) + } else { + false + } + }), "`{ftype}` field should be placed as the last one since it's not known. If it's an alias type (which are not supported by proc_macro), be sure to add it to either `known_types` or `get_bit_size` lists in the derive crate." ); diff --git a/crates/storage/db/src/abstraction/mock.rs b/crates/storage/db/src/abstraction/mock.rs index adf118f2f..656bae654 100644 --- a/crates/storage/db/src/abstraction/mock.rs +++ b/crates/storage/db/src/abstraction/mock.rs @@ -76,6 +76,10 @@ impl<'a> DbTx<'a> for TxMock { ) -> Result<>::DupCursor, DatabaseError> { todo!() } + + fn entries(&self) -> Result { + todo!() + } } impl<'a> DbTxMut<'a> for TxMock { diff --git a/crates/storage/db/src/abstraction/transaction.rs b/crates/storage/db/src/abstraction/transaction.rs index 9b6e12665..3abc9755b 100644 --- a/crates/storage/db/src/abstraction/transaction.rs +++ b/crates/storage/db/src/abstraction/transaction.rs @@ -47,6 +47,8 @@ pub trait DbTx<'tx>: for<'a> DbTxGAT<'a> { fn cursor_dup_read( &self, ) -> Result<>::DupCursor, DatabaseError>; + /// Returns number of entries in the table. + fn entries(&self) -> Result; } /// Read write transaction that allows writing to database diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 8e91463ed..d6abdd712 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -115,6 +115,15 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> { ) -> Result<>::DupCursor, DatabaseError> { self.new_cursor() } + + /// Returns number of entries in the table using cheap DB stats invocation. + fn entries(&self) -> Result { + Ok(self + .inner + .db_stat_with_dbi(self.get_dbi::()?) + .map_err(|e| DatabaseError::Stats(e.into()))? + .entries()) + } } impl DbTxMut<'_> for Tx<'_, RW, E> { diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index f7d6ab17b..3a0f8f1c7 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -170,7 +170,7 @@ table!( table!( /// (Canonical only) Stores the transaction body for canonical transactions. - ( Transactions ) TxNumber | TransactionSignedNoHash + ( Transactions ) TxNumber | TransactionSignedNoHash ); table!( @@ -211,7 +211,7 @@ dupsort!( table!( /// Stores pointers to block changeset with changes for each account key. /// - /// Last shard key of the storage will contains `u64::MAX` `BlockNumber`, + /// Last shard key of the storage will contain `u64::MAX` `BlockNumber`, /// this would allows us small optimization on db access when change is in plain state. /// /// Imagine having shards as: @@ -233,7 +233,7 @@ table!( table!( /// Stores pointers to block number changeset with changes for each storage key. /// - /// Last shard key of the storage will contains `u64::MAX` `BlockNumber`, + /// Last shard key of the storage will contain `u64::MAX` `BlockNumber`, /// this would allows us small optimization on db access when change is in plain state. /// /// Imagine having shards as: @@ -293,7 +293,7 @@ dupsort!( ); table!( - /// Stores the transaction sender for each transaction. + /// Stores the transaction sender for each canonical transaction. /// It is needed to speed up execution stage and allows fetching signer without doing /// transaction signed recovery ( TxSenders ) TxNumber | Address diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index b20f31a37..7059a2beb 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -202,10 +202,15 @@ where /// Retrieves database statistics. pub fn db_stat<'txn>(&'txn self, db: &Database<'txn>) -> Result { + self.db_stat_with_dbi(db.dbi()) + } + + /// Retrieves database statistics by the given dbi. + pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result { unsafe { let mut stat = Stat::new(); mdbx_result(txn_execute(&self.txn, |txn| { - ffi::mdbx_dbi_stat(txn, db.dbi(), stat.mdb_stat(), size_of::()) + ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::()) }))?; Ok(stat) } diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 099296c54..ffe8534f2 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -293,14 +293,14 @@ where self.get_take_block_and_execution_range::(chain_spec, range) } - /// Unwind and clear account hashing + /// Unwind and clear account hashing. pub fn unwind_account_hashing( &self, range: RangeInclusive, ) -> Result<(), TransactionError> { let mut hashed_accounts = self.cursor_write::()?; - // Aggregate all transition changesets and and make list of account that have been changed. + // Aggregate all transition changesets and make a list of accounts that have been changed. self.cursor_read::()? .walk_range(range)? .collect::, _>>()? @@ -329,10 +329,11 @@ where } Ok(()) })?; + Ok(()) } - /// Unwind and clear storage hashing + /// Unwind and clear storage hashing. pub fn unwind_storage_hashing( &self, range: Range, @@ -375,20 +376,22 @@ where } Ok(()) })?; + Ok(()) } - /// Unwind and clear account history indices + /// Unwind and clear account history indices. + /// + /// Returns number of changesets walked. pub fn unwind_account_history_indices( &self, range: RangeInclusive, - ) -> Result<(), TransactionError> { - let mut cursor = self.cursor_write::()?; - + ) -> Result { let account_changeset = self .cursor_read::()? .walk_range(range)? .collect::, _>>()?; + let changesets = account_changeset.len(); let last_indices = account_changeset .into_iter() @@ -400,7 +403,9 @@ where accounts.insert(account.address, index); accounts }); + // try to unwind the index + let mut cursor = self.cursor_write::()?; for (address, rem_index) in last_indices { let shard_part = unwind_account_history_shards::(&mut cursor, address, rem_index)?; @@ -414,20 +419,23 @@ where )?; } } - Ok(()) + + Ok(changesets) } - /// Unwind and clear storage history indices + /// Unwind and clear storage history indices. + /// + /// Returns number of changesets walked. pub fn unwind_storage_history_indices( &self, range: Range, - ) -> Result<(), TransactionError> { - let mut cursor = self.cursor_write::()?; - + ) -> Result { let storage_changesets = self .cursor_read::()? .walk_range(range)? .collect::, _>>()?; + let changesets = storage_changesets.len(); + let last_indices = storage_changesets .into_iter() // reverse so we can get lowest transition id where we need to unwind account. @@ -441,6 +449,8 @@ where accounts }, ); + + let mut cursor = self.cursor_write::()?; for ((address, storage_key), rem_index) in last_indices { let shard_part = unwind_storage_history_shards::(&mut cursor, address, storage_key, rem_index)?; @@ -455,7 +465,8 @@ where )?; } } - Ok(()) + + Ok(changesets) } /// Append blocks and insert its post state. @@ -1178,7 +1189,6 @@ where storages }, ); - Ok(storage_changeset_lists) } @@ -1194,7 +1204,7 @@ where .walk_range(range)? .collect::, _>>()?; - let account_transtions = account_changesets + let account_transitions = account_changesets .into_iter() // fold all account to one set of changed accounts .fold( @@ -1204,8 +1214,7 @@ where accounts }, ); - - Ok(account_transtions) + Ok(account_transitions) } /// Insert storage change index to database. Used inside StorageHistoryIndex stage diff --git a/crates/trie/src/progress.rs b/crates/trie/src/progress.rs index bd8054623..128fa3f50 100644 --- a/crates/trie/src/progress.rs +++ b/crates/trie/src/progress.rs @@ -9,10 +9,10 @@ use reth_primitives::{ #[derive(Debug)] pub enum StateRootProgress { /// The complete state root computation with updates and computed root. - Complete(H256, TrieUpdates), + Complete(H256, usize, TrieUpdates), /// The intermediate progress of state root computation. /// Contains the walker stack, the hash builder and the trie updates. - Progress(Box, TrieUpdates), + Progress(Box, usize, TrieUpdates), } /// The intermediate state of the state root computation. diff --git a/crates/trie/src/trie.rs b/crates/trie/src/trie.rs index b237bb413..92c82f881 100644 --- a/crates/trie/src/trie.rs +++ b/crates/trie/src/trie.rs @@ -174,7 +174,7 @@ where /// The intermediate progress of state root computation and the trie updates. pub fn root_with_updates(self) -> Result<(H256, TrieUpdates), StateRootError> { match self.with_no_threshold().calculate(true)? { - StateRootProgress::Complete(root, updates) => Ok((root, updates)), + StateRootProgress::Complete(root, _, updates) => Ok((root, updates)), StateRootProgress::Progress(..) => unreachable!(), // unreachable threshold } } @@ -187,7 +187,7 @@ where /// The state root hash. pub fn root(self) -> Result { match self.calculate(false)? { - StateRootProgress::Complete(root, _) => Ok(root), + StateRootProgress::Complete(root, _, _) => Ok(root), StateRootProgress::Progress(..) => unreachable!(), // update retenion is disabled } } @@ -234,6 +234,7 @@ where hash_builder.set_updates(retain_updates); let mut account_rlp = Vec::with_capacity(128); + let mut hashed_entries_walked = 0; while let Some(key) = last_walker_key.take().or_else(|| walker.key()) { // Take the last account key to make sure we take it into consideration only once. @@ -260,6 +261,7 @@ where }; while let Some((hashed_address, account)) = next_account_entry { + hashed_entries_walked += 1; let account_nibbles = Nibbles::unpack(hashed_address); if let Some(ref key) = next_key { @@ -286,7 +288,9 @@ where ); let storage_root = if retain_updates { - let (root, updates) = storage_root_calculator.root_with_updates()?; + let (root, storage_slots_walked, updates) = + storage_root_calculator.root_with_updates()?; + hashed_entries_walked += storage_slots_walked; trie_updates.extend(updates.into_iter()); root } else { @@ -317,7 +321,11 @@ where trie_updates.extend(walker_updates.into_iter()); trie_updates.extend_with_account_updates(hash_builder_updates); - return Ok(StateRootProgress::Progress(Box::new(state), trie_updates)) + return Ok(StateRootProgress::Progress( + Box::new(state), + hashed_entries_walked, + trie_updates, + )) } // Move the next account entry @@ -333,7 +341,7 @@ where trie_updates.extend(walker_updates.into_iter()); trie_updates.extend_with_account_updates(hash_builder_updates); - Ok(StateRootProgress::Complete(root, trie_updates)) + Ok(StateRootProgress::Complete(root, hashed_entries_walked, trie_updates)) } } @@ -414,7 +422,7 @@ where /// # Returns /// /// The storage root and storage trie updates for a given address. - pub fn root_with_updates(&self) -> Result<(H256, TrieUpdates), StorageRootError> { + pub fn root_with_updates(&self) -> Result<(H256, usize, TrieUpdates), StorageRootError> { self.calculate(true) } @@ -424,11 +432,14 @@ where /// /// The storage root. pub fn root(&self) -> Result { - let (root, _) = self.calculate(false)?; + let (root, _, _) = self.calculate(false)?; Ok(root) } - fn calculate(&self, retain_updates: bool) -> Result<(H256, TrieUpdates), StorageRootError> { + fn calculate( + &self, + retain_updates: bool, + ) -> Result<(H256, usize, TrieUpdates), StorageRootError> { tracing::debug!(target: "trie::storage_root", hashed_address = ?self.hashed_address, "calculating storage root"); let mut hashed_storage_cursor = self.hashed_cursor_factory.hashed_storage_cursor()?; @@ -442,6 +453,7 @@ where if hashed_storage_cursor.is_storage_empty(self.hashed_address)? { return Ok(( EMPTY_ROOT, + 0, TrieUpdates::from([(TrieKey::StorageTrie(self.hashed_address), TrieOp::Delete)]), )) } @@ -451,6 +463,7 @@ where let mut hash_builder = HashBuilder::default().with_updates(retain_updates); + let mut storage_slots_walked = 0; while let Some(key) = walker.key() { if walker.can_skip_current_node { hash_builder.add_branch(key, walker.hash().unwrap(), walker.children_are_in_trie()); @@ -464,6 +477,8 @@ where let next_key = walker.advance()?; let mut storage = hashed_storage_cursor.seek(self.hashed_address, seek_key)?; while let Some(StorageEntry { key: hashed_key, value }) = storage { + storage_slots_walked += 1; + let storage_key_nibbles = Nibbles::unpack(hashed_key); if let Some(ref key) = next_key { if key < &storage_key_nibbles { @@ -486,7 +501,7 @@ where trie_updates.extend_with_storage_updates(self.hashed_address, hash_builder_updates); tracing::debug!(target: "trie::storage_root", ?root, hashed_address = ?self.hashed_address, "calculated storage root"); - Ok((root, trie_updates)) + Ok((root, storage_slots_walked, trie_updates)) } } @@ -555,7 +570,7 @@ mod tests { } // Generate the intermediate nodes on the receiving end of the channel - let (_, trie_updates) = + let (_, _, trie_updates) = StorageRoot::new_hashed(tx.deref(), hashed_address).root_with_updates().unwrap(); // 1. Some state transition happens, update the hashed storage to the new value @@ -723,6 +738,9 @@ mod tests { fn arbitrary_state_root_with_progress() { proptest!( ProptestConfig::with_cases(10), | (state: State) | { + let hashed_entries_total = state.len() + + state.values().map(|(_, slots)| slots.len()).sum::(); + let db = create_test_rw_db(); let mut tx = Transaction::new(db.as_ref()).unwrap(); @@ -734,6 +752,7 @@ mod tests { let threshold = 10; let mut got = None; + let mut hashed_entries_walked = 0; let mut intermediate_state: Option> = None; while got.is_none() { @@ -741,11 +760,18 @@ mod tests { .with_threshold(threshold) .with_intermediate_state(intermediate_state.take().map(|state| *state)); match calculator.root_with_progress().unwrap() { - StateRootProgress::Progress(state, _updates) => intermediate_state = Some(state), - StateRootProgress::Complete(root, _updates) => got = Some(root), + StateRootProgress::Progress(state, walked, _) => { + intermediate_state = Some(state); + hashed_entries_walked += walked; + }, + StateRootProgress::Complete(root, walked, _) => { + got = Some(root); + hashed_entries_walked += walked; + }, }; } assert_eq!(expected, got.unwrap()); + assert_eq!(hashed_entries_total, hashed_entries_walked) } ); } @@ -1210,7 +1236,7 @@ mod tests { let (expected_root, expected_updates) = extension_node_storage_trie(&mut tx, hashed_address); - let (got, updates) = + let (got, _, updates) = StorageRoot::new_hashed(tx.deref_mut(), hashed_address).root_with_updates().unwrap(); assert_eq!(expected_root, got); diff --git a/etc/grafana/dashboards/overview.json b/etc/grafana/dashboards/overview.json index 6e5fd0315..18f64c0fb 100644 --- a/etc/grafana/dashboards/overview.json +++ b/etc/grafana/dashboards/overview.json @@ -27,7 +27,7 @@ "type": "grafana", "id": "grafana", "name": "Grafana", - "version": "9.4.7" + "version": "9.5.2" }, { "type": "panel", @@ -128,7 +128,7 @@ }, "gridPos": { "h": 8, - "w": 5, + "w": 12, "x": 0, "y": 0 }, @@ -145,7 +145,7 @@ "showThresholdLabels": false, "showThresholdMarkers": true }, - "pluginVersion": "9.4.7", + "pluginVersion": "9.5.2", "targets": [ { "datasource": { @@ -192,8 +192,8 @@ }, "gridPos": { "h": 8, - "w": 9, - "x": 5, + "w": 12, + "x": 12, "y": 0 }, "id": 20, @@ -209,9 +209,10 @@ "fields": "", "values": false }, - "showUnfilled": true + "showUnfilled": true, + "valueMode": "color" }, - "pluginVersion": "9.4.7", + "pluginVersion": "9.5.2", "targets": [ { "datasource": { @@ -231,6 +232,97 @@ "transparent": true, "type": "bargauge" }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "max": 1, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 69, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "reth_sync_entities_processed / reth_sync_entities_total", + "legendFormat": "{{stage}}", + "range": true, + "refId": "A" + } + ], + "title": "Sync progress (stage progress in %)", + "type": "timeseries" + }, { "datasource": { "type": "prometheus", @@ -290,9 +382,9 @@ }, "gridPos": { "h": 8, - "w": 10, - "x": 14, - "y": 0 + "w": 12, + "x": 12, + "y": 8 }, "id": 12, "options": { @@ -320,7 +412,7 @@ "refId": "A" } ], - "title": "Sync progress", + "title": "Sync progress (stage progress as highest block number reached)", "type": "timeseries" }, { @@ -329,7 +421,7 @@ "h": 1, "w": 24, "x": 0, - "y": 8 + "y": 16 }, "id": 38, "panels": [], @@ -397,7 +489,7 @@ "h": 8, "w": 12, "x": 0, - "y": 9 + "y": 17 }, "id": 40, "options": { @@ -457,7 +549,7 @@ "h": 8, "w": 12, "x": 12, - "y": 9 + "y": 17 }, "id": 42, "maxDataPoints": 25, @@ -501,7 +593,7 @@ "unit": "percentunit" } }, - "pluginVersion": "9.4.7", + "pluginVersion": "9.5.2", "targets": [ { "datasource": { @@ -548,7 +640,7 @@ "h": 8, "w": 12, "x": 0, - "y": 17 + "y": 25 }, "id": 48, "options": { @@ -657,7 +749,7 @@ "h": 8, "w": 12, "x": 12, - "y": 17 + "y": 25 }, "id": 52, "options": { @@ -715,7 +807,7 @@ "h": 8, "w": 12, "x": 0, - "y": 25 + "y": 33 }, "id": 50, "options": { @@ -883,10 +975,11 @@ "h": 8, "w": 12, "x": 12, - "y": 25 + "y": 33 }, "id": 58, "options": { + "cellHeight": "sm", "footer": { "countRows": false, "fields": "", @@ -897,7 +990,7 @@ }, "showHeader": true }, - "pluginVersion": "9.4.7", + "pluginVersion": "9.5.2", "targets": [ { "datasource": { @@ -923,7 +1016,7 @@ "h": 1, "w": 24, "x": 0, - "y": 33 + "y": 41 }, "id": 46, "panels": [], @@ -989,7 +1082,7 @@ "h": 8, "w": 24, "x": 0, - "y": 34 + "y": 42 }, "id": 56, "options": { @@ -1062,7 +1155,7 @@ "h": 1, "w": 24, "x": 0, - "y": 42 + "y": 50 }, "id": 6, "panels": [], @@ -1131,7 +1224,7 @@ "h": 8, "w": 8, "x": 0, - "y": 43 + "y": 51 }, "id": 18, "options": { @@ -1224,7 +1317,7 @@ "h": 8, "w": 8, "x": 8, - "y": 43 + "y": 51 }, "id": 16, "options": { @@ -1342,7 +1435,7 @@ "h": 8, "w": 8, "x": 16, - "y": 43 + "y": 51 }, "id": 8, "options": { @@ -1421,7 +1514,7 @@ "h": 8, "w": 8, "x": 0, - "y": 51 + "y": 59 }, "id": 54, "options": { @@ -1585,7 +1678,7 @@ "h": 1, "w": 24, "x": 0, - "y": 59 + "y": 67 }, "id": 24, "panels": [], @@ -1637,8 +1730,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1678,7 +1770,7 @@ "h": 8, "w": 12, "x": 0, - "y": 60 + "y": 68 }, "id": 26, "options": { @@ -1792,8 +1884,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1808,7 +1899,7 @@ "h": 8, "w": 12, "x": 12, - "y": 60 + "y": 68 }, "id": 33, "options": { @@ -1909,8 +2000,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1925,7 +2015,7 @@ "h": 8, "w": 12, "x": 0, - "y": 68 + "y": 76 }, "id": 36, "options": { @@ -1974,7 +2064,7 @@ "h": 1, "w": 24, "x": 0, - "y": 76 + "y": 84 }, "id": 32, "panels": [], @@ -2027,8 +2117,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2068,7 +2157,7 @@ "h": 8, "w": 12, "x": 0, - "y": 77 + "y": 85 }, "id": 30, "options": { @@ -2218,8 +2307,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" } ] } @@ -2230,7 +2318,7 @@ "h": 8, "w": 12, "x": 12, - "y": 77 + "y": 85 }, "id": 28, "options": { @@ -2331,8 +2419,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2347,7 +2434,7 @@ "h": 8, "w": 12, "x": 0, - "y": 85 + "y": 93 }, "id": 35, "options": { @@ -2396,7 +2483,7 @@ "h": 1, "w": 24, "x": 0, - "y": 93 + "y": 101 }, "id": 68, "panels": [ @@ -2446,8 +2533,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2462,7 +2548,7 @@ "h": 8, "w": 11, "x": 0, - "y": 1 + "y": 9 }, "id": 60, "options": { @@ -2539,8 +2625,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2555,7 +2640,7 @@ "h": 8, "w": 13, "x": 11, - "y": 1 + "y": 9 }, "id": 62, "options": { @@ -2632,8 +2717,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2648,7 +2732,7 @@ "h": 7, "w": 11, "x": 0, - "y": 9 + "y": 17 }, "id": 64, "options": { @@ -2700,6 +2784,6 @@ "timezone": "", "title": "reth", "uid": "2k8BXz24x", - "version": 5, + "version": 6, "weekStart": "" } \ No newline at end of file