fix: use --syncmode=execution-layer from op-node for optimistic pipeline sync (#7552)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
joshieDo
2024-05-07 21:16:04 +01:00
committed by GitHub
parent 7c4d37b270
commit 9bd74fda9e
19 changed files with 365 additions and 111 deletions

View File

@ -2,11 +2,7 @@
use clap::Parser;
use reth::cli::Cli;
use reth_node_builder::NodeHandle;
use reth_node_optimism::{
args::RollupArgs, rpc::SequencerClient, OptimismEngineTypes, OptimismNode,
};
use reth_provider::BlockReaderIdExt;
use reth_node_optimism::{args::RollupArgs, rpc::SequencerClient, OptimismNode};
use std::sync::Arc;
// We use jemalloc for performance reasons
@ -27,7 +23,7 @@ fn main() {
}
if let Err(err) = Cli::<RollupArgs>::parse().run(|builder, rollup_args| async move {
let NodeHandle { node, node_exit_future } = builder
let handle = builder
.node(OptimismNode::new(rollup_args.clone()))
.extend_rpc_modules(move |ctx| {
// register sequencer tx forwarder
@ -42,29 +38,7 @@ fn main() {
.launch()
.await?;
// If `enable_genesis_walkback` is set to true, the rollup client will need to
// perform the derivation pipeline from genesis, validating the data dir.
// When set to false, set the finalized, safe, and unsafe head block hashes
// on the rollup client using a fork choice update. This prevents the rollup
// client from performing the derivation pipeline from genesis, and instead
// starts syncing from the current tip in the DB.
if node.chain_spec().is_optimism() && !rollup_args.enable_genesis_walkback {
let client = node.rpc_server_handles.auth.http_client();
if let Ok(Some(head)) = node.provider.latest_header() {
reth_rpc_api::EngineApiClient::<OptimismEngineTypes>::fork_choice_updated_v2(
&client,
reth_rpc_types::engine::ForkchoiceState {
head_block_hash: head.hash(),
safe_block_hash: head.hash(),
finalized_block_hash: head.hash(),
},
None,
)
.await?;
}
}
node_exit_future.await
handle.node_exit_future.await
}) {
eprintln!("Error: {err:?}");
std::process::exit(1);

View File

@ -104,13 +104,13 @@ impl BlockBuffer {
removed
}
/// Discard all blocks that precede finalized block number from the buffer.
pub fn remove_old_blocks(&mut self, finalized_number: BlockNumber) {
/// Discard all blocks that precede block number from the buffer.
pub fn remove_old_blocks(&mut self, block_number: BlockNumber) {
let mut block_hashes_to_remove = Vec::new();
// discard all blocks that are before the finalized number.
while let Some(entry) = self.earliest_blocks.first_entry() {
if *entry.key() > finalized_number {
if *entry.key() > block_number {
break
}
let block_hashes = entry.remove();

View File

@ -19,13 +19,14 @@ use reth_interfaces::{
};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, GotExpected, Hardfork, PruneModes, Receipt,
SealedBlock, SealedBlockWithSenders, SealedHeader, U256,
SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, B256, U256,
};
use reth_provider::{
chain::{ChainSplit, ChainSplitTarget},
BlockExecutionWriter, BlockNumReader, BlockWriter, BundleStateWithReceipts,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain,
ChainSpecProvider, DisplayBlocksChain, HeaderProvider, ProviderError,
StaticFileProviderFactory,
};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use std::{
@ -783,6 +784,11 @@ where
Ok(InsertPayloadOk::Inserted(status))
}
/// Discard all blocks that precede block number from the buffer.
pub fn remove_old_blocks(&mut self, block: BlockNumber) {
self.state.buffered_blocks.remove_old_blocks(block);
}
/// Finalize blocks up until and including `finalized_block`, and remove them from the tree.
pub fn finalize_block(&mut self, finalized_block: BlockNumber) {
// remove blocks
@ -797,7 +803,7 @@ where
}
}
// clean block buffer.
self.state.buffered_blocks.remove_old_blocks(finalized_block);
self.remove_old_blocks(finalized_block);
}
/// Reads the last `N` canonical hashes from the database and updates the block indices of the
@ -817,6 +823,16 @@ where
) -> RethResult<()> {
self.finalize_block(last_finalized_block);
let last_canonical_hashes = self.update_block_hashes()?;
self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?;
Ok(())
}
/// Update all block hashes. iterate over present and new list of canonical hashes and compare
/// them. Remove all mismatches, disconnect them and removes all chains.
pub fn update_block_hashes(&mut self) -> RethResult<BTreeMap<BlockNumber, B256>> {
let last_canonical_hashes = self
.externals
.fetch_latest_canonical_hashes(self.config.num_of_canonical_hashes() as usize)?;
@ -831,9 +847,22 @@ where
}
}
self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?;
Ok(last_canonical_hashes)
}
Ok(())
/// Update all block hashes. iterate over present and new list of canonical hashes and compare
/// them. Remove all mismatches, disconnect them, removes all chains and clears all buffered
/// blocks before the tip.
pub fn update_block_hashes_and_clear_buffered(
&mut self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
let chain = self.update_block_hashes()?;
if let Some((block, _)) = chain.last_key_value() {
self.remove_old_blocks(*block);
}
Ok(chain)
}
/// Reads the last `N` canonical hashes from the database and updates the block indices of the
@ -1220,6 +1249,28 @@ where
&self,
revert_until: BlockNumber,
) -> Result<Option<Chain>, CanonicalError> {
// This should only happen when an optimistic sync target was re-orged.
//
// Static files generally contain finalized data. The blockchain tree only deals
// with unfinalized data. The only scenario where canonical reverts go past the highest
// static file is when an optimistic sync occured and unfinalized data was written to
// static files.
if self
.externals
.provider_factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::Headers)
.unwrap_or_default() >
revert_until
{
trace!(
target: "blockchain_tree",
"Reverting optimistic canonical chain to block {}",
revert_until
);
return Err(CanonicalError::OptimisticTargetRevert(revert_until))
}
// read data that is needed for new sidechain
let provider_rw = self.externals.provider_factory.provider_rw()?;

View File

@ -68,6 +68,12 @@ impl BlockchainTreeEngine for NoopBlockchainTree {
fn make_canonical(&self, block_hash: BlockHash) -> Result<CanonicalOutcome, CanonicalError> {
Err(BlockchainTreeError::BlockHashNotFoundInChain { block_hash }.into())
}
fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
Ok(BTreeMap::new())
}
}
impl BlockchainTreeViewer for NoopBlockchainTree {

View File

@ -83,6 +83,15 @@ where
res
}
fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
let mut tree = self.tree.write();
let res = tree.update_block_hashes_and_clear_buffered();
tree.update_chains_metrics();
res
}
fn connect_buffered_blocks_to_canonical_hashes(&self) -> RethResult<()> {
trace!(target: "blockchain_tree", "Connecting buffered blocks to canonical hashes");
let mut tree = self.tree.write();

View File

@ -130,10 +130,16 @@ impl EngineHooksController {
args: EngineHookContext,
db_write_active: bool,
) -> Poll<Result<PolledHook, EngineHookError>> {
// Hook with DB write access level is not allowed to run due to already running hook with DB
// write access level or active DB write according to passed argument
// Hook with DB write access level is not allowed to run due to any of the following
// reasons:
// - An already running hook with DB write access level
// - Active DB write according to passed argument
// - Missing a finalized block number. We might be on an optimistic sync scenario where we
// cannot skip the FCU with the finalized hash, otherwise CL might misbehave.
if hook.db_access_level().is_read_write() &&
(self.active_db_write_hook.is_some() || db_write_active)
(self.active_db_write_hook.is_some() ||
db_write_active ||
args.finalized_block_number.is_none())
{
return Poll::Pending
}

View File

@ -15,8 +15,9 @@ use reth_interfaces::{
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
constants::EPOCH_SLOTS, stage::StageId, BlockNumHash, BlockNumber, Head, Header, SealedBlock,
SealedHeader, B256,
constants::EPOCH_SLOTS,
stage::{PipelineTarget, StageId},
BlockNumHash, BlockNumber, Head, Header, SealedBlock, SealedHeader, B256,
};
use reth_provider::{
BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
@ -316,7 +317,7 @@ where
};
if let Some(target) = maybe_pipeline_target {
this.sync.set_pipeline_sync_target(target);
this.sync.set_pipeline_sync_target(target.into());
}
Ok((this, handle))
@ -668,6 +669,21 @@ where
// threshold
return Some(state.finalized_block_hash)
}
// OPTIMISTIC SYNCING
//
// It can happen when the node is doing an
// optimistic sync, where the CL has no knowledge of the finalized hash,
// but is expecting the EL to sync as high
// as possible before finalizing.
//
// This usually doesn't happen on ETH mainnet since CLs use the more
// secure checkpoint syncing.
//
// However, optimism chains will do this. The risk of a reorg is however
// low.
debug!(target: "consensus::engine", hash=?state.head_block_hash, "Setting head hash as an optimistic pipeline target.");
return Some(state.head_block_hash)
}
Ok(Some(_)) => {
// we're fully synced to the finalized block
@ -981,6 +997,10 @@ where
// so we should not warn the user, since this will result in us attempting to sync
// to a new target and is considered normal operation during sync
}
CanonicalError::OptimisticTargetRevert(block_number) => {
self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(*block_number));
return PayloadStatus::from_status(PayloadStatusEnum::Syncing)
}
_ => {
warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
// TODO(mattsse) better error handling before attempting to sync (FCU could be
@ -1011,7 +1031,7 @@ where
if self.pipeline_run_threshold == 0 {
// use the pipeline to sync to the target
trace!(target: "consensus::engine", %target, "Triggering pipeline run to sync missing ancestors of the new head");
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
} else {
// trigger a full block download for missing hash, or the parent of its lowest buffered
// ancestor
@ -1361,7 +1381,7 @@ where
) {
// we don't have the block yet and the distance exceeds the allowed
// threshold
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
// we can exit early here because the pipeline will take care of syncing
return
}
@ -1445,6 +1465,8 @@ where
// TODO: do not ignore this
let _ = self.blockchain.make_canonical(*target_hash.as_ref());
}
} else if let Some(block_number) = err.optimistic_revert_block_number() {
self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(block_number));
}
Err((target.head_block_hash, err))
@ -1506,13 +1528,7 @@ where
// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
let max_block = ctrl.block_number().unwrap_or_default();
let max_header = self.blockchain.sealed_header(max_block)
.inspect_err(|error| {
error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync");
})?
.ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?;
self.blockchain.set_canonical_head(max_header);
self.set_canonical_head(ctrl.block_number().unwrap_or_default())?;
}
let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
@ -1525,6 +1541,14 @@ where
}
};
if sync_target_state.finalized_block_hash.is_zero() {
self.set_canonical_head(ctrl.block_number().unwrap_or_default())?;
self.blockchain.update_block_hashes_and_clear_buffered()?;
self.blockchain.connect_buffered_blocks_to_canonical_hashes()?;
// We are on an optimistic syncing process, better to wait for the next FCU to handle
return Ok(())
}
// Next, we check if we need to schedule another pipeline run or transition
// to live sync via tree.
// This can arise if we buffer the forkchoice head, and if the head is an
@ -1580,7 +1604,7 @@ where
// the tree update from executing too many blocks and blocking.
if let Some(target) = pipeline_target {
// run the pipeline to the target since the distance is sufficient
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
} else if let Some(number) =
self.blockchain.block_number(sync_target_state.finalized_block_hash)?
{
@ -1592,12 +1616,23 @@ where
} else {
// We don't have the finalized block in the database, so we need to
// trigger another pipeline run.
self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash);
self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash.into());
}
Ok(())
}
fn set_canonical_head(&self, max_block: BlockNumber) -> RethResult<()> {
let max_header = self.blockchain.sealed_header(max_block)
.inspect_err(|error| {
error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync");
})?
.ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?;
self.blockchain.set_canonical_head(max_header);
Ok(())
}
fn on_hook_result(&self, polled_hook: PolledHook) -> Result<(), BeaconConsensusEngineError> {
if let EngineHookEvent::Finished(Err(error)) = &polled_hook.event {
error!(
@ -1746,16 +1781,20 @@ where
Err(BeaconOnNewPayloadError::Internal(Box::new(error.clone())));
let _ = tx.send(response);
return Err(RethError::Canonical(error))
} else if error.optimistic_revert_block_number().is_some() {
// engine already set the pipeline unwind target on
// `try_make_sync_target_canonical`
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
} else {
// If we could not make the sync target block canonical,
// we should return the error as an invalid payload status.
PayloadStatus::new(
PayloadStatusEnum::Invalid { validation_error: error.to_string() },
// TODO: return a proper latest valid hash
// See: <https://github.com/paradigmxyz/reth/issues/7146>
self.forkchoice_state_tracker.last_valid_head(),
)
}
// If we could not make the sync target block canonical,
// we should return the error as an invalid payload status.
PayloadStatus::new(
PayloadStatusEnum::Invalid { validation_error: error.to_string() },
// TODO: return a proper latest valid hash
// See: <https://github.com/paradigmxyz/reth/issues/7146>
self.forkchoice_state_tracker.last_valid_head(),
)
}
};

