diff --git a/bin/reth/src/optimism.rs b/bin/reth/src/optimism.rs index a651314b8..581718797 100644 --- a/bin/reth/src/optimism.rs +++ b/bin/reth/src/optimism.rs @@ -2,11 +2,7 @@ use clap::Parser; use reth::cli::Cli; -use reth_node_builder::NodeHandle; -use reth_node_optimism::{ - args::RollupArgs, rpc::SequencerClient, OptimismEngineTypes, OptimismNode, -}; -use reth_provider::BlockReaderIdExt; +use reth_node_optimism::{args::RollupArgs, rpc::SequencerClient, OptimismNode}; use std::sync::Arc; // We use jemalloc for performance reasons @@ -27,7 +23,7 @@ fn main() { } if let Err(err) = Cli::::parse().run(|builder, rollup_args| async move { - let NodeHandle { node, node_exit_future } = builder + let handle = builder .node(OptimismNode::new(rollup_args.clone())) .extend_rpc_modules(move |ctx| { // register sequencer tx forwarder @@ -42,29 +38,7 @@ fn main() { .launch() .await?; - // If `enable_genesis_walkback` is set to true, the rollup client will need to - // perform the derivation pipeline from genesis, validating the data dir. - // When set to false, set the finalized, safe, and unsafe head block hashes - // on the rollup client using a fork choice update. This prevents the rollup - // client from performing the derivation pipeline from genesis, and instead - // starts syncing from the current tip in the DB. - if node.chain_spec().is_optimism() && !rollup_args.enable_genesis_walkback { - let client = node.rpc_server_handles.auth.http_client(); - if let Ok(Some(head)) = node.provider.latest_header() { - reth_rpc_api::EngineApiClient::::fork_choice_updated_v2( - &client, - reth_rpc_types::engine::ForkchoiceState { - head_block_hash: head.hash(), - safe_block_hash: head.hash(), - finalized_block_hash: head.hash(), - }, - None, - ) - .await?; - } - } - - node_exit_future.await + handle.node_exit_future.await }) { eprintln!("Error: {err:?}"); std::process::exit(1); diff --git a/crates/blockchain-tree/src/block_buffer.rs b/crates/blockchain-tree/src/block_buffer.rs index 23c6ca681..14e896337 100644 --- a/crates/blockchain-tree/src/block_buffer.rs +++ b/crates/blockchain-tree/src/block_buffer.rs @@ -104,13 +104,13 @@ impl BlockBuffer { removed } - /// Discard all blocks that precede finalized block number from the buffer. - pub fn remove_old_blocks(&mut self, finalized_number: BlockNumber) { + /// Discard all blocks that precede block number from the buffer. + pub fn remove_old_blocks(&mut self, block_number: BlockNumber) { let mut block_hashes_to_remove = Vec::new(); // discard all blocks that are before the finalized number. while let Some(entry) = self.earliest_blocks.first_entry() { - if *entry.key() > finalized_number { + if *entry.key() > block_number { break } let block_hashes = entry.remove(); diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 64d311549..689994471 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -19,13 +19,14 @@ use reth_interfaces::{ }; use reth_primitives::{ BlockHash, BlockNumHash, BlockNumber, ForkBlock, GotExpected, Hardfork, PruneModes, Receipt, - SealedBlock, SealedBlockWithSenders, SealedHeader, U256, + SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, B256, U256, }; use reth_provider::{ chain::{ChainSplit, ChainSplitTarget}, BlockExecutionWriter, BlockNumReader, BlockWriter, BundleStateWithReceipts, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain, ChainSpecProvider, DisplayBlocksChain, HeaderProvider, ProviderError, + StaticFileProviderFactory, }; use reth_stages_api::{MetricEvent, MetricEventsSender}; use std::{ @@ -783,6 +784,11 @@ where Ok(InsertPayloadOk::Inserted(status)) } + /// Discard all blocks that precede block number from the buffer. + pub fn remove_old_blocks(&mut self, block: BlockNumber) { + self.state.buffered_blocks.remove_old_blocks(block); + } + /// Finalize blocks up until and including `finalized_block`, and remove them from the tree. pub fn finalize_block(&mut self, finalized_block: BlockNumber) { // remove blocks @@ -797,7 +803,7 @@ where } } // clean block buffer. - self.state.buffered_blocks.remove_old_blocks(finalized_block); + self.remove_old_blocks(finalized_block); } /// Reads the last `N` canonical hashes from the database and updates the block indices of the @@ -817,6 +823,16 @@ where ) -> RethResult<()> { self.finalize_block(last_finalized_block); + let last_canonical_hashes = self.update_block_hashes()?; + + self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?; + + Ok(()) + } + + /// Update all block hashes. iterate over present and new list of canonical hashes and compare + /// them. Remove all mismatches, disconnect them and removes all chains. + pub fn update_block_hashes(&mut self) -> RethResult> { let last_canonical_hashes = self .externals .fetch_latest_canonical_hashes(self.config.num_of_canonical_hashes() as usize)?; @@ -831,9 +847,22 @@ where } } - self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?; + Ok(last_canonical_hashes) + } - Ok(()) + /// Update all block hashes. iterate over present and new list of canonical hashes and compare + /// them. Remove all mismatches, disconnect them, removes all chains and clears all buffered + /// blocks before the tip. + pub fn update_block_hashes_and_clear_buffered( + &mut self, + ) -> RethResult> { + let chain = self.update_block_hashes()?; + + if let Some((block, _)) = chain.last_key_value() { + self.remove_old_blocks(*block); + } + + Ok(chain) } /// Reads the last `N` canonical hashes from the database and updates the block indices of the @@ -1220,6 +1249,28 @@ where &self, revert_until: BlockNumber, ) -> Result, CanonicalError> { + // This should only happen when an optimistic sync target was re-orged. + // + // Static files generally contain finalized data. The blockchain tree only deals + // with unfinalized data. The only scenario where canonical reverts go past the highest + // static file is when an optimistic sync occured and unfinalized data was written to + // static files. + if self + .externals + .provider_factory + .static_file_provider() + .get_highest_static_file_block(StaticFileSegment::Headers) + .unwrap_or_default() > + revert_until + { + trace!( + target: "blockchain_tree", + "Reverting optimistic canonical chain to block {}", + revert_until + ); + return Err(CanonicalError::OptimisticTargetRevert(revert_until)) + } + // read data that is needed for new sidechain let provider_rw = self.externals.provider_factory.provider_rw()?; diff --git a/crates/blockchain-tree/src/noop.rs b/crates/blockchain-tree/src/noop.rs index bb99f9b55..776a15325 100644 --- a/crates/blockchain-tree/src/noop.rs +++ b/crates/blockchain-tree/src/noop.rs @@ -68,6 +68,12 @@ impl BlockchainTreeEngine for NoopBlockchainTree { fn make_canonical(&self, block_hash: BlockHash) -> Result { Err(BlockchainTreeError::BlockHashNotFoundInChain { block_hash }.into()) } + + fn update_block_hashes_and_clear_buffered( + &self, + ) -> RethResult> { + Ok(BTreeMap::new()) + } } impl BlockchainTreeViewer for NoopBlockchainTree { diff --git a/crates/blockchain-tree/src/shareable.rs b/crates/blockchain-tree/src/shareable.rs index 061b49f4c..77cc53c2d 100644 --- a/crates/blockchain-tree/src/shareable.rs +++ b/crates/blockchain-tree/src/shareable.rs @@ -83,6 +83,15 @@ where res } + fn update_block_hashes_and_clear_buffered( + &self, + ) -> RethResult> { + let mut tree = self.tree.write(); + let res = tree.update_block_hashes_and_clear_buffered(); + tree.update_chains_metrics(); + res + } + fn connect_buffered_blocks_to_canonical_hashes(&self) -> RethResult<()> { trace!(target: "blockchain_tree", "Connecting buffered blocks to canonical hashes"); let mut tree = self.tree.write(); diff --git a/crates/consensus/beacon/src/engine/hooks/controller.rs b/crates/consensus/beacon/src/engine/hooks/controller.rs index 47085be00..7916928db 100644 --- a/crates/consensus/beacon/src/engine/hooks/controller.rs +++ b/crates/consensus/beacon/src/engine/hooks/controller.rs @@ -130,10 +130,16 @@ impl EngineHooksController { args: EngineHookContext, db_write_active: bool, ) -> Poll> { - // Hook with DB write access level is not allowed to run due to already running hook with DB - // write access level or active DB write according to passed argument + // Hook with DB write access level is not allowed to run due to any of the following + // reasons: + // - An already running hook with DB write access level + // - Active DB write according to passed argument + // - Missing a finalized block number. We might be on an optimistic sync scenario where we + // cannot skip the FCU with the finalized hash, otherwise CL might misbehave. if hook.db_access_level().is_read_write() && - (self.active_db_write_hook.is_some() || db_write_active) + (self.active_db_write_hook.is_some() || + db_write_active || + args.finalized_block_number.is_none()) { return Poll::Pending } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 4e3550cd3..1057457c7 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -15,8 +15,9 @@ use reth_interfaces::{ use reth_payload_builder::PayloadBuilderHandle; use reth_payload_validator::ExecutionPayloadValidator; use reth_primitives::{ - constants::EPOCH_SLOTS, stage::StageId, BlockNumHash, BlockNumber, Head, Header, SealedBlock, - SealedHeader, B256, + constants::EPOCH_SLOTS, + stage::{PipelineTarget, StageId}, + BlockNumHash, BlockNumber, Head, Header, SealedBlock, SealedHeader, B256, }; use reth_provider::{ BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError, @@ -316,7 +317,7 @@ where }; if let Some(target) = maybe_pipeline_target { - this.sync.set_pipeline_sync_target(target); + this.sync.set_pipeline_sync_target(target.into()); } Ok((this, handle)) @@ -668,6 +669,21 @@ where // threshold return Some(state.finalized_block_hash) } + + // OPTIMISTIC SYNCING + // + // It can happen when the node is doing an + // optimistic sync, where the CL has no knowledge of the finalized hash, + // but is expecting the EL to sync as high + // as possible before finalizing. + // + // This usually doesn't happen on ETH mainnet since CLs use the more + // secure checkpoint syncing. + // + // However, optimism chains will do this. The risk of a reorg is however + // low. + debug!(target: "consensus::engine", hash=?state.head_block_hash, "Setting head hash as an optimistic pipeline target."); + return Some(state.head_block_hash) } Ok(Some(_)) => { // we're fully synced to the finalized block @@ -981,6 +997,10 @@ where // so we should not warn the user, since this will result in us attempting to sync // to a new target and is considered normal operation during sync } + CanonicalError::OptimisticTargetRevert(block_number) => { + self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(*block_number)); + return PayloadStatus::from_status(PayloadStatusEnum::Syncing) + } _ => { warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash"); // TODO(mattsse) better error handling before attempting to sync (FCU could be @@ -1011,7 +1031,7 @@ where if self.pipeline_run_threshold == 0 { // use the pipeline to sync to the target trace!(target: "consensus::engine", %target, "Triggering pipeline run to sync missing ancestors of the new head"); - self.sync.set_pipeline_sync_target(target); + self.sync.set_pipeline_sync_target(target.into()); } else { // trigger a full block download for missing hash, or the parent of its lowest buffered // ancestor @@ -1361,7 +1381,7 @@ where ) { // we don't have the block yet and the distance exceeds the allowed // threshold - self.sync.set_pipeline_sync_target(target); + self.sync.set_pipeline_sync_target(target.into()); // we can exit early here because the pipeline will take care of syncing return } @@ -1445,6 +1465,8 @@ where // TODO: do not ignore this let _ = self.blockchain.make_canonical(*target_hash.as_ref()); } + } else if let Some(block_number) = err.optimistic_revert_block_number() { + self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(block_number)); } Err((target.head_block_hash, err)) @@ -1506,13 +1528,7 @@ where // update the canon chain if continuous is enabled if self.sync.run_pipeline_continuously() { - let max_block = ctrl.block_number().unwrap_or_default(); - let max_header = self.blockchain.sealed_header(max_block) - .inspect_err(|error| { - error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync"); - })? - .ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?; - self.blockchain.set_canonical_head(max_header); + self.set_canonical_head(ctrl.block_number().unwrap_or_default())?; } let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() { @@ -1525,6 +1541,14 @@ where } }; + if sync_target_state.finalized_block_hash.is_zero() { + self.set_canonical_head(ctrl.block_number().unwrap_or_default())?; + self.blockchain.update_block_hashes_and_clear_buffered()?; + self.blockchain.connect_buffered_blocks_to_canonical_hashes()?; + // We are on an optimistic syncing process, better to wait for the next FCU to handle + return Ok(()) + } + // Next, we check if we need to schedule another pipeline run or transition // to live sync via tree. // This can arise if we buffer the forkchoice head, and if the head is an @@ -1580,7 +1604,7 @@ where // the tree update from executing too many blocks and blocking. if let Some(target) = pipeline_target { // run the pipeline to the target since the distance is sufficient - self.sync.set_pipeline_sync_target(target); + self.sync.set_pipeline_sync_target(target.into()); } else if let Some(number) = self.blockchain.block_number(sync_target_state.finalized_block_hash)? { @@ -1592,12 +1616,23 @@ where } else { // We don't have the finalized block in the database, so we need to // trigger another pipeline run. - self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash); + self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash.into()); } Ok(()) } + fn set_canonical_head(&self, max_block: BlockNumber) -> RethResult<()> { + let max_header = self.blockchain.sealed_header(max_block) + .inspect_err(|error| { + error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync"); + })? + .ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?; + self.blockchain.set_canonical_head(max_header); + + Ok(()) + } + fn on_hook_result(&self, polled_hook: PolledHook) -> Result<(), BeaconConsensusEngineError> { if let EngineHookEvent::Finished(Err(error)) = &polled_hook.event { error!( @@ -1746,16 +1781,20 @@ where Err(BeaconOnNewPayloadError::Internal(Box::new(error.clone()))); let _ = tx.send(response); return Err(RethError::Canonical(error)) + } else if error.optimistic_revert_block_number().is_some() { + // engine already set the pipeline unwind target on + // `try_make_sync_target_canonical` + PayloadStatus::from_status(PayloadStatusEnum::Syncing) + } else { + // If we could not make the sync target block canonical, + // we should return the error as an invalid payload status. + PayloadStatus::new( + PayloadStatusEnum::Invalid { validation_error: error.to_string() }, + // TODO: return a proper latest valid hash + // See: + self.forkchoice_state_tracker.last_valid_head(), + ) } - - // If we could not make the sync target block canonical, - // we should return the error as an invalid payload status. - PayloadStatus::new( - PayloadStatusEnum::Invalid { validation_error: error.to_string() }, - // TODO: return a proper latest valid hash - // See: - self.forkchoice_state_tracker.last_valid_head(), - ) } }; diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index fd78f461a..261b6874f 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -11,7 +11,7 @@ use reth_interfaces::p2p::{ full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient}, headers::client::HeadersClient, }; -use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, B256}; +use reth_primitives::{stage::PipelineTarget, BlockNumber, ChainSpec, SealedBlock, B256}; use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineWithResult}; use reth_tasks::TaskSpawner; use reth_tokio_util::EventListeners; @@ -44,7 +44,7 @@ where /// The pipeline is used for large ranges. pipeline_state: PipelineState, /// Pending target block for the pipeline to sync - pending_pipeline_target: Option, + pending_pipeline_target: Option, /// In-flight full block requests in progress. inflight_full_block_requests: Vec>, /// In-flight full block _range_ requests in progress. @@ -216,8 +216,12 @@ where /// Sets a new target to sync the pipeline to. /// /// But ensures the target is not the zero hash. - pub(crate) fn set_pipeline_sync_target(&mut self, target: B256) { - if target.is_zero() { + pub(crate) fn set_pipeline_sync_target(&mut self, target: PipelineTarget) { + if target.sync_target().is_some_and(|target| target.is_zero()) { + trace!( + target: "consensus::engine::sync", + "Pipeline target cannot be zero hash." + ); // precaution to never sync to the zero hash return } @@ -384,7 +388,7 @@ pub(crate) enum EngineSyncEvent { /// Pipeline started syncing /// /// This is none if the pipeline is triggered without a specific target. - PipelineStarted(Option), + PipelineStarted(Option), /// Pipeline finished /// /// If this is returned, the pipeline is idle. @@ -590,7 +594,7 @@ mod tests { .build(pipeline, chain_spec); let tip = client.highest_block().expect("there should be blocks here"); - sync_controller.set_pipeline_sync_target(tip.hash()); + sync_controller.set_pipeline_sync_target(tip.hash().into()); let sync_future = poll_fn(|cx| sync_controller.poll(cx)); let next_event = poll!(sync_future); @@ -598,7 +602,7 @@ mod tests { // can assert that the first event here is PipelineStarted because we set the sync target, // and we should get Ready because the pipeline should be spawned immediately assert_matches!(next_event, Poll::Ready(EngineSyncEvent::PipelineStarted(Some(target))) => { - assert_eq!(target, tip.hash()); + assert_eq!(target.sync_target().unwrap(), tip.hash()); }); // the next event should be the pipeline finishing in a good state diff --git a/crates/e2e-test-utils/src/engine_api.rs b/crates/e2e-test-utils/src/engine_api.rs index 13b735aea..fefd7d6ff 100644 --- a/crates/e2e-test-utils/src/engine_api.rs +++ b/crates/e2e-test-utils/src/engine_api.rs @@ -63,7 +63,7 @@ impl EngineApiTestContext { ) .await?; - assert!(submission.status == expected_status); + assert_eq!(submission.status, expected_status); Ok(submission.latest_valid_hash.unwrap_or_default()) } diff --git a/crates/e2e-test-utils/src/node.rs b/crates/e2e-test-utils/src/node.rs index 668af6034..0ae20664a 100644 --- a/crates/e2e-test-utils/src/node.rs +++ b/crates/e2e-test-utils/src/node.rs @@ -5,7 +5,6 @@ use crate::{ use alloy_rpc_types::BlockNumberOrTag; use eyre::Ok; - use futures_util::Future; use reth::{ api::{BuiltPayload, EngineTypes, FullNodeComponents, PayloadBuilderAttributes}, @@ -171,10 +170,7 @@ where if check { if let Some(latest_block) = self.inner.provider.block_by_number(number)? { - if latest_block.hash_slow() != expected_block_hash { - // TODO: only if its awaiting a reorg - continue - } + assert_eq!(latest_block.hash_slow(), expected_block_hash); break } if wait_finish_checkpoint { @@ -185,8 +181,22 @@ where Ok(()) } + pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> { + loop { + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? { + if checkpoint.block_number == number { + break + } + } + } + Ok(()) + } + /// Asserts that a new block has been added to the blockchain - /// and the tx has been included in the block + /// and the tx has been included in the block. + /// + /// Does NOT work for pipeline since there's no stream notification! pub async fn assert_new_block( &mut self, tip_tx_hash: B256, diff --git a/crates/e2e-test-utils/src/wallet.rs b/crates/e2e-test-utils/src/wallet.rs index d94dec2a0..e841e7cd7 100644 --- a/crates/e2e-test-utils/src/wallet.rs +++ b/crates/e2e-test-utils/src/wallet.rs @@ -4,7 +4,8 @@ use alloy_signer_wallet::{coins_bip39::English, LocalWallet, MnemonicBuilder}; /// One of the accounts of the genesis allocations. pub struct Wallet { pub inner: LocalWallet, - chain_id: u64, + pub inner_nonce: u64, + pub chain_id: u64, amount: usize, derivation_path: Option, } @@ -13,7 +14,7 @@ impl Wallet { /// Creates a new account from one of the secret/pubkeys of the genesis allocations (test.json) pub fn new(amount: usize) -> Self { let inner = MnemonicBuilder::::default().phrase(TEST_MNEMONIC).build().unwrap(); - Self { inner, chain_id: 1, amount, derivation_path: None } + Self { inner, chain_id: 1, amount, derivation_path: None, inner_nonce: 0 } } /// Sets chain id diff --git a/crates/interfaces/src/blockchain_tree/error.rs b/crates/interfaces/src/blockchain_tree/error.rs index b805c6ee8..e9cdb8714 100644 --- a/crates/interfaces/src/blockchain_tree/error.rs +++ b/crates/interfaces/src/blockchain_tree/error.rs @@ -67,6 +67,9 @@ pub enum CanonicalError { /// Error indicating a transaction failed to commit during execution. #[error("transaction error on commit: {0}")] CanonicalCommit(String), + /// Error indicating that a previous optimistic sync target was re-orged + #[error("transaction error on revert: {0}")] + OptimisticTargetRevert(BlockNumber), } impl CanonicalError { @@ -83,6 +86,15 @@ impl CanonicalError { CanonicalError::BlockchainTree(BlockchainTreeError::BlockHashNotFoundInChain { .. }) ) } + + /// Returns `Some(BlockNumber)` if the underlying error matches + /// [CanonicalError::OptimisticTargetRevert]. + pub fn optimistic_revert_block_number(&self) -> Option { + match self { + CanonicalError::OptimisticTargetRevert(block_number) => Some(*block_number), + _ => None, + } + } } /// Error thrown when inserting a block failed because the block is considered invalid. @@ -316,7 +328,8 @@ impl InsertBlockErrorKind { InsertBlockErrorKind::Canonical(err) => match err { CanonicalError::BlockchainTree(_) | CanonicalError::CanonicalCommit(_) | - CanonicalError::CanonicalRevert(_) => false, + CanonicalError::CanonicalRevert(_) | + CanonicalError::OptimisticTargetRevert(_) => false, CanonicalError::Validation(_) => true, CanonicalError::Provider(_) => false, }, diff --git a/crates/interfaces/src/blockchain_tree/mod.rs b/crates/interfaces/src/blockchain_tree/mod.rs index d8ad667fc..7d2b50e41 100644 --- a/crates/interfaces/src/blockchain_tree/mod.rs +++ b/crates/interfaces/src/blockchain_tree/mod.rs @@ -78,6 +78,13 @@ pub trait BlockchainTreeEngine: BlockchainTreeViewer + Send + Sync { last_finalized_block: BlockNumber, ) -> RethResult<()>; + /// Update all block hashes. iterate over present and new list of canonical hashes and compare + /// them. Remove all mismatches, disconnect them, removes all chains and clears all buffered + /// blocks before the tip. + fn update_block_hashes_and_clear_buffered( + &self, + ) -> RethResult>; + /// Reads the last `N` canonical hashes from the database and updates the block indices of the /// tree by attempting to connect the buffered blocks to canonical hashes. /// diff --git a/crates/optimism/node/tests/e2e/p2p.rs b/crates/optimism/node/tests/e2e/p2p.rs index a38fadf67..9e3741055 100644 --- a/crates/optimism/node/tests/e2e/p2p.rs +++ b/crates/optimism/node/tests/e2e/p2p.rs @@ -1,41 +1,89 @@ use crate::utils::{advance_chain, setup}; -use reth::primitives::BASE_MAINNET; -use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet::Wallet}; -use reth_primitives::ChainId; +use reth_interfaces::blockchain_tree::error::BlockchainTreeError; +use reth_rpc_types::engine::PayloadStatusEnum; +use std::sync::Arc; +use tokio::sync::Mutex; #[tokio::test] async fn can_sync() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let chain_id: ChainId = BASE_MAINNET.chain.into(); + let (mut nodes, _tasks, wallet) = setup(3).await?; + let wallet = Arc::new(Mutex::new(wallet)); - let (mut nodes, _tasks, _wallet) = setup(2).await?; - - let second_node = nodes.pop().unwrap(); + let third_node = nodes.pop().unwrap(); + let mut second_node = nodes.pop().unwrap(); let mut first_node = nodes.pop().unwrap(); - let tip: usize = 300; + let tip: usize = 90; let tip_index: usize = tip - 1; + let reorg_depth = 2; - let wallet = Wallet::default(); - - // On first node, create a chain up to block number 300a - let canonical_payload_chain = advance_chain(tip, &mut first_node, |nonce: u64| { - let wallet = wallet.inner.clone(); - Box::pin(async move { - TransactionTestContext::optimism_l1_block_info_tx(chain_id, wallet, nonce).await - }) - }) - .await?; + // On first node, create a chain up to block number 90a + let canonical_payload_chain = advance_chain(tip, &mut first_node, wallet.clone()).await?; let canonical_chain = canonical_payload_chain.iter().map(|p| p.0.block().hash()).collect::>(); - // On second node, sync up to block number 300a + // On second node, sync optimistically up to block number 88a second_node .engine_api - .update_forkchoice(canonical_chain[tip_index], canonical_chain[tip_index]) + .update_optimistic_forkchoice(canonical_chain[tip_index - reorg_depth]) .await?; - second_node.wait_block(tip as u64, canonical_chain[tip_index], true).await?; + second_node + .wait_block((tip - reorg_depth) as u64, canonical_chain[tip_index - reorg_depth], true) + .await?; + + // On third node, sync optimistically up to block number 90a + third_node.engine_api.update_optimistic_forkchoice(canonical_chain[tip_index]).await?; + third_node.wait_block(tip as u64, canonical_chain[tip_index], true).await?; + + // On second node, create a side chain: 88a -> 89b -> 90b + wallet.lock().await.inner_nonce -= reorg_depth as u64; + second_node.payload.timestamp = first_node.payload.timestamp - reorg_depth as u64; // TODO: probably want to make it node agnostic + let side_payload_chain = advance_chain(reorg_depth, &mut second_node, wallet.clone()).await?; + let side_chain = side_payload_chain.iter().map(|p| p.0.block().hash()).collect::>(); + + // Creates fork chain by submitting 89b payload. + // By returning Valid here, op-node will finally return a finalized hash + let _ = third_node + .engine_api + .submit_payload( + side_payload_chain[0].0.clone(), + side_payload_chain[0].1.clone(), + PayloadStatusEnum::Valid, + Default::default(), + ) + .await; + + // It will issue a pipeline reorg to 88a, and then make 89b canonical AND finalized. + third_node.engine_api.update_forkchoice(side_chain[0], side_chain[0]).await?; + + // Make sure we have the updated block + third_node.wait_unwind((tip - reorg_depth) as u64).await?; + third_node + .wait_block( + side_payload_chain[0].0.block().number, + side_payload_chain[0].0.block().hash(), + true, + ) + .await?; + + // Make sure that trying to submit 89a again will result in an invalid payload status, since 89b + // has been set as finalized. + let _ = third_node + .engine_api + .submit_payload( + canonical_payload_chain[tip_index - reorg_depth + 1].0.clone(), + canonical_payload_chain[tip_index - reorg_depth + 1].1.clone(), + PayloadStatusEnum::Invalid { + validation_error: BlockchainTreeError::PendingBlockIsFinalized { + last_finalized: (tip - reorg_depth) as u64 + 1, + } + .to_string(), + }, + Default::default(), + ) + .await; Ok(()) } diff --git a/crates/optimism/node/tests/e2e/utils.rs b/crates/optimism/node/tests/e2e/utils.rs index e86a7c654..ad19086ae 100644 --- a/crates/optimism/node/tests/e2e/utils.rs +++ b/crates/optimism/node/tests/e2e/utils.rs @@ -1,9 +1,10 @@ -use reth::{primitives::Bytes, rpc::types::engine::PayloadAttributes, tasks::TaskManager}; -use reth_e2e_test_utils::{wallet::Wallet, NodeHelperType}; +use reth::{rpc::types::engine::PayloadAttributes, tasks::TaskManager}; +use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet::Wallet, NodeHelperType}; use reth_node_optimism::{OptimismBuiltPayload, OptimismNode, OptimismPayloadBuilderAttributes}; use reth_payload_builder::EthPayloadBuilderAttributes; use reth_primitives::{Address, ChainSpecBuilder, Genesis, B256, BASE_MAINNET}; -use std::{future::Future, pin::Pin, sync::Arc}; +use std::sync::Arc; +use tokio::sync::Mutex; /// Optimism Node Helper type pub(crate) type OpNode = NodeHelperType; @@ -24,12 +25,30 @@ pub(crate) async fn setup(num_nodes: usize) -> eyre::Result<(Vec, TaskMa .await } +/// Advance the chain with sequential payloads returning them in the end. pub(crate) async fn advance_chain( length: usize, node: &mut OpNode, - tx_generator: impl Fn(u64) -> Pin>>, + wallet: Arc>, ) -> eyre::Result> { - node.advance(length as u64, tx_generator, optimism_payload_attributes).await + node.advance( + length as u64, + |_| { + let wallet = wallet.clone(); + Box::pin(async move { + let mut wallet = wallet.lock().await; + let tx_fut = TransactionTestContext::optimism_l1_block_info_tx( + wallet.chain_id, + wallet.inner.clone(), + wallet.inner_nonce, + ); + wallet.inner_nonce += 1; + tx_fut.await + }) + }, + optimism_payload_attributes, + ) + .await } /// Helper function to create a new eth payload attributes diff --git a/crates/primitives/src/stage/mod.rs b/crates/primitives/src/stage/mod.rs index ffe52554d..3c7c972bc 100644 --- a/crates/primitives/src/stage/mod.rs +++ b/crates/primitives/src/stage/mod.rs @@ -1,6 +1,7 @@ //! Staged sync primitives. mod id; +use crate::{BlockHash, BlockNumber}; pub use id::StageId; mod checkpoints; @@ -9,3 +10,46 @@ pub use checkpoints::{ HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint, StageUnitCheckpoint, StorageHashingCheckpoint, }; + +/// Direction and target block for pipeline operations. +#[derive(Debug, Clone, Copy)] +pub enum PipelineTarget { + /// Target for forward synchronization, indicating a block hash to sync to. + Sync(BlockHash), + /// Target for backward unwinding, indicating a block number to unwind to. + Unwind(BlockNumber), +} + +impl PipelineTarget { + /// Returns the target block hash for forward synchronization, if applicable. + /// + /// # Returns + /// + /// - `Some(BlockHash)`: The target block hash for forward synchronization. + /// - `None`: If the target is for backward unwinding. + pub fn sync_target(self) -> Option { + match self { + PipelineTarget::Sync(hash) => Some(hash), + PipelineTarget::Unwind(_) => None, + } + } + + /// Returns the target block number for backward unwinding, if applicable. + /// + /// # Returns + /// + /// - `Some(BlockNumber)`: The target block number for backward unwinding. + /// - `None`: If the target is for forward synchronization. + pub fn unwind_target(self) -> Option { + match self { + PipelineTarget::Sync(_) => None, + PipelineTarget::Unwind(number) => Some(number), + } + } +} + +impl From for PipelineTarget { + fn from(hash: BlockHash) -> Self { + Self::Sync(hash) + } +} diff --git a/crates/rpc/rpc/src/eth/error.rs b/crates/rpc/rpc/src/eth/error.rs index 305536aab..df2aef800 100644 --- a/crates/rpc/rpc/src/eth/error.rs +++ b/crates/rpc/rpc/src/eth/error.rs @@ -39,7 +39,12 @@ pub enum EthApiError { UnknownBlockNumber, /// Thrown when querying for `finalized` or `safe` block before the merge transition is /// finalized, - #[error("unknown block")] + /// + /// op-node uses case sensitive string comparison to parse this error: + /// + /// + /// TODO(#8045): Temporary, until a version of is pushed through that doesn't require this to figure out the EL sync status. + #[error("Unknown block")] UnknownSafeOrFinalizedBlock, /// Thrown when an unknown block or transaction index is encountered #[error("unknown block or tx index")] diff --git a/crates/stages-api/src/pipeline/mod.rs b/crates/stages-api/src/pipeline/mod.rs index 0cbd993c5..199cc41e6 100644 --- a/crates/stages-api/src/pipeline/mod.rs +++ b/crates/stages-api/src/pipeline/mod.rs @@ -7,7 +7,7 @@ use reth_db::database::Database; use reth_interfaces::RethResult; use reth_primitives::{ constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, - stage::{StageCheckpoint, StageId}, + stage::{PipelineTarget, StageCheckpoint, StageId}, static_file::HighestStaticFiles, BlockNumber, B256, }; @@ -130,17 +130,31 @@ where /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the /// pipeline and its result as a future. #[track_caller] - pub fn run_as_fut(mut self, tip: Option) -> PipelineFut { + pub fn run_as_fut(mut self, target: Option) -> PipelineFut { // TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for // updating metrics. let _ = self.register_metrics(); // ignore error Box::pin(async move { // NOTE: the tip should only be None if we are in continuous sync mode. - if let Some(tip) = tip { - self.set_tip(tip); + if let Some(target) = target { + match target { + PipelineTarget::Sync(tip) => self.set_tip(tip), + PipelineTarget::Unwind(target) => { + if let Err(err) = self.produce_static_files() { + return (self, Err(err.into())) + } + if let Err(err) = self.unwind(target, None) { + return (self, Err(err)) + } + self.progress.update(target); + + return (self, Ok(ControlFlow::Continue { block_number: target })) + } + } } + let result = self.run_loop().await; - trace!(target: "sync::pipeline", ?tip, ?result, "Pipeline finished"); + trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished"); (self, result) }) } diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index b0f43ba9f..bf94e32cf 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -669,6 +669,10 @@ where self.tree.finalize_block(finalized_block) } + fn update_block_hashes_and_clear_buffered(&self) -> RethResult> { + self.tree.update_block_hashes_and_clear_buffered() + } + fn connect_buffered_blocks_to_canonical_hashes_and_finalize( &self, last_finalized_block: BlockNumber,