feat: Better progress reporting for stage checkpoints (#2982)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
Georgios Konstantopoulos
2023-06-05 09:10:46 -07:00
committed by GitHub
parent 6fab79bbc2
commit 08900740bc
28 changed files with 1601 additions and 341 deletions

View File

@ -8,6 +8,8 @@ pub enum StageEnum {
Bodies,
Senders,
Execution,
AccountHashing,
StorageHashing,
Hashing,
Merkle,
TxLookup,

View File

@ -63,6 +63,13 @@ impl Command {
tool.db.update(|tx| {
match &self.stage {
StageEnum::Senders => {
tx.clear::<tables::TxSenders>()?;
tx.put::<tables::SyncStage>(
StageId::SenderRecovery.to_string(),
Default::default(),
)?;
}
StageEnum::Execution => {
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
@ -76,6 +83,20 @@ impl Command {
)?;
insert_genesis_state::<Env<WriteMap>>(tx, self.chain.genesis())?;
}
StageEnum::AccountHashing => {
tx.clear::<tables::HashedAccount>()?;
tx.put::<tables::SyncStage>(
StageId::AccountHashing.to_string(),
Default::default(),
)?;
}
StageEnum::StorageHashing => {
tx.clear::<tables::HashedStorage>()?;
tx.put::<tables::SyncStage>(
StageId::StorageHashing.to_string(),
Default::default(),
)?;
}
StageEnum::Hashing => {
// Clear hashed accounts
tx.clear::<tables::HashedAccount>()?;

View File

@ -15,16 +15,17 @@ use reth_primitives::{
stage::{StageCheckpoint, StageId},
ChainSpec,
};
use reth_provider::{ShareableDatabase, Transaction};
use reth_provider::{providers::get_stage_checkpoint, ShareableDatabase, Transaction};
use reth_staged_sync::utils::init::init_db;
use reth_stages::{
stages::{
BodyStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, TransactionLookupStage,
AccountHashingStage, BodyStage, ExecutionStage, ExecutionStageThresholds,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
},
ExecInput, ExecOutput, Stage, UnwindInput,
};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use std::{any::Any, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc};
use tracing::*;
/// `reth stage` command
@ -183,9 +184,7 @@ impl Command {
(Box::new(stage), None)
}
StageEnum::Senders => {
(Box::new(SenderRecoveryStage { commit_threshold: batch_size }), None)
}
StageEnum::Senders => (Box::new(SenderRecoveryStage::new(batch_size)), None),
StageEnum::Execution => {
let factory = reth_revm::Factory::new(self.chain.clone());
(
@ -201,6 +200,12 @@ impl Command {
)
}
StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None),
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
}
StageEnum::StorageHashing => {
(Box::new(StorageHashingStage::new(1, batch_size)), None)
}
StageEnum::Merkle => (
Box::new(MerkleStage::default_execution()),
Some(Box::new(MerkleStage::default_unwind())),
@ -209,18 +214,16 @@ impl Command {
StageEnum::StorageHistory => (Box::<IndexStorageHistoryStage>::default(), None),
_ => return Ok(()),
};
if let Some(unwind_stage) = &unwind_stage {
assert!(exec_stage.type_id() == unwind_stage.type_id());
}
let checkpoint = get_stage_checkpoint(tx.deref(), exec_stage.id())?.unwrap_or_default();
let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
let mut input = ExecInput {
previous_stage: Some((
StageId::Other("No Previous Stage"),
StageCheckpoint::new(self.to),
)),
checkpoint: Some(StageCheckpoint::new(self.from)),
};
let mut unwind = UnwindInput {
checkpoint: StageCheckpoint::new(self.to),
checkpoint: checkpoint.with_block_number(self.to),
unwind_to: self.from,
bad_block: None,
};
@ -232,6 +235,14 @@ impl Command {
}
}
let mut input = ExecInput {
previous_stage: Some((
StageId::Other("No Previous Stage"),
StageCheckpoint::new(self.to),
)),
checkpoint: Some(checkpoint.with_block_number(self.from)),
};
while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut tx, input).await?
{

View File

@ -28,4 +28,7 @@ pub enum DatabaseError {
/// Failed to decode a key from a table.
#[error("Error decoding value.")]
DecodeError,
/// Failed to get database stats.
#[error("Database stats error code: {0:?}")]
Stats(i32),
}

View File

@ -1,11 +1,14 @@
use crate::{
trie::{hash_builder::HashBuilderState, StoredSubNode},
Address, BlockNumber, TxNumber, H256,
Address, BlockNumber, H256,
};
use bytes::{Buf, BufMut};
use reth_codecs::{derive_arbitrary, main_codec, Compact};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::{
fmt::{Display, Formatter},
ops::RangeInclusive,
};
/// Saves the progress of Merkle stage.
#[derive(Default, Debug, Clone, PartialEq)]
@ -101,26 +104,56 @@ impl Compact for MerkleCheckpoint {
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct AccountHashingCheckpoint {
/// The next account to start hashing from
/// The next account to start hashing from.
pub address: Option<Address>,
/// Start transition id
pub from: u64,
/// Last transition id
pub to: u64,
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in accounts.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of StorageHashing stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct StorageHashingCheckpoint {
/// The next account to start hashing from
/// The next account to start hashing from.
pub address: Option<Address>,
/// The next storage slot to start hashing from
/// The next storage slot to start hashing from.
pub storage: Option<H256>,
/// Start transition id
pub from: u64,
/// Last transition id
pub to: u64,
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in storage slots.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of Execution stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct ExecutionCheckpoint {
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in gas.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of Headers stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct HeadersCheckpoint {
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in gas.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of Index History stages.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct IndexHistoryCheckpoint {
/// Block range which this checkpoint is valid for.
pub block_range: CheckpointBlockRange,
/// Progress measured in changesets.
pub progress: EntitiesCheckpoint,
}
/// Saves the progress of abstract stage iterating over or downloading entities.
@ -130,16 +163,35 @@ pub struct EntitiesCheckpoint {
/// Number of entities already processed.
pub processed: u64,
/// Total entities to be processed.
pub total: Option<u64>,
pub total: u64,
}
impl Display for EntitiesCheckpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(total) = self.total {
write!(f, "{:.1}%", 100.0 * self.processed as f64 / total as f64)
} else {
write!(f, "{}", self.processed)
}
write!(f, "{:.1}%", 100.0 * self.processed as f64 / self.total as f64)
}
}
/// Saves the block range. Usually, it's used to check the validity of some stage checkpoint across
/// multiple executions.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct CheckpointBlockRange {
/// The first block of the range, inclusive.
pub from: BlockNumber,
/// The last block of the range, inclusive.
pub to: BlockNumber,
}
impl From<RangeInclusive<BlockNumber>> for CheckpointBlockRange {
fn from(range: RangeInclusive<BlockNumber>) -> Self {
Self { from: *range.start(), to: *range.end() }
}
}
impl From<&RangeInclusive<BlockNumber>> for CheckpointBlockRange {
fn from(range: &RangeInclusive<BlockNumber>) -> Self {
Self { from: *range.start(), to: *range.end() }
}
}
@ -183,6 +235,36 @@ impl StageCheckpoint {
}
}
/// Returns the execution stage checkpoint, if any.
pub fn execution_stage_checkpoint(&self) -> Option<ExecutionCheckpoint> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::Execution(checkpoint)) => Some(checkpoint),
_ => None,
}
}
/// Returns the headers stage checkpoint, if any.
pub fn headers_stage_checkpoint(&self) -> Option<HeadersCheckpoint> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::Headers(checkpoint)) => Some(checkpoint),
_ => None,
}
}
/// Returns the index history stage checkpoint, if any.
pub fn index_history_stage_checkpoint(&self) -> Option<IndexHistoryCheckpoint> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::IndexHistory(checkpoint)) => Some(checkpoint),
_ => None,
}
}
/// Sets the block number.
pub fn with_block_number(mut self, block_number: BlockNumber) -> Self {
self.block_number = block_number;
self
}
/// Sets the stage checkpoint to account hashing.
pub fn with_account_hashing_stage_checkpoint(
mut self,
@ -206,13 +288,48 @@ impl StageCheckpoint {
self.stage_checkpoint = Some(StageUnitCheckpoint::Entities(checkpoint));
self
}
/// Sets the stage checkpoint to execution.
pub fn with_execution_stage_checkpoint(mut self, checkpoint: ExecutionCheckpoint) -> Self {
self.stage_checkpoint = Some(StageUnitCheckpoint::Execution(checkpoint));
self
}
/// Sets the stage checkpoint to headers.
pub fn with_headers_stage_checkpoint(mut self, checkpoint: HeadersCheckpoint) -> Self {
self.stage_checkpoint = Some(StageUnitCheckpoint::Headers(checkpoint));
self
}
/// Sets the stage checkpoint to index history.
pub fn with_index_history_stage_checkpoint(
mut self,
checkpoint: IndexHistoryCheckpoint,
) -> Self {
self.stage_checkpoint = Some(StageUnitCheckpoint::IndexHistory(checkpoint));
self
}
}
impl Display for StageCheckpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::Entities(stage_checkpoint)) => stage_checkpoint.fmt(f),
_ => write!(f, "{}", self.block_number),
Some(
StageUnitCheckpoint::Account(AccountHashingCheckpoint {
progress: entities, ..
}) |
StageUnitCheckpoint::Storage(StorageHashingCheckpoint {
progress: entities, ..
}) |
StageUnitCheckpoint::Entities(entities) |
StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress: entities, .. }) |
StageUnitCheckpoint::Headers(HeadersCheckpoint { progress: entities, .. }) |
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint {
progress: entities,
..
}),
) => entities.fmt(f),
None => write!(f, "{}", self.block_number),
}
}
}
@ -223,14 +340,18 @@ impl Display for StageCheckpoint {
#[derive_arbitrary(compact)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
pub enum StageUnitCheckpoint {
/// Saves the progress of transaction-indexed stages.
Transaction(TxNumber),
/// Saves the progress of AccountHashing stage.
Account(AccountHashingCheckpoint),
/// Saves the progress of StorageHashing stage.
Storage(StorageHashingCheckpoint),
/// Saves the progress of abstract stage iterating over or downloading entities.
Entities(EntitiesCheckpoint),
/// Saves the progress of Execution stage.
Execution(ExecutionCheckpoint),
/// Saves the progress of Headers stage.
Headers(HeadersCheckpoint),
/// Saves the progress of Index History stage.
IndexHistory(IndexHistoryCheckpoint),
}
impl Compact for StageUnitCheckpoint {
@ -239,22 +360,30 @@ impl Compact for StageUnitCheckpoint {
B: BufMut + AsMut<[u8]>,
{
match self {
StageUnitCheckpoint::Transaction(data) => {
StageUnitCheckpoint::Account(data) => {
buf.put_u8(0);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::Account(data) => {
StageUnitCheckpoint::Storage(data) => {
buf.put_u8(1);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::Storage(data) => {
StageUnitCheckpoint::Entities(data) => {
buf.put_u8(2);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::Entities(data) => {
StageUnitCheckpoint::Execution(data) => {
buf.put_u8(3);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::Headers(data) => {
buf.put_u8(4);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::IndexHistory(data) => {
buf.put_u8(5);
1 + data.to_compact(buf)
}
}
}
@ -264,21 +393,29 @@ impl Compact for StageUnitCheckpoint {
{
match buf[0] {
0 => {
let (data, buf) = TxNumber::from_compact(&buf[1..], buf.len() - 1);
(Self::Transaction(data), buf)
}
1 => {
let (data, buf) = AccountHashingCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Account(data), buf)
}
2 => {
1 => {
let (data, buf) = StorageHashingCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Storage(data), buf)
}
3 => {
2 => {
let (data, buf) = EntitiesCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Entities(data), buf)
}
3 => {
let (data, buf) = ExecutionCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Execution(data), buf)
}
4 => {
let (data, buf) = HeadersCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Headers(data), buf)
}
5 => {
let (data, buf) = IndexHistoryCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::IndexHistory(data), buf)
}
_ => unreachable!("Junk data in database: unknown StageUnitCheckpoint variant"),
}
}
@ -314,17 +451,40 @@ mod tests {
fn stage_unit_checkpoint_roundtrip() {
let mut rng = rand::thread_rng();
let checkpoints = vec![
StageUnitCheckpoint::Transaction(rng.gen()),
StageUnitCheckpoint::Account(AccountHashingCheckpoint {
address: Some(Address::from_low_u64_be(rng.gen())),
from: rng.gen(),
to: rng.gen(),
block_range: CheckpointBlockRange { from: rng.gen(), to: rng.gen() },
progress: EntitiesCheckpoint {
processed: rng.gen::<u32>() as u64,
total: u32::MAX as u64 + rng.gen::<u64>(),
},
}),
StageUnitCheckpoint::Storage(StorageHashingCheckpoint {
address: Some(Address::from_low_u64_be(rng.gen())),
storage: Some(H256::from_low_u64_be(rng.gen())),
from: rng.gen(),
to: rng.gen(),
block_range: CheckpointBlockRange { from: rng.gen(), to: rng.gen() },
progress: EntitiesCheckpoint {
processed: rng.gen::<u32>() as u64,
total: u32::MAX as u64 + rng.gen::<u64>(),
},
}),
StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed: rng.gen::<u32>() as u64,
total: u32::MAX as u64 + rng.gen::<u64>(),
}),
StageUnitCheckpoint::Execution(ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: rng.gen(), to: rng.gen() },
progress: EntitiesCheckpoint {
processed: rng.gen::<u32>() as u64,
total: u32::MAX as u64 + rng.gen::<u64>(),
},
}),
StageUnitCheckpoint::Headers(HeadersCheckpoint {
block_range: CheckpointBlockRange { from: rng.gen(), to: rng.gen() },
progress: EntitiesCheckpoint {
processed: rng.gen::<u32>() as u64,
total: u32::MAX as u64 + rng.gen::<u64>(),
},
}),
];

View File

@ -5,6 +5,7 @@ pub use id::StageId;
mod checkpoints;
pub use checkpoints::{
AccountHashingCheckpoint, EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint,
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint,
HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint,
StageUnitCheckpoint, StorageHashingCheckpoint,
};

View File

@ -253,30 +253,29 @@ where
let span = info_span!("Unwinding", stage = %stage_id);
let _enter = span.enter();
let mut stage_progress = tx.get_stage_checkpoint(stage_id)?.unwrap_or_default();
if stage_progress.block_number < to {
debug!(target: "sync::pipeline", from = %stage_progress, %to, "Unwind point too far for stage");
let mut checkpoint = tx.get_stage_checkpoint(stage_id)?.unwrap_or_default();
if checkpoint.block_number < to {
debug!(target: "sync::pipeline", from = %checkpoint, %to, "Unwind point too far for stage");
self.listeners.notify(PipelineEvent::Skipped { stage_id });
continue
}
debug!(target: "sync::pipeline", from = %stage_progress, %to, ?bad_block, "Starting unwind");
while stage_progress.block_number > to {
let input = UnwindInput { checkpoint: stage_progress, unwind_to: to, bad_block };
debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind");
while checkpoint.block_number > to {
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
let output = stage.unwind(&mut tx, input).await;
match output {
Ok(unwind_output) => {
stage_progress = unwind_output.checkpoint;
checkpoint = unwind_output.checkpoint;
self.metrics.stage_checkpoint(
stage_id,
stage_progress,
stage_id, checkpoint,
// We assume it was set in the previous execute iteration, so it
// doesn't change when we unwind.
None,
);
tx.save_stage_checkpoint(stage_id, stage_progress)?;
tx.save_stage_checkpoint(stage_id, checkpoint)?;
self.listeners
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
@ -304,6 +303,7 @@ where
let stage = &mut self.stages[stage_index];
let stage_id = stage.id();
let mut made_progress = false;
loop {
let mut tx = Transaction::new(&self.db)?;

View File

@ -3,7 +3,11 @@ use reth_metrics::{
Metrics,
};
use reth_primitives::{
stage::{EntitiesCheckpoint, StageCheckpoint, StageId, StageUnitCheckpoint},
stage::{
AccountHashingCheckpoint, EntitiesCheckpoint, ExecutionCheckpoint, HeadersCheckpoint,
IndexHistoryCheckpoint, StageCheckpoint, StageId, StageUnitCheckpoint,
StorageHashingCheckpoint,
},
BlockNumber,
};
use std::collections::HashMap;
@ -39,13 +43,19 @@ impl Metrics {
stage_metrics.checkpoint.set(checkpoint.block_number as f64);
let (processed, total) = match checkpoint.stage_checkpoint {
Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { processed, total })) => {
(processed, total)
}
_ => (checkpoint.block_number, max_block_number),
Some(
StageUnitCheckpoint::Account(AccountHashingCheckpoint { progress, .. }) |
StageUnitCheckpoint::Storage(StorageHashingCheckpoint { progress, .. }) |
StageUnitCheckpoint::Entities(progress @ EntitiesCheckpoint { .. }) |
StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress, .. }) |
StageUnitCheckpoint::Headers(HeadersCheckpoint { progress, .. }) |
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint { progress, .. }),
) => (progress.processed, Some(progress.total)),
None => (checkpoint.block_number, max_block_number),
};
stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}

View File

@ -14,14 +14,14 @@ use std::{
/// Stage execution input, see [Stage::execute].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct ExecInput {
/// The stage that was run before the current stage and the progress it reached.
/// The stage that was run before the current stage and the checkpoint it reached.
pub previous_stage: Option<(StageId, StageCheckpoint)>,
/// The progress of this stage the last time it was executed.
/// The checkpoint of this stage the last time it was executed.
pub checkpoint: Option<StageCheckpoint>,
}
impl ExecInput {
/// Return the progress of the stage or default.
/// Return the checkpoint of the stage or default.
pub fn checkpoint(&self) -> StageCheckpoint {
self.checkpoint.unwrap_or_default()
}
@ -63,7 +63,7 @@ impl ExecInput {
/// Stage unwind input, see [Stage::unwind].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct UnwindInput {
/// The current highest progress of the stage.
/// The current highest checkpoint of the stage.
pub checkpoint: StageCheckpoint,
/// The block to unwind to.
pub unwind_to: BlockNumber,
@ -114,7 +114,7 @@ impl ExecOutput {
/// The output of a stage unwinding.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct UnwindOutput {
/// The block at which the stage has unwound to.
/// The checkpoint at which the stage has unwound to.
pub checkpoint: StageCheckpoint,
}

View File

@ -6,19 +6,22 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::db::DatabaseError;
use reth_metrics::{
metrics::{self, Gauge},
Metrics,
};
use reth_primitives::{
constants::MGAS_TO_GAS,
stage::{StageCheckpoint, StageId},
Block, BlockNumber, BlockWithSenders, TransactionSigned, U256,
stage::{
CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, StageCheckpoint, StageId,
},
Block, BlockNumber, BlockWithSenders, Header, TransactionSigned, U256,
};
use reth_provider::{
post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction,
};
use std::time::Instant;
use std::{ops::RangeInclusive, time::Instant};
use tracing::*;
/// Execution stage metrics.
@ -143,6 +146,8 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// Progress tracking
let mut stage_progress = start_block;
let mut stage_checkpoint =
execution_checkpoint(tx, start_block, max_block, input.checkpoint())?;
// Execute block range
let mut state = PostState::default();
@ -169,6 +174,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// Merge state changes
state.extend(block_state);
stage_progress = block_number;
stage_checkpoint.progress.processed += block.gas_used;
// Write history periodically to free up memory
if self.thresholds.should_write_history(state.changeset_size_hint() as u64) {
@ -193,10 +199,99 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let is_final_range = stage_progress == max_block;
info!(target: "sync::stages::execution", stage_progress, is_final_range, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress), done: is_final_range })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(stage_progress)
.with_execution_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
}
}
fn execution_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
start_block: BlockNumber,
max_block: BlockNumber,
checkpoint: StageCheckpoint,
) -> Result<ExecutionCheckpoint, DatabaseError> {
Ok(match checkpoint.execution_stage_checkpoint() {
// If checkpoint block range fully matches our range,
// we take the previously used stage checkpoint as-is.
Some(stage_checkpoint @ ExecutionCheckpoint { block_range, .. })
if block_range == CheckpointBlockRange::from(start_block..=max_block) =>
{
stage_checkpoint
}
// If checkpoint block range precedes our range seamlessly, we take the previously used
// stage checkpoint and add the amount of gas from our range to the checkpoint total.
Some(ExecutionCheckpoint {
block_range: CheckpointBlockRange { to, .. },
progress: EntitiesCheckpoint { processed, total },
}) if to == start_block - 1 => ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: start_block, to: max_block },
progress: EntitiesCheckpoint {
processed,
total: total + calculate_gas_used_from_headers(tx, start_block..=max_block)?,
},
},
// If checkpoint block range ends on the same block as our range, we take the previously
// used stage checkpoint.
Some(ExecutionCheckpoint { block_range: CheckpointBlockRange { to, .. }, progress })
if to == max_block =>
{
ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: start_block, to: max_block },
progress,
}
}
// If there's any other non-empty checkpoint, we calculate the remaining amount of total gas
// to be processed not including the checkpoint range.
Some(ExecutionCheckpoint { progress: EntitiesCheckpoint { processed, .. }, .. }) => {
let after_checkpoint_block_number =
calculate_gas_used_from_headers(tx, checkpoint.block_number + 1..=max_block)?;
ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: start_block, to: max_block },
progress: EntitiesCheckpoint {
processed,
total: processed + after_checkpoint_block_number,
},
}
}
// Otherwise, we recalculate the whole stage checkpoint including the amount of gas
// already processed, if there's any.
_ => {
let processed = calculate_gas_used_from_headers(tx, 0..=start_block - 1)?;
ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: start_block, to: max_block },
progress: EntitiesCheckpoint {
processed,
total: processed +
calculate_gas_used_from_headers(tx, start_block..=max_block)?,
},
}
}
})
}
fn calculate_gas_used_from_headers<DB: Database>(
tx: &Transaction<'_, DB>,
range: RangeInclusive<BlockNumber>,
) -> Result<u64, DatabaseError> {
let mut gas_total = 0;
let start = Instant::now();
for entry in tx.cursor_read::<tables::Headers>()?.walk_range(range.clone())? {
let (_, Header { gas_used, .. }) = entry?;
gas_total += gas_used;
}
let duration = start.elapsed();
trace!(target: "sync::stages::execution", ?range, ?duration, "Time elapsed in calculate_gas_used_from_headers");
Ok(gas_total)
}
/// The size of the stack used by the executor.
///
/// Ensure the size is aligned to 8 as this is usually more efficient.
@ -290,14 +385,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
}
// Discard unwinded changesets
let mut rev_acc_changeset_walker = account_changeset.walk_back(None)?;
while let Some((block_num, _)) = rev_acc_changeset_walker.next().transpose()? {
if block_num <= unwind_to {
break
}
// delete all changesets
rev_acc_changeset_walker.delete_current()?;
}
tx.unwind_table_by_num::<tables::AccountChangeSet>(unwind_to)?;
let mut rev_storage_changeset_walker = storage_changeset.walk_back(None)?;
while let Some((key, _)) = rev_storage_changeset_walker.next().transpose()? {
@ -311,13 +399,31 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
// Look up the start index for the transaction range
let first_tx_num = tx.block_body_indices(*range.start())?.first_tx_num();
let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint();
// Unwind all receipts for transactions in the block range
tx.unwind_table_by_num::<tables::Receipts>(first_tx_num)?;
// `unwind_table_by_num` doesn't unwind the provided key, so we need to unwind it manually
tx.delete::<tables::Receipts>(first_tx_num, None)?;
let mut cursor = tx.cursor_write::<tables::Receipts>()?;
let mut reverse_walker = cursor.walk_back(None)?;
while let Some(Ok((tx_number, receipt))) = reverse_walker.next() {
if tx_number < first_tx_num {
break
}
reverse_walker.delete_current()?;
if let Some(stage_checkpoint) = stage_checkpoint.as_mut() {
stage_checkpoint.progress.processed -= receipt.cumulative_gas_used;
}
}
let checkpoint = if let Some(stage_checkpoint) = stage_checkpoint {
StageCheckpoint::new(unwind_to).with_execution_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_to)
};
info!(target: "sync::stages::execution", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
Ok(UnwindOutput { checkpoint })
}
}
@ -371,13 +477,14 @@ impl ExecutionStageThresholds {
mod tests {
use super::*;
use crate::test_utils::{TestTransaction, PREV_STAGE_ID};
use assert_matches::assert_matches;
use reth_db::{
mdbx::{test_utils::create_test_db, EnvKind, WriteMap},
models::AccountBeforeTx,
};
use reth_primitives::{
hex_literal::hex, keccak256, Account, Bytecode, ChainSpecBuilder, SealedBlock,
StorageEntry, H160, H256, U256,
hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode,
ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, U256,
};
use reth_provider::insert_canonical_block;
use reth_revm::Factory;
@ -400,6 +507,124 @@ mod tests {
)
}
#[test]
fn execution_checkpoint_matches() {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let tx = Transaction::new(state_db.as_ref()).unwrap();
let previous_stage_checkpoint = ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: 0, to: 0 },
progress: EntitiesCheckpoint { processed: 1, total: 2 },
};
let previous_checkpoint = StageCheckpoint {
block_number: 0,
stage_checkpoint: Some(StageUnitCheckpoint::Execution(previous_stage_checkpoint)),
};
let stage_checkpoint = execution_checkpoint(
&tx,
previous_stage_checkpoint.block_range.from,
previous_stage_checkpoint.block_range.to,
previous_checkpoint,
);
assert_eq!(stage_checkpoint, Ok(previous_stage_checkpoint));
}
#[test]
fn execution_checkpoint_precedes() {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut tx = Transaction::new(state_db.as_ref()).unwrap();
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(tx.deref_mut(), genesis, None).unwrap();
insert_canonical_block(tx.deref_mut(), block.clone(), None).unwrap();
tx.commit().unwrap();
let previous_stage_checkpoint = ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: 0, to: 0 },
progress: EntitiesCheckpoint { processed: 1, total: 1 },
};
let previous_checkpoint = StageCheckpoint {
block_number: 1,
stage_checkpoint: Some(StageUnitCheckpoint::Execution(previous_stage_checkpoint)),
};
let stage_checkpoint = execution_checkpoint(&tx, 1, 1, previous_checkpoint);
assert_matches!(stage_checkpoint, Ok(ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 1 },
progress: EntitiesCheckpoint {
processed,
total
}
}) if processed == previous_stage_checkpoint.progress.processed &&
total == previous_stage_checkpoint.progress.total + block.gas_used);
}
#[test]
fn execution_checkpoint_recalculate_full_previous_some() {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut tx = Transaction::new(state_db.as_ref()).unwrap();
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(tx.deref_mut(), genesis, None).unwrap();
insert_canonical_block(tx.deref_mut(), block.clone(), None).unwrap();
tx.commit().unwrap();
let previous_stage_checkpoint = ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: 0, to: 0 },
progress: EntitiesCheckpoint { processed: 1, total: 1 },
};
let previous_checkpoint = StageCheckpoint {
block_number: 1,
stage_checkpoint: Some(StageUnitCheckpoint::Execution(previous_stage_checkpoint)),
};
let stage_checkpoint = execution_checkpoint(&tx, 1, 1, previous_checkpoint);
assert_matches!(stage_checkpoint, Ok(ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 1 },
progress: EntitiesCheckpoint {
processed,
total
}
}) if processed == previous_stage_checkpoint.progress.processed &&
total == previous_stage_checkpoint.progress.total + block.gas_used);
}
#[test]
fn execution_checkpoint_recalculate_full_previous_none() {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut tx = Transaction::new(state_db.as_ref()).unwrap();
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(tx.deref_mut(), genesis, None).unwrap();
insert_canonical_block(tx.deref_mut(), block.clone(), None).unwrap();
tx.commit().unwrap();
let previous_checkpoint = StageCheckpoint { block_number: 1, stage_checkpoint: None };
let stage_checkpoint = execution_checkpoint(&tx, 1, 1, previous_checkpoint);
assert_matches!(stage_checkpoint, Ok(ExecutionCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 1 },
progress: EntitiesCheckpoint {
processed: 0,
total
}
}) if total == block.gas_used);
}
#[tokio::test]
async fn sanity_execution_of_block() {
// TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332
@ -444,7 +669,22 @@ mod tests {
let mut execution_stage = stage();
let output = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();
assert_eq!(output, ExecOutput { checkpoint: StageCheckpoint::new(1), done: true });
assert_matches!(output, ExecOutput {
checkpoint: StageCheckpoint {
block_number: 1,
stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint {
block_range: CheckpointBlockRange {
from: 1,
to: 1,
},
progress: EntitiesCheckpoint {
processed,
total
}
}))
},
done: true
} if processed == total && total == block.gas_used);
let tx = tx.deref_mut();
// check post state
let account1 = H160(hex!("1000000000000000000000000000000000000000"));
@ -526,19 +766,33 @@ mod tests {
// execute
let mut execution_stage = stage();
let _ = execution_stage.execute(&mut tx, input).await.unwrap();
let result = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();
let mut stage = stage();
let o = stage
let result = stage
.unwind(
&mut tx,
UnwindInput { checkpoint: StageCheckpoint::new(1), unwind_to: 0, bad_block: None },
UnwindInput { checkpoint: result.checkpoint, unwind_to: 0, bad_block: None },
)
.await
.unwrap();
assert_eq!(o, UnwindOutput { checkpoint: StageCheckpoint::new(0) });
assert_matches!(result, UnwindOutput {
checkpoint: StageCheckpoint {
block_number: 0,
stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint {
block_range: CheckpointBlockRange {
from: 1,
to: 1,
},
progress: EntitiesCheckpoint {
processed: 0,
total
}
}))
}
} if total == block.gas_used);
// assert unwind stage
let db_tx = tx.deref();