View File

@ -11,7 +11,7 @@ use reth_interfaces::p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
headers::client::HeadersClient,
};
use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, B256};
use reth_primitives::{stage::PipelineTarget, BlockNumber, ChainSpec, SealedBlock, B256};
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
@ -44,7 +44,7 @@ where
/// The pipeline is used for large ranges.
pipeline_state: PipelineState<DB>,
/// Pending target block for the pipeline to sync
pending_pipeline_target: Option<B256>,
pending_pipeline_target: Option<PipelineTarget>,
/// In-flight full block requests in progress.
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// In-flight full block _range_ requests in progress.
@ -216,8 +216,12 @@ where
/// Sets a new target to sync the pipeline to.
///
/// But ensures the target is not the zero hash.
pub(crate) fn set_pipeline_sync_target(&mut self, target: B256) {
if target.is_zero() {
pub(crate) fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
if target.sync_target().is_some_and(|target| target.is_zero()) {
trace!(
target: "consensus::engine::sync",
"Pipeline target cannot be zero hash."
);
// precaution to never sync to the zero hash
return
}
@ -384,7 +388,7 @@ pub(crate) enum EngineSyncEvent {
/// Pipeline started syncing
///
/// This is none if the pipeline is triggered without a specific target.
PipelineStarted(Option<B256>),
PipelineStarted(Option<PipelineTarget>),
/// Pipeline finished
///
/// If this is returned, the pipeline is idle.
@ -590,7 +594,7 @@ mod tests {
.build(pipeline, chain_spec);
let tip = client.highest_block().expect("there should be blocks here");
sync_controller.set_pipeline_sync_target(tip.hash());
sync_controller.set_pipeline_sync_target(tip.hash().into());
let sync_future = poll_fn(|cx| sync_controller.poll(cx));
let next_event = poll!(sync_future);
@ -598,7 +602,7 @@ mod tests {
// can assert that the first event here is PipelineStarted because we set the sync target,
// and we should get Ready because the pipeline should be spawned immediately
assert_matches!(next_event, Poll::Ready(EngineSyncEvent::PipelineStarted(Some(target))) => {
assert_eq!(target, tip.hash());
assert_eq!(target.sync_target().unwrap(), tip.hash());
});
// the next event should be the pipeline finishing in a good state

View File

@ -63,7 +63,7 @@ impl<E: EngineTypes + 'static> EngineApiTestContext<E> {
)
.await?;
assert!(submission.status == expected_status);
assert_eq!(submission.status, expected_status);
Ok(submission.latest_valid_hash.unwrap_or_default())
}

View File

@ -5,7 +5,6 @@ use crate::{
use alloy_rpc_types::BlockNumberOrTag;
use eyre::Ok;
use futures_util::Future;
use reth::{
api::{BuiltPayload, EngineTypes, FullNodeComponents, PayloadBuilderAttributes},
@ -171,10 +170,7 @@ where
if check {
if let Some(latest_block) = self.inner.provider.block_by_number(number)? {
if latest_block.hash_slow() != expected_block_hash {
// TODO: only if its awaiting a reorg
continue
}
assert_eq!(latest_block.hash_slow(), expected_block_hash);
break
}
if wait_finish_checkpoint {
@ -185,8 +181,22 @@ where
Ok(())
}
pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
loop {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? {
if checkpoint.block_number == number {
break
}
}
}
Ok(())
}
/// Asserts that a new block has been added to the blockchain
/// and the tx has been included in the block
/// and the tx has been included in the block.
///
/// Does NOT work for pipeline since there's no stream notification!
pub async fn assert_new_block(
&mut self,
tip_tx_hash: B256,

View File

@ -4,7 +4,8 @@ use alloy_signer_wallet::{coins_bip39::English, LocalWallet, MnemonicBuilder};
/// One of the accounts of the genesis allocations.
pub struct Wallet {
pub inner: LocalWallet,
chain_id: u64,
pub inner_nonce: u64,
pub chain_id: u64,
amount: usize,
derivation_path: Option<String>,
}
@ -13,7 +14,7 @@ impl Wallet {
/// Creates a new account from one of the secret/pubkeys of the genesis allocations (test.json)
pub fn new(amount: usize) -> Self {
let inner = MnemonicBuilder::<English>::default().phrase(TEST_MNEMONIC).build().unwrap();
Self { inner, chain_id: 1, amount, derivation_path: None }
Self { inner, chain_id: 1, amount, derivation_path: None, inner_nonce: 0 }
}
/// Sets chain id

View File

@ -67,6 +67,9 @@ pub enum CanonicalError {
/// Error indicating a transaction failed to commit during execution.
#[error("transaction error on commit: {0}")]
CanonicalCommit(String),
/// Error indicating that a previous optimistic sync target was re-orged
#[error("transaction error on revert: {0}")]
OptimisticTargetRevert(BlockNumber),
}
impl CanonicalError {
@ -83,6 +86,15 @@ impl CanonicalError {
CanonicalError::BlockchainTree(BlockchainTreeError::BlockHashNotFoundInChain { .. })
)
}
/// Returns `Some(BlockNumber)` if the underlying error matches
/// [CanonicalError::OptimisticTargetRevert].
pub fn optimistic_revert_block_number(&self) -> Option<BlockNumber> {
match self {
CanonicalError::OptimisticTargetRevert(block_number) => Some(*block_number),
_ => None,
}
}
}
/// Error thrown when inserting a block failed because the block is considered invalid.
@ -316,7 +328,8 @@ impl InsertBlockErrorKind {
InsertBlockErrorKind::Canonical(err) => match err {
CanonicalError::BlockchainTree(_) |
CanonicalError::CanonicalCommit(_) |
CanonicalError::CanonicalRevert(_) => false,
CanonicalError::CanonicalRevert(_) |
CanonicalError::OptimisticTargetRevert(_) => false,
CanonicalError::Validation(_) => true,
CanonicalError::Provider(_) => false,
},

View File

@ -78,6 +78,13 @@ pub trait BlockchainTreeEngine: BlockchainTreeViewer + Send + Sync {
last_finalized_block: BlockNumber,
) -> RethResult<()>;
/// Update all block hashes. iterate over present and new list of canonical hashes and compare
/// them. Remove all mismatches, disconnect them, removes all chains and clears all buffered
/// blocks before the tip.
fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>>;
/// Reads the last `N` canonical hashes from the database and updates the block indices of the
/// tree by attempting to connect the buffered blocks to canonical hashes.
///

View File

@ -1,41 +1,89 @@
use crate::utils::{advance_chain, setup};
use reth::primitives::BASE_MAINNET;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet::Wallet};
use reth_primitives::ChainId;
use reth_interfaces::blockchain_tree::error::BlockchainTreeError;
use reth_rpc_types::engine::PayloadStatusEnum;
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::test]
async fn can_sync() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let chain_id: ChainId = BASE_MAINNET.chain.into();
let (mut nodes, _tasks, wallet) = setup(3).await?;
let wallet = Arc::new(Mutex::new(wallet));
let (mut nodes, _tasks, _wallet) = setup(2).await?;
let second_node = nodes.pop().unwrap();
let third_node = nodes.pop().unwrap();
let mut second_node = nodes.pop().unwrap();
let mut first_node = nodes.pop().unwrap();
let tip: usize = 300;
let tip: usize = 90;
let tip_index: usize = tip - 1;
let reorg_depth = 2;
let wallet = Wallet::default();
// On first node, create a chain up to block number 300a
let canonical_payload_chain = advance_chain(tip, &mut first_node, |nonce: u64| {
let wallet = wallet.inner.clone();
Box::pin(async move {
TransactionTestContext::optimism_l1_block_info_tx(chain_id, wallet, nonce).await
})
})
.await?;
// On first node, create a chain up to block number 90a
let canonical_payload_chain = advance_chain(tip, &mut first_node, wallet.clone()).await?;
let canonical_chain =
canonical_payload_chain.iter().map(|p| p.0.block().hash()).collect::<Vec<_>>();
// On second node, sync up to block number 300a
// On second node, sync optimistically up to block number 88a
second_node
.engine_api
.update_forkchoice(canonical_chain[tip_index], canonical_chain[tip_index])
.update_optimistic_forkchoice(canonical_chain[tip_index - reorg_depth])
.await?;
second_node.wait_block(tip as u64, canonical_chain[tip_index], true).await?;
second_node
.wait_block((tip - reorg_depth) as u64, canonical_chain[tip_index - reorg_depth], true)
.await?;
// On third node, sync optimistically up to block number 90a
third_node.engine_api.update_optimistic_forkchoice(canonical_chain[tip_index]).await?;
third_node.wait_block(tip as u64, canonical_chain[tip_index], true).await?;
// On second node, create a side chain: 88a -> 89b -> 90b
wallet.lock().await.inner_nonce -= reorg_depth as u64;
second_node.payload.timestamp = first_node.payload.timestamp - reorg_depth as u64; // TODO: probably want to make it node agnostic
let side_payload_chain = advance_chain(reorg_depth, &mut second_node, wallet.clone()).await?;
let side_chain = side_payload_chain.iter().map(|p| p.0.block().hash()).collect::<Vec<_>>();
// Creates fork chain by submitting 89b payload.
// By returning Valid here, op-node will finally return a finalized hash
let _ = third_node
.engine_api
.submit_payload(
side_payload_chain[0].0.clone(),
side_payload_chain[0].1.clone(),
PayloadStatusEnum::Valid,
Default::default(),
)
.await;
// It will issue a pipeline reorg to 88a, and then make 89b canonical AND finalized.
third_node.engine_api.update_forkchoice(side_chain[0], side_chain[0]).await?;
// Make sure we have the updated block
third_node.wait_unwind((tip - reorg_depth) as u64).await?;
third_node
.wait_block(
side_payload_chain[0].0.block().number,
side_payload_chain[0].0.block().hash(),
true,
)
.await?;
// Make sure that trying to submit 89a again will result in an invalid payload status, since 89b
// has been set as finalized.
let _ = third_node
.engine_api
.submit_payload(
canonical_payload_chain[tip_index - reorg_depth + 1].0.clone(),
canonical_payload_chain[tip_index - reorg_depth + 1].1.clone(),
PayloadStatusEnum::Invalid {
validation_error: BlockchainTreeError::PendingBlockIsFinalized {
last_finalized: (tip - reorg_depth) as u64 + 1,
}
.to_string(),
},
Default::default(),
)
.await;
Ok(())
}

View File

@ -1,9 +1,10 @@
use reth::{primitives::Bytes, rpc::types::engine::PayloadAttributes, tasks::TaskManager};
use reth_e2e_test_utils::{wallet::Wallet, NodeHelperType};
use reth::{rpc::types::engine::PayloadAttributes, tasks::TaskManager};
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet::Wallet, NodeHelperType};
use reth_node_optimism::{OptimismBuiltPayload, OptimismNode, OptimismPayloadBuilderAttributes};
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_primitives::{Address, ChainSpecBuilder, Genesis, B256, BASE_MAINNET};
use std::{future::Future, pin::Pin, sync::Arc};
use std::sync::Arc;
use tokio::sync::Mutex;
/// Optimism Node Helper type
pub(crate) type OpNode = NodeHelperType<OptimismNode>;
@ -24,12 +25,30 @@ pub(crate) async fn setup(num_nodes: usize) -> eyre::Result<(Vec<OpNode>, TaskMa
.await
}
/// Advance the chain with sequential payloads returning them in the end.
pub(crate) async fn advance_chain(
length: usize,
node: &mut OpNode,
tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
wallet: Arc<Mutex<Wallet>>,
) -> eyre::Result<Vec<(OptimismBuiltPayload, OptimismPayloadBuilderAttributes)>> {
node.advance(length as u64, tx_generator, optimism_payload_attributes).await
node.advance(
length as u64,
|_| {
let wallet = wallet.clone();
Box::pin(async move {
let mut wallet = wallet.lock().await;
let tx_fut = TransactionTestContext::optimism_l1_block_info_tx(
wallet.chain_id,
wallet.inner.clone(),
wallet.inner_nonce,
);
wallet.inner_nonce += 1;
tx_fut.await
})
},
optimism_payload_attributes,
)
.await
}
/// Helper function to create a new eth payload attributes

View File

@ -1,6 +1,7 @@
//! Staged sync primitives.
mod id;
use crate::{BlockHash, BlockNumber};
pub use id::StageId;
mod checkpoints;
@ -9,3 +10,46 @@ pub use checkpoints::{
HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint,
StageUnitCheckpoint, StorageHashingCheckpoint,
};
/// Direction and target block for pipeline operations.
#[derive(Debug, Clone, Copy)]
pub enum PipelineTarget {
/// Target for forward synchronization, indicating a block hash to sync to.
Sync(BlockHash),
/// Target for backward unwinding, indicating a block number to unwind to.
Unwind(BlockNumber),
}
impl PipelineTarget {
/// Returns the target block hash for forward synchronization, if applicable.
///
/// # Returns
///
/// - `Some(BlockHash)`: The target block hash for forward synchronization.
/// - `None`: If the target is for backward unwinding.
pub fn sync_target(self) -> Option<BlockHash> {
match self {
PipelineTarget::Sync(hash) => Some(hash),
PipelineTarget::Unwind(_) => None,
}
}
/// Returns the target block number for backward unwinding, if applicable.
///
/// # Returns
///
/// - `Some(BlockNumber)`: The target block number for backward unwinding.
/// - `None`: If the target is for forward synchronization.
pub fn unwind_target(self) -> Option<BlockNumber> {
match self {
PipelineTarget::Sync(_) => None,
PipelineTarget::Unwind(number) => Some(number),
}
}
}
impl From<BlockHash> for PipelineTarget {
fn from(hash: BlockHash) -> Self {
Self::Sync(hash)
}
}

View File

@ -39,7 +39,12 @@ pub enum EthApiError {
UnknownBlockNumber,
/// Thrown when querying for `finalized` or `safe` block before the merge transition is
/// finalized, <https://github.com/ethereum/execution-apis/blob/6d17705a875e52c26826124c2a8a15ed542aeca2/src/schemas/block.yaml#L109>
#[error("unknown block")]
///
/// op-node uses case sensitive string comparison to parse this error:
/// <https://github.com/ethereum-optimism/optimism/blob/0913776869f6cb2c1218497463d7377cf4de16de/op-service/sources/l2_client.go#L105>
///
/// TODO(#8045): Temporary, until a version of <https://github.com/ethereum-optimism/optimism/pull/10071> is pushed through that doesn't require this to figure out the EL sync status.
#[error("Unknown block")]
UnknownSafeOrFinalizedBlock,
/// Thrown when an unknown block or transaction index is encountered
#[error("unknown block or tx index")]

View File

@ -7,7 +7,7 @@ use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::{
constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH,
stage::{StageCheckpoint, StageId},
stage::{PipelineTarget, StageCheckpoint, StageId},
static_file::HighestStaticFiles,
BlockNumber, B256,
};
@ -130,17 +130,31 @@ where
/// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
/// pipeline and its result as a future.
#[track_caller]
pub fn run_as_fut(mut self, tip: Option<B256>) -> PipelineFut<DB> {
pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<DB> {
// TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for
// updating metrics.
let _ = self.register_metrics(); // ignore error
Box::pin(async move {
// NOTE: the tip should only be None if we are in continuous sync mode.
if let Some(tip) = tip {
self.set_tip(tip);
if let Some(target) = target {
match target {
PipelineTarget::Sync(tip) => self.set_tip(tip),
PipelineTarget::Unwind(target) => {
if let Err(err) = self.produce_static_files() {
return (self, Err(err.into()))
}
if let Err(err) = self.unwind(target, None) {
return (self, Err(err))
}
self.progress.update(target);
return (self, Ok(ControlFlow::Continue { block_number: target }))
}
}
}
let result = self.run_loop().await;
trace!(target: "sync::pipeline", ?tip, ?result, "Pipeline finished");
trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
(self, result)
})
}

View File

@ -669,6 +669,10 @@ where
self.tree.finalize_block(finalized_block)
}
fn update_block_hashes_and_clear_buffered(&self) -> RethResult<BTreeMap<BlockNumber, B256>> {
self.tree.update_block_hashes_and_clear_buffered()
}
fn connect_buffered_blocks_to_canonical_hashes_and_finalize(
&self,
last_finalized_block: BlockNumber,