feat(stages): remove changeset commit threshold in Execution (#3050)

This commit is contained in:
Alexey Shekhirin
2023-06-08 13:34:25 +04:00
committed by GitHub
parent 1075995efc
commit 7cc69d52c6
8 changed files with 6 additions and 62 deletions

View File

@ -174,7 +174,6 @@ impl ImportCommand {
ExecutionStageThresholds { ExecutionStageThresholds {
max_blocks: config.stages.execution.max_blocks, max_blocks: config.stages.execution.max_blocks,
max_changes: config.stages.execution.max_changes, max_changes: config.stages.execution.max_changes,
max_changesets: config.stages.execution.max_changesets,
}, },
)), )),
) )

View File

@ -143,11 +143,7 @@ impl Command {
}) })
.set(ExecutionStage::new( .set(ExecutionStage::new(
factory, factory,
ExecutionStageThresholds { ExecutionStageThresholds { max_blocks: None, max_changes: None },
max_blocks: None,
max_changes: None,
max_changesets: None,
},
)), )),
) )
.build(db); .build(db);

View File

@ -91,11 +91,7 @@ impl Command {
let factory = reth_revm::Factory::new(self.chain.clone()); let factory = reth_revm::Factory::new(self.chain.clone());
let mut execution_stage = ExecutionStage::new( let mut execution_stage = ExecutionStage::new(
factory, factory,
ExecutionStageThresholds { ExecutionStageThresholds { max_blocks: Some(1), max_changes: None },
max_blocks: Some(1),
max_changes: None,
max_changesets: None,
},
); );
let mut account_hashing_stage = AccountHashingStage::default(); let mut account_hashing_stage = AccountHashingStage::default();

View File

@ -691,7 +691,6 @@ impl Command {
ExecutionStageThresholds { ExecutionStageThresholds {
max_blocks: stage_conf.execution.max_blocks, max_blocks: stage_conf.execution.max_blocks,
max_changes: stage_conf.execution.max_changes, max_changes: stage_conf.execution.max_changes,
max_changesets: stage_conf.execution.max_changesets,
}, },
)), )),
) )

View File

@ -71,11 +71,7 @@ async fn unwind_and_copy<DB: Database>(
// Bring Plainstate to TO (hashing stage execution requires it) // Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage = ExecutionStage::new( let mut exec_stage = ExecutionStage::new(
reth_revm::Factory::new(Arc::new(MAINNET.clone())), reth_revm::Factory::new(Arc::new(MAINNET.clone())),
ExecutionStageThresholds { ExecutionStageThresholds { max_blocks: Some(u64::MAX), max_changes: None },
max_blocks: Some(u64::MAX),
max_changes: None,
max_changesets: None,
},
); );
exec_stage exec_stage

View File

@ -190,7 +190,6 @@ impl Command {
ExecutionStageThresholds { ExecutionStageThresholds {
max_blocks: Some(batch_size), max_blocks: Some(batch_size),
max_changes: None, max_changes: None,
max_changesets: None,
}, },
)), )),
None, None,

View File

@ -196,21 +196,11 @@ pub struct ExecutionConfig {
pub max_blocks: Option<u64>, pub max_blocks: Option<u64>,
/// The maximum amount of state changes to keep in memory before the execution stage commits. /// The maximum amount of state changes to keep in memory before the execution stage commits.
pub max_changes: Option<u64>, pub max_changes: Option<u64>,
/// The maximum amount of changesets to keep in memory before they are written to the pending
/// database transaction.
///
/// If this is lower than `max_gas`, then history is periodically flushed to the database
/// transaction, which frees up memory.
pub max_changesets: Option<u64>,
} }
impl Default for ExecutionConfig { impl Default for ExecutionConfig {
fn default() -> Self { fn default() -> Self {
Self { Self { max_blocks: Some(500_000), max_changes: Some(5_000_000) }
max_blocks: Some(500_000),
max_changes: Some(5_000_000),
max_changesets: Some(1_000_000),
}
} }
} }

View File

@ -176,14 +176,6 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
stage_progress = block_number; stage_progress = block_number;
stage_checkpoint.progress.processed += block.gas_used; stage_checkpoint.progress.processed += block.gas_used;
// Write history periodically to free up memory
if self.thresholds.should_write_history(state.changeset_size_hint() as u64) {
info!(target: "sync::stages::execution", ?block_number, "Writing history.");
state.write_history_to_db(&**tx)?;
info!(target: "sync::stages::execution", ?block_number, "Wrote history.");
// gas_since_history_write = 0;
}
// Check if we should commit now // Check if we should commit now
if self.thresholds.is_end_of_batch(block_number - start_block, state.size_hint() as u64) if self.thresholds.is_end_of_batch(block_number - start_block, state.size_hint() as u64)
{ {
@ -431,30 +423,17 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
/// ///
/// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage /// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage
/// commits all pending changes to the database. /// commits all pending changes to the database.
///
/// A third threshold, `max_changesets`, can be set to periodically write changesets to the
/// current database transaction, which frees up memory.
#[derive(Debug)] #[derive(Debug)]
pub struct ExecutionStageThresholds { pub struct ExecutionStageThresholds {
/// The maximum number of blocks to process before the execution stage commits. /// The maximum number of blocks to process before the execution stage commits.
pub max_blocks: Option<u64>, pub max_blocks: Option<u64>,
/// The maximum amount of state changes to keep in memory before the execution stage commits. /// The maximum amount of state changes to keep in memory before the execution stage commits.
pub max_changes: Option<u64>, pub max_changes: Option<u64>,
/// The maximum amount of changesets to keep in memory before they are written to the pending
/// database transaction.
///
/// If this is lower than `max_changes`, then history is periodically flushed to the database
/// transaction, which frees up memory.
pub max_changesets: Option<u64>,
} }
impl Default for ExecutionStageThresholds { impl Default for ExecutionStageThresholds {
fn default() -> Self { fn default() -> Self {
Self { Self { max_blocks: Some(500_000), max_changes: Some(5_000_000) }
max_blocks: Some(500_000),
max_changes: Some(5_000_000),
max_changesets: Some(1_000_000),
}
} }
} }
@ -465,12 +444,6 @@ impl ExecutionStageThresholds {
blocks_processed >= self.max_blocks.unwrap_or(u64::MAX) || blocks_processed >= self.max_blocks.unwrap_or(u64::MAX) ||
changes_processed >= self.max_changes.unwrap_or(u64::MAX) changes_processed >= self.max_changes.unwrap_or(u64::MAX)
} }
/// Check if the history write threshold has been hit.
#[inline]
pub fn should_write_history(&self, history_changes: u64) -> bool {
history_changes >= self.max_changesets.unwrap_or(u64::MAX)
}
} }
#[cfg(test)] #[cfg(test)]
@ -499,11 +472,7 @@ mod tests {
Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())); Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build()));
ExecutionStage::new( ExecutionStage::new(
factory, factory,
ExecutionStageThresholds { ExecutionStageThresholds { max_blocks: Some(100), max_changes: None },
max_blocks: Some(100),
max_changes: None,
max_changesets: None,
},
) )
} }