View File

@ -8,15 +8,19 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
RawKey, RawTable,
};
use reth_interfaces::db::DatabaseError;
use reth_primitives::{
keccak256,
stage::{AccountHashingCheckpoint, StageCheckpoint, StageId},
stage::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint,
StageId,
},
};
use reth_provider::Transaction;
use std::{
cmp::max,
fmt::Debug,
ops::{Range, RangeInclusive},
ops::{Deref, Range, RangeInclusive},
};
use tokio::sync::mpsc;
use tracing::*;
@ -32,6 +36,13 @@ pub struct AccountHashingStage {
pub commit_threshold: u64,
}
impl AccountHashingStage {
/// Create new instance of [AccountHashingStage].
pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self {
Self { clean_threshold, commit_threshold }
}
}
impl Default for AccountHashingStage {
fn default() -> Self {
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
@ -140,7 +151,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
.and_then(|checkpoint| checkpoint.account_hashing_stage_checkpoint());
let start_address = match stage_checkpoint {
Some(AccountHashingCheckpoint { address: address @ Some(_), from, to })
Some(AccountHashingCheckpoint { address: address @ Some(_), block_range: CheckpointBlockRange { from, to }, .. })
// Checkpoint is only valid if the range of transitions didn't change.
// An already hashed account may have been changed with the new range,
// and therefore should be hashed again.
@ -219,30 +230,36 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
let checkpoint = input.checkpoint().with_account_hashing_stage_checkpoint(
AccountHashingCheckpoint {
address: Some(next_address.key().unwrap()),
from: from_block,
to: to_block,
block_range: CheckpointBlockRange { from: from_block, to: to_block },
progress: stage_checkpoint_progress(tx)?,
},
);
info!(target: "sync::stages::hashing_account", stage_progress = %checkpoint, is_final_range = false, "Stage iteration finished");
info!(target: "sync::stages::hashing_account", checkpoint = %checkpoint, is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint, done: false })
}
} else {
// Aggregate all transition changesets and and make list of account that have been
// Aggregate all transition changesets and make a list of accounts that have been
// changed.
let lists = tx.get_addresses_of_changed_accounts(from_block..=to_block)?;
// iterate over plain state and get newest value.
// Iterate over plain state and get newest value.
// Assumption we are okay to make is that plainstate represent
// `previous_stage_progress` state.
let accounts = tx.get_plainstate_accounts(lists.into_iter())?;
// insert and hash accounts to hashing table
let accounts = tx.get_plainstate_accounts(lists)?;
// Insert and hash accounts to hashing table
tx.insert_account_for_hashing(accounts.into_iter())?;
}
// We finished the hashing stage, no future iterations is expected for the same block range,
// so no checkpoint is needed.
let checkpoint = input.previous_stage_checkpoint();
let checkpoint = input.previous_stage_checkpoint().with_account_hashing_stage_checkpoint(
AccountHashingCheckpoint {
progress: stage_checkpoint_progress(tx)?,
..Default::default()
},
);
info!(target: "sync::stages::hashing_account", stage_progress = %checkpoint, is_final_range = true, "Stage iteration finished");
info!(target: "sync::stages::hashing_account", checkpoint = %checkpoint, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint, done: true })
}
@ -255,14 +272,31 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
let (range, unwind_progress, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Aggregate all transition changesets and and make list of account that have been changed.
// Aggregate all transition changesets and make a list of accounts that have been changed.
tx.unwind_account_hashing(range)?;
info!(target: "sync::stages::hashing_account", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
let mut stage_checkpoint =
input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
stage_checkpoint.progress = stage_checkpoint_progress(tx)?;
info!(target: "sync::stages::hashing_account", to_block = input.unwind_to, %unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_progress)
.with_account_hashing_stage_checkpoint(stage_checkpoint),
})
}
}
fn stage_checkpoint_progress<DB: Database>(
tx: &Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
Ok(EntitiesCheckpoint {
processed: tx.deref().entries::<tables::HashedAccount>()? as u64,
total: tx.deref().entries::<tables::PlainAccountState>()? as u64,
})
}
#[cfg(test)]
mod tests {
use super::*;
@ -293,7 +327,24 @@ mod tests {
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput {checkpoint: StageCheckpoint { block_number, ..}, done: true}) if block_number == previous_stage);
assert_matches!(
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
progress: EntitiesCheckpoint {
processed,
total,
},
..
})),
},
done: true,
}) if block_number == previous_stage &&
processed == total &&
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
@ -337,11 +388,19 @@ mod tests {
checkpoint: StageCheckpoint {
block_number: 10,
stage_checkpoint: Some(StageUnitCheckpoint::Account(
AccountHashingCheckpoint { address: Some(address), from: 11, to: 20 }
AccountHashingCheckpoint {
address: Some(address),
block_range: CheckpointBlockRange {
from: 11,
to: 20,
},
progress: EntitiesCheckpoint { processed: 5, total }
}
))
},
done: false
}) if address == fifth_address
}) if address == fifth_address &&
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 5);
@ -353,9 +412,22 @@ mod tests {
assert_matches!(
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint { block_number: 20, stage_checkpoint: None },
checkpoint: StageCheckpoint {
block_number: 20,
stage_checkpoint: Some(StageUnitCheckpoint::Account(
AccountHashingCheckpoint {
address: None,
block_range: CheckpointBlockRange {
from: 0,
to: 0,
},
progress: EntitiesCheckpoint { processed, total }
}
))
},
done: true
})
}) if processed == total &&
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 10);

