mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: canonical state for local engine (#11245)
This commit is contained in:
@ -11,6 +11,7 @@ exclude.workspace = true
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-beacon-consensus.workspace = true
|
||||
reth-chain-state.workspace = true
|
||||
reth-engine-tree.workspace = true
|
||||
reth-node-types.workspace = true
|
||||
reth-payload-builder.workspace = true
|
||||
|
||||
@ -7,8 +7,9 @@
|
||||
//! building at a fixed interval.
|
||||
|
||||
use crate::miner::MiningMode;
|
||||
use alloy_primitives::B256;
|
||||
use eyre::eyre;
|
||||
use reth_beacon_consensus::EngineNodeTypes;
|
||||
use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain};
|
||||
use reth_engine_tree::persistence::PersistenceHandle;
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_payload_primitives::{
|
||||
@ -17,12 +18,12 @@ use reth_payload_primitives::{
|
||||
use reth_provider::ProviderFactory;
|
||||
use reth_prune::PrunerWithFactory;
|
||||
use reth_stages_api::MetricEventsSender;
|
||||
use std::fmt::Formatter;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::debug;
|
||||
|
||||
/// Provides a local dev service engine that can be used to drive the
|
||||
/// chain forward.
|
||||
#[derive(Debug)]
|
||||
pub struct LocalEngineService<N, B>
|
||||
where
|
||||
N: EngineNodeTypes,
|
||||
@ -32,30 +33,14 @@ where
|
||||
payload_builder: PayloadBuilderHandle<N::Engine>,
|
||||
/// The payload attribute builder for the engine
|
||||
payload_attributes_builder: B,
|
||||
/// Keep track of the Canonical chain state that isn't persisted on disk yet
|
||||
canonical_in_memory_state: CanonicalInMemoryState,
|
||||
/// A handle to the persistence layer
|
||||
persistence_handle: PersistenceHandle,
|
||||
/// The hash of the current head
|
||||
head: B256,
|
||||
/// The mining mode for the engine
|
||||
mode: MiningMode,
|
||||
}
|
||||
|
||||
impl<N, B> std::fmt::Debug for LocalEngineService<N, B>
|
||||
where
|
||||
N: EngineNodeTypes,
|
||||
B: PayloadAttributesBuilder<PayloadAttributes = <N::Engine as PayloadTypes>::PayloadAttributes>,
|
||||
{
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("LocalEngineService")
|
||||
.field("payload_builder", &self.payload_builder)
|
||||
.field("payload_attributes_builder", &self.payload_attributes_builder)
|
||||
.field("persistence_handle", &self.persistence_handle)
|
||||
.field("head", &self.head)
|
||||
.field("mode", &self.mode)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<N, B> LocalEngineService<N, B>
|
||||
where
|
||||
N: EngineNodeTypes,
|
||||
@ -67,14 +52,20 @@ where
|
||||
payload_attributes_builder: B,
|
||||
provider: ProviderFactory<N>,
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
canonical_in_memory_state: CanonicalInMemoryState,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
head: B256,
|
||||
mode: MiningMode,
|
||||
) -> Self {
|
||||
let persistence_handle =
|
||||
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
|
||||
Self { payload_builder, payload_attributes_builder, persistence_handle, head, mode }
|
||||
Self {
|
||||
payload_builder,
|
||||
payload_attributes_builder,
|
||||
canonical_in_memory_state,
|
||||
persistence_handle,
|
||||
mode,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the [`LocalEngineService`] on a tokio green thread. The service will poll the payload
|
||||
@ -86,8 +77,8 @@ where
|
||||
payload_attributes_builder: B,
|
||||
provider: ProviderFactory<N>,
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
canonical_in_memory_state: CanonicalInMemoryState,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
head: B256,
|
||||
mode: MiningMode,
|
||||
) {
|
||||
let engine = Self::new(
|
||||
@ -95,8 +86,8 @@ where
|
||||
payload_attributes_builder,
|
||||
provider,
|
||||
pruner,
|
||||
canonical_in_memory_state,
|
||||
sync_metrics_tx,
|
||||
head,
|
||||
mode,
|
||||
);
|
||||
|
||||
@ -112,26 +103,29 @@ where
|
||||
(&mut self.mode).await;
|
||||
|
||||
// Start a new payload building job
|
||||
let new_head = self.build_and_save_payload().await;
|
||||
let executed_block = self.build_and_save_payload().await;
|
||||
|
||||
if new_head.is_err() {
|
||||
debug!(target: "local_engine", err = ?new_head.unwrap_err(), "failed payload building");
|
||||
if executed_block.is_err() {
|
||||
debug!(target: "local_engine", err = ?executed_block.unwrap_err(), "failed payload building");
|
||||
continue
|
||||
}
|
||||
let block = executed_block.expect("not error");
|
||||
|
||||
// Update the head
|
||||
self.head = new_head.expect("not error");
|
||||
let res = self.update_canonical_in_memory_state(block);
|
||||
if res.is_err() {
|
||||
debug!(target: "local_engine", err = ?res.unwrap_err(), "failed canonical state update");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a payload by initiating a new payload job via the [`PayloadBuilderHandle`],
|
||||
/// saving the execution outcome to persistence and returning the current head of the
|
||||
/// chain.
|
||||
async fn build_and_save_payload(&self) -> eyre::Result<B256> {
|
||||
/// saving the execution outcome to persistence and returning the executed block.
|
||||
async fn build_and_save_payload(&self) -> eyre::Result<ExecutedBlock> {
|
||||
let payload_attributes = self.payload_attributes_builder.build()?;
|
||||
let parent = self.canonical_in_memory_state.get_canonical_head().hash();
|
||||
let payload_builder_attributes =
|
||||
<N::Engine as PayloadTypes>::PayloadBuilderAttributes::try_new(
|
||||
self.head,
|
||||
parent,
|
||||
payload_attributes,
|
||||
)
|
||||
.map_err(|_| eyre::eyre!("failed to fetch payload attributes"))?;
|
||||
@ -142,22 +136,38 @@ where
|
||||
.await?
|
||||
.await?;
|
||||
|
||||
let block = payload.executed_block().map(|block| vec![block]).unwrap_or_default();
|
||||
let executed_block =
|
||||
payload.executed_block().ok_or_else(|| eyre!("missing executed block"))?;
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let _ = self.persistence_handle.save_blocks(block, tx);
|
||||
let _ = self.persistence_handle.save_blocks(vec![executed_block.clone()], tx);
|
||||
|
||||
// Wait for the persistence_handle to complete
|
||||
let new_head = rx.await?.ok_or_else(|| eyre::eyre!("missing new head"))?;
|
||||
let _ = rx.await?.ok_or_else(|| eyre!("missing new head"))?;
|
||||
|
||||
Ok(new_head.hash)
|
||||
Ok(executed_block)
|
||||
}
|
||||
|
||||
/// Update the canonical in memory state and send notification for a new canon state to
|
||||
/// all the listeners.
|
||||
fn update_canonical_in_memory_state(&self, executed_block: ExecutedBlock) -> eyre::Result<()> {
|
||||
let chain = NewCanonicalChain::Commit { new: vec![executed_block] };
|
||||
let tip = chain.tip().header.clone();
|
||||
let notification = chain.to_chain_notification();
|
||||
|
||||
// Update the tracked in-memory state with the new chain
|
||||
self.canonical_in_memory_state.update_chain(chain);
|
||||
self.canonical_in_memory_state.set_canonical_head(tip);
|
||||
|
||||
// Sends an event to all active listeners about the new canonical chain
|
||||
self.canonical_in_memory_state.notify_canon_state(notification);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::B256;
|
||||
use reth_chainspec::MAINNET;
|
||||
use reth_config::PruneConfig;
|
||||
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
|
||||
@ -201,20 +211,20 @@ mod tests {
|
||||
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
|
||||
create_test_rw_db(),
|
||||
MAINNET.clone(),
|
||||
StaticFileProvider::read_write(static_dir_path).unwrap(),
|
||||
StaticFileProvider::read_write(static_dir_path)?,
|
||||
);
|
||||
let pruner = PrunerBuilder::new(PruneConfig::default())
|
||||
.build_with_provider_factory(provider.clone());
|
||||
|
||||
// Create an empty canonical in memory state
|
||||
let canonical_in_memory_state = CanonicalInMemoryState::empty();
|
||||
|
||||
// Start the payload builder service
|
||||
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();
|
||||
|
||||
// Sync metric channel
|
||||
let (sync_metrics_tx, _) = unbounded_channel();
|
||||
|
||||
// Get the attributes for start of block building
|
||||
let genesis_hash = B256::random();
|
||||
|
||||
// Launch the LocalEngineService in interval mode
|
||||
let period = Duration::from_secs(1);
|
||||
LocalEngineService::spawn_new(
|
||||
@ -222,13 +232,17 @@ mod tests {
|
||||
TestPayloadAttributesBuilder,
|
||||
provider.clone(),
|
||||
pruner,
|
||||
canonical_in_memory_state,
|
||||
sync_metrics_tx,
|
||||
genesis_hash,
|
||||
MiningMode::interval(period),
|
||||
);
|
||||
|
||||
// Check that we have no block for now
|
||||
let block = provider.block_by_number(0)?;
|
||||
assert!(block.is_none());
|
||||
|
||||
// Wait 4 intervals
|
||||
tokio::time::sleep(4 * period).await;
|
||||
tokio::time::sleep(2 * period).await;
|
||||
|
||||
// Assert a block has been build
|
||||
let block = provider.block_by_number(0)?;
|
||||
@ -246,11 +260,14 @@ mod tests {
|
||||
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
|
||||
create_test_rw_db(),
|
||||
MAINNET.clone(),
|
||||
StaticFileProvider::read_write(static_dir_path).unwrap(),
|
||||
StaticFileProvider::read_write(static_dir_path)?,
|
||||
);
|
||||
let pruner = PrunerBuilder::new(PruneConfig::default())
|
||||
.build_with_provider_factory(provider.clone());
|
||||
|
||||
// Create an empty canonical in memory state
|
||||
let canonical_in_memory_state = CanonicalInMemoryState::empty();
|
||||
|
||||
// Start the payload builder service
|
||||
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();
|
||||
|
||||
@ -260,17 +277,14 @@ mod tests {
|
||||
// Sync metric channel
|
||||
let (sync_metrics_tx, _) = unbounded_channel();
|
||||
|
||||
// Get the attributes for start of block building
|
||||
let genesis_hash = B256::random();
|
||||
|
||||
// Launch the LocalEngineService in instant mode
|
||||
LocalEngineService::spawn_new(
|
||||
payload_handle,
|
||||
TestPayloadAttributesBuilder,
|
||||
provider.clone(),
|
||||
pruner,
|
||||
canonical_in_memory_state,
|
||||
sync_metrics_tx,
|
||||
genesis_hash,
|
||||
MiningMode::instant(pool.clone()),
|
||||
);
|
||||
|
||||
@ -295,4 +309,54 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_canonical_chain_subscription() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
// Start the provider and the pruner
|
||||
let (_, static_dir_path) = create_test_static_files_dir();
|
||||
let provider = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
|
||||
create_test_rw_db(),
|
||||
MAINNET.clone(),
|
||||
StaticFileProvider::read_write(static_dir_path)?,
|
||||
);
|
||||
let pruner = PrunerBuilder::new(PruneConfig::default())
|
||||
.build_with_provider_factory(provider.clone());
|
||||
|
||||
// Create an empty canonical in memory state
|
||||
let canonical_in_memory_state = CanonicalInMemoryState::empty();
|
||||
let mut notifications = canonical_in_memory_state.subscribe_canon_state();
|
||||
|
||||
// Start the payload builder service
|
||||
let payload_handle = spawn_test_payload_service::<EthEngineTypes>();
|
||||
|
||||
// Start a transaction pool
|
||||
let pool = testing_pool();
|
||||
|
||||
// Sync metric channel
|
||||
let (sync_metrics_tx, _) = unbounded_channel();
|
||||
|
||||
// Launch the LocalEngineService in instant mode
|
||||
LocalEngineService::spawn_new(
|
||||
payload_handle,
|
||||
TestPayloadAttributesBuilder,
|
||||
provider.clone(),
|
||||
pruner,
|
||||
canonical_in_memory_state,
|
||||
sync_metrics_tx,
|
||||
MiningMode::instant(pool.clone()),
|
||||
);
|
||||
|
||||
// Add a transaction to the pool
|
||||
let transaction = MockTransaction::legacy().with_gas_price(10);
|
||||
pool.add_transaction(Default::default(), transaction).await?;
|
||||
|
||||
// Check a notification is received for block 0
|
||||
let res = notifications.recv().await?;
|
||||
|
||||
assert_eq!(res.tip().number, 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user