refactor(stages): input target reached & output done checks (#3119)

This commit is contained in:
Alexey Shekhirin
2023-06-13 16:02:48 +04:00
committed by GitHub
parent 0561675bb9
commit 6752d624a1
26 changed files with 332 additions and 392 deletions

View File

@ -120,30 +120,22 @@ impl Command {
let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage
.execute(
&mut provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
account_hashing_done = output.done;
let input = ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
};
let output = account_hashing_stage.execute(&mut provider_rw, input).await?;
account_hashing_done = output.is_done(input);
}
let mut storage_hashing_done = false;
while !storage_hashing_done {
let output = storage_hashing_stage
.execute(
&mut provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
storage_hashing_done = output.done;
let input = ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
};
let output = storage_hashing_stage.execute(&mut provider_rw, input).await?;
storage_hashing_done = output.is_done(input);
}
let incremental_result = merkle_stage
@ -173,7 +165,7 @@ impl Command {
loop {
let clean_result = merkle_stage.execute(&mut provider_rw, clean_input).await;
assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().done {
if clean_result.unwrap().is_done(clean_input) {
break
}
}

View File

@ -72,7 +72,8 @@ impl NodeState {
pipeline_position,
pipeline_total,
stage_id,
result: ExecOutput { checkpoint, done },
result: ExecOutput { checkpoint },
done,
} => {
self.current_checkpoint = checkpoint;

View File

@ -77,16 +77,11 @@ async fn dry_run<DB: Database>(
let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = exec_stage.execute(&mut provider, exec_input).await?.is_done(exec_input);
}
info!(target: "reth::cli", "Success.");

View File

@ -76,16 +76,11 @@ async fn dry_run<DB: Database>(
let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = exec_stage.execute(&mut provider, exec_input).await?.is_done(exec_input);
}
info!(target: "reth::cli", "Success.");

View File

@ -119,20 +119,17 @@ async fn dry_run<DB: Database>(
let mut provider = shareable_db.provider_rw()?;
let mut exec_output = false;
while !exec_output {
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = MerkleStage::Execution {
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating
* from
* scratch */
// Forces updating the root instead of calculating from scratch
clean_threshold: u64::MAX,
}
.execute(
&mut provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.execute(&mut provider, exec_input)
.await?
.done;
.is_done(exec_input);
}
info!(target: "reth::cli", "Success.");

View File

@ -20,7 +20,7 @@ use reth_stages::{
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
},
ExecInput, ExecOutput, PipelineError, Stage, UnwindInput,
ExecInput, PipelineError, Stage, UnwindInput,
};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc};
use tracing::*;
@ -238,10 +238,13 @@ impl Command {
checkpoint: Some(checkpoint.with_block_number(self.from)),
};
while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut provider_rw, input).await?
{
input.checkpoint = Some(stage_progress);
loop {
let result = exec_stage.execute(&mut provider_rw, input).await?;
if result.is_done(input) {
break
}
input.checkpoint = Some(result.checkpoint);
if self.commit {
provider_rw.commit()?;

View File

@ -1369,6 +1369,7 @@ mod tests {
chain_spec: Arc<ChainSpec>,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<PostState>,
max_block: Option<BlockNumber>,
) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
reth_tracing::init_test_tracing();
let db = create_test_rw_db();
@ -1380,10 +1381,13 @@ mod tests {
// Setup pipeline
let (tip_tx, tip_rx) = watch::channel(H256::default());
let pipeline = Pipeline::builder()
let mut pipeline_builder = Pipeline::builder()
.add_stages(TestStages::new(pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx)
.build(db.clone(), chain_spec.clone());
.with_tip_sender(tip_tx);
if let Some(max_block) = max_block {
pipeline_builder = pipeline_builder.with_max_block(max_block);
}
let pipeline = pipeline_builder.build(db.clone(), chain_spec.clone());
// Setup blockchain tree
let externals =
@ -1403,7 +1407,7 @@ mod tests {
blockchain_provider,
Box::<TokioTaskExecutor>::default(),
Box::<NoopSyncStateUpdater>::default(),
None,
max_block,
false,
payload_builder,
None,
@ -1438,6 +1442,7 @@ mod tests {
chain_spec.clone(),
VecDeque::from([Err(StageError::ChannelClosed)]),
Vec::default(),
Some(1),
);
let res = spawn_consensus_engine(consensus_engine);
@ -1467,6 +1472,7 @@ mod tests {
chain_spec.clone(),
VecDeque::from([Err(StageError::ChannelClosed)]),
Vec::default(),
Some(1),
);
let mut rx = spawn_consensus_engine(consensus_engine);
@ -1506,10 +1512,11 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1) }),
Err(StageError::ChannelClosed),
]),
Vec::default(),
Some(2),
);
let rx = spawn_consensus_engine(consensus_engine);
@ -1522,7 +1529,9 @@ mod tests {
assert_matches!(
rx.await,
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
Ok(
Err(BeaconConsensusEngineError::Pipeline(n))
) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
);
}
@ -1536,15 +1545,12 @@ mod tests {
.paris_activated()
.build(),
);
let (mut consensus_engine, env) = setup_consensus_engine(
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(max_block),
done: true,
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(max_block) })]),
Vec::default(),
Some(max_block),
);
consensus_engine.sync.set_max_block(max_block);
let rx = spawn_consensus_engine(consensus_engine);
let _ = env
@ -1584,11 +1590,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);
let mut engine_rx = spawn_consensus_engine(consensus_engine);
@ -1615,11 +1619,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);
let genesis = random_block(0, None, None, Some(0));
@ -1664,10 +1666,11 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);
let genesis = random_block(0, None, None, Some(0));
@ -1712,11 +1715,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);
let genesis = random_block(0, None, None, Some(0));
@ -1750,10 +1751,11 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);
let genesis = random_block(0, None, None, Some(0));
@ -1803,10 +1805,11 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);
let genesis = random_block(0, None, None, Some(0));
@ -1849,11 +1852,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);
let mut engine_rx = spawn_consensus_engine(consensus_engine);
@ -1882,11 +1883,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);
let genesis = random_block(0, None, None, Some(0));
@ -1932,11 +1931,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::default(),
None,
);
let genesis = random_block(0, None, None, Some(0));
@ -1989,11 +1986,9 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec.clone(),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
Vec::from([exec_result2]),
None,
);
insert_blocks(

View File

@ -83,12 +83,6 @@ where
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
}
/// Sets the max block value for testing
#[cfg(test)]
pub(crate) fn set_max_block(&mut self, block: BlockNumber) {
self.max_block = Some(block);
}
/// Cancels all full block requests that are in progress.
pub(crate) fn clear_full_block_requests(&mut self) {
self.inflight_full_block_requests.clear();

View File

@ -31,6 +31,8 @@ pub enum PipelineEvent {
stage_id: StageId,
/// The result of executing the stage.
result: ExecOutput,
/// Stage completed executing the whole block range
done: bool,
},
/// Emitted when a stage is about to be unwound.
Unwinding {
@ -45,6 +47,8 @@ pub enum PipelineEvent {
stage_id: StageId,
/// The result of unwinding the stage.
result: UnwindOutput,
/// Stage completed unwinding the whole block range
done: bool,
},
/// Emitted when a stage encounters an error either during execution or unwinding.
Error {

View File

@ -262,8 +262,10 @@ where
continue
}
let mut done = UnwindInput { checkpoint, unwind_to: to, bad_block }.target_reached();
debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind");
while checkpoint.block_number > to {
while !done {
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });
@ -271,6 +273,7 @@ where
match output {
Ok(unwind_output) => {
checkpoint = unwind_output.checkpoint;
done = unwind_output.is_done(input);
info!(
target: "sync::pipeline",
stage = %stage_id,
@ -287,8 +290,11 @@ where
);
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
self.listeners
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
self.listeners.notify(PipelineEvent::Unwound {
stage_id,
result: unwind_output,
done,
});
provider_rw.commit()?;
provider_rw =
@ -349,11 +355,18 @@ where
checkpoint: prev_checkpoint,
});
match stage
.execute(&mut provider_rw, ExecInput { target, checkpoint: prev_checkpoint })
.await
{
Ok(out @ ExecOutput { checkpoint, done }) => {
let input = ExecInput { target, checkpoint: prev_checkpoint };
let result = if input.target_reached() {
Ok(ExecOutput { checkpoint: input.checkpoint() })
} else {
stage
.execute(&mut provider_rw, ExecInput { target, checkpoint: prev_checkpoint })
.await
};
match result {
Ok(out @ ExecOutput { checkpoint }) => {
let done = out.is_done(input);
made_progress |=
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
info!(
@ -372,6 +385,7 @@ where
pipeline_total: total_stages,
stage_id,
result: out.clone(),
done,
});
// TODO: Make the commit interval configurable
@ -470,7 +484,10 @@ impl<DB: Database> std::fmt::Debug for Pipeline<DB> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{test_utils::TestStage, UnwindOutput};
use crate::{
test_utils::{TestStage, TestTransaction},
UnwindOutput,
};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, EnvKind};
use reth_interfaces::{
@ -509,19 +526,23 @@ mod tests {
/// Runs a simple pipeline.
#[tokio::test]
async fn run_pipeline() {
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let tx = TestTransaction::default();
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) })),
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
.with_checkpoint(Some(StageCheckpoint::new(10)), tx.inner()),
)
.add_stage(
TestStage::new(StageId::Other("C"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
)
.with_max_block(10)
.build(db, MAINNET.clone());
.build(tx.inner_raw(), MAINNET.clone());
let events = pipeline.events();
// Run pipeline
@ -535,27 +556,30 @@ mod tests {
vec![
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
pipeline_total: 3,
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
pipeline_total: 3,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(20) },
done: true,
},
PipelineEvent::Skipped { stage_id: StageId::Other("B") },
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
pipeline_position: 3,
pipeline_total: 3,
stage_id: StageId::Other("C"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
pipeline_position: 3,
pipeline_total: 3,
stage_id: StageId::Other("C"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
done: true,
},
]
);
@ -569,17 +593,17 @@ mod tests {
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.add_stage(
TestStage::new(StageId::Other("C"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20) }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.with_max_block(10)
@ -610,7 +634,8 @@ mod tests {
pipeline_position: 1,
pipeline_total: 3,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(100) },
done: true
},
PipelineEvent::Running {
pipeline_position: 2,
@ -622,7 +647,8 @@ mod tests {
pipeline_position: 2,
pipeline_total: 3,
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
done: true
},
PipelineEvent::Running {
pipeline_position: 3,
@ -634,7 +660,8 @@ mod tests {
pipeline_position: 3,
pipeline_total: 3,
stage_id: StageId::Other("C"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(20) },
done: true
},
// Unwinding
PipelineEvent::Unwinding {
@ -648,6 +675,7 @@ mod tests {
PipelineEvent::Unwound {
stage_id: StageId::Other("C"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
done: true
},
PipelineEvent::Unwinding {
stage_id: StageId::Other("B"),
@ -660,6 +688,7 @@ mod tests {
PipelineEvent::Unwound {
stage_id: StageId::Other("B"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
done: true
},
PipelineEvent::Unwinding {
stage_id: StageId::Other("A"),
@ -672,6 +701,7 @@ mod tests {
PipelineEvent::Unwound {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
done: true
},
]
);
@ -685,12 +715,12 @@ mod tests {
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100) }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
)
.with_max_block(10)
.build(db, MAINNET.clone());
@ -720,7 +750,8 @@ mod tests {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(100) },
done: true
},
PipelineEvent::Running {
pipeline_position: 2,
@ -732,7 +763,8 @@ mod tests {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
done: true
},
// Unwinding
// Nothing to unwind in stage "B"
@ -748,6 +780,7 @@ mod tests {
PipelineEvent::Unwound {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
done: true
},
]
);
@ -772,9 +805,9 @@ mod tests {
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
)
.add_stage(
TestStage::new(StageId::Other("B"))
@ -783,7 +816,7 @@ mod tests {
error: consensus::ConsensusError::BaseFeeMissing,
}))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
)
.with_max_block(10)
.build(db, MAINNET.clone());
@ -808,7 +841,8 @@ mod tests {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
done: true
},
PipelineEvent::Running {
pipeline_position: 2,
@ -828,6 +862,7 @@ mod tests {
PipelineEvent::Unwound {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
done: true
},
PipelineEvent::Running {
pipeline_position: 1,
@ -839,7 +874,8 @@ mod tests {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
done: true
},
PipelineEvent::Running {
pipeline_position: 2,
@ -851,7 +887,8 @@ mod tests {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
result: ExecOutput { checkpoint: StageCheckpoint::new(10) },
done: true
},
]
);
@ -861,17 +898,17 @@ mod tests {
#[tokio::test]
async fn pipeline_error_handling() {
// Non-fatal
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline = Pipeline::builder()
.add_stage(
TestStage::new(StageId::Other("NonFatal"))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(db, MAINNET.clone());
let result = pipeline.run().await;
assert_matches!(result, Ok(()));
// let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
// let mut pipeline = Pipeline::builder()
// .add_stage(
// TestStage::new(StageId::Other("NonFatal"))
// .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
// .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10) })),
// )
// .with_max_block(10)
// .build(db, MAINNET.clone());
// let result = pipeline.run().await;
// assert_matches!(result, Ok(()));
// Fatal
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
@ -879,6 +916,7 @@ mod tests {
.add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
)))
.with_max_block(1)
.build(db, MAINNET.clone());
let result = pipeline.run().await;
assert_matches!(

View File

@ -10,6 +10,7 @@ use std::{
cmp::{max, min},
ops::RangeInclusive,
};
use tracing::warn;
/// Stage execution input, see [Stage::execute].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
@ -35,7 +36,7 @@ impl ExecInput {
/// Returns `true` if the target block number has already been reached.
pub fn target_reached(&self) -> bool {
self.checkpoint().block_number >= self.target()
ExecOutput { checkpoint: self.checkpoint.unwrap_or_default() }.is_done(*self)
}
/// Return the target block number or default.
@ -45,8 +46,7 @@ impl ExecInput {
/// Return next block range that needs to be executed.
pub fn next_block_range(&self) -> RangeInclusive<BlockNumber> {
let (range, _) = self.next_block_range_with_threshold(u64::MAX);
range
self.next_block_range_with_threshold(u64::MAX)
}
/// Return true if this is the first block range to execute.
@ -55,19 +55,15 @@ impl ExecInput {
}
/// Return the next block range to execute.
/// Return pair of the block range and if this is final block range.
pub fn next_block_range_with_threshold(
&self,
threshold: u64,
) -> (RangeInclusive<BlockNumber>, bool) {
/// Return pair of the block range.
pub fn next_block_range_with_threshold(&self, threshold: u64) -> RangeInclusive<BlockNumber> {
let current_block = self.checkpoint();
let start = current_block.block_number + 1;
let target = self.target();
let end = min(target, current_block.block_number.saturating_add(threshold));
let is_final_range = end == target;
(start..=end, is_final_range)
start..=end
}
/// Return the next block range determined the number of transactions within it.
@ -77,7 +73,7 @@ impl ExecInput {
&self,
provider: &DatabaseProviderRW<'_, DB>,
tx_threshold: u64,
) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>), StageError> {
let start_block = self.next_block();
let start_block_body = provider
.tx_ref()
@ -100,8 +96,7 @@ impl ExecInput {
break
}
}
let is_final_range = end_block_number >= target_block;
Ok((first_tx_number..=last_tx_number, start_block..=end_block_number, is_final_range))
Ok((first_tx_number..=last_tx_number, start_block..=end_block_number))
}
}
@ -117,6 +112,11 @@ pub struct UnwindInput {
}
impl UnwindInput {
/// Returns `true` if the target block number has already been reached.
pub fn target_reached(&self) -> bool {
UnwindOutput { checkpoint: self.checkpoint }.is_done(*self)
}
/// Return next block range that needs to be unwound.
pub fn unwind_block_range(&self) -> RangeInclusive<BlockNumber> {
self.unwind_block_range_with_threshold(u64::MAX).0
@ -126,7 +126,7 @@ impl UnwindInput {
pub fn unwind_block_range_with_threshold(
&self,
threshold: u64,
) -> (RangeInclusive<BlockNumber>, BlockNumber, bool) {
) -> (RangeInclusive<BlockNumber>, BlockNumber) {
// +1 is to skip the block we're unwinding to
let mut start = self.unwind_to + 1;
let end = self.checkpoint;
@ -135,8 +135,7 @@ impl UnwindInput {
let unwind_to = start - 1;
let is_final_range = unwind_to == self.unwind_to;
(start..=end.block_number, unwind_to, is_final_range)
(start..=end.block_number, unwind_to)
}
}
@ -145,14 +144,16 @@ impl UnwindInput {
pub struct ExecOutput {
/// How far the stage got.
pub checkpoint: StageCheckpoint,
/// Whether or not the stage is done.
pub done: bool,
}
impl ExecOutput {
/// Mark the stage as done, checkpointing at the given place.
pub fn done(checkpoint: StageCheckpoint) -> Self {
Self { checkpoint, done: true }
/// Returns `true` if the target block number has already been reached,
/// i.e. `checkpoint.block_number >= target`.
pub fn is_done(&self, input: ExecInput) -> bool {
if self.checkpoint.block_number > input.target() {
warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the execution target");
}
self.checkpoint.block_number >= input.target()
}
}
@ -163,6 +164,17 @@ pub struct UnwindOutput {
pub checkpoint: StageCheckpoint,
}
impl UnwindOutput {
/// Returns `true` if the target block number has already been reached,
/// i.e. `checkpoint.block_number <= unwind_to`.
pub fn is_done(&self, input: UnwindInput) -> bool {
if self.checkpoint.block_number < input.unwind_to {
warn!(target: "sync::pipeline", ?input, output = ?self, "Checkpoint is beyond the unwind target");
}
self.checkpoint.block_number <= input.unwind_to
}
}
/// A stage is a segmented part of the syncing process of the node.
///
/// Each stage takes care of a well-defined task, such as downloading headers or executing

View File

@ -70,10 +70,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let range = input.next_block_range();
// Update the header range on the downloader
self.downloader.set_download_range(range.clone())?;
@ -152,11 +148,9 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// The stage is "done" if:
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
let done = highest_block == to_block;
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(highest_block)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
done,
})
}
@ -232,15 +226,11 @@ fn stage_checkpoint<DB: Database>(
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
};
use crate::test_utils::{ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner};
use assert_matches::assert_matches;
use reth_primitives::stage::StageUnitCheckpoint;
use test_utils::*;
stage_test_suite_ext!(BodyTestRunner, body);
/// Checks that the stage downloads at most `batch_size` blocks.
#[tokio::test]
async fn partial_body_download() {
@ -273,7 +263,7 @@ mod tests {
processed, // 1 seeded block body + batch size
total // seeded headers
}))
}, done: false }) if block_number < 200 &&
}}) if block_number < 200 &&
processed == 1 + batch_size && total == previous_stage
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
@ -310,8 +300,7 @@ mod tests {
processed,
total
}))
},
done: true
}
}) if processed == total && total == previous_stage
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
@ -346,7 +335,7 @@ mod tests {
processed,
total
}))
}, done: false }) if block_number >= 10 &&
}}) if block_number >= 10 &&
processed == 1 + batch_size && total == previous_stage
);
let first_run_checkpoint = first_run.unwrap().checkpoint;
@ -366,7 +355,7 @@ mod tests {
processed,
total
}))
}, done: true }) if block_number > first_run_checkpoint.block_number &&
}}) if block_number > first_run_checkpoint.block_number &&
processed == total && total == previous_stage
);
assert_matches!(
@ -406,7 +395,7 @@ mod tests {
processed,
total
}))
}, done: true }) if block_number == previous_stage &&
}}) if block_number == previous_stage &&
processed == total && total == previous_stage
);
let checkpoint = output.unwrap().checkpoint;