View File

@ -7,13 +7,17 @@ use reth_db::{
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::db::DatabaseError;
use reth_primitives::{
keccak256,
stage::{StageCheckpoint, StageId, StorageHashingCheckpoint},
stage::{
CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint, StageId,
StorageHashingCheckpoint,
},
StorageEntry,
};
use reth_provider::Transaction;
use std::{collections::BTreeMap, fmt::Debug};
use std::{collections::BTreeMap, fmt::Debug, ops::Deref};
use tracing::*;
/// Storage hashing stage hashes plain storage.
@ -27,6 +31,13 @@ pub struct StorageHashingStage {
pub commit_threshold: u64,
}
impl StorageHashingStage {
/// Create new instance of [StorageHashingStage].
pub fn new(clean_threshold: u64, commit_threshold: u64) -> Self {
Self { clean_threshold, commit_threshold }
}
}
impl Default for StorageHashingStage {
fn default() -> Self {
Self { clean_threshold: 500_000, commit_threshold: 100_000 }
@ -65,9 +76,9 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
Some(StorageHashingCheckpoint {
address: address @ Some(_),
storage,
from,
to,
})
block_range: CheckpointBlockRange { from, to },
..
})
// Checkpoint is only valid if the range of transitions didn't change.
// An already hashed storage may have been changed with the new range,
// and therefore should be hashed again.
@ -149,11 +160,12 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
StorageHashingCheckpoint {
address: current_key,
storage: current_subkey,
from: from_block,
to: to_block,
block_range: CheckpointBlockRange { from: from_block, to: to_block },
progress: stage_checkpoint_progress(tx)?,
},
);
info!(target: "sync::stages::hashing_storage", stage_progress = %checkpoint, is_final_range = false, "Stage iteration finished");
info!(target: "sync::stages::hashing_storage", checkpoint = %checkpoint, is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint, done: false })
}
} else {
@ -163,15 +175,20 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
// iterate over plain state and get newest storage value.
// Assumption we are okay with is that plain state represent
// `previous_stage_progress` state.
let storages = tx.get_plainstate_storages(lists.into_iter())?;
let storages = tx.get_plainstate_storages(lists)?;
tx.insert_storage_for_hashing(storages.into_iter())?;
}
// We finished the hashing stage, no future iterations is expected for the same block range,
// so no checkpoint is needed.
let checkpoint = input.previous_stage_checkpoint();
let checkpoint = input.previous_stage_checkpoint().with_storage_hashing_stage_checkpoint(
StorageHashingCheckpoint {
progress: stage_checkpoint_progress(tx)?,
..Default::default()
},
);
info!(target: "sync::stages::hashing_storage", stage_progress = %checkpoint, is_final_range = true, "Stage iteration finished");
info!(target: "sync::stages::hashing_storage", checkpoint = %checkpoint, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint, done: true })
}
@ -186,11 +203,28 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.unwind_storage_hashing(BlockNumberAddress::range(range))?;
info!(target: "sync::stages::hashing_storage", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
let mut stage_checkpoint =
input.checkpoint.storage_hashing_stage_checkpoint().unwrap_or_default();
stage_checkpoint.progress = stage_checkpoint_progress(tx)?;
info!(target: "sync::stages::hashing_storage", to_block = input.unwind_to, %unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_progress)
.with_storage_hashing_stage_checkpoint(stage_checkpoint),
})
}
}
fn stage_checkpoint_progress<DB: Database>(
tx: &Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
Ok(EntitiesCheckpoint {
processed: tx.deref().entries::<tables::HashedStorage>()? as u64,
total: tx.deref().entries::<tables::PlainStorageState>()? as u64,
})
}
#[cfg(test)]
mod tests {
use super::*;
@ -236,13 +270,36 @@ mod tests {
runner.seed_execution(input).expect("failed to seed execution");
loop {
if let Ok(result) = runner.execute(input).await.unwrap() {
if !result.done {
if let Ok(result @ ExecOutput { checkpoint, done }) =
runner.execute(input).await.unwrap()
{
if !done {
let previous_checkpoint = input
.checkpoint
.and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
.unwrap_or_default();
assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
progress: EntitiesCheckpoint {
processed,
total,
},
..
}) if processed == previous_checkpoint.progress.processed + 1 &&
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64);
// Continue from checkpoint
input.checkpoint = Some(result.checkpoint);
input.checkpoint = Some(checkpoint);
continue
} else {
assert!(result.checkpoint.block_number == previous_stage);
assert!(checkpoint.block_number == previous_stage);
assert_matches!(checkpoint.storage_hashing_stage_checkpoint(), Some(StorageHashingCheckpoint {
progress: EntitiesCheckpoint {
processed,
total,
},
..
}) if processed == total &&
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64);
// Validate the stage execution
assert!(
@ -297,18 +354,26 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Storage(StorageHashingCheckpoint {
address: Some(address),
storage: Some(storage),
from: 101,
to: 500
block_range: CheckpointBlockRange {
from: 101,
to: 500,
},
progress: EntitiesCheckpoint {
processed: 500,
total
}
}))
},
done: false
}) if address == progress_address && storage == progress_key
}) if address == progress_address && storage == progress_key &&
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 500);
// second run with commit threshold of 2 to check if subkey is set.
runner.set_commit_threshold(2);
input.checkpoint = Some(result.unwrap().checkpoint);
let result = result.unwrap();
input.checkpoint = Some(result.checkpoint);
let rx = runner.execute(input);
let result = rx.await.unwrap();
@ -334,13 +399,20 @@ mod tests {
StorageHashingCheckpoint {
address: Some(address),
storage: Some(storage),
from: 101,
to: 500,
block_range: CheckpointBlockRange {
from: 101,
to: 500,
},
progress: EntitiesCheckpoint {
processed: 502,
total
}
}
))
},
done: false
}) if address == progress_address && storage == progress_key
}) if address == progress_address && storage == progress_key &&
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 502);
@ -353,9 +425,26 @@ mod tests {
assert_matches!(
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint { block_number: 500, stage_checkpoint: None },
checkpoint: StageCheckpoint {
block_number: 500,
stage_checkpoint: Some(StageUnitCheckpoint::Storage(
StorageHashingCheckpoint {
address: None,
storage: None,
block_range: CheckpointBlockRange {
from: 0,
to: 0,
},
progress: EntitiesCheckpoint {
processed,
total
}
}
))
},
done: true
})
}) if processed == total &&
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
);
assert_eq!(
runner.tx.table::<tables::HashedStorage>().unwrap().len(),

View File

@ -11,7 +11,9 @@ use reth_interfaces::{
provider::ProviderError,
};
use reth_primitives::{
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
stage::{
CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId,
},
BlockHashOrNumber, BlockNumber, SealedHeader, H256,
};
use reth_provider::Transaction;
@ -249,30 +251,42 @@ where
None
};
let mut stage_checkpoint = current_checkpoint
.entities_stage_checkpoint()
.unwrap_or(EntitiesCheckpoint {
// If for some reason (e.g. due to DB migration) we don't have `processed`
// in the middle of headers sync, set it to the local head block number +
// number of block already filled in the gap.
processed: local_head +
(target_block_number.unwrap_or_default() - tip_block_number.unwrap_or_default()),
// Shouldn't fail because on the first iteration, we download the header for missing
// tip, and use its block number.
total: target_block_number.or_else(|| {
warn!(target: "sync::stages::headers", ?tip, "No downloaded header for tip found");
// Safe, because `Display` impl for `EntitiesCheckpoint` will fallback to displaying
// just `processed`
None
}),
});
let mut stage_checkpoint = match current_checkpoint.headers_stage_checkpoint() {
// If checkpoint block range matches our range, we take the previously used
// stage checkpoint as-is.
Some(stage_checkpoint)
if stage_checkpoint.block_range.from == input.checkpoint().block_number =>
{
stage_checkpoint
}
// Otherwise, we're on the first iteration of new gap sync, so we recalculate the number
// of already processed and total headers.
// `target_block_number` is guaranteed to be `Some`, because on the first iteration
// we download the header for missing tip and use its block number.
_ => {
HeadersCheckpoint {
block_range: CheckpointBlockRange {
from: input.checkpoint().block_number,
to: target_block_number.expect("No downloaded header for tip found"),
},
progress: EntitiesCheckpoint {
// Set processed to the local head block number + number
// of block already filled in the gap.
processed: local_head +
(target_block_number.unwrap_or_default() -
tip_block_number.unwrap_or_default()),
total: target_block_number.expect("No downloaded header for tip found"),
},
}
}
};
// Total headers can be updated if we received new tip from the network, and need to fill
// the local gap.
if let Some(target_block_number) = target_block_number {
stage_checkpoint.total = Some(target_block_number);
stage_checkpoint.progress.total = target_block_number;
}
stage_checkpoint.processed += downloaded_headers.len() as u64;
stage_checkpoint.progress.processed += downloaded_headers.len() as u64;
// Write the headers to db
self.write_headers::<DB>(tx, downloaded_headers)?.unwrap_or_default();
@ -286,12 +300,12 @@ where
);
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(checkpoint)
.with_entities_stage_checkpoint(stage_checkpoint),
.with_headers_stage_checkpoint(stage_checkpoint),
done: true,
})
} else {
Ok(ExecOutput {
checkpoint: current_checkpoint.with_entities_stage_checkpoint(stage_checkpoint),
checkpoint: current_checkpoint.with_headers_stage_checkpoint(stage_checkpoint),
done: false,
})
}
@ -311,14 +325,20 @@ where
let unwound_headers = tx.unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
let stage_checkpoint =
input.checkpoint.entities_stage_checkpoint().map(|checkpoint| EntitiesCheckpoint {
processed: checkpoint.processed.saturating_sub(unwound_headers as u64),
total: None,
input.checkpoint.headers_stage_checkpoint().map(|stage_checkpoint| HeadersCheckpoint {
block_range: stage_checkpoint.block_range,
progress: EntitiesCheckpoint {
processed: stage_checkpoint
.progress
.processed
.saturating_sub(unwound_headers as u64),
total: stage_checkpoint.progress.total,
},
});
let mut checkpoint = StageCheckpoint::new(input.unwind_to);
if let Some(stage_checkpoint) = stage_checkpoint {
checkpoint = checkpoint.with_entities_stage_checkpoint(stage_checkpoint);
checkpoint = checkpoint.with_headers_stage_checkpoint(stage_checkpoint);
}
info!(target: "sync::stages::headers", to_block = input.unwind_to, checkpoint = input.unwind_to, is_final_range = true, "Unwind iteration finished");
@ -550,14 +570,20 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!( result, Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total: Some(total),
stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint {
block_range: CheckpointBlockRange {
from,
to
},
progress: EntitiesCheckpoint {
processed,
total,
}
}))
}, done: true }) if block_number == tip.number
}, done: true }) if block_number == tip.number &&
from == checkpoint && to == previous_stage &&
// -1 because we don't need to download the local head
&& processed == checkpoint + headers.len() as u64 - 1
&& total == tip.number);
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
@ -638,13 +664,19 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total: Some(total),
stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint {
block_range: CheckpointBlockRange {
from,
to
},
progress: EntitiesCheckpoint {
processed,
total,
}
}))
}, done: false }) if block_number == checkpoint &&
processed == checkpoint + 500 &&
total == tip.number);
from == checkpoint && to == previous_stage &&
processed == checkpoint + 500 && total == tip.number);
runner.client.clear().await;
runner.client.extend(headers.iter().take(101).map(|h| h.clone().unseal()).rev()).await;
@ -655,14 +687,20 @@ mod tests {
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total: Some(total),
stage_checkpoint: Some(StageUnitCheckpoint::Headers(HeadersCheckpoint {
block_range: CheckpointBlockRange {
from,
to
},
progress: EntitiesCheckpoint {
processed,
total,
}
}))
}, done: true }) if block_number == tip.number
}, done: true }) if block_number == tip.number &&
from == checkpoint && to == previous_stage &&
// -1 because we don't need to download the local head
&& processed == checkpoint + headers.len() as u64 - 1
&& total == tip.number);
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
}

