fix auto-seal consensus duplicate notification (#8548)

This commit is contained in:
frisitano
2024-06-03 16:10:27 +04:00
committed by GitHub
parent f9959b0cc8
commit 063807b3ae
3 changed files with 8 additions and 58 deletions

View File

@ -27,8 +27,7 @@ use reth_primitives::{
Withdrawals, B256, U256,
};
use reth_provider::{
BlockReaderIdExt, BundleStateWithReceipts, CanonStateNotificationSender, StateProviderFactory,
StateRootProvider,
BlockReaderIdExt, BundleStateWithReceipts, StateProviderFactory, StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_transaction_pool::TransactionPool;
@ -107,7 +106,6 @@ pub struct AutoSealBuilder<Client, Pool, Engine: EngineTypes, EvmConfig> {
mode: MiningMode,
storage: Storage,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
evm_config: EvmConfig,
}
@ -125,7 +123,6 @@ where
client: Client,
pool: Pool,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
mode: MiningMode,
evm_config: EvmConfig,
) -> Self {
@ -142,7 +139,6 @@ where
pool,
mode,
to_engine,
canon_state_notification,
evm_config,
}
}
@ -158,22 +154,12 @@ where
pub fn build(
self,
) -> (AutoSealConsensus, AutoSealClient, MiningTask<Client, Pool, EvmConfig, Engine>) {
let Self {
client,
consensus,
pool,
mode,
storage,
to_engine,
canon_state_notification,
evm_config,
} = self;
let Self { client, consensus, pool, mode, storage, to_engine, evm_config } = self;
let auto_client = AutoSealClient::new(storage.clone());
let task = MiningTask::new(
Arc::clone(&consensus.chain_spec),
mode,
to_engine,
canon_state_notification,
storage,
client,
pool,

View File

@ -3,10 +3,8 @@ use futures_util::{future::BoxFuture, FutureExt};
use reth_beacon_consensus::{BeaconEngineMessage, ForkchoiceStatus};
use reth_engine_primitives::EngineTypes;
use reth_evm::execute::BlockExecutorProvider;
use reth_primitives::{
Block, ChainSpec, IntoRecoveredTransaction, Requests, SealedBlockWithSenders, Withdrawals,
};
use reth_provider::{CanonChainTracker, CanonStateNotificationSender, Chain, StateProviderFactory};
use reth_primitives::{ChainSpec, IntoRecoveredTransaction, Requests, Withdrawals};
use reth_provider::{CanonChainTracker, StateProviderFactory};
use reth_rpc_types::engine::ForkchoiceState;
use reth_stages_api::PipelineEvent;
use reth_tokio_util::EventStream;
@ -39,8 +37,6 @@ pub struct MiningTask<Client, Pool: TransactionPool, Executor, Engine: EngineTyp
queued: VecDeque<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>,
// TODO: ideally this would just be a sender of hashes
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
/// Used to notify consumers of new blocks
canon_state_notification: CanonStateNotificationSender,
/// The pipeline events to listen on
pipe_line_events: Option<EventStream<PipelineEvent>>,
/// The type used for block execution
@ -58,7 +54,6 @@ impl<Executor, Client, Pool: TransactionPool, Engine: EngineTypes>
chain_spec: Arc<ChainSpec>,
miner: MiningMode,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
storage: Storage,
client: Client,
pool: Pool,
@ -72,7 +67,6 @@ impl<Executor, Client, Pool: TransactionPool, Engine: EngineTypes>
storage,
pool,
to_engine,
canon_state_notification,
queued: Default::default(),
pipe_line_events: None,
block_executor,
@ -120,7 +114,6 @@ where
let chain_spec = Arc::clone(&this.chain_spec);
let pool = this.pool.clone();
let events = this.pipe_line_events.take();
let canon_state_notification = this.canon_state_notification.clone();
let executor = this.block_executor.clone();
// Create the mining future that creates a block, notifies the engine that drives
@ -128,14 +121,13 @@ where
this.insert_task = Some(Box::pin(async move {
let mut storage = storage.write().await;
let (transactions, senders): (Vec<_>, Vec<_>) = transactions
let transactions: Vec<_> = transactions
.into_iter()
.map(|tx| {
let recovered = tx.to_recovered_transaction();
let signer = recovered.signer();
(recovered.into_signed(), signer)
recovered.into_signed()
})
.unzip();
.collect();
let ommers = vec![];
// todo(onbjerg): these two dont respect chainspec
let withdrawals = Some(Withdrawals::default());
@ -150,7 +142,7 @@ where
chain_spec,
&executor,
) {
Ok((new_header, bundle_state)) => {
Ok((new_header, _bundle_state)) => {
// clear all transactions from pool
pool.remove_transactions(
transactions.iter().map(|tx| tx.hash()).collect(),
@ -198,36 +190,10 @@ where
}
}
// seal the block
let block = Block {
header: new_header.clone().unseal(),
body: transactions,
ommers,
withdrawals,
requests,
};
let sealed_block = block.seal_slow();
let sealed_block_with_senders =
SealedBlockWithSenders::new(sealed_block, senders)
.expect("senders are valid");
// update canon chain for rpc
client.set_canonical_head(new_header.clone());
client.set_safe(new_header.clone());
client.set_finalized(new_header.clone());
debug!(target: "consensus::auto", header=?sealed_block_with_senders.hash(), "sending block notification");
let chain = Arc::new(Chain::new(
vec![sealed_block_with_senders],
bundle_state,
None,
));
// send block notification
let _ = canon_state_notification
.send(reth_provider::CanonStateNotification::Commit { new: chain });
}
Err(err) => {
warn!(target: "consensus::auto", %err, "failed to execute block")