View File

@ -143,10 +143,6 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let start_block = input.next_block();
let max_block = input.target();
@ -199,11 +195,9 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
state.write_to_db(provider.tx_ref())?;
trace!(target: "sync::stages::execution", took = ?start.elapsed(), "Wrote state");
let done = stage_progress == max_block;
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(stage_progress)
.with_execution_stage_checkpoint(stage_checkpoint),
done,
})
}
}
@ -345,7 +339,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
let mut account_changeset = tx.cursor_dup_write::<tables::AccountChangeSet>()?;
let mut storage_changeset = tx.cursor_dup_write::<tables::StorageChangeSet>()?;
let (range, unwind_to, _) =
let (range, unwind_to) =
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
@ -669,8 +663,7 @@ mod tests {
total
}
}))
},
done: true
}
} if processed == total && total == block.gas_used);
let mut provider = db.provider_rw().unwrap();
let tx = provider.tx_mut();

View File

@ -21,7 +21,7 @@ impl<DB: Database> Stage<DB> for FinishStage {
_provider: &mut DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) })
}
async fn unwind(
@ -37,14 +37,12 @@ impl<DB: Database> Stage<DB> for FinishStage {
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner,
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
};
use reth_interfaces::test_utils::generators::{random_header, random_header_range};
use reth_primitives::SealedHeader;
stage_test_suite_ext!(FinishTestRunner, finish);
#[derive(Default)]
struct FinishTestRunner {
tx: TestTransaction,
@ -89,7 +87,7 @@ mod tests {
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
assert!(output.done, "stage should always be done");
assert!(output.is_done(input), "stage should always be done");
assert_eq!(
output.checkpoint.block_number,
input.target(),

View File

@ -135,10 +135,6 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (from_block, to_block) = input.next_block_range().into_inner();
// if there are more blocks then threshold it is faster to go over Plain state and hash all
@ -236,7 +232,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
},
);
return Ok(ExecOutput { checkpoint, done: false })
return Ok(ExecOutput { checkpoint })
}
} else {
// Aggregate all transition changesets and make a list of accounts that have been
@ -258,7 +254,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
..Default::default()
});
Ok(ExecOutput { checkpoint, done: true })
Ok(ExecOutput { checkpoint })
}
/// Unwind the stage.
@ -267,7 +263,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) =
let (range, unwind_progress) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Aggregate all transition changesets and make a list of accounts that have been changed.
@ -297,15 +293,11 @@ fn stage_checkpoint_progress<DB: Database>(
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner,
};
use crate::test_utils::{ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner};
use assert_matches::assert_matches;
use reth_primitives::{stage::StageUnitCheckpoint, Account, U256};
use test_utils::*;
stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
#[tokio::test]
async fn execute_clean_account_hashing() {
let (previous_stage, stage_progress) = (20, 10);
@ -335,8 +327,7 @@ mod tests {
},
..
})),
},
done: true,
}
}) if block_number == previous_stage &&
processed == total &&
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
@ -393,8 +384,7 @@ mod tests {
progress: EntitiesCheckpoint { processed: 5, total }
}
))
},
done: false
}
}) if address == fifth_address &&
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
);
@ -420,8 +410,7 @@ mod tests {
progress: EntitiesCheckpoint { processed, total }
}
))
},
done: true
}
}) if processed == total &&
total == runner.tx.table::<tables::PlainAccountState>().unwrap().len() as u64
);