View File

@ -1,8 +1,16 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError};
use reth_primitives::{
stage::{
CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId,
},
BlockNumber,
};
use reth_provider::Transaction;
use std::fmt::Debug;
use std::{
fmt::Debug,
ops::{Deref, RangeInclusive},
};
use tracing::*;
/// Stage is indexing history the account changesets generated in
@ -40,12 +48,22 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
return Ok(ExecOutput::done(StageCheckpoint::new(*range.end())))
}
let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?;
let indices = tx.get_account_transition_ids_from_changeset(range.clone())?;
let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::<u64>();
// Insert changeset to history index
tx.insert_account_history_index(indices)?;
stage_checkpoint.progress.processed += changesets;
info!(target: "sync::stages::index_account_history", stage_progress = *range.end(), is_final_range, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
}
/// Unwind the stage.
@ -57,16 +75,70 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let (range, unwind_progress, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
tx.unwind_account_history_indices(range)?;
let changesets = tx.unwind_account_history_indices(range)?;
let checkpoint =
if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() {
stage_checkpoint.progress.processed -= changesets as u64;
StageCheckpoint::new(unwind_progress)
.with_index_history_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_progress)
};
info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
Ok(UnwindOutput { checkpoint })
}
}
// The function proceeds as follows:
// 1. It first checks if the checkpoint has an `IndexHistoryCheckpoint` that matches the given
// block range. If it does, the function returns that checkpoint.
// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates
// a new `IndexHistoryCheckpoint` with the given block range and updates the progress with the
// current progress.
// 3. If none of the above conditions are met, it creates a new `IndexHistoryCheckpoint` with the
// given block range and calculates the progress by counting the number of processed entries in the
// `AccountChangeSet` table within the given block range.
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
checkpoint: StageCheckpoint,
range: &RangeInclusive<BlockNumber>,
) -> Result<IndexHistoryCheckpoint, DatabaseError> {
Ok(match checkpoint.index_history_stage_checkpoint() {
Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. })
if block_range == CheckpointBlockRange::from(range) =>
{
stage_checkpoint
}
Some(IndexHistoryCheckpoint { block_range, progress })
if block_range.to == checkpoint.block_number =>
{
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: progress.processed,
total: tx.deref().entries::<tables::AccountChangeSet>()? as u64,
},
}
}
_ => IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(0..=checkpoint.block_number)?
.count() as u64,
total: tx.deref().entries::<tables::AccountChangeSet>()? as u64,
},
},
})
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use std::collections::BTreeMap;
use super::*;
@ -141,7 +213,21 @@ mod tests {
let mut stage = IndexAccountHistoryStage::default();
let mut tx = tx.inner();
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange {
from: input.checkpoint().block_number + 1,
to: run_to
},
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
tx.commit().unwrap();
}
@ -353,4 +439,54 @@ mod tests {
])
);
}
#[test]
fn stage_checkpoint_recalculation() {
let tx = TestTransaction::default();
tx.commit(|tx| {
tx.put::<tables::AccountChangeSet>(
1,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000001")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
1,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000002")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
2,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000001")),
info: None,
},
)
.unwrap();
tx.put::<tables::AccountChangeSet>(
2,
AccountBeforeTx {
address: H160(hex!("0000000000000000000000000000000000000002")),
info: None,
},
)
.unwrap();
Ok(())
})
.unwrap();
assert_matches!(
stage_checkpoint(&tx.inner(), StageCheckpoint::new(1), &(1..=2)).unwrap(),
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 2 },
progress: EntitiesCheckpoint { processed: 2, total: 4 }
}
);
}
}

