diff --git a/Cargo.lock b/Cargo.lock index cb6dd8315..d77922209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4437,6 +4437,7 @@ dependencies = [ "num_cpus", "proptest", "reth-auto-seal-consensus", + "reth-basic-payload-builder", "reth-beacon-consensus", "reth-db", "reth-discv4", @@ -4943,7 +4944,6 @@ version = "0.1.0" dependencies = [ "futures-core", "futures-util", - "parking_lot 0.12.1", "reth-consensus-common", "reth-interfaces", "reth-primitives", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index c588cb03b..f007b8525 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -32,6 +32,7 @@ reth-tracing = { path = "../../crates/tracing" } reth-tasks = { path = "../../crates/tasks" } reth-net-nat = { path = "../../crates/net/nat" } reth-payload-builder = { path = "../../crates/payload/builder" } +reth-basic-payload-builder = { path = "../../crates/payload/basic" } reth-discv4 = { path = "../../crates/net/discv4" } # crypto diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 197ac5a8b..abc480435 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -13,6 +13,7 @@ use eyre::Context; use fdlimit::raise_fd_limit; use futures::{pin_mut, stream::select as stream_select, FutureExt, StreamExt}; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus}; +use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage}; use reth_db::{ database::Database, @@ -38,7 +39,6 @@ use reth_interfaces::{ }; use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::NetworkInfo; -use reth_payload_builder::TestPayloadStore; use reth_primitives::{BlockHashOrNumber, Chain, ChainSpec, Head, Header, SealedHeader, H256}; use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; use reth_revm::Factory; @@ -72,6 +72,7 @@ use tracing::*; use crate::dirs::MaybePlatformPath; use reth_interfaces::p2p::headers::client::HeadersClient; +use reth_payload_builder::PayloadBuilderService; use reth_stages::stages::{MERKLE_EXECUTION, MERKLE_UNWIND}; pub mod events; @@ -291,8 +292,19 @@ impl Command { ctx.task_executor .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); - // TODO: change to non-test or rename this component eventually - let test_payload_store = TestPayloadStore::default(); + // configure the payload builder + let payload_generator = BasicPayloadJobGenerator::new( + shareable_db.clone(), + transaction_pool.clone(), + ctx.task_executor.clone(), + // TODO use extradata from args + BasicPayloadJobGeneratorConfig::default(), + Arc::clone(&self.chain), + ); + let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator); + + debug!(target: "reth::cli", "Spawning payload builder service"); + ctx.task_executor.spawn_critical("payload builder service", payload_service); let beacon_consensus_engine = BeaconConsensusEngine::new( Arc::clone(&db), @@ -301,7 +313,7 @@ impl Command { blockchain_tree.clone(), consensus_engine_rx, self.debug.max_block, - test_payload_store, + payload_builder, ); info!(target: "reth::cli", "Consensus engine initialized"); diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 3db7f1f2b..be60e932e 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -30,6 +30,7 @@ metrics = "0.20.1" [dev-dependencies] # reth +reth-payload-builder = { path = "../../payload/builder", features = ["test-utils"] } reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } reth-stages = { path = "../../stages", features = ["test-utils"] } reth-executor = { path = "../../executor", features = ["test-utils"] } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 0b1051c86..0219f20f8 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -9,7 +9,7 @@ use reth_interfaces::{ Error, }; use reth_metrics_derive::Metrics; -use reth_payload_builder::PayloadStore; +use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; use reth_primitives::{BlockNumber, Header, SealedBlock, H256}; use reth_rpc_types::engine::{ EngineRpcError, ExecutionPayload, ExecutionPayloadEnvelope, ForkchoiceUpdated, @@ -66,13 +66,12 @@ struct Metrics { /// /// If the future is polled more than once. Leads to undefined state. #[must_use = "Future does nothing unless polled"] -pub struct BeaconConsensusEngine +pub struct BeaconConsensusEngine where DB: Database, TS: TaskSpawner, U: SyncStateUpdater, BT: BlockchainTreeEngine, - P: PayloadStore, { /// The database handle. db: Arc, @@ -95,18 +94,17 @@ where /// purposes. max_block: Option, /// The payload store. - payload_store: P, + payload_builder: PayloadBuilderHandle, /// Consensus engine metrics. metrics: Metrics, } -impl BeaconConsensusEngine +impl BeaconConsensusEngine where DB: Database + Unpin + 'static, TS: TaskSpawner, U: SyncStateUpdater + 'static, BT: BlockchainTreeEngine + 'static, - P: PayloadStore + 'static, { /// Create new instance of the [BeaconConsensusEngine]. /// @@ -119,7 +117,7 @@ where blockchain_tree: BT, message_rx: UnboundedReceiver, max_block: Option, - payload_store: P, + payload_builder: PayloadBuilderHandle, ) -> Self { Self { db, @@ -130,7 +128,7 @@ where forkchoice_state: None, next_action: BeaconEngineAction::None, max_block, - payload_store, + payload_builder, metrics: Metrics::default(), } } @@ -256,7 +254,9 @@ where // if payloadAttributes is not null and the forkchoice state has been // updated successfully. The build process is specified in the Payload // building section. - let payload_id = self.payload_store.new_payload(header.parent_hash, attrs)?; + let attributes = PayloadBuilderAttributes::new(header.parent_hash, attrs); + // TODO(mattsse) this needs to be handled asynchronously + let payload_id = self.payload_builder.send_new_payload(attributes); // Client software MUST respond to this method call in the following way: // { @@ -279,7 +279,7 @@ where /// Called to receive the execution payload associated with a payload build process. pub fn on_get_payload( &self, - payload_id: PayloadId, + _payload_id: PayloadId, ) -> Result { // TODO: Client software SHOULD stop the updating process when either a call to // engine_getPayload with the build process's payloadId is made or SECONDS_PER_SLOT (12s in @@ -287,10 +287,12 @@ where // timestamp parameter. // for now just return the output from the payload store - match self.payload_store.get_execution_payload(payload_id) { - Some(payload) => Ok(payload), - None => Err(EngineRpcError::UnknownPayload.into()), - } + // match self.payload_builder.get_execution_payload(payload_id) { + // Some(payload) => Ok(payload), + // None => Err(EngineRpcError::UnknownPayload.into()), + // } + + todo!() } /// When the Consensus layer receives a new block via the consensus gossip protocol, @@ -432,13 +434,12 @@ where /// local forkchoice state, it will launch the pipeline to sync to the head hash. /// While the pipeline is syncing, the consensus engine will keep processing messages from the /// receiver and forwarding them to the blockchain tree. -impl Future for BeaconConsensusEngine +impl Future for BeaconConsensusEngine where DB: Database + Unpin + 'static, TS: TaskSpawner + Unpin, U: SyncStateUpdater + Unpin + 'static, BT: BlockchainTreeEngine + Unpin + 'static, - P: PayloadStore + Unpin + 'static, { type Output = Result<(), BeaconEngineError>; @@ -602,7 +603,7 @@ mod tests { test_utils::TestExecutorFactory, }; use reth_interfaces::{sync::NoopSyncStateUpdate, test_utils::TestConsensus}; - use reth_payload_builder::TestPayloadStore; + use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; use reth_provider::Transaction; use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError}; @@ -619,7 +620,6 @@ mod tests { TokioTaskExecutor, NoopSyncStateUpdate, ShareableBlockchainTree>, TestConsensus, TestExecutorFactory>, - TestPayloadStore, >; struct TestEnv { @@ -670,7 +670,8 @@ mod tests { reth_tracing::init_test_tracing(); let db = create_test_rw_db(); let consensus = TestConsensus::default(); - let payload_store = TestPayloadStore::default(); + let payload_builder = spawn_test_payload_service(); + let executor_factory = TestExecutorFactory::new(chain_spec.clone()); executor_factory.extend(executor_results); @@ -699,7 +700,7 @@ mod tests { tree, sync_rx, None, - payload_store, + payload_builder, ), TestEnv::new(db, tip_rx, sync_tx), ) diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 306aa37ae..dcd6a28df 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -45,17 +45,6 @@ use tokio::{ }; use tracing::trace; -// TODO move to common since commonly used - -/// Settings for how to generate a block -#[derive(Debug, Clone)] -pub struct BlockConfig { - /// Data to include in the block's extra data field. - extradata: Bytes, - /// Target gas ceiling for mined blocks, defaults to 30_000_000 gas. - max_gas_limit: u64, -} - /// The [PayloadJobGenerator] that creates [BasicPayloadJob]s. pub struct BasicPayloadJobGenerator { /// The client that can interact with the chain. @@ -66,8 +55,6 @@ pub struct BasicPayloadJobGenerator { executor: Tasks, /// The configuration for the job generator. config: BasicPayloadJobGeneratorConfig, - /// The configuration for how to create a block. - block_config: BlockConfig, /// Restricts how many generator tasks can be executed at once. payload_task_guard: PayloadTaskGuard, /// The chain spec. @@ -83,7 +70,6 @@ impl BasicPayloadJobGenerator { pool: Pool, executor: Tasks, config: BasicPayloadJobGeneratorConfig, - block_config: BlockConfig, chain_spec: Arc, ) -> Self { Self { @@ -92,7 +78,6 @@ impl BasicPayloadJobGenerator { executor, payload_task_guard: PayloadTaskGuard::new(config.max_payload_tasks), config, - block_config, chain_spec, } } @@ -128,7 +113,7 @@ where initialized_block_env, initialized_cfg, parent_block: Arc::new(parent_block), - extra_data: self.block_config.extradata.clone(), + extra_data: self.config.extradata.clone(), attributes, chain_spec: Arc::clone(&self.chain_spec), }; @@ -167,6 +152,10 @@ impl PayloadTaskGuard { /// Settings for the [BasicPayloadJobGenerator]. #[derive(Debug, Clone)] pub struct BasicPayloadJobGeneratorConfig { + /// Data to include in the block's extra data field. + extradata: Bytes, + /// Target gas ceiling for mined blocks, defaults to 30_000_000 gas. + max_gas_limit: u64, /// The interval at which the job should build a new payload after the last. interval: Duration, /// The deadline when this job should resolve. @@ -200,11 +189,30 @@ impl BasicPayloadJobGeneratorConfig { self.max_payload_tasks = max_payload_tasks; self } + + /// Sets the data to include in the block's extra data field. + /// + /// Defaults to the current client version. + pub fn extradata(mut self, extradata: Bytes) -> Self { + self.extradata = extradata; + self + } + + /// Sets the target gas ceiling for mined blocks. + /// + /// Defaults to 30_000_000 gas. + pub fn max_gas_limit(mut self, max_gas_limit: u64) -> Self { + self.max_gas_limit = max_gas_limit; + self + } } impl Default for BasicPayloadJobGeneratorConfig { fn default() -> Self { + // TODO: use default rlp client version as extradata Self { + extradata: Default::default(), + max_gas_limit: 30_000_000, interval: Duration::from_secs(1), // 12s slot time deadline: SLOT_DURATION, diff --git a/crates/payload/builder/Cargo.toml b/crates/payload/builder/Cargo.toml index 4e94b8e08..380493c50 100644 --- a/crates/payload/builder/Cargo.toml +++ b/crates/payload/builder/Cargo.toml @@ -28,5 +28,8 @@ futures-core = "0.3" ## misc thiserror = "1.0" sha2 = { version = "0.10", default-features = false } -parking_lot = "0.12" tracing = "0.1.37" + + +[features] +test-utils = [] \ No newline at end of file diff --git a/crates/payload/builder/src/lib.rs b/crates/payload/builder/src/lib.rs index 56023888c..ec3e0a8f2 100644 --- a/crates/payload/builder/src/lib.rs +++ b/crates/payload/builder/src/lib.rs @@ -14,72 +14,18 @@ //! //! It Defines the abstractions to create and update payloads: //! - [PayloadJobGenerator]: a type that knows how to create new jobs for creating payloads based -//! on [PayloadAttributes]. +//! on [PayloadAttributes](reth_rpc_types::engine::PayloadAttributes). //! - [PayloadJob]: a type that can yields (better) payloads over time. pub mod error; mod payload; mod service; mod traits; + +#[cfg(any(test, feature = "test-utils"))] +pub mod test_utils; + pub use payload::{BuiltPayload, PayloadBuilderAttributes}; pub use reth_rpc_types::engine::PayloadId; -pub use service::{PayloadBuilderHandle, PayloadBuilderService, PayloadStore as PayloadStore2}; +pub use service::{PayloadBuilderHandle, PayloadBuilderService, PayloadStore}; pub use traits::{PayloadJob, PayloadJobGenerator}; - -use crate::error::PayloadBuilderError; -use parking_lot::Mutex; -use reth_primitives::{H256, U256}; -use reth_rpc_types::engine::{ExecutionPayloadEnvelope, PayloadAttributes}; -use std::{collections::HashMap, sync::Arc}; - -/// A type that has access to all locally built payloads and can create new ones. -/// This type is intended to by used by the engine API. -pub trait PayloadStore: Send + Sync { - /// Returns true if the payload store contains the given payload. - fn contains(&self, payload_id: PayloadId) -> bool; - - /// Returns the current [ExecutionPayloadEnvelope] associated with the [PayloadId]. - /// - /// Returns `None` if the payload is not yet built, See [PayloadStore::new_payload]. - fn get_execution_payload(&self, payload_id: PayloadId) -> Option; - - /// Builds and stores a new payload using the given attributes. - /// - /// Returns an error if the payload could not be built. - // TODO: does this require async? - fn new_payload( - &self, - parent: H256, - attributes: PayloadAttributes, - ) -> Result; -} - -/// A simple in-memory payload store. -#[derive(Debug, Default)] -pub struct TestPayloadStore { - payloads: Arc>>, -} - -impl PayloadStore for TestPayloadStore { - fn contains(&self, payload_id: PayloadId) -> bool { - self.payloads.lock().contains_key(&payload_id) - } - - fn get_execution_payload(&self, _payload_id: PayloadId) -> Option { - // TODO requires conversion - None - } - - fn new_payload( - &self, - parent: H256, - attributes: PayloadAttributes, - ) -> Result { - let attr = PayloadBuilderAttributes::new(parent, attributes); - let payload_id = attr.payload_id(); - self.payloads - .lock() - .insert(payload_id, BuiltPayload::new(payload_id, Default::default(), U256::ZERO)); - Ok(payload_id) - } -} diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index d69051163..23a9dfec1 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -53,6 +53,16 @@ impl PayloadBuilderHandle { rx.await.ok()? } + /// Sends a message to the service to start building a new payload for the given payload. + /// + /// This is the same as [PayloadBuilderHandle::new_payload] but does not wait for the result. + pub fn send_new_payload(&self, attr: PayloadBuilderAttributes) -> PayloadId { + let id = attr.payload_id(); + let (tx, _) = oneshot::channel(); + let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx)); + id + } + /// Starts building a new payload for the given payload attributes. /// /// Returns the identifier of the payload. diff --git a/crates/payload/builder/src/test_utils.rs b/crates/payload/builder/src/test_utils.rs new file mode 100644 index 000000000..1b458c986 --- /dev/null +++ b/crates/payload/builder/src/test_utils.rs @@ -0,0 +1,66 @@ +//! Utils for testing purposes. + +use crate::{ + error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes, PayloadBuilderHandle, + PayloadBuilderService, PayloadJob, PayloadJobGenerator, +}; +use futures_core::Stream; +use reth_primitives::{Block, U256}; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +/// Creates a new [PayloadBuilderService] for testing purposes. +pub fn test_payload_service( +) -> (PayloadBuilderService, PayloadBuilderHandle) { + PayloadBuilderService::new(Default::default()) +} + +/// Creates a new [PayloadBuilderService] for testing purposes and spawns it in the background. +pub fn spawn_test_payload_service() -> PayloadBuilderHandle { + let (service, handle) = test_payload_service(); + tokio::spawn(service); + handle +} + +/// A [PayloadJobGenerator] for testing purposes +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct TestPayloadJobGenerator; + +impl PayloadJobGenerator for TestPayloadJobGenerator { + type Job = TestPayloadJob; + + fn new_payload_job( + &self, + attr: PayloadBuilderAttributes, + ) -> Result { + Ok(TestPayloadJob { attr }) + } +} + +/// A [PayloadJobGenerator] for testing purposes +#[derive(Debug)] +pub struct TestPayloadJob { + attr: PayloadBuilderAttributes, +} + +impl Stream for TestPayloadJob { + type Item = Result, PayloadBuilderError>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } +} + +impl PayloadJob for TestPayloadJob { + fn best_payload(&self) -> Arc { + Arc::new(BuiltPayload::new( + self.attr.payload_id(), + Block::default().seal_slow(), + U256::ZERO, + )) + } +}