View File

@ -58,9 +58,6 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let tx = provider.tx_ref();
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (from_block, to_block) = input.next_block_range().into_inner();
@ -166,7 +163,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
},
);
return Ok(ExecOutput { checkpoint, done: false })
return Ok(ExecOutput { checkpoint })
}
} else {
// Aggregate all changesets and and make list of storages that have been
@ -188,7 +185,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
..Default::default()
});
Ok(ExecOutput { checkpoint, done: true })
Ok(ExecOutput { checkpoint })
}
/// Unwind the stage.
@ -197,7 +194,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) =
let (range, unwind_progress) =
input.unwind_block_range_with_threshold(self.commit_threshold);
provider.unwind_storage_hashing(BlockNumberAddress::range(range))?;
@ -227,8 +224,8 @@ fn stage_checkpoint_progress<DB: Database>(
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner,
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
};
use assert_matches::assert_matches;
use reth_db::{
@ -243,8 +240,6 @@ mod tests {
stage::StageUnitCheckpoint, Address, SealedBlock, StorageEntry, H256, U256,
};
stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
/// Execute with low clean threshold so as to hash whole storage
#[tokio::test]
async fn execute_clean_storage_hashing() {
@ -268,10 +263,8 @@ mod tests {
runner.seed_execution(input).expect("failed to seed execution");
loop {
if let Ok(result @ ExecOutput { checkpoint, done }) =
runner.execute(input).await.unwrap()
{
if !done {
if let Ok(result @ ExecOutput { checkpoint }) = runner.execute(input).await.unwrap() {
if !result.is_done(input) {
let previous_checkpoint = input
.checkpoint
.and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint())
@ -361,8 +354,7 @@ mod tests {
total
}
}))
},
done: false
}
}) if address == progress_address && storage == progress_key &&
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
);
@ -407,8 +399,7 @@ mod tests {
}
}
))
},
done: false
}
}) if address == progress_address && storage == progress_key &&
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
);
@ -439,8 +430,7 @@ mod tests {
}
}
))
},
done: true
}
}) if processed == total &&
total == runner.tx.table::<tables::PlainStorageState>().unwrap().len() as u64
);