View File

@ -1,8 +1,19 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{database::Database, models::BlockNumberAddress};
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_db::{
cursor::DbCursorRO, database::Database, models::BlockNumberAddress, tables, transaction::DbTx,
DatabaseError,
};
use reth_primitives::{
stage::{
CheckpointBlockRange, EntitiesCheckpoint, IndexHistoryCheckpoint, StageCheckpoint, StageId,
},
BlockNumber,
};
use reth_provider::Transaction;
use std::fmt::Debug;
use std::{
fmt::Debug,
ops::{Deref, RangeInclusive},
};
use tracing::*;
/// Stage is indexing history the account changesets generated in
@ -41,11 +52,21 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
return Ok(ExecOutput::done(target))
}
let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?;
let indices = tx.get_storage_transition_ids_from_changeset(range.clone())?;
let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::<u64>();
tx.insert_storage_history_index(indices)?;
stage_checkpoint.progress.processed += changesets;
info!(target: "sync::stages::index_storage_history", stage_progress = *range.end(), done = is_final_range, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: is_final_range })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
}
/// Unwind the stage.
@ -57,16 +78,70 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
let (range, unwind_progress, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
let changesets = tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
let checkpoint =
if let Some(mut stage_checkpoint) = input.checkpoint.index_history_stage_checkpoint() {
stage_checkpoint.progress.processed -= changesets as u64;
StageCheckpoint::new(unwind_progress)
.with_index_history_stage_checkpoint(stage_checkpoint)
} else {
StageCheckpoint::new(unwind_progress)
};
info!(target: "sync::stages::index_storage_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
Ok(UnwindOutput { checkpoint })
}
}
// The function proceeds as follows:
// 1. It first checks if the checkpoint has an `IndexHistoryCheckpoint` that matches the given
// block range. If it does, the function returns that checkpoint.
// 2. If the checkpoint's block range end matches the current checkpoint's block number, it creates
// a new `IndexHistoryCheckpoint` with the given block range and updates the progress with the
// current progress.
// 3. If none of the above conditions are met, it creates a new `IndexHistoryCheckpoint` with the
// given block range and calculates the progress by counting the number of processed entries in the
// `StorageChangeSet` table within the given block range.
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
checkpoint: StageCheckpoint,
range: &RangeInclusive<BlockNumber>,
) -> Result<IndexHistoryCheckpoint, DatabaseError> {
Ok(match checkpoint.index_history_stage_checkpoint() {
Some(stage_checkpoint @ IndexHistoryCheckpoint { block_range, .. })
if block_range == CheckpointBlockRange::from(range) =>
{
stage_checkpoint
}
Some(IndexHistoryCheckpoint { block_range, progress })
if block_range.to == checkpoint.block_number =>
{
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: progress.processed,
total: tx.deref().entries::<tables::StorageChangeSet>()? as u64,
},
}
}
_ => IndexHistoryCheckpoint {
block_range: CheckpointBlockRange::from(range),
progress: EntitiesCheckpoint {
processed: tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(BlockNumberAddress::range(0..=checkpoint.block_number))?
.count() as u64,
total: tx.deref().entries::<tables::StorageChangeSet>()? as u64,
},
},
})
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use std::collections::BTreeMap;
use super::*;
@ -151,7 +226,21 @@ mod tests {
let mut stage = IndexStorageHistoryStage::default();
let mut tx = tx.inner();
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange {
from: input.checkpoint().block_number + 1,
to: run_to
},
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
tx.commit().unwrap();
}
@ -369,4 +458,64 @@ mod tests {
])
);
}
#[test]
fn stage_checkpoint_recalculation() {
let tx = TestTransaction::default();
tx.commit(|tx| {
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000001")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000001"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000001")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000002"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((1, H160(hex!("0000000000000000000000000000000000000002")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000001"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000001")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000001"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000001")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000002"
))),
)
.unwrap();
tx.put::<tables::StorageChangeSet>(
BlockNumberAddress((2, H160(hex!("0000000000000000000000000000000000000002")))),
storage(H256(hex!(
"0000000000000000000000000000000000000000000000000000000000000001"
))),
)
.unwrap();
Ok(())
})
.unwrap();
assert_matches!(
stage_checkpoint(&tx.inner(), StageCheckpoint::new(1), &(1..=2)).unwrap(),
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 2 },
progress: EntitiesCheckpoint { processed: 3, total: 6 }
}
);
}
}

