diff --git a/bin/reth/src/commands/debug_cmd/build_block.rs b/bin/reth/src/commands/debug_cmd/build_block.rs index e95df6289..84a800236 100644 --- a/bin/reth/src/commands/debug_cmd/build_block.rs +++ b/bin/reth/src/commands/debug_cmd/build_block.rs @@ -33,7 +33,7 @@ use reth_provider::{ StageCheckpointReader, StateProviderFactory, }; use reth_revm::{ - cached::CachedReads, cancelled::Cancelled, database::StateProviderDatabase, + cached::CachedReads, cancelled::CancelOnDrop, database::StateProviderDatabase, primitives::KzgSettings, }; use reth_stages::StageId; @@ -220,7 +220,7 @@ impl> Command { transaction_pool, CachedReads::default(), payload_config, - Cancelled::default(), + CancelOnDrop::default(), None, ); diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 22034fd1f..bd5e1572a 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -51,7 +51,7 @@ use reth_provider::{ ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, }; -use reth_revm::{cancelled::Cancelled, database::StateProviderDatabase}; +use reth_revm::{cancelled::ManualCancel, database::StateProviderDatabase}; use reth_stages_api::ControlFlow; use reth_trie::{ trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, HashedPostState, @@ -2375,7 +2375,7 @@ where self.is_descendant_of_persisting_blocks(block.header()); // Atomic bool for letting the prewarm tasks know when to stop - let cancel_execution = Cancelled::default(); + let cancel_execution = ManualCancel::default(); let (state_root_handle, state_root_task_config, state_root_sender, state_hook) = if is_descendant_of_persisting_blocks && self.config.use_state_root_task() { @@ -2465,7 +2465,7 @@ where trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block"); // Ensure that prewarm tasks don't send proof messages after state root sender is dropped - drop(cancel_execution); + cancel_execution.cancel(); if let Err(err) = self.consensus.validate_block_post_execution( &block, @@ -2638,7 +2638,7 @@ where caches: ProviderCaches, cache_metrics: CachedStateMetrics, state_root_sender: Option>, - cancel_execution: Cancelled, + cancel_execution: ManualCancel, ) -> Result<(), InsertBlockErrorKind> { let Some(state_provider) = self.state_provider(block.parent_hash())? else { trace!(target: "engine::tree", parent=%block.parent_hash(), "Could not get state provider for prewarm"); diff --git a/crates/optimism/payload/src/builder.rs b/crates/optimism/payload/src/builder.rs index 3730e4499..cbb97bf9e 100644 --- a/crates/optimism/payload/src/builder.rs +++ b/crates/optimism/payload/src/builder.rs @@ -44,7 +44,7 @@ use reth_provider::{ StateRootProvider, StorageRootProvider, }; use reth_revm::{ - cancelled::Cancelled, database::StateProviderDatabase, witness::ExecutionWitnessRecord, + cancelled::CancelOnDrop, database::StateProviderDatabase, witness::ExecutionWitnessRecord, }; use reth_transaction_pool::{ pool::BestPayloadTransactions, BestTransactionsAttributes, PoolTransaction, TransactionPool, @@ -648,7 +648,7 @@ pub struct OpPayloadBuilderCtx { /// Evm Settings pub evm_env: EvmEnv, /// Marker to check whether the job has been cancelled. - pub cancel: Cancelled, + pub cancel: CancelOnDrop, /// The currently best payload. pub best_payload: Option>, /// Receipt builder. diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index a30a9d07e..15790ad52 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -22,7 +22,7 @@ use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKin use reth_primitives::{NodePrimitives, SealedHeader}; use reth_primitives_traits::proofs; use reth_provider::{BlockReaderIdExt, CanonStateNotification, StateProviderFactory}; -use reth_revm::{cached::CachedReads, cancelled::Cancelled}; +use reth_revm::{cached::CachedReads, cancelled::CancelOnDrop}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; use revm::{Database, State}; @@ -356,7 +356,7 @@ where let (tx, rx) = oneshot::channel(); let client = self.client.clone(); let pool = self.pool.clone(); - let cancel = Cancelled::default(); + let cancel = CancelOnDrop::default(); let _cancel = cancel.clone(); let guard = self.payload_task_guard.clone(); let payload_config = self.config.clone(); @@ -501,7 +501,7 @@ where pool: self.pool.clone(), cached_reads: self.cached_reads.take().unwrap_or_default(), config: self.config.clone(), - cancel: Cancelled::default(), + cancel: CancelOnDrop::default(), best_payload: None, }; @@ -657,7 +657,7 @@ where #[derive(Debug)] pub struct PendingPayload

{ /// The marker to cancel the job on drop - _cancel: Cancelled, + _cancel: CancelOnDrop, /// The channel to send the result to. payload: oneshot::Receiver, PayloadBuilderError>>, } @@ -665,7 +665,7 @@ pub struct PendingPayload

{ impl

PendingPayload

{ /// Constructs a `PendingPayload` future. pub const fn new( - cancel: Cancelled, + cancel: CancelOnDrop, payload: oneshot::Receiver, PayloadBuilderError>>, ) -> Self { Self { _cancel: cancel, payload } @@ -818,7 +818,7 @@ pub struct BuildArguments { /// How to configure the payload. pub config: PayloadConfig, /// A marker that can be used to cancel the job. - pub cancel: Cancelled, + pub cancel: CancelOnDrop, /// The best payload achieved so far. pub best_payload: Option, } @@ -830,7 +830,7 @@ impl BuildArguments, - cancel: Cancelled, + cancel: CancelOnDrop, best_payload: Option, ) -> Self { Self { client, pool, cached_reads, config, cancel, best_payload } diff --git a/crates/revm/src/cancelled.rs b/crates/revm/src/cancelled.rs index d9f06be38..b692d2db7 100644 --- a/crates/revm/src/cancelled.rs +++ b/crates/revm/src/cancelled.rs @@ -4,20 +4,108 @@ use core::sync::atomic::AtomicBool; /// A marker that can be used to cancel execution. /// /// If dropped, it will set the `cancelled` flag to true. +/// +/// This is most useful when a payload job needs to be cancelled. #[derive(Default, Clone, Debug)] -pub struct Cancelled(Arc); +pub struct CancelOnDrop(Arc); -// === impl Cancelled === +// === impl CancelOnDrop === -impl Cancelled { +impl CancelOnDrop { /// Returns true if the job was cancelled. pub fn is_cancelled(&self) -> bool { self.0.load(core::sync::atomic::Ordering::Relaxed) } } -impl Drop for Cancelled { +impl Drop for CancelOnDrop { fn drop(&mut self) { self.0.store(true, core::sync::atomic::Ordering::Relaxed); } } + +/// A marker that can be used to cancel execution. +/// +/// If dropped, it will NOT set the `cancelled` flag to true. +/// If `cancel` is called, the `cancelled` flag will be set to true. +/// +/// This is useful in prewarming, when an external signal is received to cancel many prewarming +/// tasks. +#[derive(Default, Clone, Debug)] +pub struct ManualCancel(Arc); + +// === impl ManualCancel === + +impl ManualCancel { + /// Returns true if the job was cancelled. + pub fn is_cancelled(&self) -> bool { + self.0.load(core::sync::atomic::Ordering::Relaxed) + } + + /// Drops the [`ManualCancel`], setting the cancelled flag to true. + pub fn cancel(self) { + self.0.store(true, core::sync::atomic::Ordering::Relaxed); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_cancelled() { + let c = CancelOnDrop::default(); + assert!(!c.is_cancelled()); + } + + #[test] + fn test_default_cancel_task() { + let c = ManualCancel::default(); + assert!(!c.is_cancelled()); + } + + #[test] + fn test_set_cancel_task() { + let c = ManualCancel::default(); + assert!(!c.is_cancelled()); + let c2 = c.clone(); + let c3 = c.clone(); + c.cancel(); + assert!(c3.is_cancelled()); + assert!(c2.is_cancelled()); + } + + #[test] + fn test_cancel_task_multiple_threads() { + let c = ManualCancel::default(); + let cloned_cancel = c.clone(); + + // we want to make sure that: + // * we can spawn tasks that do things + // * those tasks can run to completion and the flag remains unset unless we call cancel + let mut handles = vec![]; + for _ in 0..10 { + let c = c.clone(); + let handle = std::thread::spawn(move || { + for _ in 0..1000 { + if c.is_cancelled() { + return; + } + } + }); + handles.push(handle); + } + + // wait for all the threads to finish + for handle in handles { + handle.join().unwrap(); + } + + // check that the flag is still unset + assert!(!c.is_cancelled()); + + // cancel and check that the flag is set + c.cancel(); + assert!(cloned_cancel.is_cancelled()); + } +}