View File

@ -210,7 +210,7 @@ where
// Nothing to sync
if gap.is_closed() {
info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached");
return Ok(ExecOutput::done(current_checkpoint))
return Ok(ExecOutput { checkpoint: current_checkpoint })
}
debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync");
@ -313,12 +313,10 @@ where
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(checkpoint)
.with_headers_stage_checkpoint(stage_checkpoint),
done: true,
})
} else {
Ok(ExecOutput {
checkpoint: current_checkpoint.with_headers_stage_checkpoint(stage_checkpoint),
done: false,
})
}
}
@ -591,7 +589,7 @@ mod tests {
total,
}
}))
}, done: true }) if block_number == tip.number &&
}}) if block_number == tip.number &&
from == checkpoint && to == previous_stage &&
// -1 because we don't need to download the local head
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number);
@ -687,7 +685,7 @@ mod tests {
total,
}
}))
}, done: false }) if block_number == checkpoint &&
}}) if block_number == checkpoint &&
from == checkpoint && to == previous_stage &&
processed == checkpoint + 500 && total == tip.number);
@ -710,7 +708,7 @@ mod tests {
total,
}
}))
}, done: true }) if block_number == tip.number &&
}}) if block_number == tip.number &&
from == checkpoint && to == previous_stage &&
// -1 because we don't need to download the local head
processed == checkpoint + headers.len() as u64 - 1 && total == tip.number);