View File

@ -8,13 +8,16 @@ use reth_db::{
use reth_interfaces::consensus;
use reth_primitives::{
hex,
stage::{MerkleCheckpoint, StageCheckpoint, StageId},
stage::{EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint, StageId},
trie::StoredSubNode,
BlockNumber, SealedHeader, H256,
};
use reth_provider::Transaction;
use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress};
use std::{fmt::Debug, ops::DerefMut};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
};
use tracing::*;
/// The merkle hashing stage uses input from
@ -162,11 +165,13 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let mut checkpoint = self.get_execution_checkpoint(tx)?;
let trie_root = if range.is_empty() {
block_root
let (trie_root, entities_checkpoint) = if range.is_empty() {
(block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
} else if to_block - from_block > threshold || from_block == 1 {
// if there are more blocks than threshold it is faster to rebuild the trie
if let Some(checkpoint) = checkpoint.as_ref().filter(|c| c.target_block == to_block) {
let mut entities_checkpoint = if let Some(checkpoint) =
checkpoint.as_ref().filter(|c| c.target_block == to_block)
{
debug!(
target: "sync::stages::merkle::exec",
current = ?current_block,
@ -175,6 +180,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
last_walker_key = ?hex::encode(&checkpoint.last_walker_key),
"Continuing inner merkle checkpoint"
);
input.checkpoint().entities_stage_checkpoint()
} else {
debug!(
target: "sync::stages::merkle::exec",
@ -188,15 +195,23 @@ impl<DB: Database> Stage<DB> for MerkleStage {
self.save_execution_checkpoint(tx, None)?;
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
None
}
.unwrap_or(EntitiesCheckpoint {
processed: 0,
total: (tx.deref().entries::<tables::HashedAccount>()? +
tx.deref().entries::<tables::HashedStorage>()?) as u64,
});
let progress = StateRoot::new(tx.deref_mut())
.with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
.root_with_progress()
.map_err(|e| StageError::Fatal(Box::new(e)))?;
match progress {
StateRootProgress::Progress(state, updates) => {
StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
updates.flush(tx.deref_mut())?;
let checkpoint = MerkleCheckpoint::new(
to_block,
state.last_account_key,
@ -205,11 +220,22 @@ impl<DB: Database> Stage<DB> for MerkleStage {
state.hash_builder.into(),
);
self.save_execution_checkpoint(tx, Some(checkpoint))?;
return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })
entities_checkpoint.processed += hashed_entries_walked as u64;
return Ok(ExecOutput {
checkpoint: input
.checkpoint()
.with_entities_stage_checkpoint(entities_checkpoint),
done: false,
})
}
StateRootProgress::Complete(root, updates) => {
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
updates.flush(tx.deref_mut())?;
root
entities_checkpoint.processed += hashed_entries_walked as u64;
(root, entities_checkpoint)
}
}
} else {
@ -217,7 +243,20 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let (root, updates) = StateRoot::incremental_root_with_updates(tx.deref_mut(), range)
.map_err(|e| StageError::Fatal(Box::new(e)))?;
updates.flush(tx.deref_mut())?;
root
let total_hashed_entries = (tx.deref().entries::<tables::HashedAccount>()? +
tx.deref().entries::<tables::HashedStorage>()?)
as u64;
let entities_checkpoint = EntitiesCheckpoint {
// This is fine because `range` doesn't have an upper bound, so in this `else`
// branch we're just hashing all remaining accounts and storage slots we have in the
// database.
processed: total_hashed_entries,
total: total_hashed_entries,
};
(root, entities_checkpoint)
};
// Reset the checkpoint
@ -226,7 +265,11 @@ impl<DB: Database> Stage<DB> for MerkleStage {
self.validate_state_root(trie_root, block.seal_slow(), to_block)?;
info!(target: "sync::stages::merkle::exec", stage_progress = to_block, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(to_block), done: true })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(to_block)
.with_entities_stage_checkpoint(entities_checkpoint),
done: true,
})
}
/// Unwind the stage.
@ -241,11 +284,24 @@ impl<DB: Database> Stage<DB> for MerkleStage {
return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
let mut entities_checkpoint =
input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
processed: 0,
total: (tx.deref().entries::<tables::HashedAccount>()? +
tx.deref().entries::<tables::HashedStorage>()?) as u64,
});
if input.unwind_to == 0 {
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
entities_checkpoint.processed = 0;
return Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(input.unwind_to)
.with_entities_stage_checkpoint(entities_checkpoint),
})
}
// Unwind trie only if there are transitions
@ -260,6 +316,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
// Validation passed, apply unwind changes to the database.
updates.flush(tx.deref_mut())?;
// TODO(alexey): update entities checkpoint
} else {
info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
}
@ -285,7 +343,9 @@ mod tests {
use reth_interfaces::test_utils::generators::{
random_block, random_block_range, random_contract_account_range, random_transition_range,
};
use reth_primitives::{keccak256, SealedBlock, StorageEntry, H256, U256};
use reth_primitives::{
keccak256, stage::StageUnitCheckpoint, SealedBlock, StorageEntry, H256, U256,
};
use reth_trie::test_utils::{state_root, state_root_prehashed};
use std::collections::BTreeMap;
@ -312,8 +372,20 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
},
done: true
}) if block_number == previous_stage && processed == total &&
total == (
runner.tx.table::<tables::HashedAccount>().unwrap().len() +
runner.tx.table::<tables::HashedStorage>().unwrap().len()
) as u64
);
// Validate the stage execution
@ -340,8 +412,20 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
},
done: true
}) if block_number == previous_stage && processed == total &&
total == (
runner.tx.table::<tables::HashedAccount>().unwrap().len() +
runner.tx.table::<tables::HashedStorage>().unwrap().len()
) as u64
);
// Validate the stage execution

View File

@ -5,15 +5,15 @@ use reth_db::{
database::Database,
tables,
transaction::{DbTx, DbTxMut},
RawKey, RawTable, RawValue,
DatabaseError, RawKey, RawTable, RawValue,
};
use reth_primitives::{
keccak256,
stage::{StageCheckpoint, StageId},
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
TransactionSignedNoHash, TxNumber, H160,
};
use reth_provider::Transaction;
use std::fmt::Debug;
use std::{fmt::Debug, ops::Deref};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::*;
@ -28,6 +28,13 @@ pub struct SenderRecoveryStage {
pub commit_threshold: u64,
}
impl SenderRecoveryStage {
/// Create new instance of [SenderRecoveryStage].
pub fn new(commit_threshold: u64) -> Self {
Self { commit_threshold }
}
}
impl Default for SenderRecoveryStage {
fn default() -> Self {
Self { commit_threshold: 500_000 }
@ -67,7 +74,8 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
if first_tx_num > last_tx_num {
info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Target transaction already reached");
return Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block),
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
done: is_final_range,
})
}
@ -105,7 +113,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// closure that would recover signer. Used as utility to wrap result
let recover = |entry: Result<
(RawKey<TxNumber>, RawValue<TransactionSignedNoHash>),
reth_db::DatabaseError,
DatabaseError,
>,
rlp_buf: &mut Vec<u8>|
-> Result<(u64, H160), Box<StageError>> {
@ -132,6 +140,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
});
}
// Iterate over channels and append the sender in the order that they are received.
for mut channel in channels {
while let Some(recovered) = channel.recv().await {
@ -141,7 +150,11 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(end_block), done: is_final_range })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
done: is_final_range,
})
}
/// Unwind the stage.
@ -158,10 +171,22 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
tx.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
})
}
}
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
Ok(EntitiesCheckpoint {
processed: tx.deref().entries::<tables::TxSenders>()? as u64,
total: tx.deref().entries::<tables::Transactions>()? as u64,
})
}
// TODO(onbjerg): Should unwind
#[derive(Error, Debug)]
enum SenderRecoveryStageError {
@ -179,7 +204,9 @@ impl From<SenderRecoveryStageError> for StageError {
mod tests {
use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::{random_block, random_block_range};
use reth_primitives::{BlockNumber, SealedBlock, TransactionSigned, H256};
use reth_primitives::{
stage::StageUnitCheckpoint, BlockNumber, SealedBlock, TransactionSigned, H256,
};
use super::*;
use crate::test_utils::{
@ -216,8 +243,13 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed: 1,
total: 1
}))
}, done: true }) if block_number == previous_stage
);
// Validate the stage execution
@ -239,13 +271,22 @@ mod tests {
// Seed only once with full input range
runner.seed_execution(first_input).expect("failed to seed execution");
let total_transactions = runner.tx.table::<tables::Transactions>().unwrap().len() as u64;
// Execute first time
let result = runner.execute(first_input).await.unwrap();
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false })
if block_number == expected_progress
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: false }) if block_number == expected_progress &&
processed == runner.tx.table::<tables::TxSenders>().unwrap().len() as u64 &&
total == total_transactions
);
// Execute second time
@ -256,8 +297,15 @@ mod tests {
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage &&
processed == runner.tx.table::<tables::TxSenders>().unwrap().len() as u64 &&
total == total_transactions
);
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");

View File

