fix: use different cancel wrapper for prewarm tasks (#14221)

This commit is contained in:
Dan Cline
2025-02-04 16:06:50 -05:00
committed by GitHub
parent ad503a08fa
commit a9ae060452
5 changed files with 107 additions and 19 deletions

View File

@ -33,7 +33,7 @@ use reth_provider::{
StageCheckpointReader, StateProviderFactory,
};
use reth_revm::{
cached::CachedReads, cancelled::Cancelled, database::StateProviderDatabase,
cached::CachedReads, cancelled::CancelOnDrop, database::StateProviderDatabase,
primitives::KzgSettings,
};
use reth_stages::StageId;
@ -220,7 +220,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
transaction_pool,
CachedReads::default(),
payload_config,
Cancelled::default(),
CancelOnDrop::default(),
None,
);

View File

@ -51,7 +51,7 @@ use reth_provider::{
ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
};
use reth_revm::{cancelled::Cancelled, database::StateProviderDatabase};
use reth_revm::{cancelled::ManualCancel, database::StateProviderDatabase};
use reth_stages_api::ControlFlow;
use reth_trie::{
trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, HashedPostState,
@ -2375,7 +2375,7 @@ where
self.is_descendant_of_persisting_blocks(block.header());
// Atomic bool for letting the prewarm tasks know when to stop
let cancel_execution = Cancelled::default();
let cancel_execution = ManualCancel::default();
let (state_root_handle, state_root_task_config, state_root_sender, state_hook) =
if is_descendant_of_persisting_blocks && self.config.use_state_root_task() {
@ -2465,7 +2465,7 @@ where
trace!(target: "engine::tree", elapsed = ?execution_time, number=?block_num_hash.number, "Executed block");
// Ensure that prewarm tasks don't send proof messages after state root sender is dropped
drop(cancel_execution);
cancel_execution.cancel();
if let Err(err) = self.consensus.validate_block_post_execution(
&block,
@ -2638,7 +2638,7 @@ where
caches: ProviderCaches,
cache_metrics: CachedStateMetrics,
state_root_sender: Option<Sender<StateRootMessage>>,
cancel_execution: Cancelled,
cancel_execution: ManualCancel,
) -> Result<(), InsertBlockErrorKind> {
let Some(state_provider) = self.state_provider(block.parent_hash())? else {
trace!(target: "engine::tree", parent=%block.parent_hash(), "Could not get state provider for prewarm");

View File

@ -44,7 +44,7 @@ use reth_provider::{
StateRootProvider, StorageRootProvider,
};
use reth_revm::{
cancelled::Cancelled, database::StateProviderDatabase, witness::ExecutionWitnessRecord,
cancelled::CancelOnDrop, database::StateProviderDatabase, witness::ExecutionWitnessRecord,
};
use reth_transaction_pool::{
pool::BestPayloadTransactions, BestTransactionsAttributes, PoolTransaction, TransactionPool,
@ -648,7 +648,7 @@ pub struct OpPayloadBuilderCtx<EvmConfig: ConfigureEvmEnv, N: NodePrimitives> {
/// Evm Settings
pub evm_env: EvmEnv<EvmConfig::Spec>,
/// Marker to check whether the job has been cancelled.
pub cancel: Cancelled,
pub cancel: CancelOnDrop,
/// The currently best payload.
pub best_payload: Option<OpBuiltPayload<N>>,
/// Receipt builder.

View File

@ -22,7 +22,7 @@ use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKin
use reth_primitives::{NodePrimitives, SealedHeader};
use reth_primitives_traits::proofs;
use reth_provider::{BlockReaderIdExt, CanonStateNotification, StateProviderFactory};
use reth_revm::{cached::CachedReads, cancelled::Cancelled};
use reth_revm::{cached::CachedReads, cancelled::CancelOnDrop};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use revm::{Database, State};
@ -356,7 +356,7 @@ where
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let pool = self.pool.clone();
let cancel = Cancelled::default();
let cancel = CancelOnDrop::default();
let _cancel = cancel.clone();
let guard = self.payload_task_guard.clone();
let payload_config = self.config.clone();
@ -501,7 +501,7 @@ where
pool: self.pool.clone(),
cached_reads: self.cached_reads.take().unwrap_or_default(),
config: self.config.clone(),
cancel: Cancelled::default(),
cancel: CancelOnDrop::default(),
best_payload: None,
};
@ -657,7 +657,7 @@ where
#[derive(Debug)]
pub struct PendingPayload<P> {
/// The marker to cancel the job on drop
_cancel: Cancelled,
_cancel: CancelOnDrop,
/// The channel to send the result to.
payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
}
@ -665,7 +665,7 @@ pub struct PendingPayload<P> {
impl<P> PendingPayload<P> {
/// Constructs a `PendingPayload` future.
pub const fn new(
cancel: Cancelled,
cancel: CancelOnDrop,
payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
) -> Self {
Self { _cancel: cancel, payload }
@ -818,7 +818,7 @@ pub struct BuildArguments<Pool, Client, Attributes, Payload> {
/// How to configure the payload.
pub config: PayloadConfig<Attributes>,
/// A marker that can be used to cancel the job.
pub cancel: Cancelled,
pub cancel: CancelOnDrop,
/// The best payload achieved so far.
pub best_payload: Option<Payload>,
}
@ -830,7 +830,7 @@ impl<Pool, Client, Attributes, Payload> BuildArguments<Pool, Client, Attributes,
pool: Pool,
cached_reads: CachedReads,
config: PayloadConfig<Attributes>,
cancel: Cancelled,
cancel: CancelOnDrop,
best_payload: Option<Payload>,
) -> Self {
Self { client, pool, cached_reads, config, cancel, best_payload }

View File

@ -4,20 +4,108 @@ use core::sync::atomic::AtomicBool;
/// A marker that can be used to cancel execution.
///
/// If dropped, it will set the `cancelled` flag to true.
///
/// This is most useful when a payload job needs to be cancelled.
#[derive(Default, Clone, Debug)]
pub struct Cancelled(Arc<AtomicBool>);
pub struct CancelOnDrop(Arc<AtomicBool>);
// === impl Cancelled ===
// === impl CancelOnDrop ===
impl Cancelled {
impl CancelOnDrop {
/// Returns true if the job was cancelled.
pub fn is_cancelled(&self) -> bool {
self.0.load(core::sync::atomic::Ordering::Relaxed)
}
}
impl Drop for Cancelled {
impl Drop for CancelOnDrop {
fn drop(&mut self) {
self.0.store(true, core::sync::atomic::Ordering::Relaxed);
}
}
/// A marker that can be used to cancel execution.
///
/// If dropped, it will NOT set the `cancelled` flag to true.
/// If `cancel` is called, the `cancelled` flag will be set to true.
///
/// This is useful in prewarming, when an external signal is received to cancel many prewarming
/// tasks.
#[derive(Default, Clone, Debug)]
pub struct ManualCancel(Arc<AtomicBool>);
// === impl ManualCancel ===
impl ManualCancel {
/// Returns true if the job was cancelled.
pub fn is_cancelled(&self) -> bool {
self.0.load(core::sync::atomic::Ordering::Relaxed)
}
/// Drops the [`ManualCancel`], setting the cancelled flag to true.
pub fn cancel(self) {
self.0.store(true, core::sync::atomic::Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_cancelled() {
let c = CancelOnDrop::default();
assert!(!c.is_cancelled());
}
#[test]
fn test_default_cancel_task() {
let c = ManualCancel::default();
assert!(!c.is_cancelled());
}
#[test]
fn test_set_cancel_task() {
let c = ManualCancel::default();
assert!(!c.is_cancelled());
let c2 = c.clone();
let c3 = c.clone();
c.cancel();
assert!(c3.is_cancelled());
assert!(c2.is_cancelled());
}
#[test]
fn test_cancel_task_multiple_threads() {
let c = ManualCancel::default();
let cloned_cancel = c.clone();
// we want to make sure that:
// * we can spawn tasks that do things
// * those tasks can run to completion and the flag remains unset unless we call cancel
let mut handles = vec![];
for _ in 0..10 {
let c = c.clone();
let handle = std::thread::spawn(move || {
for _ in 0..1000 {
if c.is_cancelled() {
return;
}
}
});
handles.push(handle);
}
// wait for all the threads to finish
for handle in handles {
handle.join().unwrap();
}
// check that the flag is still unset
assert!(!c.is_cancelled());
// cancel and check that the flag is set
c.cancel();
assert!(cloned_cancel.is_cancelled());
}
}