View File

@ -38,11 +38,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let range = input.next_block_range_with_threshold(self.commit_threshold);
let mut stage_checkpoint = stage_checkpoint(
provider,
@ -63,7 +59,6 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
}
@ -73,7 +68,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) =
let (range, unwind_progress) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let changesets = provider.unwind_account_history_indices(range)?;
@ -222,9 +217,9 @@ mod tests {
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
assert!(out.is_done(input));
provider.commit().unwrap();
}
@ -462,10 +457,10 @@ mod tests {
block_range: CheckpointBlockRange { from: 1, to: 5 },
progress: EntitiesCheckpoint { processed: 1, total: 2 }
}
),
done: false
)
}
);
assert!(!out.is_done(input));
input.checkpoint = Some(out.checkpoint);
let out = stage.execute(&mut provider, input).await.unwrap();
@ -477,10 +472,10 @@ mod tests {
block_range: CheckpointBlockRange { from: 5, to: 5 },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
)
}
);
assert!(out.is_done(input));
provider.commit().unwrap();
}

View File

@ -41,11 +41,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let range = input.next_block_range_with_threshold(self.commit_threshold);
let mut stage_checkpoint = stage_checkpoint(
provider,
@ -65,7 +61,6 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
done: is_final_range,
})
}
@ -75,7 +70,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, _) =
let (range, unwind_progress) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let changesets =
@ -234,10 +229,10 @@ mod tests {
block_range: CheckpointBlockRange { from: input.next_block(), to: run_to },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
)
}
);
assert!(out.is_done(input));
provider.commit().unwrap();
}
@ -478,10 +473,10 @@ mod tests {
block_range: CheckpointBlockRange { from: 1, to: 5 },
progress: EntitiesCheckpoint { processed: 1, total: 2 }
}
),
done: false
)
}
);
assert!(!out.is_done(input));
input.checkpoint = Some(out.checkpoint);
let out = stage.execute(&mut provider, input).await.unwrap();
@ -493,10 +488,10 @@ mod tests {
block_range: CheckpointBlockRange { from: 5, to: 5 },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
)
}
);
assert!(out.is_done(input));
provider.commit().unwrap();
}