@ -6,13 +6,15 @@ use reth_db::{
database::Database,
tables,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_primitives::{
rpc_utils::keccak256,
stage::{StageCheckpoint, StageId},
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
BlockNumber, TransactionSignedNoHash, TxNumber, H256,
};
use reth_provider::Transaction;
use std::ops::Deref;
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::*;
@ -145,7 +147,11 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { done: is_final_range, checkpoint: StageCheckpoint::new(end_block) })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
done: is_final_range,
})
}
/// Unwind the stage.
@ -179,7 +185,10 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) })
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
})
}
}
@ -195,6 +204,15 @@ impl From<TransactionLookupStageError> for StageError {
}
}
fn stage_checkpoint<DB: Database>(
tx: &Transaction<'_, DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
Ok(EntitiesCheckpoint {
processed: tx.deref().entries::<tables::TxHashNumber>()? as u64,
total: tx.deref().entries::<tables::Transactions>()? as u64,
})
}
#[cfg(test)]
mod tests {
use super::*;
@ -204,7 +222,7 @@ mod tests {
};
use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::{random_block, random_block_range};
use reth_primitives::{BlockNumber, SealedBlock, H256};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
// Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
@ -235,8 +253,14 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
Ok(ExecOutput {checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
);
// Validate the stage execution
@ -263,8 +287,15 @@ mod tests {
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false })
if block_number == expected_progress
Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: false }) if block_number == expected_progress &&
processed == runner.tx.table::<tables::TxHashNumber>().unwrap().len() as u64 &&
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
);
// Execute second time
@ -275,8 +306,14 @@ mod tests {
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true })
if block_number == previous_stage
Ok(ExecOutput {checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
);
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
@ -340,7 +377,7 @@ mod tests {
let stage_progress = input.checkpoint().block_number;
let end = input.previous_stage_checkpoint().block_number;
let blocks = random_block_range(stage_progress..=end, H256::zero(), 0..2);
let blocks = random_block_range(stage_progress + 1..=end, H256::zero(), 0..2);
self.tx.insert_blocks(blocks.iter(), None)?;
Ok(blocks)
}

View File

@ -119,7 +119,13 @@ impl<'a> StructHandler<'a> {
assert!(
known_types.contains(&ftype.as_str()) ||
is_flag_type(ftype) ||
self.fields_iterator.peek().is_none(),
self.fields_iterator.peek().map_or(true, |ftypes| {
if let FieldTypes::StructField((_, ftype, _, _)) = ftypes {
!known_types.contains(&ftype.as_str())
} else {
false
}
}),
"`{ftype}` field should be placed as the last one since it's not known.
If it's an alias type (which are not supported by proc_macro), be sure to add it to either `known_types` or `get_bit_size` lists in the derive crate."
);

View File

@ -76,6 +76,10 @@ impl<'a> DbTx<'a> for TxMock {
) -> Result<<Self as DbTxGAT<'_>>::DupCursor<T>, DatabaseError> {
todo!()
}
fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
todo!()
}
}
impl<'a> DbTxMut<'a> for TxMock {

View File

@ -47,6 +47,8 @@ pub trait DbTx<'tx>: for<'a> DbTxGAT<'a> {
fn cursor_dup_read<T: DupSort>(
&self,
) -> Result<<Self as DbTxGAT<'_>>::DupCursor<T>, DatabaseError>;
/// Returns number of entries in the table.
fn entries<T: Table>(&self) -> Result<usize, DatabaseError>;
}
/// Read write transaction that allows writing to database

View File

@ -115,6 +115,15 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> {
) -> Result<<Self as DbTxGAT<'_>>::DupCursor<T>, DatabaseError> {
self.new_cursor()
}
/// Returns number of entries in the table using cheap DB stats invocation.
fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
Ok(self
.inner
.db_stat_with_dbi(self.get_dbi::<T>()?)
.map_err(|e| DatabaseError::Stats(e.into()))?
.entries())
}
}
impl<E: EnvironmentKind> DbTxMut<'_> for Tx<'_, RW, E> {

View File

@ -170,7 +170,7 @@ table!(
table!(
/// (Canonical only) Stores the transaction body for canonical transactions.
( Transactions ) TxNumber | TransactionSignedNoHash
( Transactions ) TxNumber | TransactionSignedNoHash
);
table!(
@ -211,7 +211,7 @@ dupsort!(
table!(
/// Stores pointers to block changeset with changes for each account key.
///
/// Last shard key of the storage will contains `u64::MAX` `BlockNumber`,
/// Last shard key of the storage will contain `u64::MAX` `BlockNumber`,
/// this would allows us small optimization on db access when change is in plain state.
///
/// Imagine having shards as:
@ -233,7 +233,7 @@ table!(
table!(
/// Stores pointers to block number changeset with changes for each storage key.
///
/// Last shard key of the storage will contains `u64::MAX` `BlockNumber`,
/// Last shard key of the storage will contain `u64::MAX` `BlockNumber`,
/// this would allows us small optimization on db access when change is in plain state.
///
/// Imagine having shards as:
@ -293,7 +293,7 @@ dupsort!(
);
table!(
/// Stores the transaction sender for each transaction.
/// Stores the transaction sender for each canonical transaction.
/// It is needed to speed up execution stage and allows fetching signer without doing
/// transaction signed recovery
( TxSenders ) TxNumber | Address

View File

@ -202,10 +202,15 @@ where
/// Retrieves database statistics.
pub fn db_stat<'txn>(&'txn self, db: &Database<'txn>) -> Result<Stat> {
self.db_stat_with_dbi(db.dbi())
}
/// Retrieves database statistics by the given dbi.
pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
unsafe {
let mut stat = Stat::new();
mdbx_result(txn_execute(&self.txn, |txn| {
ffi::mdbx_dbi_stat(txn, db.dbi(), stat.mdb_stat(), size_of::<Stat>())
ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>())
}))?;
Ok(stat)
}

View File

@ -293,14 +293,14 @@ where
self.get_take_block_and_execution_range::<true>(chain_spec, range)
}
/// Unwind and clear account hashing
/// Unwind and clear account hashing.
pub fn unwind_account_hashing(
&self,
range: RangeInclusive<BlockNumber>,
) -> Result<(), TransactionError> {
let mut hashed_accounts = self.cursor_write::<tables::HashedAccount>()?;
// Aggregate all transition changesets and and make list of account that have been changed.
// Aggregate all transition changesets and make a list of accounts that have been changed.
self.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?
@ -329,10 +329,11 @@ where
}
Ok(())
})?;
Ok(())
}
/// Unwind and clear storage hashing
/// Unwind and clear storage hashing.
pub fn unwind_storage_hashing(
&self,
range: Range<BlockNumberAddress>,
@ -375,20 +376,22 @@ where
}
Ok(())
})?;
Ok(())
}
/// Unwind and clear account history indices
/// Unwind and clear account history indices.
///
/// Returns number of changesets walked.
pub fn unwind_account_history_indices(
&self,
range: RangeInclusive<BlockNumber>,
) -> Result<(), TransactionError> {
let mut cursor = self.cursor_write::<tables::AccountHistory>()?;
) -> Result<usize, TransactionError> {
let account_changeset = self
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
let changesets = account_changeset.len();
let last_indices = account_changeset
.into_iter()
@ -400,7 +403,9 @@ where
accounts.insert(account.address, index);
accounts
});
// try to unwind the index
let mut cursor = self.cursor_write::<tables::AccountHistory>()?;
for (address, rem_index) in last_indices {
let shard_part = unwind_account_history_shards::<DB>(&mut cursor, address, rem_index)?;
@ -414,20 +419,23 @@ where
)?;
}
}
Ok(())
Ok(changesets)
}
/// Unwind and clear storage history indices
/// Unwind and clear storage history indices.
///
/// Returns number of changesets walked.
pub fn unwind_storage_history_indices(
&self,
range: Range<BlockNumberAddress>,
) -> Result<(), TransactionError> {
let mut cursor = self.cursor_write::<tables::StorageHistory>()?;
) -> Result<usize, TransactionError> {
let storage_changesets = self
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
let changesets = storage_changesets.len();
let last_indices = storage_changesets
.into_iter()
// reverse so we can get lowest transition id where we need to unwind account.
@ -441,6 +449,8 @@ where
accounts
},
);
let mut cursor = self.cursor_write::<tables::StorageHistory>()?;
for ((address, storage_key), rem_index) in last_indices {
let shard_part =
unwind_storage_history_shards::<DB>(&mut cursor, address, storage_key, rem_index)?;
@ -455,7 +465,8 @@ where
)?;
}
}
Ok(())
Ok(changesets)
}
/// Append blocks and insert its post state.
@ -1178,7 +1189,6 @@ where
storages
},
);
Ok(storage_changeset_lists)
}
@ -1194,7 +1204,7 @@ where
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
let account_transtions = account_changesets
let account_transitions = account_changesets
.into_iter()
// fold all account to one set of changed accounts
.fold(
@ -1204,8 +1214,7 @@ where
accounts
},
);
Ok(account_transtions)
Ok(account_transitions)
}
/// Insert storage change index to database. Used inside StorageHistoryIndex stage

View File

