mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: preload cachedreads with tip state (#5804)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -5651,6 +5651,7 @@ dependencies = [
|
||||
"eyre",
|
||||
"fdlimit",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"human_bytes",
|
||||
"humantime",
|
||||
"hyper",
|
||||
@ -6312,8 +6313,10 @@ dependencies = [
|
||||
"reth-interfaces",
|
||||
"reth-metrics",
|
||||
"reth-primitives",
|
||||
"reth-provider",
|
||||
"reth-rpc-types",
|
||||
"reth-rpc-types-compat",
|
||||
"reth-tasks",
|
||||
"reth-transaction-pool",
|
||||
"revm",
|
||||
"revm-primitives",
|
||||
|
||||
@ -112,6 +112,7 @@ const-str = "0.5.6"
|
||||
boyer-moore-magiclen = "0.2.16"
|
||||
itertools.workspace = true
|
||||
rayon.workspace = true
|
||||
futures-util.workspace = true
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
jemallocator = { version = "0.5.0", optional = true }
|
||||
|
||||
@ -7,6 +7,7 @@ use crate::cli::{
|
||||
use clap::Args;
|
||||
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
|
||||
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
|
||||
use reth_provider::CanonStateSubscriptions;
|
||||
use reth_tasks::TaskSpawner;
|
||||
use std::{fmt, marker::PhantomData};
|
||||
|
||||
@ -161,7 +162,10 @@ pub trait RethNodeCommandConfig: fmt::Debug {
|
||||
components.chain_spec(),
|
||||
payload_builder,
|
||||
);
|
||||
let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator);
|
||||
let (payload_service, payload_builder) = PayloadBuilderService::new(
|
||||
payload_generator,
|
||||
components.events().canonical_state_stream(),
|
||||
);
|
||||
|
||||
components
|
||||
.task_executor()
|
||||
|
||||
@ -26,7 +26,7 @@ use reth_primitives::{
|
||||
fs::{self},
|
||||
ChainSpec,
|
||||
};
|
||||
use reth_provider::{providers::BlockchainProvider, ProviderFactory};
|
||||
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory};
|
||||
use reth_revm::EvmProcessorFactory;
|
||||
use reth_rpc_types::{
|
||||
engine::{CancunPayloadFields, ForkchoiceState, PayloadAttributes},
|
||||
@ -175,7 +175,8 @@ impl Command {
|
||||
self.chain.clone(),
|
||||
payload_builder,
|
||||
);
|
||||
let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator);
|
||||
let (payload_service, payload_builder) =
|
||||
PayloadBuilderService::new(payload_generator, blockchain_db.canonical_state_stream());
|
||||
ctx.task_executor.spawn_critical("payload builder service", Box::pin(payload_service));
|
||||
|
||||
// Configure the consensus engine
|
||||
|
||||
@ -10,6 +10,30 @@
|
||||
use alloy_rlp::Encodable;
|
||||
use futures_core::ready;
|
||||
use futures_util::FutureExt;
|
||||
use reth_interfaces::RethResult;
|
||||
use reth_payload_builder::{
|
||||
database::CachedReads, error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive,
|
||||
PayloadBuilderAttributes, PayloadId, PayloadJob, PayloadJobGenerator,
|
||||
};
|
||||
use reth_primitives::{
|
||||
bytes::BytesMut,
|
||||
constants::{
|
||||
BEACON_NONCE, EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, EMPTY_WITHDRAWALS,
|
||||
ETHEREUM_BLOCK_GAS_LIMIT, RETH_CLIENT_VERSION, SLOT_DURATION,
|
||||
},
|
||||
proofs, Block, BlockNumberOrTag, Bytes, ChainSpec, Header, Receipts, SealedBlock, Withdrawal,
|
||||
B256, EMPTY_OMMER_ROOT_HASH, U256,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReaderIdExt, BlockSource, BundleStateWithReceipts, CanonStateNotification, ProviderError,
|
||||
StateProviderFactory,
|
||||
};
|
||||
use reth_revm::{
|
||||
database::StateProviderDatabase,
|
||||
state_change::{apply_beacon_root_contract_call, post_block_withdrawals_balance_increments},
|
||||
};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use reth_transaction_pool::TransactionPool;
|
||||
use revm::{
|
||||
db::states::bundle_state::BundleRetention,
|
||||
primitives::{BlockEnv, CfgEnv, Env},
|
||||
@ -28,30 +52,6 @@ use tokio::{
|
||||
};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
use reth_interfaces::RethResult;
|
||||
use reth_payload_builder::{
|
||||
database::CachedReads, error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive,
|
||||
PayloadBuilderAttributes, PayloadId, PayloadJob, PayloadJobGenerator,
|
||||
};
|
||||
use reth_primitives::{
|
||||
bytes::BytesMut,
|
||||
constants::{
|
||||
BEACON_NONCE, EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, EMPTY_WITHDRAWALS,
|
||||
ETHEREUM_BLOCK_GAS_LIMIT, RETH_CLIENT_VERSION, SLOT_DURATION,
|
||||
},
|
||||
proofs, Block, BlockNumberOrTag, Bytes, ChainSpec, Header, Receipts, SealedBlock, Withdrawal,
|
||||
B256, EMPTY_OMMER_ROOT_HASH, U256,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReaderIdExt, BlockSource, BundleStateWithReceipts, ProviderError, StateProviderFactory,
|
||||
};
|
||||
use reth_revm::{
|
||||
database::StateProviderDatabase,
|
||||
state_change::{apply_beacon_root_contract_call, post_block_withdrawals_balance_increments},
|
||||
};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use reth_transaction_pool::TransactionPool;
|
||||
|
||||
use crate::metrics::PayloadBuilderMetrics;
|
||||
|
||||
mod metrics;
|
||||
@ -75,6 +75,8 @@ pub struct BasicPayloadJobGenerator<Client, Pool, Tasks, Builder> {
|
||||
///
|
||||
/// See [PayloadBuilder]
|
||||
builder: Builder,
|
||||
/// Stored cached_reads for new payload jobs.
|
||||
pre_cached: Option<PrecachedState>,
|
||||
}
|
||||
|
||||
// === impl BasicPayloadJobGenerator ===
|
||||
@ -97,6 +99,7 @@ impl<Client, Pool, Tasks, Builder> BasicPayloadJobGenerator<Client, Pool, Tasks,
|
||||
config,
|
||||
chain_spec,
|
||||
builder,
|
||||
pre_cached: None,
|
||||
}
|
||||
}
|
||||
|
||||
@ -123,6 +126,22 @@ impl<Client, Pool, Tasks, Builder> BasicPayloadJobGenerator<Client, Pool, Tasks,
|
||||
fn job_deadline(&self, unix_timestamp: u64) -> tokio::time::Instant {
|
||||
tokio::time::Instant::now() + self.max_job_duration(unix_timestamp)
|
||||
}
|
||||
|
||||
/// Returns a reference to the tasks type
|
||||
pub fn tasks(&self) -> &Tasks {
|
||||
&self.executor
|
||||
}
|
||||
|
||||
/// Returns the pre-cached reads for the given parent block if it matches the cached state's
|
||||
/// block.
|
||||
fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
|
||||
let pre_cached = self.pre_cached.as_ref()?;
|
||||
if pre_cached.block == parent {
|
||||
Some(pre_cached.cached.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl BasicPayloadJobGenerator ===
|
||||
@ -167,6 +186,8 @@ where
|
||||
let until = self.job_deadline(config.attributes.timestamp);
|
||||
let deadline = Box::pin(tokio::time::sleep_until(until));
|
||||
|
||||
let cached_reads = self.maybe_pre_cached(config.parent_block.hash());
|
||||
|
||||
Ok(BasicPayloadJob {
|
||||
config,
|
||||
client: self.client.clone(),
|
||||
@ -176,12 +197,43 @@ where
|
||||
interval: tokio::time::interval(self.config.interval),
|
||||
best_payload: None,
|
||||
pending_block: None,
|
||||
cached_reads: None,
|
||||
cached_reads,
|
||||
payload_task_guard: self.payload_task_guard.clone(),
|
||||
metrics: Default::default(),
|
||||
builder: self.builder.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn on_new_state(&mut self, new_state: CanonStateNotification) {
|
||||
if let Some(committed) = new_state.committed() {
|
||||
let mut cached = CachedReads::default();
|
||||
|
||||
// extract the state from the notification and put it into the cache
|
||||
let new_state = committed.state();
|
||||
for (addr, acc) in new_state.bundle_accounts_iter() {
|
||||
if let Some(info) = acc.info.clone() {
|
||||
// we want pre cache existing accounts and their storage
|
||||
// this only includes changed accounts and storage but is better than nothing
|
||||
let storage =
|
||||
acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
|
||||
cached.insert_account(addr, info, storage);
|
||||
}
|
||||
}
|
||||
|
||||
self.pre_cached = Some(PrecachedState { block: committed.tip().hash, cached });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Pre-filled [CachedReads] for a specific block.
|
||||
///
|
||||
/// This is extracted from the [CanonStateNotification] for the tip block.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PrecachedState {
|
||||
/// The block for which the state is pre-cached.
|
||||
pub block: B256,
|
||||
/// Cached state for the block.
|
||||
pub cached: CachedReads,
|
||||
}
|
||||
|
||||
/// Restricts how many generator tasks can be executed at once.
|
||||
|
||||
@ -18,6 +18,8 @@ reth-rpc-types.workspace = true
|
||||
reth-transaction-pool.workspace = true
|
||||
reth-interfaces.workspace = true
|
||||
reth-rpc-types-compat.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
|
||||
# ethereum
|
||||
alloy-rlp.workspace = true
|
||||
|
||||
@ -50,6 +50,16 @@ impl CachedReads {
|
||||
fn as_db_mut<DB>(&mut self, db: DB) -> CachedReadsDbMut<'_, DB> {
|
||||
CachedReadsDbMut { cached: self, db }
|
||||
}
|
||||
|
||||
/// Inserts an account info into the cache.
|
||||
pub fn insert_account(
|
||||
&mut self,
|
||||
address: Address,
|
||||
info: AccountInfo,
|
||||
storage: HashMap<U256, U256>,
|
||||
) {
|
||||
self.accounts.insert(address, CachedAccount { info: Some(info), storage });
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@ -7,7 +7,8 @@ use crate::{
|
||||
error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator,
|
||||
BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob,
|
||||
};
|
||||
use futures_util::{future::FutureExt, StreamExt};
|
||||
use futures_util::{future::FutureExt, Stream, StreamExt};
|
||||
use reth_provider::CanonStateNotification;
|
||||
use reth_rpc_types::engine::PayloadId;
|
||||
use std::{
|
||||
fmt,
|
||||
@ -160,7 +161,7 @@ impl PayloadBuilderHandle {
|
||||
/// does know nothing about how to build them, it just drives their jobs to completion.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct PayloadBuilderService<Gen>
|
||||
pub struct PayloadBuilderService<Gen, St>
|
||||
where
|
||||
Gen: PayloadJobGenerator,
|
||||
{
|
||||
@ -174,17 +175,22 @@ where
|
||||
command_rx: UnboundedReceiverStream<PayloadServiceCommand>,
|
||||
/// Metrics for the payload builder service
|
||||
metrics: PayloadBuilderServiceMetrics,
|
||||
/// Chain events notification stream
|
||||
chain_events: St,
|
||||
}
|
||||
|
||||
// === impl PayloadBuilderService ===
|
||||
|
||||
impl<Gen> PayloadBuilderService<Gen>
|
||||
impl<Gen, St> PayloadBuilderService<Gen, St>
|
||||
where
|
||||
Gen: PayloadJobGenerator,
|
||||
{
|
||||
/// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact
|
||||
/// with it.
|
||||
pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) {
|
||||
///
|
||||
/// This also takes a stream of chain events that will be forwarded to the generator to apply
|
||||
/// additional logic when new state is committed. See also [PayloadJobGenerator::on_new_state].
|
||||
pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) {
|
||||
let (service_tx, command_rx) = mpsc::unbounded_channel();
|
||||
let service = Self {
|
||||
generator,
|
||||
@ -192,7 +198,9 @@ where
|
||||
service_tx,
|
||||
command_rx: UnboundedReceiverStream::new(command_rx),
|
||||
metrics: Default::default(),
|
||||
chain_events,
|
||||
};
|
||||
|
||||
let handle = service.handle();
|
||||
(service, handle)
|
||||
}
|
||||
@ -271,17 +279,22 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Gen> Future for PayloadBuilderService<Gen>
|
||||
impl<Gen, St> Future for PayloadBuilderService<Gen, St>
|
||||
where
|
||||
Gen: PayloadJobGenerator + Unpin + 'static,
|
||||
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
|
||||
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
loop {
|
||||
// notify the generator of new chain events
|
||||
while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
|
||||
this.generator.on_new_state(new_head);
|
||||
}
|
||||
|
||||
// we poll all jobs first, so we always have the latest payload that we can report if
|
||||
// requests
|
||||
// we don't care about the order of the jobs, so we can just swap_remove them
|
||||
|
||||
@ -6,6 +6,7 @@ use crate::{
|
||||
PayloadJobGenerator,
|
||||
};
|
||||
use reth_primitives::{Block, U256};
|
||||
use reth_provider::CanonStateNotification;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
@ -14,9 +15,14 @@ use std::{
|
||||
};
|
||||
|
||||
/// Creates a new [PayloadBuilderService] for testing purposes.
|
||||
pub fn test_payload_service(
|
||||
) -> (PayloadBuilderService<TestPayloadJobGenerator>, PayloadBuilderHandle) {
|
||||
PayloadBuilderService::new(Default::default())
|
||||
pub fn test_payload_service() -> (
|
||||
PayloadBuilderService<
|
||||
TestPayloadJobGenerator,
|
||||
futures_util::stream::Empty<CanonStateNotification>,
|
||||
>,
|
||||
PayloadBuilderHandle,
|
||||
) {
|
||||
PayloadBuilderService::new(Default::default(), futures_util::stream::empty())
|
||||
}
|
||||
|
||||
/// Creates a new [PayloadBuilderService] for testing purposes and spawns it in the background.
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
//! Trait abstractions used by the payload crate.
|
||||
|
||||
use reth_provider::CanonStateNotification;
|
||||
|
||||
use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes};
|
||||
use std::{future::Future, sync::Arc};
|
||||
|
||||
@ -80,4 +82,12 @@ pub trait PayloadJobGenerator: Send + Sync {
|
||||
&self,
|
||||
attr: PayloadBuilderAttributes,
|
||||
) -> Result<Self::Job, PayloadBuilderError>;
|
||||
|
||||
/// Handles new chain state events
|
||||
///
|
||||
/// This is intended for any logic that needs to be run when the chain state changes or used to
|
||||
/// use the in memory state for the head block.
|
||||
fn on_new_state(&mut self, new_state: CanonStateNotification) {
|
||||
let _ = new_state;
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,7 +16,10 @@ use reth_trie::{
|
||||
updates::TrieUpdates,
|
||||
StateRoot, StateRootError,
|
||||
};
|
||||
use revm::{db::states::BundleState, primitives::AccountInfo};
|
||||
use revm::{
|
||||
db::{states::BundleState, BundleAccount},
|
||||
primitives::AccountInfo,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub use revm::db::states::OriginalValuesKnown;
|
||||
@ -110,6 +113,11 @@ impl BundleStateWithReceipts {
|
||||
self.bundle.state().iter().map(|(a, acc)| (*a, acc.info.as_ref()))
|
||||
}
|
||||
|
||||
/// Return iterator over all [BundleAccount]s in the bundle
|
||||
pub fn bundle_accounts_iter(&self) -> impl Iterator<Item = (Address, &BundleAccount)> {
|
||||
self.bundle.state().iter().map(|(a, acc)| (*a, acc))
|
||||
}
|
||||
|
||||
/// Get account if account is known.
|
||||
pub fn account(&self, address: &Address) -> Option<Option<Account>> {
|
||||
self.bundle.account(address).map(|a| a.info.clone().map(into_reth_acc))
|
||||
|
||||
Reference in New Issue
Block a user