View File

@ -144,7 +144,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let threshold = match self {
MerkleStage::Unwind => {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
return Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) })
}
MerkleStage::Execution { clean_threshold } => *clean_threshold,
#[cfg(any(test, feature = "test-utils"))]
@ -226,7 +226,6 @@ impl<DB: Database> Stage<DB> for MerkleStage {
checkpoint: input
.checkpoint()
.with_entities_stage_checkpoint(entities_checkpoint),
done: false,
})
}
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
@ -267,7 +266,6 @@ impl<DB: Database> Stage<DB> for MerkleStage {
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(to_block)
.with_entities_stage_checkpoint(entities_checkpoint),
done: true,
})
}
@ -330,8 +328,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner,
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
};
use assert_matches::assert_matches;
use reth_db::{
@ -348,8 +346,6 @@ mod tests {
use reth_trie::test_utils::{state_root, state_root_prehashed};
use std::collections::BTreeMap;
stage_test_suite_ext!(MerkleTestRunner, merkle);
/// Execute from genesis so as to merkelize whole state
#[tokio::test]
async fn execute_clean_merkle() {
@ -378,8 +374,7 @@ mod tests {
processed,
total
}))
},
done: true
}
}) if block_number == previous_stage && processed == total &&
total == (
runner.tx.table::<tables::HashedAccount>().unwrap().len() +
@ -418,8 +413,7 @@ mod tests {
processed,
total
}))
},
done: true
}
}) if block_number == previous_stage && processed == total &&
total == (
runner.tx.table::<tables::HashedAccount>().unwrap().len() +

View File

@ -59,11 +59,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (tx_range, block_range, is_final_range) =
let (tx_range, block_range) =
input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?;
let end_block = *block_range.end();
@ -73,7 +69,6 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
return Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
done: is_final_range,
})
}
@ -155,7 +150,6 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
done: is_final_range,
})
}
@ -165,7 +159,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold);
// Lookup latest tx id that we should unwind to
let latest_tx_id = provider.block_body_indices(unwind_to)?.last_tx_num();
@ -233,12 +227,10 @@ mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner,
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
};
stage_test_suite_ext!(SenderRecoveryTestRunner, sender_recovery);
/// Execute a block range with a single transaction
#[tokio::test]
async fn execute_single_transaction() {
@ -272,7 +264,7 @@ mod tests {
processed: 1,
total: 1
}))
}, done: true }) if block_number == previous_stage
}}) if block_number == previous_stage
);
// Validate the stage execution
@ -311,17 +303,17 @@ mod tests {
.unwrap_or(previous_stage);
assert_matches!(result, Ok(_));
assert_eq!(
result.unwrap(),
ExecOutput {
result.as_ref().unwrap(),
&ExecOutput {
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
EntitiesCheckpoint {
processed: runner.tx.table::<tables::TxSenders>().unwrap().len() as u64,
total: total_transactions
}
),
done: false
)
}
);
assert!(!result.unwrap().is_done(first_input));
// Execute second time to completion
runner.set_threshold(u64::MAX);
@ -336,8 +328,7 @@ mod tests {
&ExecOutput {
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
EntitiesCheckpoint { processed: total_transactions, total: total_transactions }
),
done: true
)
}
);