@ -9,10 +9,10 @@ use reth_primitives::{
#[derive(Debug)]
pub enum StateRootProgress {
/// The complete state root computation with updates and computed root.
Complete(H256, TrieUpdates),
Complete(H256, usize, TrieUpdates),
/// The intermediate progress of state root computation.
/// Contains the walker stack, the hash builder and the trie updates.
Progress(Box<IntermediateStateRootState>, TrieUpdates),
Progress(Box<IntermediateStateRootState>, usize, TrieUpdates),
}
/// The intermediate state of the state root computation.

View File

@ -174,7 +174,7 @@ where
/// The intermediate progress of state root computation and the trie updates.
pub fn root_with_updates(self) -> Result<(H256, TrieUpdates), StateRootError> {
match self.with_no_threshold().calculate(true)? {
StateRootProgress::Complete(root, updates) => Ok((root, updates)),
StateRootProgress::Complete(root, _, updates) => Ok((root, updates)),
StateRootProgress::Progress(..) => unreachable!(), // unreachable threshold
}
}
@ -187,7 +187,7 @@ where
/// The state root hash.
pub fn root(self) -> Result<H256, StateRootError> {
match self.calculate(false)? {
StateRootProgress::Complete(root, _) => Ok(root),
StateRootProgress::Complete(root, _, _) => Ok(root),
StateRootProgress::Progress(..) => unreachable!(), // update retenion is disabled
}
}
@ -234,6 +234,7 @@ where
hash_builder.set_updates(retain_updates);
let mut account_rlp = Vec::with_capacity(128);
let mut hashed_entries_walked = 0;
while let Some(key) = last_walker_key.take().or_else(|| walker.key()) {
// Take the last account key to make sure we take it into consideration only once.
@ -260,6 +261,7 @@ where
};
while let Some((hashed_address, account)) = next_account_entry {
hashed_entries_walked += 1;
let account_nibbles = Nibbles::unpack(hashed_address);
if let Some(ref key) = next_key {
@ -286,7 +288,9 @@ where
);
let storage_root = if retain_updates {
let (root, updates) = storage_root_calculator.root_with_updates()?;
let (root, storage_slots_walked, updates) =
storage_root_calculator.root_with_updates()?;
hashed_entries_walked += storage_slots_walked;
trie_updates.extend(updates.into_iter());
root
} else {
@ -317,7 +321,11 @@ where
trie_updates.extend(walker_updates.into_iter());
trie_updates.extend_with_account_updates(hash_builder_updates);
return Ok(StateRootProgress::Progress(Box::new(state), trie_updates))
return Ok(StateRootProgress::Progress(
Box::new(state),
hashed_entries_walked,
trie_updates,
))
}
// Move the next account entry
@ -333,7 +341,7 @@ where
trie_updates.extend(walker_updates.into_iter());
trie_updates.extend_with_account_updates(hash_builder_updates);
Ok(StateRootProgress::Complete(root, trie_updates))
Ok(StateRootProgress::Complete(root, hashed_entries_walked, trie_updates))
}
}
@ -414,7 +422,7 @@ where
/// # Returns
///
/// The storage root and storage trie updates for a given address.
pub fn root_with_updates(&self) -> Result<(H256, TrieUpdates), StorageRootError> {
pub fn root_with_updates(&self) -> Result<(H256, usize, TrieUpdates), StorageRootError> {
self.calculate(true)
}
@ -424,11 +432,14 @@ where
///
/// The storage root.
pub fn root(&self) -> Result<H256, StorageRootError> {
let (root, _) = self.calculate(false)?;
let (root, _, _) = self.calculate(false)?;
Ok(root)
}
fn calculate(&self, retain_updates: bool) -> Result<(H256, TrieUpdates), StorageRootError> {
fn calculate(
&self,
retain_updates: bool,
) -> Result<(H256, usize, TrieUpdates), StorageRootError> {
tracing::debug!(target: "trie::storage_root", hashed_address = ?self.hashed_address, "calculating storage root");
let mut hashed_storage_cursor = self.hashed_cursor_factory.hashed_storage_cursor()?;
@ -442,6 +453,7 @@ where
if hashed_storage_cursor.is_storage_empty(self.hashed_address)? {
return Ok((
EMPTY_ROOT,
0,
TrieUpdates::from([(TrieKey::StorageTrie(self.hashed_address), TrieOp::Delete)]),
))
}
@ -451,6 +463,7 @@ where
let mut hash_builder = HashBuilder::default().with_updates(retain_updates);
let mut storage_slots_walked = 0;
while let Some(key) = walker.key() {
if walker.can_skip_current_node {
hash_builder.add_branch(key, walker.hash().unwrap(), walker.children_are_in_trie());
@ -464,6 +477,8 @@ where
let next_key = walker.advance()?;
let mut storage = hashed_storage_cursor.seek(self.hashed_address, seek_key)?;
while let Some(StorageEntry { key: hashed_key, value }) = storage {
storage_slots_walked += 1;
let storage_key_nibbles = Nibbles::unpack(hashed_key);
if let Some(ref key) = next_key {
if key < &storage_key_nibbles {
@ -486,7 +501,7 @@ where
trie_updates.extend_with_storage_updates(self.hashed_address, hash_builder_updates);
tracing::debug!(target: "trie::storage_root", ?root, hashed_address = ?self.hashed_address, "calculated storage root");
Ok((root, trie_updates))
Ok((root, storage_slots_walked, trie_updates))
}
}
@ -555,7 +570,7 @@ mod tests {
}
// Generate the intermediate nodes on the receiving end of the channel
let (_, trie_updates) =
let (_, _, trie_updates) =
StorageRoot::new_hashed(tx.deref(), hashed_address).root_with_updates().unwrap();
// 1. Some state transition happens, update the hashed storage to the new value
@ -723,6 +738,9 @@ mod tests {
fn arbitrary_state_root_with_progress() {
proptest!(
ProptestConfig::with_cases(10), | (state: State) | {
let hashed_entries_total = state.len() +
state.values().map(|(_, slots)| slots.len()).sum::<usize>();
let db = create_test_rw_db();
let mut tx = Transaction::new(db.as_ref()).unwrap();
@ -734,6 +752,7 @@ mod tests {
let threshold = 10;
let mut got = None;
let mut hashed_entries_walked = 0;
let mut intermediate_state: Option<Box<IntermediateStateRootState>> = None;
while got.is_none() {
@ -741,11 +760,18 @@ mod tests {
.with_threshold(threshold)
.with_intermediate_state(intermediate_state.take().map(|state| *state));
match calculator.root_with_progress().unwrap() {
StateRootProgress::Progress(state, _updates) => intermediate_state = Some(state),
StateRootProgress::Complete(root, _updates) => got = Some(root),
StateRootProgress::Progress(state, walked, _) => {
intermediate_state = Some(state);
hashed_entries_walked += walked;
},
StateRootProgress::Complete(root, walked, _) => {
got = Some(root);
hashed_entries_walked += walked;
},
};
}
assert_eq!(expected, got.unwrap());
assert_eq!(hashed_entries_total, hashed_entries_walked)
}
);
}
@ -1210,7 +1236,7 @@ mod tests {
let (expected_root, expected_updates) =
extension_node_storage_trie(&mut tx, hashed_address);
let (got, updates) =
let (got, _, updates) =
StorageRoot::new_hashed(tx.deref_mut(), hashed_address).root_with_updates().unwrap();
assert_eq!(expected_root, got);

View File

@ -27,7 +27,7 @@
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "9.4.7"
"version": "9.5.2"
},
{
"type": "panel",
@ -128,7 +128,7 @@
},
"gridPos": {
"h": 8,
"w": 5,
"w": 12,
"x": 0,
"y": 0
},
@ -145,7 +145,7 @@
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"pluginVersion": "9.4.7",
"pluginVersion": "9.5.2",
"targets": [
{
"datasource": {
@ -192,8 +192,8 @@
},
"gridPos": {
"h": 8,
"w": 9,
"x": 5,
"w": 12,
"x": 12,
"y": 0
},
"id": 20,
@ -209,9 +209,10 @@
"fields": "",
"values": false
},
"showUnfilled": true
"showUnfilled": true,
"valueMode": "color"
},
"pluginVersion": "9.4.7",
"pluginVersion": "9.5.2",
"targets": [
{
"datasource": {
@ -231,6 +232,97 @@
"transparent": true,
"type": "bargauge"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": true,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"max": 1,
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
},
"unit": "percentunit"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"id": 69,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "reth_sync_entities_processed / reth_sync_entities_total",
"legendFormat": "{{stage}}",
"range": true,
"refId": "A"
}
],
"title": "Sync progress (stage progress in %)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
@ -290,9 +382,9 @@
},
"gridPos": {
"h": 8,
"w": 10,
"x": 14,
"y": 0
"w": 12,
"x": 12,
"y": 8
},
"id": 12,
"options": {
@ -320,7 +412,7 @@
"refId": "A"
}
],
"title": "Sync progress",
"title": "Sync progress (stage progress as highest block number reached)",
"type": "timeseries"
},
{
@ -329,7 +421,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 8
"y": 16
},
"id": 38,
"panels": [],
@ -397,7 +489,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 9
"y": 17
},
"id": 40,
"options": {
@ -457,7 +549,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 9
"y": 17
},
"id": 42,
"maxDataPoints": 25,
@ -501,7 +593,7 @@
"unit": "percentunit"
}
},
"pluginVersion": "9.4.7",
"pluginVersion": "9.5.2",
"targets": [
{
"datasource": {
@ -548,7 +640,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 17
"y": 25
},
"id": 48,
"options": {
@ -657,7 +749,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 17
"y": 25
},
"id": 52,
"options": {
@ -715,7 +807,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 25
"y": 33
},
"id": 50,
"options": {
@ -883,10 +975,11 @@
"h": 8,
"w": 12,
"x": 12,
"y": 25
"y": 33
},
"id": 58,
"options": {
"cellHeight": "sm",
"footer": {
"countRows": false,
"fields": "",
@ -897,7 +990,7 @@
},
"showHeader": true
},
"pluginVersion": "9.4.7",
"pluginVersion": "9.5.2",
"targets": [
{
"datasource": {
@ -923,7 +1016,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 33
"y": 41
},
"id": 46,
"panels": [],
@ -989,7 +1082,7 @@
"h": 8,
"w": 24,
"x": 0,
"y": 34
"y": 42
},
"id": 56,
"options": {
@ -1062,7 +1155,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 42
"y": 50
},
"id": 6,
"panels": [],
@ -1131,7 +1224,7 @@
"h": 8,
"w": 8,
"x": 0,
"y": 43
"y": 51
},
"id": 18,
"options": {
@ -1224,7 +1317,7 @@
"h": 8,
"w": 8,
"x": 8,
"y": 43
"y": 51
},
"id": 16,
"options": {
@ -1342,7 +1435,7 @@
"h": 8,
"w": 8,
"x": 16,
"y": 43
"y": 51
},
"id": 8,
"options": {
@ -1421,7 +1514,7 @@
"h": 8,
"w": 8,
"x": 0,
"y": 51
"y": 59
},
"id": 54,
"options": {
@ -1585,7 +1678,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 59
"y": 67
},
"id": 24,
"panels": [],
@ -1637,8 +1730,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -1678,7 +1770,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 60
"y": 68
},
"id": 26,
"options": {
@ -1792,8 +1884,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -1808,7 +1899,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 60
"y": 68
},
"id": 33,
"options": {
@ -1909,8 +2000,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -1925,7 +2015,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 68
"y": 76
},
"id": 36,
"options": {
@ -1974,7 +2064,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 76
"y": 84
},
"id": 32,
"panels": [],
@ -2027,8 +2117,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2068,7 +2157,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 77
"y": 85
},
"id": 30,
"options": {
@ -2218,8 +2307,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
}
]
}
@ -2230,7 +2318,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 77
"y": 85
},
"id": 28,
"options": {
@ -2331,8 +2419,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2347,7 +2434,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 85
"y": 93
},
"id": 35,
"options": {
@ -2396,7 +2483,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 93
"y": 101
},
"id": 68,
"panels": [
@ -2446,8 +2533,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2462,7 +2548,7 @@
"h": 8,
"w": 11,
"x": 0,
"y": 1
"y": 9
},
"id": 60,
"options": {
@ -2539,8 +2625,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2555,7 +2640,7 @@
"h": 8,
"w": 13,
"x": 11,
"y": 1
"y": 9
},
"id": 62,
"options": {
@ -2632,8 +2717,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2648,7 +2732,7 @@
"h": 7,
"w": 11,
"x": 0,
"y": 9
"y": 17
},
"id": 64,
"options": {
@ -2700,6 +2784,6 @@
"timezone": "",
"title": "reth",
"uid": "2k8BXz24x",
"version": 5,
"version": 6,
"weekStart": ""
}