mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
Revert "refactor(stages): input target reached & output done checks" (#3114)
This commit is contained in:
committed by
GitHub
parent
9a8c680e0f
commit
7ec4b0a5cf
@ -119,22 +119,30 @@ impl Command {
|
|||||||
|
|
||||||
let mut account_hashing_done = false;
|
let mut account_hashing_done = false;
|
||||||
while !account_hashing_done {
|
while !account_hashing_done {
|
||||||
let input = ExecInput {
|
let output = account_hashing_stage
|
||||||
target: Some(block),
|
.execute(
|
||||||
checkpoint: progress.map(StageCheckpoint::new),
|
&mut tx,
|
||||||
};
|
ExecInput {
|
||||||
let output = account_hashing_stage.execute(&mut tx, input).await?;
|
target: Some(block),
|
||||||
account_hashing_done = output.is_done(input);
|
checkpoint: progress.map(StageCheckpoint::new),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
account_hashing_done = output.done;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut storage_hashing_done = false;
|
let mut storage_hashing_done = false;
|
||||||
while !storage_hashing_done {
|
while !storage_hashing_done {
|
||||||
let input = ExecInput {
|
let output = storage_hashing_stage
|
||||||
target: Some(block),
|
.execute(
|
||||||
checkpoint: progress.map(StageCheckpoint::new),
|
&mut tx,
|
||||||
};
|
ExecInput {
|
||||||
let output = storage_hashing_stage.execute(&mut tx, input).await?;
|
target: Some(block),
|
||||||
storage_hashing_done = output.is_done(input);
|
checkpoint: progress.map(StageCheckpoint::new),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
storage_hashing_done = output.done;
|
||||||
}
|
}
|
||||||
|
|
||||||
let incremental_result = merkle_stage
|
let incremental_result = merkle_stage
|
||||||
@ -162,7 +170,7 @@ impl Command {
|
|||||||
loop {
|
loop {
|
||||||
let clean_result = merkle_stage.execute(&mut tx, clean_input).await;
|
let clean_result = merkle_stage.execute(&mut tx, clean_input).await;
|
||||||
assert!(clean_result.is_ok(), "Clean state root calculation failed");
|
assert!(clean_result.is_ok(), "Clean state root calculation failed");
|
||||||
if clean_result.unwrap().is_done(clean_input) {
|
if clean_result.unwrap().done {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -72,8 +72,7 @@ impl NodeState {
|
|||||||
pipeline_position,
|
pipeline_position,
|
||||||
pipeline_total,
|
pipeline_total,
|
||||||
stage_id,
|
stage_id,
|
||||||
result: ExecOutput { checkpoint },
|
result: ExecOutput { checkpoint, done },
|
||||||
done,
|
|
||||||
} => {
|
} => {
|
||||||
self.current_checkpoint = checkpoint;
|
self.current_checkpoint = checkpoint;
|
||||||
|
|
||||||
|
|||||||
@ -76,11 +76,16 @@ async fn dry_run(
|
|||||||
|
|
||||||
let mut exec_output = false;
|
let mut exec_output = false;
|
||||||
while !exec_output {
|
while !exec_output {
|
||||||
let exec_input = reth_stages::ExecInput {
|
exec_output = exec_stage
|
||||||
target: Some(to),
|
.execute(
|
||||||
checkpoint: Some(StageCheckpoint::new(from)),
|
&mut tx,
|
||||||
};
|
reth_stages::ExecInput {
|
||||||
exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input);
|
target: Some(to),
|
||||||
|
checkpoint: Some(StageCheckpoint::new(from)),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.done;
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.drop()?;
|
tx.drop()?;
|
||||||
|
|||||||
@ -73,11 +73,16 @@ async fn dry_run(
|
|||||||
|
|
||||||
let mut exec_output = false;
|
let mut exec_output = false;
|
||||||
while !exec_output {
|
while !exec_output {
|
||||||
let exec_input = reth_stages::ExecInput {
|
exec_output = exec_stage
|
||||||
target: Some(to),
|
.execute(
|
||||||
checkpoint: Some(StageCheckpoint::new(from)),
|
&mut tx,
|
||||||
};
|
reth_stages::ExecInput {
|
||||||
exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input);
|
target: Some(to),
|
||||||
|
checkpoint: Some(StageCheckpoint::new(from)),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.done;
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.drop()?;
|
tx.drop()?;
|
||||||
|
|||||||
@ -116,17 +116,19 @@ async fn dry_run(
|
|||||||
let mut tx = Transaction::new(&output_db)?;
|
let mut tx = Transaction::new(&output_db)?;
|
||||||
let mut exec_output = false;
|
let mut exec_output = false;
|
||||||
while !exec_output {
|
while !exec_output {
|
||||||
let exec_input = reth_stages::ExecInput {
|
|
||||||
target: Some(to),
|
|
||||||
checkpoint: Some(StageCheckpoint::new(from)),
|
|
||||||
};
|
|
||||||
exec_output = MerkleStage::Execution {
|
exec_output = MerkleStage::Execution {
|
||||||
// Forces updating the root instead of calculating from scratch
|
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating from
|
||||||
clean_threshold: u64::MAX,
|
* scratch */
|
||||||
}
|
}
|
||||||
.execute(&mut tx, exec_input)
|
.execute(
|
||||||
|
&mut tx,
|
||||||
|
reth_stages::ExecInput {
|
||||||
|
target: Some(to),
|
||||||
|
checkpoint: Some(StageCheckpoint::new(from)),
|
||||||
|
},
|
||||||
|
)
|
||||||
.await?
|
.await?
|
||||||
.is_done(exec_input);
|
.done;
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.drop()?;
|
tx.drop()?;
|
||||||
|
|||||||
@ -20,7 +20,7 @@ use reth_stages::{
|
|||||||
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
|
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
|
||||||
StorageHashingStage, TransactionLookupStage,
|
StorageHashingStage, TransactionLookupStage,
|
||||||
},
|
},
|
||||||
ExecInput, Stage, UnwindInput,
|
ExecInput, ExecOutput, Stage, UnwindInput,
|
||||||
};
|
};
|
||||||
use std::{any::Any, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc};
|
use std::{any::Any, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc};
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
@ -236,13 +236,10 @@ impl Command {
|
|||||||
checkpoint: Some(checkpoint.with_block_number(self.from)),
|
checkpoint: Some(checkpoint.with_block_number(self.from)),
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
while let ExecOutput { checkpoint: stage_progress, done: false } =
|
||||||
let result = exec_stage.execute(&mut tx, input).await?;
|
exec_stage.execute(&mut tx, input).await?
|
||||||
if result.is_done(input) {
|
{
|
||||||
break
|
input.checkpoint = Some(stage_progress);
|
||||||
}
|
|
||||||
|
|
||||||
input.checkpoint = Some(result.checkpoint);
|
|
||||||
|
|
||||||
if self.commit {
|
if self.commit {
|
||||||
tx.commit()?;
|
tx.commit()?;
|
||||||
|
|||||||
@ -1370,7 +1370,6 @@ mod tests {
|
|||||||
chain_spec: Arc<ChainSpec>,
|
chain_spec: Arc<ChainSpec>,
|
||||||
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
|
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
|
||||||
executor_results: Vec<PostState>,
|
executor_results: Vec<PostState>,
|
||||||
max_block: Option<BlockNumber>,
|
|
||||||
) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
|
) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
|
||||||
reth_tracing::init_test_tracing();
|
reth_tracing::init_test_tracing();
|
||||||
let db = create_test_rw_db();
|
let db = create_test_rw_db();
|
||||||
@ -1382,13 +1381,10 @@ mod tests {
|
|||||||
|
|
||||||
// Setup pipeline
|
// Setup pipeline
|
||||||
let (tip_tx, tip_rx) = watch::channel(H256::default());
|
let (tip_tx, tip_rx) = watch::channel(H256::default());
|
||||||
let mut pipeline_builder = Pipeline::builder()
|
let pipeline = Pipeline::builder()
|
||||||
.add_stages(TestStages::new(pipeline_exec_outputs, Default::default()))
|
.add_stages(TestStages::new(pipeline_exec_outputs, Default::default()))
|
||||||
.with_tip_sender(tip_tx);
|
.with_tip_sender(tip_tx)
|
||||||
if let Some(max_block) = max_block {
|
.build(db.clone());
|
||||||
pipeline_builder = pipeline_builder.with_max_block(max_block);
|
|
||||||
}
|
|
||||||
let pipeline = pipeline_builder.build(db.clone());
|
|
||||||
|
|
||||||
// Setup blockchain tree
|
// Setup blockchain tree
|
||||||
let externals =
|
let externals =
|
||||||
@ -1408,7 +1404,7 @@ mod tests {
|
|||||||
blockchain_provider,
|
blockchain_provider,
|
||||||
Box::<TokioTaskExecutor>::default(),
|
Box::<TokioTaskExecutor>::default(),
|
||||||
Box::<NoopSyncStateUpdater>::default(),
|
Box::<NoopSyncStateUpdater>::default(),
|
||||||
max_block,
|
None,
|
||||||
false,
|
false,
|
||||||
payload_builder,
|
payload_builder,
|
||||||
None,
|
None,
|
||||||
@ -1443,7 +1439,6 @@ mod tests {
|
|||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Err(StageError::ChannelClosed)]),
|
VecDeque::from([Err(StageError::ChannelClosed)]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
Some(1),
|
|
||||||
);
|
);
|
||||||
let res = spawn_consensus_engine(consensus_engine);
|
let res = spawn_consensus_engine(consensus_engine);
|
||||||
|
|
||||||
@ -1473,7 +1468,6 @@ mod tests {
|
|||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Err(StageError::ChannelClosed)]),
|
VecDeque::from([Err(StageError::ChannelClosed)]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
Some(1),
|
|
||||||
);
|
);
|
||||||
let mut rx = spawn_consensus_engine(consensus_engine);
|
let mut rx = spawn_consensus_engine(consensus_engine);
|
||||||
|
|
||||||
@ -1513,11 +1507,10 @@ mod tests {
|
|||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([
|
VecDeque::from([
|
||||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1) }),
|
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
|
||||||
Err(StageError::ChannelClosed),
|
Err(StageError::ChannelClosed),
|
||||||
]),
|
]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
Some(2),
|
|
||||||
);
|
);
|
||||||
let rx = spawn_consensus_engine(consensus_engine);
|
let rx = spawn_consensus_engine(consensus_engine);
|
||||||
|
|
||||||
@ -1530,9 +1523,7 @@ mod tests {
|
|||||||
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
rx.await,
|
rx.await,
|
||||||
Ok(
|
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
|
||||||
Err(BeaconConsensusEngineError::Pipeline(n))
|
|
||||||
) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1546,12 +1537,15 @@ mod tests {
|
|||||||
.paris_activated()
|
.paris_activated()
|
||||||
.build(),
|
.build(),
|
||||||
);
|
);
|
||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (mut consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(max_block) })]),
|
VecDeque::from([Ok(ExecOutput {
|
||||||
|
checkpoint: StageCheckpoint::new(max_block),
|
||||||
|
done: true,
|
||||||
|
})]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
Some(max_block),
|
|
||||||
);
|
);
|
||||||
|
consensus_engine.sync.set_max_block(max_block);
|
||||||
let rx = spawn_consensus_engine(consensus_engine);
|
let rx = spawn_consensus_engine(consensus_engine);
|
||||||
|
|
||||||
let _ = env
|
let _ = env
|
||||||
@ -1588,9 +1582,11 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
|
VecDeque::from([Ok(ExecOutput {
|
||||||
|
done: true,
|
||||||
|
checkpoint: StageCheckpoint::new(0),
|
||||||
|
})]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
||||||
@ -1617,9 +1613,11 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
|
VecDeque::from([Ok(ExecOutput {
|
||||||
|
done: true,
|
||||||
|
checkpoint: StageCheckpoint::new(0),
|
||||||
|
})]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let genesis = random_block(0, None, None, Some(0));
|
let genesis = random_block(0, None, None, Some(0));
|
||||||
@ -1664,11 +1662,10 @@ mod tests {
|
|||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([
|
VecDeque::from([
|
||||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
|
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
|
||||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
|
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
|
||||||
]),
|
]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let genesis = random_block(0, None, None, Some(0));
|
let genesis = random_block(0, None, None, Some(0));
|
||||||
@ -1713,9 +1710,11 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
|
VecDeque::from([Ok(ExecOutput {
|
||||||
|
done: true,
|
||||||
|
checkpoint: StageCheckpoint::new(0),
|
||||||
|
})]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let genesis = random_block(0, None, None, Some(0));
|
let genesis = random_block(0, None, None, Some(0));
|
||||||
@ -1749,11 +1748,10 @@ mod tests {
|
|||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([
|
VecDeque::from([
|
||||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
|
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
|
||||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
|
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
|
||||||
]),
|
]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let genesis = random_block(0, None, None, Some(0));
|
let genesis = random_block(0, None, None, Some(0));
|
||||||
@ -1799,11 +1797,10 @@ mod tests {
|
|||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([
|
VecDeque::from([
|
||||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
|
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
|
||||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
|
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
|
||||||
]),
|
]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let genesis = random_block(0, None, None, Some(0));
|
let genesis = random_block(0, None, None, Some(0));
|
||||||
@ -1846,9 +1843,11 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
|
VecDeque::from([Ok(ExecOutput {
|
||||||
|
done: true,
|
||||||
|
checkpoint: StageCheckpoint::new(0),
|
||||||
|
})]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
||||||
@ -1877,9 +1876,11 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
|
VecDeque::from([Ok(ExecOutput {
|
||||||
|
done: true,
|
||||||
|
checkpoint: StageCheckpoint::new(0),
|
||||||
|
})]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let genesis = random_block(0, None, None, Some(0));
|
let genesis = random_block(0, None, None, Some(0));
|
||||||
@ -1921,9 +1922,11 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
|
VecDeque::from([Ok(ExecOutput {
|
||||||
|
done: true,
|
||||||
|
checkpoint: StageCheckpoint::new(0),
|
||||||
|
})]),
|
||||||
Vec::default(),
|
Vec::default(),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let genesis = random_block(0, None, None, Some(0));
|
let genesis = random_block(0, None, None, Some(0));
|
||||||
@ -1976,9 +1979,11 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let (consensus_engine, env) = setup_consensus_engine(
|
let (consensus_engine, env) = setup_consensus_engine(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
|
VecDeque::from([Ok(ExecOutput {
|
||||||
|
done: true,
|
||||||
|
checkpoint: StageCheckpoint::new(0),
|
||||||
|
})]),
|
||||||
Vec::from([exec_result2]),
|
Vec::from([exec_result2]),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
insert_blocks(env.db.as_ref(), [&data.genesis, &block1].into_iter());
|
insert_blocks(env.db.as_ref(), [&data.genesis, &block1].into_iter());
|
||||||
|
|||||||
@ -83,6 +83,12 @@ where
|
|||||||
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
|
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets the max block value for testing
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn set_max_block(&mut self, block: BlockNumber) {
|
||||||
|
self.max_block = Some(block);
|
||||||
|
}
|
||||||
|
|
||||||
/// Cancels all full block requests that are in progress.
|
/// Cancels all full block requests that are in progress.
|
||||||
pub(crate) fn clear_full_block_requests(&mut self) {
|
pub(crate) fn clear_full_block_requests(&mut self) {
|
||||||
self.inflight_full_block_requests.clear();
|
self.inflight_full_block_requests.clear();
|
||||||
|
|||||||
@ -31,8 +31,6 @@ pub enum PipelineEvent {
|
|||||||
stage_id: StageId,
|
stage_id: StageId,
|
||||||
/// The result of executing the stage.
|
/// The result of executing the stage.
|
||||||
result: ExecOutput,
|
result: ExecOutput,
|
||||||
/// Stage completed executing the whole block range
|
|
||||||
done: bool,
|
|
||||||
},
|
},
|
||||||
/// Emitted when a stage is about to be unwound.
|
/// Emitted when a stage is about to be unwound.
|
||||||
Unwinding {
|
Unwinding {
|
||||||
@ -47,8 +45,6 @@ pub enum PipelineEvent {
|
|||||||
stage_id: StageId,
|
stage_id: StageId,
|
||||||
/// The result of unwinding the stage.
|
/// The result of unwinding the stage.
|
||||||
result: UnwindOutput,
|
result: UnwindOutput,
|
||||||
/// Stage completed unwinding the whole block range
|
|
||||||
done: bool,
|
|
||||||
},
|
},
|
||||||
/// Emitted when a stage encounters an error either during execution or unwinding.
|
/// Emitted when a stage encounters an error either during execution or unwinding.
|
||||||
Error {
|
Error {
|
||||||
|
|||||||
@ -259,10 +259,8 @@ where
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut done = UnwindInput { checkpoint, unwind_to: to, bad_block }.target_reached();
|
|
||||||
|
|
||||||
debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind");
|
debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind");
|
||||||
while !done {
|
while checkpoint.block_number > to {
|
||||||
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
|
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
|
||||||
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
|
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
|
||||||
|
|
||||||
@ -270,7 +268,6 @@ where
|
|||||||
match output {
|
match output {
|
||||||
Ok(unwind_output) => {
|
Ok(unwind_output) => {
|
||||||
checkpoint = unwind_output.checkpoint;
|
checkpoint = unwind_output.checkpoint;
|
||||||
done = unwind_output.is_done(input);
|
|
||||||
info!(
|
info!(
|
||||||
target: "sync::pipeline",
|
target: "sync::pipeline",
|
||||||
stage = %stage_id,
|
stage = %stage_id,
|
||||||
@ -287,11 +284,8 @@ where
|
|||||||
);
|
);
|
||||||
tx.save_stage_checkpoint(stage_id, checkpoint)?;
|
tx.save_stage_checkpoint(stage_id, checkpoint)?;
|
||||||
|
|
||||||
self.listeners.notify(PipelineEvent::Unwound {
|
self.listeners
|
||||||
stage_id,
|
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
|
||||||
result: unwind_output,
|
|
||||||
done,
|
|
||||||
});
|
|
||||||
|
|
||||||
tx.commit()?;
|
tx.commit()?;
|
||||||
}
|
}
|
||||||
@ -349,17 +343,8 @@ where
|
|||||||
checkpoint: prev_checkpoint,
|
checkpoint: prev_checkpoint,
|
||||||
});
|
});
|
||||||
|
|
||||||
let input = ExecInput { target, checkpoint: prev_checkpoint };
|
match stage.execute(&mut tx, ExecInput { target, checkpoint: prev_checkpoint }).await {
|
||||||
let result = if input.target_reached() {
|
Ok(out @ ExecOutput { checkpoint, done }) => {
|
||||||
Ok(ExecOutput { checkpoint: input.checkpoint() })
|
|
||||||
} else {
|
|
||||||
stage.execute(&mut tx, ExecInput { target, checkpoint: prev_checkpoint }).await
|
|
||||||
};
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(out @ ExecOutput { checkpoint }) => {
|
|
||||||
let done = out.is_done(input);
|
|
||||||
|
|
||||||
made_progress |=
|
made_progress |=
|
||||||
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
|
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
|
||||||
info!(
|
info!(
|
||||||
@ -378,7 +363,6 @@ where
|
|||||||
pipeline_total: total_stages,
|
pipeline_total: total_stages,
|
||||||
stage_id,
|
stage_id,
|
||||||
result: out.clone(),
|
result: out.clone(),
|
||||||
done,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO: Make the commit interval configurable
|
// TODO: Make the commit interval configurable
|
||||||
@ -520,15 +504,11 @@ mod tests {
|
|||||||
let mut pipeline = Pipeline::builder()
|
let mut pipeline = Pipeline::builder()
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("A"))
|
TestStage::new(StageId::Other("A"))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) })),
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })),
|
||||||
)
|
)
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("B"))
|
TestStage::new(StageId::Other("B"))
|
||||||
.with_checkpoint(Some(StageCheckpoint::new(10)), &db),
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
|
||||||
)
|
|
||||||
.add_stage(
|
|
||||||
TestStage::new(StageId::Other("C"))
|
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
|
|
||||||
)
|
)
|
||||||
.with_max_block(10)
|
.with_max_block(10)
|
||||||
.build(db);
|
.build(db);
|
||||||
@ -545,30 +525,27 @@ mod tests {
|
|||||||
vec![
|
vec![
|
||||||
PipelineEvent::Running {
|
PipelineEvent::Running {
|
||||||
pipeline_position: 1,
|
pipeline_position: 1,
|
||||||
pipeline_total: 3,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
checkpoint: None
|
checkpoint: None
|
||||||
},
|
},
|
||||||
PipelineEvent::Ran {
|
PipelineEvent::Ran {
|
||||||
pipeline_position: 1,
|
pipeline_position: 1,
|
||||||
pipeline_total: 3,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(20) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
|
||||||
done: true,
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Skipped { stage_id: StageId::Other("B") },
|
|
||||||
PipelineEvent::Running {
|
PipelineEvent::Running {
|
||||||
pipeline_position: 3,
|
pipeline_position: 2,
|
||||||
pipeline_total: 3,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("C"),
|
stage_id: StageId::Other("B"),
|
||||||
checkpoint: None
|
checkpoint: None
|
||||||
},
|
},
|
||||||
PipelineEvent::Ran {
|
PipelineEvent::Ran {
|
||||||
pipeline_position: 3,
|
pipeline_position: 2,
|
||||||
pipeline_total: 3,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("C"),
|
stage_id: StageId::Other("B"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
|
||||||
done: true,
|
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
@ -582,17 +559,17 @@ mod tests {
|
|||||||
let mut pipeline = Pipeline::builder()
|
let mut pipeline = Pipeline::builder()
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("A"))
|
TestStage::new(StageId::Other("A"))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) }))
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
|
||||||
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
|
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
|
||||||
)
|
)
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("B"))
|
TestStage::new(StageId::Other("B"))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) }))
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
|
||||||
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
|
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
|
||||||
)
|
)
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("C"))
|
TestStage::new(StageId::Other("C"))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) }))
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
|
||||||
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
|
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
|
||||||
)
|
)
|
||||||
.with_max_block(10)
|
.with_max_block(10)
|
||||||
@ -623,8 +600,7 @@ mod tests {
|
|||||||
pipeline_position: 1,
|
pipeline_position: 1,
|
||||||
pipeline_total: 3,
|
pipeline_total: 3,
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(100) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Running {
|
PipelineEvent::Running {
|
||||||
pipeline_position: 2,
|
pipeline_position: 2,
|
||||||
@ -636,8 +612,7 @@ mod tests {
|
|||||||
pipeline_position: 2,
|
pipeline_position: 2,
|
||||||
pipeline_total: 3,
|
pipeline_total: 3,
|
||||||
stage_id: StageId::Other("B"),
|
stage_id: StageId::Other("B"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Running {
|
PipelineEvent::Running {
|
||||||
pipeline_position: 3,
|
pipeline_position: 3,
|
||||||
@ -649,8 +624,7 @@ mod tests {
|
|||||||
pipeline_position: 3,
|
pipeline_position: 3,
|
||||||
pipeline_total: 3,
|
pipeline_total: 3,
|
||||||
stage_id: StageId::Other("C"),
|
stage_id: StageId::Other("C"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(20) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
// Unwinding
|
// Unwinding
|
||||||
PipelineEvent::Unwinding {
|
PipelineEvent::Unwinding {
|
||||||
@ -664,7 +638,6 @@ mod tests {
|
|||||||
PipelineEvent::Unwound {
|
PipelineEvent::Unwound {
|
||||||
stage_id: StageId::Other("C"),
|
stage_id: StageId::Other("C"),
|
||||||
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
|
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Unwinding {
|
PipelineEvent::Unwinding {
|
||||||
stage_id: StageId::Other("B"),
|
stage_id: StageId::Other("B"),
|
||||||
@ -677,7 +650,6 @@ mod tests {
|
|||||||
PipelineEvent::Unwound {
|
PipelineEvent::Unwound {
|
||||||
stage_id: StageId::Other("B"),
|
stage_id: StageId::Other("B"),
|
||||||
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
|
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Unwinding {
|
PipelineEvent::Unwinding {
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
@ -690,7 +662,6 @@ mod tests {
|
|||||||
PipelineEvent::Unwound {
|
PipelineEvent::Unwound {
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
|
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
@ -704,12 +675,12 @@ mod tests {
|
|||||||
let mut pipeline = Pipeline::builder()
|
let mut pipeline = Pipeline::builder()
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("A"))
|
TestStage::new(StageId::Other("A"))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) }))
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
|
||||||
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
|
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
|
||||||
)
|
)
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("B"))
|
TestStage::new(StageId::Other("B"))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
|
||||||
)
|
)
|
||||||
.with_max_block(10)
|
.with_max_block(10)
|
||||||
.build(db);
|
.build(db);
|
||||||
@ -739,8 +710,7 @@ mod tests {
|
|||||||
pipeline_position: 1,
|
pipeline_position: 1,
|
||||||
pipeline_total: 2,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(100) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Running {
|
PipelineEvent::Running {
|
||||||
pipeline_position: 2,
|
pipeline_position: 2,
|
||||||
@ -752,8 +722,7 @@ mod tests {
|
|||||||
pipeline_position: 2,
|
pipeline_position: 2,
|
||||||
pipeline_total: 2,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("B"),
|
stage_id: StageId::Other("B"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
// Unwinding
|
// Unwinding
|
||||||
// Nothing to unwind in stage "B"
|
// Nothing to unwind in stage "B"
|
||||||
@ -769,7 +738,6 @@ mod tests {
|
|||||||
PipelineEvent::Unwound {
|
PipelineEvent::Unwound {
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
|
result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
@ -794,9 +762,9 @@ mod tests {
|
|||||||
let mut pipeline = Pipeline::builder()
|
let mut pipeline = Pipeline::builder()
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("A"))
|
TestStage::new(StageId::Other("A"))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) }))
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
|
||||||
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
|
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
|
||||||
)
|
)
|
||||||
.add_stage(
|
.add_stage(
|
||||||
TestStage::new(StageId::Other("B"))
|
TestStage::new(StageId::Other("B"))
|
||||||
@ -805,7 +773,7 @@ mod tests {
|
|||||||
error: consensus::ConsensusError::BaseFeeMissing,
|
error: consensus::ConsensusError::BaseFeeMissing,
|
||||||
}))
|
}))
|
||||||
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
|
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
|
||||||
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
|
||||||
)
|
)
|
||||||
.with_max_block(10)
|
.with_max_block(10)
|
||||||
.build(db);
|
.build(db);
|
||||||
@ -830,8 +798,7 @@ mod tests {
|
|||||||
pipeline_position: 1,
|
pipeline_position: 1,
|
||||||
pipeline_total: 2,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Running {
|
PipelineEvent::Running {
|
||||||
pipeline_position: 2,
|
pipeline_position: 2,
|
||||||
@ -851,7 +818,6 @@ mod tests {
|
|||||||
PipelineEvent::Unwound {
|
PipelineEvent::Unwound {
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
|
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Running {
|
PipelineEvent::Running {
|
||||||
pipeline_position: 1,
|
pipeline_position: 1,
|
||||||
@ -863,8 +829,7 @@ mod tests {
|
|||||||
pipeline_position: 1,
|
pipeline_position: 1,
|
||||||
pipeline_total: 2,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("A"),
|
stage_id: StageId::Other("A"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
PipelineEvent::Running {
|
PipelineEvent::Running {
|
||||||
pipeline_position: 2,
|
pipeline_position: 2,
|
||||||
@ -876,8 +841,7 @@ mod tests {
|
|||||||
pipeline_position: 2,
|
pipeline_position: 2,
|
||||||
pipeline_total: 2,
|
pipeline_total: 2,
|
||||||
stage_id: StageId::Other("B"),
|
stage_id: StageId::Other("B"),
|
||||||
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
|
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
|
||||||
done: true
|
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
@ -887,17 +851,17 @@ mod tests {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn pipeline_error_handling() {
|
async fn pipeline_error_handling() {
|
||||||
// Non-fatal
|
// Non-fatal
|
||||||
// let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
||||||
// let mut pipeline = Pipeline::builder()
|
let mut pipeline = Pipeline::builder()
|
||||||
// .add_stage(
|
.add_stage(
|
||||||
// TestStage::new(StageId::Other("NonFatal"))
|
TestStage::new(StageId::Other("NonFatal"))
|
||||||
// .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
|
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
|
||||||
// .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
|
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
|
||||||
// )
|
)
|
||||||
// .with_max_block(10)
|
.with_max_block(10)
|
||||||
// .build(db);
|
.build(db);
|
||||||
// let result = pipeline.run().await;
|
let result = pipeline.run().await;
|
||||||
// assert_matches!(result, Ok(()));
|
assert_matches!(result, Ok(()));
|
||||||
|
|
||||||
// Fatal
|
// Fatal
|
||||||
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
|
||||||
@ -905,7 +869,6 @@ mod tests {
|
|||||||
.add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
|
.add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
|
||||||
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
|
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
|
||||||
)))
|
)))
|
||||||
.with_max_block(1)
|
|
||||||
.build(db);
|
.build(db);
|
||||||
let result = pipeline.run().await;
|
let result = pipeline.run().await;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
|
|||||||
@ -10,7 +10,6 @@ use std::{
|
|||||||
cmp::{max, min},
|
cmp::{max, min},
|
||||||
ops::RangeInclusive,
|
ops::RangeInclusive,
|
||||||
};
|
};
|
||||||
use tracing::warn;
|
|
||||||
|
|
||||||
/// Stage execution input, see [Stage::execute].
|
/// Stage execution input, see [Stage::execute].
|
||||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||||
@ -36,7 +35,7 @@ impl ExecInput {
|
|||||||
|
|
||||||
/// Returns `true` if the target block number has already been reached.
|
/// Returns `true` if the target block number has already been reached.
|
||||||
pub fn target_reached(&self) -> bool {
|
pub fn target_reached(&self) -> bool {
|
||||||
ExecOutput { checkpoint: self.checkpoint.unwrap_or_default() }.is_done(*self)
|
self.checkpoint().block_number >= self.target()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the target block number or default.
|
/// Return the target block number or default.
|
||||||
@ -46,7 +45,8 @@ impl ExecInput {
|
|||||||
|
|
||||||
/// Return next block range that needs to be executed.
|
/// Return next block range that needs to be executed.
|
||||||
pub fn next_block_range(&self) -> RangeInclusive<BlockNumber> {
|
pub fn next_block_range(&self) -> RangeInclusive<BlockNumber> {
|
||||||
self.next_block_range_with_threshold(u64::MAX)
|
let (range, _) = self.next_block_range_with_threshold(u64::MAX);
|
||||||
|
range
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return true if this is the first block range to execute.
|
/// Return true if this is the first block range to execute.
|
||||||
@ -55,15 +55,19 @@ impl ExecInput {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Return the next block range to execute.
|
/// Return the next block range to execute.
|
||||||
/// Return pair of the block range.
|
/// Return pair of the block range and if this is final block range.
|
||||||
pub fn next_block_range_with_threshold(&self, threshold: u64) -> RangeInclusive<BlockNumber> {
|
pub fn next_block_range_with_threshold(
|
||||||
|
&self,
|
||||||
|
threshold: u64,
|
||||||
|
) -> (RangeInclusive<BlockNumber>, bool) {
|
||||||
let current_block = self.checkpoint();
|
let current_block = self.checkpoint();
|
||||||
let start = current_block.block_number + 1;
|
let start = current_block.block_number + 1;
|
||||||
let target = self.target();
|
let target = self.target();
|
||||||
|
|
||||||
let end = min(target, current_block.block_number.saturating_add(threshold));
|
let end = min(target, current_block.block_number.saturating_add(threshold));
|
||||||
|
|
||||||
start..=end
|
let is_final_range = end == target;
|
||||||
|
(start..=end, is_final_range)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the next block range determined the number of transactions within it.
|
/// Return the next block range determined the number of transactions within it.
|
||||||
@ -73,7 +77,7 @@ impl ExecInput {
|
|||||||
&self,
|
&self,
|
||||||
tx: &Transaction<'_, DB>,
|
tx: &Transaction<'_, DB>,
|
||||||
tx_threshold: u64,
|
tx_threshold: u64,
|
||||||
) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>), StageError> {
|
) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
|
||||||
let start_block = self.next_block();
|
let start_block = self.next_block();
|
||||||
let start_block_body = tx
|
let start_block_body = tx
|
||||||
.get::<tables::BlockBodyIndices>(start_block)?
|
.get::<tables::BlockBodyIndices>(start_block)?
|
||||||
@ -94,7 +98,8 @@ impl ExecInput {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok((first_tx_number..=last_tx_number, start_block..=end_block_number))
|
let is_final_range = end_block_number >= target_block;
|
||||||
|
Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,11 +115,6 @@ pub struct UnwindInput {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl UnwindInput {
|
impl UnwindInput {
|
||||||
/// Returns `true` if the target block number has already been reached.
|
|
||||||
pub fn target_reached(&self) -> bool {
|
|
||||||
UnwindOutput { checkpoint: self.checkpoint }.is_done(*self)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return next block range that needs to be unwound.
|
/// Return next block range that needs to be unwound.
|
||||||
pub fn unwind_block_range(&self) -> RangeInclusive<BlockNumber> {
|
pub fn unwind_block_range(&self) -> RangeInclusive<BlockNumber> {
|
||||||
self.unwind_block_range_with_threshold(u64::MAX).0
|
self.unwind_block_range_with_threshold(u64::MAX).0
|
||||||
@ -124,7 +124,7 @@ impl UnwindInput {
|
|||||||
pub fn unwind_block_range_with_threshold(
|
pub fn unwind_block_range_with_threshold(
|
||||||
&self,
|
&self,
|
||||||
threshold: u64,
|
threshold: u64,
|
||||||
) -> (RangeInclusive<BlockNumber>, BlockNumber) {
|
) -> (RangeInclusive<BlockNumber>, BlockNumber, bool) {
|
||||||
// +1 is to skip the block we're unwinding to
|
// +1 is to skip the block we're unwinding to
|
||||||
let mut start = self.unwind_to + 1;
|
let mut start = self.unwind_to + 1;
|
||||||
let end = self.checkpoint;
|
let end = self.checkpoint;
|
||||||
@ -133,7 +133,8 @@ impl UnwindInput {
|
|||||||
|
|
||||||
let unwind_to = start - 1;
|
let unwind_to = start - 1;
|
||||||
|
|
||||||
(start..=end.block_number, unwind_to)
|
let is_final_range = unwind_to == self.unwind_to;
|
||||||
|
(start..=end.block_number, unwind_to, is_final_range)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -142,16 +143,14 @@ impl UnwindInput {
|
|||||||
pub struct ExecOutput {
|
pub struct ExecOutput {
|
||||||
/// How far the stage got.
|
/// How far the stage got.
|
||||||
pub checkpoint: StageCheckpoint,
|
pub checkpoint: StageCheckpoint,
|
||||||
|
/// Whether or not the stage is done.
|
||||||
|
pub done: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExecOutput {
|
impl ExecOutput {
|
||||||
/// Returns `true` if the target block number has already been reached,
|
/// Mark the stage as done, checkpointing at the given place.
|
||||||
/// i.e. `checkpoint.block_number >= target`.
|
pub fn done(checkpoint: StageCheckpoint) -> Self {
|
||||||
pub fn is_done(&self, input: ExecInput) -> bool {
|
Self { checkpoint, done: true }
|
||||||
if self.checkpoint.block_number > input.target() {
|
|
||||||
warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the execution target");
|
|
||||||
}
|
|
||||||
self.checkpoint.block_number >= input.target()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,17 +161,6 @@ pub struct UnwindOutput {
|
|||||||
pub checkpoint: StageCheckpoint,
|
pub checkpoint: StageCheckpoint,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UnwindOutput {
|
|
||||||
/// Returns `true` if the target block number has already been reached,
|
|
||||||
/// i.e. `checkpoint.block_number <= unwind_to`.
|
|
||||||
pub fn is_done(&self, input: UnwindInput) -> bool {
|
|
||||||
if self.checkpoint.block_number < input.unwind_to {
|
|
||||||
warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the unwind target");
|
|
||||||
}
|
|
||||||
self.checkpoint.block_number <= input.unwind_to
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A stage is a segmented part of the syncing process of the node.
|
/// A stage is a segmented part of the syncing process of the node.
|
||||||
///
|
///
|
||||||
/// Each stage takes care of a well-defined task, such as downloading headers or executing
|
/// Each stage takes care of a well-defined task, such as downloading headers or executing
|
||||||
|
|||||||
@ -70,6 +70,10 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
let range = input.next_block_range();
|
let range = input.next_block_range();
|
||||||
// Update the header range on the downloader
|
// Update the header range on the downloader
|
||||||
self.downloader.set_download_range(range.clone())?;
|
self.downloader.set_download_range(range.clone())?;
|
||||||
@ -147,9 +151,11 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
|
|||||||
// The stage is "done" if:
|
// The stage is "done" if:
|
||||||
// - We got fewer blocks than our target
|
// - We got fewer blocks than our target
|
||||||
// - We reached our target and the target was not limited by the batch size of the stage
|
// - We reached our target and the target was not limited by the batch size of the stage
|
||||||
|
let done = highest_block == to_block;
|
||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(highest_block)
|
checkpoint: StageCheckpoint::new(highest_block)
|
||||||
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
||||||
|
done,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,11 +230,15 @@ fn stage_checkpoint<DB: Database>(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner};
|
use crate::test_utils::{
|
||||||
|
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
|
||||||
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use reth_primitives::stage::StageUnitCheckpoint;
|
use reth_primitives::stage::StageUnitCheckpoint;
|
||||||
use test_utils::*;
|
use test_utils::*;
|
||||||
|
|
||||||
|
stage_test_suite_ext!(BodyTestRunner, body);
|
||||||
|
|
||||||
/// Checks that the stage downloads at most `batch_size` blocks.
|
/// Checks that the stage downloads at most `batch_size` blocks.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn partial_body_download() {
|
async fn partial_body_download() {
|
||||||
@ -261,7 +271,7 @@ mod tests {
|
|||||||
processed, // 1 seeded block body + batch size
|
processed, // 1 seeded block body + batch size
|
||||||
total // seeded headers
|
total // seeded headers
|
||||||
}))
|
}))
|
||||||
}}) if block_number < 200 &&
|
}, done: false }) if block_number < 200 &&
|
||||||
processed == 1 + batch_size && total == previous_stage
|
processed == 1 + batch_size && total == previous_stage
|
||||||
);
|
);
|
||||||
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
|
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
|
||||||
@ -298,7 +308,8 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}
|
},
|
||||||
|
done: true
|
||||||
}) if processed == total && total == previous_stage
|
}) if processed == total && total == previous_stage
|
||||||
);
|
);
|
||||||
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
|
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
|
||||||
@ -333,7 +344,7 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}}) if block_number >= 10 &&
|
}, done: false }) if block_number >= 10 &&
|
||||||
processed == 1 + batch_size && total == previous_stage
|
processed == 1 + batch_size && total == previous_stage
|
||||||
);
|
);
|
||||||
let first_run_checkpoint = first_run.unwrap().checkpoint;
|
let first_run_checkpoint = first_run.unwrap().checkpoint;
|
||||||
@ -353,7 +364,7 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}}) if block_number > first_run_checkpoint.block_number &&
|
}, done: true }) if block_number > first_run_checkpoint.block_number &&
|
||||||
processed == total && total == previous_stage
|
processed == total && total == previous_stage
|
||||||
);
|
);
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
@ -393,7 +404,7 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}}) if block_number == previous_stage &&
|
}, done: true }) if block_number == previous_stage &&
|
||||||
processed == total && total == previous_stage
|
processed == total && total == previous_stage
|
||||||
);
|
);
|
||||||
let checkpoint = output.unwrap().checkpoint;
|
let checkpoint = output.unwrap().checkpoint;
|
||||||
|
|||||||
@ -138,6 +138,10 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
let start_block = input.next_block();
|
let start_block = input.next_block();
|
||||||
let max_block = input.target();
|
let max_block = input.target();
|
||||||
|
|
||||||
@ -189,9 +193,11 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
|
|||||||
state.write_to_db(&**tx)?;
|
state.write_to_db(&**tx)?;
|
||||||
trace!(target: "sync::stages::execution", took = ?start.elapsed(), "Wrote state");
|
trace!(target: "sync::stages::execution", took = ?start.elapsed(), "Wrote state");
|
||||||
|
|
||||||
|
let done = stage_progress == max_block;
|
||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(stage_progress)
|
checkpoint: StageCheckpoint::new(stage_progress)
|
||||||
.with_execution_stage_checkpoint(stage_checkpoint),
|
.with_execution_stage_checkpoint(stage_checkpoint),
|
||||||
|
done,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -332,7 +338,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
|
|||||||
let mut account_changeset = tx.cursor_dup_write::<tables::AccountChangeSet>()?;
|
let mut account_changeset = tx.cursor_dup_write::<tables::AccountChangeSet>()?;
|
||||||
let mut storage_changeset = tx.cursor_dup_write::<tables::StorageChangeSet>()?;
|
let mut storage_changeset = tx.cursor_dup_write::<tables::StorageChangeSet>()?;
|
||||||
|
|
||||||
let (range, unwind_to) =
|
let (range, unwind_to, _) =
|
||||||
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
|
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
|
||||||
|
|
||||||
if range.is_empty() {
|
if range.is_empty() {
|
||||||
@ -649,7 +655,8 @@ mod tests {
|
|||||||
total
|
total
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}
|
},
|
||||||
|
done: true
|
||||||
} if processed == total && total == block.gas_used);
|
} if processed == total && total == block.gas_used);
|
||||||
let tx = tx.deref_mut();
|
let tx = tx.deref_mut();
|
||||||
// check post state
|
// check post state
|
||||||
|
|||||||
@ -21,7 +21,7 @@ impl<DB: Database> Stage<DB> for FinishStage {
|
|||||||
_tx: &mut Transaction<'_, DB>,
|
_tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) })
|
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn unwind(
|
async fn unwind(
|
||||||
@ -37,12 +37,14 @@ impl<DB: Database> Stage<DB> for FinishStage {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{
|
use crate::test_utils::{
|
||||||
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
|
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
|
||||||
UnwindStageTestRunner,
|
TestTransaction, UnwindStageTestRunner,
|
||||||
};
|
};
|
||||||
use reth_interfaces::test_utils::generators::{random_header, random_header_range};
|
use reth_interfaces::test_utils::generators::{random_header, random_header_range};
|
||||||
use reth_primitives::SealedHeader;
|
use reth_primitives::SealedHeader;
|
||||||
|
|
||||||
|
stage_test_suite_ext!(FinishTestRunner, finish);
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct FinishTestRunner {
|
struct FinishTestRunner {
|
||||||
tx: TestTransaction,
|
tx: TestTransaction,
|
||||||
@ -87,7 +89,7 @@ mod tests {
|
|||||||
output: Option<ExecOutput>,
|
output: Option<ExecOutput>,
|
||||||
) -> Result<(), TestRunnerError> {
|
) -> Result<(), TestRunnerError> {
|
||||||
if let Some(output) = output {
|
if let Some(output) = output {
|
||||||
assert!(output.is_done(input), "stage should always be done");
|
assert!(output.done, "stage should always be done");
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
output.checkpoint.block_number,
|
output.checkpoint.block_number,
|
||||||
input.target(),
|
input.target(),
|
||||||
|
|||||||
@ -135,6 +135,10 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
let (from_block, to_block) = input.next_block_range().into_inner();
|
let (from_block, to_block) = input.next_block_range().into_inner();
|
||||||
|
|
||||||
// if there are more blocks then threshold it is faster to go over Plain state and hash all
|
// if there are more blocks then threshold it is faster to go over Plain state and hash all
|
||||||
@ -231,7 +235,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
return Ok(ExecOutput { checkpoint })
|
return Ok(ExecOutput { checkpoint, done: false })
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Aggregate all transition changesets and make a list of accounts that have been
|
// Aggregate all transition changesets and make a list of accounts that have been
|
||||||
@ -253,7 +257,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(ExecOutput { checkpoint })
|
Ok(ExecOutput { checkpoint, done: true })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unwind the stage.
|
/// Unwind the stage.
|
||||||
@ -262,7 +266,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: UnwindInput,
|
input: UnwindInput,
|
||||||
) -> Result<UnwindOutput, StageError> {
|
) -> Result<UnwindOutput, StageError> {
|
||||||
let (range, unwind_progress) =
|
let (range, unwind_progress, _) =
|
||||||
input.unwind_block_range_with_threshold(self.commit_threshold);
|
input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
// Aggregate all transition changesets and make a list of accounts that have been changed.
|
// Aggregate all transition changesets and make a list of accounts that have been changed.
|
||||||
@ -292,11 +296,15 @@ fn stage_checkpoint_progress<DB: Database>(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner};
|
use crate::test_utils::{
|
||||||
|
stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner,
|
||||||
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use reth_primitives::{stage::StageUnitCheckpoint, Account, U256};
|
use reth_primitives::{stage::StageUnitCheckpoint, Account, U256};
|
||||||
use test_utils::*;
|
use test_utils::*;
|
||||||
|
|
||||||
|
stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn execute_clean_account_hashing() {
|
async fn execute_clean_account_hashing() {
|
||||||
let (previous_stage, stage_progress) = (20, 10);
|
let (previous_stage, stage_progress) = (20, 10);
|
||||||
@ -326,7 +334,8 @@ mod tests {
|
|||||||
},
|
},
|
||||||
..
|
..
|
||||||
})),
|
})),
|
||||||
}
|
},
|
||||||
|
done: true,
|
||||||
}) if block_number == previous_stage &&
|
}) if block_number == previous_stage &&
|
||||||
processed == total &&
|
processed == total &&
|
||||||
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
|
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
|
||||||
@ -383,7 +392,8 @@ mod tests {
|
|||||||
progress: EntitiesCheckpoint { processed: 5, total }
|
progress: EntitiesCheckpoint { processed: 5, total }
|
||||||
}
|
}
|
||||||
))
|
))
|
||||||
}
|
},
|
||||||
|
done: false
|
||||||
}) if address == fifth_address &&
|
}) if address == fifth_address &&
|
||||||
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
|
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
|
||||||
);
|
);
|
||||||
@ -409,7 +419,8 @@ mod tests {
|
|||||||
progress: EntitiesCheckpoint { processed, total }
|
progress: EntitiesCheckpoint { processed, total }
|
||||||
}
|
}
|
||||||
))
|
))
|
||||||
}
|
},
|
||||||
|
done: true
|
||||||
}) if processed == total &&
|
}) if processed == total &&
|
||||||
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
|
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
|
||||||
);
|
);
|
||||||
|
|||||||
@ -57,6 +57,10 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
let (from_block, to_block) = input.next_block_range().into_inner();
|
let (from_block, to_block) = input.next_block_range().into_inner();
|
||||||
|
|
||||||
// if there are more blocks then threshold it is faster to go over Plain state and hash all
|
// if there are more blocks then threshold it is faster to go over Plain state and hash all
|
||||||
@ -161,7 +165,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
return Ok(ExecOutput { checkpoint })
|
return Ok(ExecOutput { checkpoint, done: false })
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Aggregate all changesets and and make list of storages that have been
|
// Aggregate all changesets and and make list of storages that have been
|
||||||
@ -182,7 +186,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(ExecOutput { checkpoint })
|
Ok(ExecOutput { checkpoint, done: true })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Unwind the stage.
|
/// Unwind the stage.
|
||||||
@ -191,7 +195,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: UnwindInput,
|
input: UnwindInput,
|
||||||
) -> Result<UnwindOutput, StageError> {
|
) -> Result<UnwindOutput, StageError> {
|
||||||
let (range, unwind_progress) =
|
let (range, unwind_progress, _) =
|
||||||
input.unwind_block_range_with_threshold(self.commit_threshold);
|
input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
tx.unwind_storage_hashing(BlockNumberAddress::range(range))?;
|
tx.unwind_storage_hashing(BlockNumberAddress::range(range))?;
|
||||||
@ -221,8 +225,8 @@ fn stage_checkpoint_progress<DB: Database>(
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{
|
use crate::test_utils::{
|
||||||
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
|
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
|
||||||
UnwindStageTestRunner,
|
TestTransaction, UnwindStageTestRunner,
|
||||||
};
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use reth_db::{
|
use reth_db::{
|
||||||
@ -237,6 +241,8 @@ mod tests {
|
|||||||
stage::StageUnitCheckpoint, Address, SealedBlock, StorageEntry, H256, U256,
|
stage::StageUnitCheckpoint, Address, SealedBlock, StorageEntry, H256, U256,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
|
||||||
|
|
||||||
/// Execute with low clean threshold so as to hash whole storage
|
/// Execute with low clean threshold so as to hash whole storage
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn execute_clean_storage_hashing() {
|
async fn execute_clean_storage_hashing() {
|
||||||
@ -260,8 +266,10 @@ mod tests {
|
|||||||
runner.seed_execution(input).expect("failed to seed execution");
|
runner.seed_execution(input).expect("failed to seed execution");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok(result @ ExecOutput { checkpoint }) = runner.execute(input).await.unwrap() {
|
if let Ok(result @ ExecOutput { checkpoint, done }) =
|
||||||
if !result.is_done(input) {
|
runner.execute(input).await.unwrap()
|
||||||
|
{
|
||||||
|
if !done {
|
||||||
let previous_checkpoint = input
|
let previous_checkpoint = input
|
||||||
.checkpoint
|
.checkpoint
|
||||||
.and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
|
.and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
|
||||||
@ -351,7 +359,8 @@ mod tests {
|
|||||||
total
|
total
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}
|
},
|
||||||
|
done: false
|
||||||
}) if address == progress_address && storage == progress_key &&
|
}) if address == progress_address && storage == progress_key &&
|
||||||
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
|
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
|
||||||
);
|
);
|
||||||
@ -396,7 +405,8 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
))
|
))
|
||||||
}
|
},
|
||||||
|
done: false
|
||||||
}) if address == progress_address && storage == progress_key &&
|
}) if address == progress_address && storage == progress_key &&
|
||||||
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
|
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
|
||||||
);
|
);
|
||||||
@ -427,7 +437,8 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
))
|
))
|
||||||
}
|
},
|
||||||
|
done: true
|
||||||
}) if processed == total &&
|
}) if processed == total &&
|
||||||
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
|
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
|
||||||
);
|
);
|
||||||
|
|||||||
@ -208,7 +208,7 @@ where
|
|||||||
// Nothing to sync
|
// Nothing to sync
|
||||||
if gap.is_closed() {
|
if gap.is_closed() {
|
||||||
info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached");
|
info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached");
|
||||||
return Ok(ExecOutput { checkpoint: current_checkpoint })
|
return Ok(ExecOutput::done(current_checkpoint))
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync");
|
debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync");
|
||||||
@ -311,10 +311,12 @@ where
|
|||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(checkpoint)
|
checkpoint: StageCheckpoint::new(checkpoint)
|
||||||
.with_headers_stage_checkpoint(stage_checkpoint),
|
.with_headers_stage_checkpoint(stage_checkpoint),
|
||||||
|
done: true,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: current_checkpoint.with_headers_stage_checkpoint(stage_checkpoint),
|
checkpoint: current_checkpoint.with_headers_stage_checkpoint(stage_checkpoint),
|
||||||
|
done: false,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -585,7 +587,7 @@ mod tests {
|
|||||||
total,
|
total,
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}}) if block_number == tip.number &&
|
}, done: true }) if block_number == tip.number &&
|
||||||
from == checkpoint && to == previous_stage &&
|
from == checkpoint && to == previous_stage &&
|
||||||
// -1 because we don't need to download the local head
|
// -1 because we don't need to download the local head
|
||||||
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number);
|
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number);
|
||||||
@ -679,7 +681,7 @@ mod tests {
|
|||||||
total,
|
total,
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}}) if block_number == checkpoint &&
|
}, done: false }) if block_number == checkpoint &&
|
||||||
from == checkpoint && to == previous_stage &&
|
from == checkpoint && to == previous_stage &&
|
||||||
processed == checkpoint + 500 && total == tip.number);
|
processed == checkpoint + 500 && total == tip.number);
|
||||||
|
|
||||||
@ -702,7 +704,7 @@ mod tests {
|
|||||||
total,
|
total,
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}}) if block_number == tip.number &&
|
}, done: true }) if block_number == tip.number &&
|
||||||
from == checkpoint && to == previous_stage &&
|
from == checkpoint && to == previous_stage &&
|
||||||
// -1 because we don't need to download the local head
|
// -1 because we don't need to download the local head
|
||||||
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number);
|
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number);
|
||||||
|
|||||||
@ -41,7 +41,11 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
let range = input.next_block_range_with_threshold(self.commit_threshold);
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
|
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
let mut stage_checkpoint = stage_checkpoint(
|
let mut stage_checkpoint = stage_checkpoint(
|
||||||
tx,
|
tx,
|
||||||
@ -62,6 +66,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
|
|||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(*range.end())
|
checkpoint: StageCheckpoint::new(*range.end())
|
||||||
.with_index_history_stage_checkpoint(stage_checkpoint),
|
.with_index_history_stage_checkpoint(stage_checkpoint),
|
||||||
|
done: is_final_range,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,7 +76,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: UnwindInput,
|
input: UnwindInput,
|
||||||
) -> Result<UnwindOutput, StageError> {
|
) -> Result<UnwindOutput, StageError> {
|
||||||
let (range, unwind_progress) =
|
let (range, unwind_progress, _) =
|
||||||
input.unwind_block_range_with_threshold(self.commit_threshold);
|
input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
let changesets = tx.unwind_account_history_indices(range)?;
|
let changesets = tx.unwind_account_history_indices(range)?;
|
||||||
@ -217,9 +222,9 @@ mod tests {
|
|||||||
progress: EntitiesCheckpoint { processed: 2, total: 2 }
|
progress: EntitiesCheckpoint { processed: 2, total: 2 }
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
done: true
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
assert!(out.is_done(input));
|
|
||||||
tx.commit().unwrap();
|
tx.commit().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -44,7 +44,11 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
let range = input.next_block_range_with_threshold(self.commit_threshold);
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
|
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
let mut stage_checkpoint = stage_checkpoint(
|
let mut stage_checkpoint = stage_checkpoint(
|
||||||
tx,
|
tx,
|
||||||
@ -64,6 +68,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
|
|||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(*range.end())
|
checkpoint: StageCheckpoint::new(*range.end())
|
||||||
.with_index_history_stage_checkpoint(stage_checkpoint),
|
.with_index_history_stage_checkpoint(stage_checkpoint),
|
||||||
|
done: is_final_range,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,7 +78,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: UnwindInput,
|
input: UnwindInput,
|
||||||
) -> Result<UnwindOutput, StageError> {
|
) -> Result<UnwindOutput, StageError> {
|
||||||
let (range, unwind_progress) =
|
let (range, unwind_progress, _) =
|
||||||
input.unwind_block_range_with_threshold(self.commit_threshold);
|
input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
let changesets = tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
|
let changesets = tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
|
||||||
@ -228,10 +233,10 @@ mod tests {
|
|||||||
block_range: CheckpointBlockRange { from: input.next_block(), to: run_to },
|
block_range: CheckpointBlockRange { from: input.next_block(), to: run_to },
|
||||||
progress: EntitiesCheckpoint { processed: 2, total: 2 }
|
progress: EntitiesCheckpoint { processed: 2, total: 2 }
|
||||||
}
|
}
|
||||||
)
|
),
|
||||||
|
done: true
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
assert!(out.is_done(input));
|
|
||||||
tx.commit().unwrap();
|
tx.commit().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -149,7 +149,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
|||||||
let threshold = match self {
|
let threshold = match self {
|
||||||
MerkleStage::Unwind => {
|
MerkleStage::Unwind => {
|
||||||
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
|
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
|
||||||
return Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) })
|
return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
|
||||||
}
|
}
|
||||||
MerkleStage::Execution { clean_threshold } => *clean_threshold,
|
MerkleStage::Execution { clean_threshold } => *clean_threshold,
|
||||||
#[cfg(any(test, feature = "test-utils"))]
|
#[cfg(any(test, feature = "test-utils"))]
|
||||||
@ -227,6 +227,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
|||||||
checkpoint: input
|
checkpoint: input
|
||||||
.checkpoint()
|
.checkpoint()
|
||||||
.with_entities_stage_checkpoint(entities_checkpoint),
|
.with_entities_stage_checkpoint(entities_checkpoint),
|
||||||
|
done: false,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
|
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
|
||||||
@ -266,6 +267,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
|||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(to_block)
|
checkpoint: StageCheckpoint::new(to_block)
|
||||||
.with_entities_stage_checkpoint(entities_checkpoint),
|
.with_entities_stage_checkpoint(entities_checkpoint),
|
||||||
|
done: true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -326,8 +328,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{
|
use crate::test_utils::{
|
||||||
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
|
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
|
||||||
UnwindStageTestRunner,
|
TestTransaction, UnwindStageTestRunner,
|
||||||
};
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use reth_db::{
|
use reth_db::{
|
||||||
@ -344,6 +346,8 @@ mod tests {
|
|||||||
use reth_trie::test_utils::{state_root, state_root_prehashed};
|
use reth_trie::test_utils::{state_root, state_root_prehashed};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
stage_test_suite_ext!(MerkleTestRunner, merkle);
|
||||||
|
|
||||||
/// Execute from genesis so as to merkelize whole state
|
/// Execute from genesis so as to merkelize whole state
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn execute_clean_merkle() {
|
async fn execute_clean_merkle() {
|
||||||
@ -372,7 +376,8 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}
|
},
|
||||||
|
done: true
|
||||||
}) if block_number == previous_stage && processed == total &&
|
}) if block_number == previous_stage && processed == total &&
|
||||||
total == (
|
total == (
|
||||||
runner.tx.table::<tables::HashedAccount>().unwrap().len() +
|
runner.tx.table::<tables::HashedAccount>().unwrap().len() +
|
||||||
@ -411,7 +416,8 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}
|
},
|
||||||
|
done: true
|
||||||
}) if block_number == previous_stage && processed == total &&
|
}) if block_number == previous_stage && processed == total &&
|
||||||
total == (
|
total == (
|
||||||
runner.tx.table::<tables::HashedAccount>().unwrap().len() +
|
runner.tx.table::<tables::HashedAccount>().unwrap().len() +
|
||||||
|
|||||||
@ -59,7 +59,11 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
let (tx_range, block_range) =
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
|
let (tx_range, block_range, is_final_range) =
|
||||||
input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?;
|
input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?;
|
||||||
let end_block = *block_range.end();
|
let end_block = *block_range.end();
|
||||||
|
|
||||||
@ -69,6 +73,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
|||||||
return Ok(ExecOutput {
|
return Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(end_block)
|
checkpoint: StageCheckpoint::new(end_block)
|
||||||
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
||||||
|
done: is_final_range,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,6 +151,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
|||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(end_block)
|
checkpoint: StageCheckpoint::new(end_block)
|
||||||
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
||||||
|
done: is_final_range,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,7 +161,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: UnwindInput,
|
input: UnwindInput,
|
||||||
) -> Result<UnwindOutput, StageError> {
|
) -> Result<UnwindOutput, StageError> {
|
||||||
let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold);
|
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
// Lookup latest tx id that we should unwind to
|
// Lookup latest tx id that we should unwind to
|
||||||
let latest_tx_id = tx.block_body_indices(unwind_to)?.last_tx_num();
|
let latest_tx_id = tx.block_body_indices(unwind_to)?.last_tx_num();
|
||||||
@ -223,10 +229,12 @@ mod tests {
|
|||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{
|
use crate::test_utils::{
|
||||||
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
|
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
|
||||||
UnwindStageTestRunner,
|
TestTransaction, UnwindStageTestRunner,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
stage_test_suite_ext!(SenderRecoveryTestRunner, sender_recovery);
|
||||||
|
|
||||||
/// Execute a block range with a single transaction
|
/// Execute a block range with a single transaction
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn execute_single_transaction() {
|
async fn execute_single_transaction() {
|
||||||
@ -260,7 +268,7 @@ mod tests {
|
|||||||
processed: 1,
|
processed: 1,
|
||||||
total: 1
|
total: 1
|
||||||
}))
|
}))
|
||||||
}}) if block_number == previous_stage
|
}, done: true }) if block_number == previous_stage
|
||||||
);
|
);
|
||||||
|
|
||||||
// Validate the stage execution
|
// Validate the stage execution
|
||||||
@ -299,17 +307,17 @@ mod tests {
|
|||||||
.unwrap_or(previous_stage);
|
.unwrap_or(previous_stage);
|
||||||
assert_matches!(result, Ok(_));
|
assert_matches!(result, Ok(_));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.as_ref().unwrap(),
|
result.unwrap(),
|
||||||
&ExecOutput {
|
ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
|
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
|
||||||
EntitiesCheckpoint {
|
EntitiesCheckpoint {
|
||||||
processed: runner.tx.table::<tables::TxSenders>().unwrap().len() as u64,
|
processed: runner.tx.table::<tables::TxSenders>().unwrap().len() as u64,
|
||||||
total: total_transactions
|
total: total_transactions
|
||||||
}
|
}
|
||||||
)
|
),
|
||||||
|
done: false
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
assert!(!result.unwrap().is_done(first_input));
|
|
||||||
|
|
||||||
// Execute second time to completion
|
// Execute second time to completion
|
||||||
runner.set_threshold(u64::MAX);
|
runner.set_threshold(u64::MAX);
|
||||||
@ -324,7 +332,8 @@ mod tests {
|
|||||||
&ExecOutput {
|
&ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
|
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
|
||||||
EntitiesCheckpoint { processed: total_transactions, total: total_transactions }
|
EntitiesCheckpoint { processed: total_transactions, total: total_transactions }
|
||||||
)
|
),
|
||||||
|
done: true
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@ -54,7 +54,11 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
let range = input.next_block_range_with_threshold(self.commit_threshold);
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
|
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
|
||||||
let (start_block, end_block) = range.clone().into_inner();
|
let (start_block, end_block) = range.clone().into_inner();
|
||||||
|
|
||||||
debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync");
|
debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync");
|
||||||
@ -86,6 +90,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
|
|||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(end_block)
|
checkpoint: StageCheckpoint::new(end_block)
|
||||||
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
||||||
|
done: is_final_range,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,7 +100,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: UnwindInput,
|
input: UnwindInput,
|
||||||
) -> Result<UnwindOutput, StageError> {
|
) -> Result<UnwindOutput, StageError> {
|
||||||
let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold);
|
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
tx.unwind_table_by_num::<tables::HeaderTD>(unwind_to)?;
|
tx.unwind_table_by_num::<tables::HeaderTD>(unwind_to)?;
|
||||||
|
|
||||||
@ -127,10 +132,12 @@ mod tests {
|
|||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{
|
use crate::test_utils::{
|
||||||
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
|
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
|
||||||
UnwindStageTestRunner,
|
TestTransaction, UnwindStageTestRunner,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
stage_test_suite_ext!(TotalDifficultyTestRunner, total_difficulty);
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn execute_with_intermediate_commit() {
|
async fn execute_with_intermediate_commit() {
|
||||||
let threshold = 50;
|
let threshold = 50;
|
||||||
@ -158,10 +165,9 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}}) if block_number == expected_progress && processed == 1 + threshold &&
|
}, done: false }) if block_number == expected_progress && processed == 1 + threshold &&
|
||||||
total == runner.tx.table::<tables::Headers>().unwrap().len() as u64
|
total == runner.tx.table::<tables::Headers>().unwrap().len() as u64
|
||||||
);
|
);
|
||||||
assert!(!result.unwrap().is_done(first_input));
|
|
||||||
|
|
||||||
// Execute second time
|
// Execute second time
|
||||||
let second_input = ExecInput {
|
let second_input = ExecInput {
|
||||||
@ -177,7 +183,7 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}}) if block_number == previous_stage && processed == total &&
|
}, done: true }) if block_number == previous_stage && processed == total &&
|
||||||
total == runner.tx.table::<tables::Headers>().unwrap().len() as u64
|
total == runner.tx.table::<tables::Headers>().unwrap().len() as u64
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@ -55,7 +55,11 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: ExecInput,
|
input: ExecInput,
|
||||||
) -> Result<ExecOutput, StageError> {
|
) -> Result<ExecOutput, StageError> {
|
||||||
let (tx_range, block_range) =
|
if input.target_reached() {
|
||||||
|
return Ok(ExecOutput::done(input.checkpoint()))
|
||||||
|
}
|
||||||
|
|
||||||
|
let (tx_range, block_range, is_final_range) =
|
||||||
input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?;
|
input.next_block_range_with_transaction_threshold(tx, self.commit_threshold)?;
|
||||||
let end_block = *block_range.end();
|
let end_block = *block_range.end();
|
||||||
|
|
||||||
@ -135,6 +139,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
|
|||||||
Ok(ExecOutput {
|
Ok(ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(end_block)
|
checkpoint: StageCheckpoint::new(end_block)
|
||||||
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
|
||||||
|
done: is_final_range,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -144,7 +149,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
|
|||||||
tx: &mut Transaction<'_, DB>,
|
tx: &mut Transaction<'_, DB>,
|
||||||
input: UnwindInput,
|
input: UnwindInput,
|
||||||
) -> Result<UnwindOutput, StageError> {
|
) -> Result<UnwindOutput, StageError> {
|
||||||
let (range, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold);
|
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
|
||||||
|
|
||||||
// Cursors to unwind tx hash to number
|
// Cursors to unwind tx hash to number
|
||||||
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
|
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
|
||||||
@ -187,13 +192,16 @@ fn stage_checkpoint<DB: Database>(
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::test_utils::{
|
use crate::test_utils::{
|
||||||
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
|
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
|
||||||
UnwindStageTestRunner,
|
TestTransaction, UnwindStageTestRunner,
|
||||||
};
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use reth_interfaces::test_utils::generators::{random_block, random_block_range};
|
use reth_interfaces::test_utils::generators::{random_block, random_block_range};
|
||||||
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
|
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
|
||||||
|
|
||||||
|
// Implement stage test suite.
|
||||||
|
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn execute_single_transaction_lookup() {
|
async fn execute_single_transaction_lookup() {
|
||||||
let (previous_stage, stage_progress) = (500, 100);
|
let (previous_stage, stage_progress) = (500, 100);
|
||||||
@ -226,7 +234,7 @@ mod tests {
|
|||||||
processed,
|
processed,
|
||||||
total
|
total
|
||||||
}))
|
}))
|
||||||
}}) if block_number == previous_stage && processed == total &&
|
}, done: true }) if block_number == previous_stage && processed == total &&
|
||||||
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
|
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -265,17 +273,17 @@ mod tests {
|
|||||||
.unwrap_or(previous_stage);
|
.unwrap_or(previous_stage);
|
||||||
assert_matches!(result, Ok(_));
|
assert_matches!(result, Ok(_));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.as_ref().unwrap(),
|
result.unwrap(),
|
||||||
&ExecOutput {
|
ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
|
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
|
||||||
EntitiesCheckpoint {
|
EntitiesCheckpoint {
|
||||||
processed: runner.tx.table::<tables::TxHashNumber>().unwrap().len() as u64,
|
processed: runner.tx.table::<tables::TxHashNumber>().unwrap().len() as u64,
|
||||||
total: total_txs
|
total: total_txs
|
||||||
}
|
}
|
||||||
)
|
),
|
||||||
|
done: false
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
assert!(!result.unwrap().is_done(first_input));
|
|
||||||
|
|
||||||
// Execute second time to completion
|
// Execute second time to completion
|
||||||
runner.set_threshold(u64::MAX);
|
runner.set_threshold(u64::MAX);
|
||||||
@ -290,7 +298,8 @@ mod tests {
|
|||||||
&ExecOutput {
|
&ExecOutput {
|
||||||
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
|
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
|
||||||
EntitiesCheckpoint { processed: total_txs, total: total_txs }
|
EntitiesCheckpoint { processed: total_txs, total: total_txs }
|
||||||
)
|
),
|
||||||
|
done: true
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@ -42,8 +42,8 @@ macro_rules! stage_test_suite {
|
|||||||
let result = rx.await.unwrap();
|
let result = rx.await.unwrap();
|
||||||
assert_matches::assert_matches!(
|
assert_matches::assert_matches!(
|
||||||
result,
|
result,
|
||||||
Ok(ref output @ ExecOutput { checkpoint })
|
Ok(ExecOutput { done, checkpoint })
|
||||||
if output.is_done(input) && checkpoint.block_number == previous_stage
|
if done && checkpoint.block_number == previous_stage
|
||||||
);
|
);
|
||||||
|
|
||||||
// Validate the stage execution
|
// Validate the stage execution
|
||||||
@ -94,8 +94,8 @@ macro_rules! stage_test_suite {
|
|||||||
let result = rx.await.unwrap();
|
let result = rx.await.unwrap();
|
||||||
assert_matches::assert_matches!(
|
assert_matches::assert_matches!(
|
||||||
result,
|
result,
|
||||||
Ok(ref output @ ExecOutput { checkpoint })
|
Ok(ExecOutput { done, checkpoint })
|
||||||
if output.is_done(execute_input) && checkpoint.block_number == previous_stage
|
if done && checkpoint.block_number == previous_stage
|
||||||
);
|
);
|
||||||
assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation");
|
assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation");
|
||||||
|
|
||||||
@ -113,8 +113,7 @@ macro_rules! stage_test_suite {
|
|||||||
// Assert the successful unwind result
|
// Assert the successful unwind result
|
||||||
assert_matches::assert_matches!(
|
assert_matches::assert_matches!(
|
||||||
rx,
|
rx,
|
||||||
Ok(output @ UnwindOutput { checkpoint })
|
Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == unwind_input.unwind_to
|
||||||
if output.is_done(unwind_input) && checkpoint.block_number == unwind_input.unwind_to
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Validate the stage unwind
|
// Validate the stage unwind
|
||||||
@ -124,4 +123,46 @@ macro_rules! stage_test_suite {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// `execute_already_reached_target` is not suitable for the headers stage thus
|
||||||
|
// included in the test suite extension
|
||||||
|
macro_rules! stage_test_suite_ext {
|
||||||
|
($runner:ident, $name:ident) => {
|
||||||
|
crate::test_utils::stage_test_suite!($runner, $name);
|
||||||
|
|
||||||
|
paste::item! {
|
||||||
|
/// Check that the execution is short-circuited if the target was already reached.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn [< execute_already_reached_target_ $name>] () {
|
||||||
|
let stage_progress = 1000;
|
||||||
|
|
||||||
|
// Set up the runner
|
||||||
|
let mut runner = $runner::default();
|
||||||
|
let input = crate::stage::ExecInput {
|
||||||
|
target: Some(stage_progress),
|
||||||
|
checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)),
|
||||||
|
};
|
||||||
|
let seed = runner.seed_execution(input).expect("failed to seed");
|
||||||
|
|
||||||
|
// Run stage execution
|
||||||
|
let rx = runner.execute(input);
|
||||||
|
|
||||||
|
// Run `after_execution` hook
|
||||||
|
runner.after_execution(seed).await.expect("failed to run after execution hook");
|
||||||
|
|
||||||
|
// Assert the successful result
|
||||||
|
let result = rx.await.unwrap();
|
||||||
|
assert_matches::assert_matches!(
|
||||||
|
result,
|
||||||
|
Ok(ExecOutput { done, checkpoint })
|
||||||
|
if done && checkpoint.block_number == stage_progress
|
||||||
|
);
|
||||||
|
|
||||||
|
// Validate the stage execution
|
||||||
|
assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) use stage_test_suite;
|
pub(crate) use stage_test_suite;
|
||||||
|
pub(crate) use stage_test_suite_ext;
|
||||||
|
|||||||
@ -1,46 +1,19 @@
|
|||||||
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
|
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
|
||||||
use reth_db::database::Database;
|
use reth_db::database::Database;
|
||||||
use reth_primitives::stage::{StageCheckpoint, StageId};
|
use reth_primitives::stage::StageId;
|
||||||
use reth_provider::Transaction;
|
use reth_provider::Transaction;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TestStage {
|
pub struct TestStage {
|
||||||
id: StageId,
|
id: StageId,
|
||||||
checkpoint: Option<StageCheckpoint>,
|
|
||||||
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
|
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
|
||||||
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
|
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestStage {
|
impl TestStage {
|
||||||
pub fn new(id: StageId) -> Self {
|
pub fn new(id: StageId) -> Self {
|
||||||
Self {
|
Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() }
|
||||||
id,
|
|
||||||
checkpoint: None,
|
|
||||||
exec_outputs: VecDeque::new(),
|
|
||||||
unwind_outputs: VecDeque::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_checkpoint<DB: Database>(
|
|
||||||
mut self,
|
|
||||||
checkpoint: Option<StageCheckpoint>,
|
|
||||||
db: DB,
|
|
||||||
) -> Self {
|
|
||||||
let mut tx = Transaction::new(&db).expect("initialize transaction");
|
|
||||||
|
|
||||||
if let Some(checkpoint) = checkpoint {
|
|
||||||
tx.save_stage_checkpoint(self.id, checkpoint)
|
|
||||||
.unwrap_or_else(|_| panic!("save stage {} checkpoint", self.id))
|
|
||||||
} else {
|
|
||||||
tx.delete_stage_checkpoint(self.id)
|
|
||||||
.unwrap_or_else(|_| panic!("delete stage {} checkpoint", self.id))
|
|
||||||
}
|
|
||||||
|
|
||||||
tx.commit().expect("commit transaction");
|
|
||||||
|
|
||||||
self.checkpoint = checkpoint;
|
|
||||||
self
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_exec(mut self, exec_outputs: VecDeque<Result<ExecOutput, StageError>>) -> Self {
|
pub fn with_exec(mut self, exec_outputs: VecDeque<Result<ExecOutput, StageError>>) -> Self {
|
||||||
|
|||||||
@ -1323,12 +1323,6 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete stage checkpoint.
|
|
||||||
pub fn delete_stage_checkpoint(&self, id: StageId) -> Result<(), DbError> {
|
|
||||||
self.delete::<tables::SyncStage>(id.to_string(), None)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return full table as Vec
|
/// Return full table as Vec
|
||||||
pub fn table<T: Table>(&self) -> Result<Vec<KeyValue<T>>, DbError>
|
pub fn table<T: Table>(&self) -> Result<Vec<KeyValue<T>>, DbError>
|
||||||
where
|
where
|
||||||
|
|||||||
Reference in New Issue
Block a user