View File

@ -55,11 +55,8 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let tx = provider.tx_ref();
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let range = input.next_block_range_with_threshold(self.commit_threshold);
let (start_block, end_block) = range.clone().into_inner();
debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync");
@ -91,7 +88,6 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
done: is_final_range,
})
}
@ -101,7 +97,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
let (_, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold);
provider.unwind_table_by_num::<tables::HeaderTD>(unwind_to)?;
@ -133,12 +129,10 @@ mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner,
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
};
stage_test_suite_ext!(TotalDifficultyTestRunner, total_difficulty);
#[tokio::test]
async fn execute_with_intermediate_commit() {
let threshold = 50;
@ -166,9 +160,10 @@ mod tests {
processed,
total
}))
}, done: false }) if block_number == expected_progress && processed == 1 + threshold &&
}}) if block_number == expected_progress && processed == 1 + threshold &&
total == runner.tx.table::<tables::Headers>().unwrap().len() as u64
);
assert!(!result.unwrap().is_done(first_input));
// Execute second time
let second_input = ExecInput {
@ -184,7 +179,7 @@ mod tests {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
}}) if block_number == previous_stage && processed == total &&
total == runner.tx.table::<tables::Headers>().unwrap().len() as u64
);

View File

@ -54,10 +54,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
provider: &mut DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
}
let (tx_range, block_range, is_final_range) =
let (tx_range, block_range) =
input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?;
let end_block = *block_range.end();
@ -138,7 +135,6 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
done: is_final_range,
})
}
@ -149,7 +145,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
let (range, unwind_to) = input.unwind_block_range_with_threshold(self.commit_threshold);
// Cursors to unwind tx hash to number
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
@ -192,16 +188,13 @@ fn stage_checkpoint<DB: Database>(
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner,
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
};
use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::{random_block, random_block_range};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
// Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
#[tokio::test]
async fn execute_single_transaction_lookup() {
let (previous_stage, stage_progress) = (500, 100);
@ -234,7 +227,7 @@ mod tests {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
}}) if block_number == previous_stage && processed == total &&
total == runner.tx.table::<tables::Transactions>().unwrap().len() as u64
);
@ -273,17 +266,17 @@ mod tests {
.unwrap_or(previous_stage);
assert_matches!(result, Ok(_));
assert_eq!(
result.unwrap(),
ExecOutput {
result.as_ref().unwrap(),
&ExecOutput {
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
EntitiesCheckpoint {
processed: runner.tx.table::<tables::TxHashNumber>().unwrap().len() as u64,
total: total_txs
}
),
done: false
)
}
);
assert!(!result.unwrap().is_done(first_input));
// Execute second time to completion
runner.set_threshold(u64::MAX);
@ -298,8 +291,7 @@ mod tests {
&ExecOutput {
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
EntitiesCheckpoint { processed: total_txs, total: total_txs }
),
done: true
)
}
);

View File

@ -42,8 +42,8 @@ macro_rules! stage_test_suite {
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
if done && checkpoint.block_number == previous_stage
Ok(ref output @ ExecOutput { checkpoint })
if output.is_done(input) && checkpoint.block_number == previous_stage
);
// Validate the stage execution
@ -94,8 +94,8 @@ macro_rules! stage_test_suite {
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
if done && checkpoint.block_number == previous_stage
Ok(ref output @ ExecOutput { checkpoint })
if output.is_done(execute_input) && checkpoint.block_number == previous_stage
);
assert_matches::assert_matches!(runner.validate_execution(execute_input, result.ok()),Ok(_), "execution validation");
@ -113,7 +113,8 @@ macro_rules! stage_test_suite {
// Assert the successful unwind result
assert_matches::assert_matches!(
rx,
Ok(UnwindOutput { checkpoint }) if checkpoint.block_number == unwind_input.unwind_to
Ok(output @ UnwindOutput { checkpoint })
if output.is_done(unwind_input) && checkpoint.block_number == unwind_input.unwind_to
);
// Validate the stage unwind
@ -123,46 +124,4 @@ macro_rules! stage_test_suite {
};
}
// `execute_already_reached_target` is not suitable for the headers stage thus
// included in the test suite extension
macro_rules! stage_test_suite_ext {
($runner:ident, $name:ident) => {
crate::test_utils::stage_test_suite!($runner, $name);
paste::item! {
/// Check that the execution is short-circuited if the target was already reached.
#[tokio::test]
async fn [< execute_already_reached_target_ $name>] () {
let stage_progress = 1000;
// Set up the runner
let mut runner = $runner::default();
let input = crate::stage::ExecInput {
target: Some(stage_progress),
checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)),
};
let seed = runner.seed_execution(input).expect("failed to seed");
// Run stage execution
let rx = runner.execute(input);
// Run `after_execution` hook
runner.after_execution(seed).await.expect("failed to run after execution hook");
// Assert the successful result
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, checkpoint })
if done && checkpoint.block_number == stage_progress
);
// Validate the stage execution
assert_matches::assert_matches!(runner.validate_execution(input, result.ok()),Ok(_), "execution validation");
}
}
};
}
pub(crate) use stage_test_suite;
pub(crate) use stage_test_suite_ext;

View File

@ -1,19 +1,49 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::stage::StageId;
use reth_provider::DatabaseProviderRW;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
MAINNET,
};
use reth_provider::{DatabaseProviderRW, ShareableDatabase};
use std::collections::VecDeque;
#[derive(Debug)]
pub struct TestStage {
id: StageId,
checkpoint: Option<StageCheckpoint>,
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
}
impl TestStage {
pub fn new(id: StageId) -> Self {
Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() }
Self {
id,
checkpoint: None,
exec_outputs: VecDeque::new(),
unwind_outputs: VecDeque::new(),
}
}
pub fn with_checkpoint<DB: Database>(
mut self,
checkpoint: Option<StageCheckpoint>,
provider: DatabaseProviderRW<'_, DB>,
) -> Self {
if let Some(checkpoint) = checkpoint {
provider
.save_stage_checkpoint(self.id, checkpoint)
.unwrap_or_else(|_| panic!("save stage {} checkpoint", self.id))
} else {
provider
.delete_stage_checkpoint(self.id)
.unwrap_or_else(|_| panic!("delete stage {} checkpoint", self.id))
}
provider.commit().expect("provider commit");
self.checkpoint = checkpoint;
self
}
pub fn with_exec(mut self, exec_outputs: VecDeque<Result<ExecOutput, StageError>>) -> Self {

View File

@ -1108,6 +1108,12 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(())
}
/// Delete stage checkpoint.
pub fn delete_stage_checkpoint(&self, id: StageId) -> std::result::Result<(), DatabaseError> {
self.tx.delete::<tables::SyncStage>(id.to_string(), None)?;
Ok(())
}
/// Get stage checkpoint progress.
pub fn get_stage_checkpoint_progress(
&self,