feat: integrate NodeTypesWithDB (#10698)

Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
Arsenii Kulikov
2024-09-05 19:17:28 +04:00
committed by GitHub
parent 5df03fb3c3
commit 5ecc9d2348
99 changed files with 1171 additions and 1143 deletions

View File

@ -12,13 +12,11 @@ workspace = true
[dependencies]
# reth
reth-chainspec.workspace = true
reth-ethereum-consensus.workspace = true
reth-blockchain-tree-api.workspace = true
reth-primitives.workspace = true
reth-stages-api.workspace = true
reth-errors.workspace = true
reth-db-api.workspace = true
reth-provider.workspace = true
reth-rpc-types.workspace = true
reth-tasks.workspace = true
@ -30,7 +28,7 @@ reth-static-file.workspace = true
reth-tokio-util.workspace = true
reth-engine-primitives.workspace = true
reth-network-p2p.workspace = true
reth-node-types.workspace = true
# async
tokio = { workspace = true, features = ["sync"] }
@ -55,6 +53,7 @@ reth-consensus = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-blockchain-tree = { workspace = true, features = ["test-utils"] }
reth-db = { workspace = true, features = ["test-utils"] }
reth-db-api.workspace = true
reth-provider = { workspace = true, features = ["test-utils"] }
reth-evm = { workspace = true, features = ["test-utils"] }
reth-network-p2p = { workspace = true, features = ["test-utils"] }
@ -67,6 +66,7 @@ reth-config.workspace = true
reth-testing-utils.workspace = true
reth-exex-types.workspace = true
reth-prune-types.workspace = true
reth-chainspec.workspace = true
alloy-genesis.workspace = true
assert_matches.workspace = true

View File

@ -6,14 +6,14 @@ use crate::{
};
use futures::FutureExt;
use metrics::Counter;
use reth_db_api::database::Database;
use reth_errors::{RethError, RethResult};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::BlockNumber;
use reth_provider::ProviderFactory;
use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_tasks::TaskSpawner;
use std::{
fmt,
fmt::{self, Debug},
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
@ -21,15 +21,15 @@ use tokio::sync::oneshot;
/// Manages pruning under the control of the engine.
///
/// This type controls the [Pruner].
pub struct PruneHook<DB> {
pub struct PruneHook<N: NodeTypesWithDB> {
/// The current state of the pruner.
pruner_state: PrunerState<DB>,
pruner_state: PrunerState<N>,
/// The type that can spawn the pruner task.
pruner_task_spawner: Box<dyn TaskSpawner>,
metrics: Metrics,
}
impl<DB: fmt::Debug> fmt::Debug for PruneHook<DB> {
impl<N: NodeTypesWithDB> fmt::Debug for PruneHook<N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PruneHook")
.field("pruner_state", &self.pruner_state)
@ -38,10 +38,10 @@ impl<DB: fmt::Debug> fmt::Debug for PruneHook<DB> {
}
}
impl<DB: Database + 'static> PruneHook<DB> {
impl<N: ProviderNodeTypes> PruneHook<N> {
/// Create a new instance
pub fn new(
pruner: Pruner<DB, ProviderFactory<DB>>,
pruner: Pruner<N::DB, ProviderFactory<N>>,
pruner_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self {
@ -117,7 +117,7 @@ impl<DB: Database + 'static> PruneHook<DB> {
}
}
impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
impl<N: ProviderNodeTypes> EngineHook for PruneHook<N> {
fn name(&self) -> &'static str {
"Prune"
}
@ -152,12 +152,23 @@ impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
#[derive(Debug)]
enum PrunerState<DB> {
enum PrunerState<N: NodeTypesWithDB> {
/// Pruner is idle.
Idle(Option<Pruner<DB, ProviderFactory<DB>>>),
Idle(Option<Pruner<N::DB, ProviderFactory<N>>>),
/// Pruner is running and waiting for a response
Running(oneshot::Receiver<PrunerWithResult<DB, ProviderFactory<DB>>>),
Running(oneshot::Receiver<PrunerWithResult<N::DB, ProviderFactory<N>>>),
}
impl<N> fmt::Debug for PrunerState<N>
where
N: NodeTypesWithDB<DB: Debug, ChainSpec: Debug>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Idle(f0) => f.debug_tuple("Idle").field(&f0).finish(),
Self::Running(f0) => f.debug_tuple("Running").field(&f0).finish(),
}
}
}
#[derive(reth_metrics::Metrics)]

View File

@ -5,9 +5,10 @@ use crate::{
hooks::EngineHookDBAccessLevel,
};
use futures::FutureExt;
use reth_db_api::database::Database;
use reth_errors::RethResult;
use reth_node_types::NodeTypesWithDB;
use reth_primitives::{static_file::HighestStaticFiles, BlockNumber};
use reth_provider::providers::ProviderNodeTypes;
use reth_static_file::{StaticFileProducer, StaticFileProducerWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
@ -18,17 +19,17 @@ use tracing::trace;
///
/// This type controls the [`StaticFileProducer`].
#[derive(Debug)]
pub struct StaticFileHook<DB> {
pub struct StaticFileHook<N: NodeTypesWithDB> {
/// The current state of the `static_file_producer`.
state: StaticFileProducerState<DB>,
state: StaticFileProducerState<N>,
/// The type that can spawn the `static_file_producer` task.
task_spawner: Box<dyn TaskSpawner>,
}
impl<DB: Database + 'static> StaticFileHook<DB> {
impl<N: ProviderNodeTypes> StaticFileHook<N> {
/// Create a new instance
pub fn new(
static_file_producer: StaticFileProducer<DB>,
static_file_producer: StaticFileProducer<N>,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self { state: StaticFileProducerState::Idle(Some(static_file_producer)), task_spawner }
@ -126,7 +127,7 @@ impl<DB: Database + 'static> StaticFileHook<DB> {
}
}
impl<DB: Database + 'static> EngineHook for StaticFileHook<DB> {
impl<N: ProviderNodeTypes> EngineHook for StaticFileHook<N> {
fn name(&self) -> &'static str {
"StaticFile"
}
@ -162,9 +163,9 @@ impl<DB: Database + 'static> EngineHook for StaticFileHook<DB> {
/// [`StaticFileProducerState::Idle`] means that the static file producer is currently idle.
/// [`StaticFileProducerState::Running`] means that the static file producer is currently running.
#[derive(Debug)]
enum StaticFileProducerState<DB> {
enum StaticFileProducerState<N: NodeTypesWithDB> {
/// [`StaticFileProducer`] is idle.
Idle(Option<StaticFileProducer<DB>>),
Idle(Option<StaticFileProducer<N>>),
/// [`StaticFileProducer`] is running and waiting for a response
Running(oneshot::Receiver<StaticFileProducerWithResult<DB>>),
Running(oneshot::Receiver<StaticFileProducerWithResult<N>>),
}

View File

@ -4,14 +4,13 @@ use reth_blockchain_tree_api::{
error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
};
use reth_chainspec::ChainSpec;
use reth_db_api::database::Database;
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineTypes, PayloadTypes};
use reth_errors::{BlockValidationError, ProviderResult, RethError, RethResult};
use reth_network_p2p::{
sync::{NetworkSyncUpdater, SyncState},
BlockClient,
};
use reth_node_types::NodeTypesWithDB;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes};
use reth_payload_validator::ExecutionPayloadValidator;
@ -20,8 +19,8 @@ use reth_primitives::{
B256,
};
use reth_provider::{
BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
StageCheckpointReader,
providers::ProviderNodeTypes, BlockIdReader, BlockReader, BlockSource, CanonChainTracker,
ChainSpecProvider, ProviderError, StageCheckpointReader,
};
use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
@ -168,40 +167,40 @@ type PendingForkchoiceUpdate<PayloadAttributes> =
/// If the future is polled more than once. Leads to undefined state.
#[must_use = "Future does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct BeaconConsensusEngine<DB, BT, Client, EngineT>
pub struct BeaconConsensusEngine<N, BT, Client>
where
DB: Database,
N: NodeTypesWithDB,
Client: BlockClient,
BT: BlockchainTreeEngine
+ BlockReader
+ BlockIdReader
+ CanonChainTracker
+ StageCheckpointReader,
EngineT: EngineTypes,
{
/// Controls syncing triggered by engine updates.
sync: EngineSyncController<DB, Client>,
sync: EngineSyncController<N, Client>,
/// The type we can use to query both the database and the blockchain tree.
blockchain: BT,
/// Used for emitting updates about whether the engine is syncing or not.
sync_state_updater: Box<dyn NetworkSyncUpdater>,
/// The Engine API message receiver.
engine_message_stream: BoxStream<'static, BeaconEngineMessage<EngineT>>,
engine_message_stream: BoxStream<'static, BeaconEngineMessage<N::Engine>>,
/// A clone of the handle
handle: BeaconConsensusEngineHandle<EngineT>,
handle: BeaconConsensusEngineHandle<N::Engine>,
/// Tracks the received forkchoice state updates received by the CL.
forkchoice_state_tracker: ForkchoiceStateTracker,
/// The payload store.
payload_builder: PayloadBuilderHandle<EngineT>,
payload_builder: PayloadBuilderHandle<N::Engine>,
/// Validator for execution payloads
payload_validator: ExecutionPayloadValidator,
/// Current blockchain tree action.
blockchain_tree_action: Option<BlockchainTreeAction<EngineT>>,
blockchain_tree_action: Option<BlockchainTreeAction<N::Engine>>,
/// Pending forkchoice update.
/// It is recorded if we cannot process the forkchoice update because
/// a hook with database read-write access is active.
/// This is a temporary solution to always process missed FCUs.
pending_forkchoice_update: Option<PendingForkchoiceUpdate<EngineT::PayloadAttributes>>,
pending_forkchoice_update:
Option<PendingForkchoiceUpdate<<N::Engine as PayloadTypes>::PayloadAttributes>>,
/// Tracks the header of invalid payloads that were rejected by the engine because they're
/// invalid.
invalid_headers: InvalidHeaderCache,
@ -224,33 +223,32 @@ where
metrics: EngineMetrics,
}
impl<DB, BT, Client, EngineT> BeaconConsensusEngine<DB, BT, Client, EngineT>
impl<N, BT, Client> BeaconConsensusEngine<N, BT, Client>
where
DB: Database + Unpin + 'static,
N: ProviderNodeTypes,
BT: BlockchainTreeEngine
+ BlockReader
+ BlockIdReader
+ CanonChainTracker
+ StageCheckpointReader
+ ChainSpecProvider<ChainSpec = ChainSpec>
+ ChainSpecProvider<ChainSpec = N::ChainSpec>
+ 'static,
Client: BlockClient + 'static,
EngineT: EngineTypes + Unpin,
{
/// Create a new instance of the [`BeaconConsensusEngine`].
#[allow(clippy::too_many_arguments)]
pub fn new(
client: Client,
pipeline: Pipeline<DB>,
pipeline: Pipeline<N>,
blockchain: BT,
task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>,
payload_builder: PayloadBuilderHandle<EngineT>,
payload_builder: PayloadBuilderHandle<N::Engine>,
target: Option<B256>,
pipeline_run_threshold: u64,
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> {
let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel(
client,
@ -284,18 +282,18 @@ where
#[allow(clippy::too_many_arguments)]
pub fn with_channel(
client: Client,
pipeline: Pipeline<DB>,
pipeline: Pipeline<N>,
blockchain: BT,
task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>,
payload_builder: PayloadBuilderHandle<EngineT>,
payload_builder: PayloadBuilderHandle<N::Engine>,
target: Option<B256>,
pipeline_run_threshold: u64,
to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
engine_message_stream: BoxStream<'static, BeaconEngineMessage<EngineT>>,
to_engine: UnboundedSender<BeaconEngineMessage<N::Engine>>,
engine_message_stream: BoxStream<'static, BeaconEngineMessage<N::Engine>>,
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> {
let event_sender = EventSender::default();
let handle = BeaconConsensusEngineHandle::new(to_engine, event_sender.clone());
let sync = EngineSyncController::new(
@ -349,7 +347,7 @@ where
}
/// Set the next blockchain tree action.
fn set_blockchain_tree_action(&mut self, action: BlockchainTreeAction<EngineT>) {
fn set_blockchain_tree_action(&mut self, action: BlockchainTreeAction<N::Engine>) {
let previous_action = self.blockchain_tree_action.replace(action);
debug_assert!(previous_action.is_none(), "Pre-existing action found");
}
@ -391,7 +389,7 @@ where
fn on_forkchoice_updated_make_canonical_result(
&mut self,
state: ForkchoiceState,
mut attrs: Option<EngineT::PayloadAttributes>,
mut attrs: Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
make_canonical_result: Result<CanonicalOutcome, CanonicalError>,
elapsed: Duration,
) -> Result<OnForkChoiceUpdated, CanonicalError> {
@ -455,7 +453,7 @@ where
&self,
head: &BlockNumHash,
header: &SealedHeader,
attrs: &mut Option<EngineT::PayloadAttributes>,
attrs: &mut Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
) -> bool {
// On Optimism, the proposers are allowed to reorg their own chain at will.
#[cfg(feature = "optimism")]
@ -499,7 +497,7 @@ where
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>,
attrs: Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
) {
self.metrics.forkchoice_updated_messages.increment(1);
@ -621,7 +619,7 @@ where
///
/// The [`BeaconConsensusEngineHandle`] can be used to interact with this
/// [`BeaconConsensusEngine`]
pub fn handle(&self) -> BeaconConsensusEngineHandle<EngineT> {
pub fn handle(&self) -> BeaconConsensusEngineHandle<N::Engine> {
self.handle.clone()
}
@ -1157,7 +1155,7 @@ where
/// return an error if the payload attributes are invalid.
fn process_payload_attributes(
&self,
attrs: EngineT::PayloadAttributes,
attrs: <N::Engine as PayloadTypes>::PayloadAttributes,
head: Header,
state: ForkchoiceState,
) -> OnForkChoiceUpdated {
@ -1174,7 +1172,7 @@ where
// forkchoiceState.headBlockHash and identified via buildProcessId value if
// payloadAttributes is not null and the forkchoice state has been updated successfully.
// The build process is specified in the Payload building section.
match <EngineT::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
match <<N:: Engine as PayloadTypes>::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
state.head_block_hash,
attrs,
) {
@ -1596,7 +1594,7 @@ where
/// so the state change should be handled accordingly.
fn on_blockchain_tree_action(
&mut self,
action: BlockchainTreeAction<EngineT>,
action: BlockchainTreeAction<N::Engine>,
) -> RethResult<EngineEventOutcome> {
match action {
BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx } => {
@ -1789,19 +1787,18 @@ where
/// local forkchoice state, it will launch the pipeline to sync to the head hash.
/// While the pipeline is syncing, the consensus engine will keep processing messages from the
/// receiver and forwarding them to the blockchain tree.
impl<DB, BT, Client, EngineT> Future for BeaconConsensusEngine<DB, BT, Client, EngineT>
impl<N, BT, Client> Future for BeaconConsensusEngine<N, BT, Client>
where
DB: Database + Unpin + 'static,
N: ProviderNodeTypes,
Client: BlockClient + 'static,
BT: BlockchainTreeEngine
+ BlockReader
+ BlockIdReader
+ CanonChainTracker
+ StageCheckpointReader
+ ChainSpecProvider<ChainSpec = ChainSpec>
+ ChainSpecProvider<ChainSpec = N::ChainSpec>
+ Unpin
+ 'static,
EngineT: EngineTypes + Unpin,
{
type Output = Result<(), BeaconConsensusEngineError>;
@ -2156,8 +2153,8 @@ mod tests {
assert_matches!(rx.await, Ok(Ok(())));
}
fn insert_blocks<'a, DB: Database>(
provider_factory: ProviderFactory<DB>,
fn insert_blocks<'a, N: ProviderNodeTypes>(
provider_factory: ProviderFactory<N>,
mut blocks: impl Iterator<Item = &'a SealedBlock>,
) {
let provider = provider_factory.provider_rw().unwrap();
@ -2176,10 +2173,10 @@ mod tests {
mod fork_choice_updated {
use super::*;
use generators::BlockParams;
use reth_db::{tables, test_utils::create_test_static_files_dir};
use reth_db::{tables, test_utils::create_test_static_files_dir, Database};
use reth_db_api::transaction::DbTxMut;
use reth_primitives::U256;
use reth_provider::providers::StaticFileProvider;
use reth_provider::{providers::StaticFileProvider, test_utils::MockNodeTypesWithDB};
use reth_rpc_types::engine::ForkchoiceUpdateError;
use reth_testing_utils::generators::random_block;
@ -2248,8 +2245,8 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
@ -2316,8 +2313,8 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
@ -2347,8 +2344,8 @@ mod tests {
// Insert next head immediately after sending forkchoice update
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
@ -2403,8 +2400,8 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
@ -2486,8 +2483,8 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
@ -2547,8 +2544,8 @@ mod tests {
let (_temp_dir, temp_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(temp_dir_path).unwrap(),
),
@ -2580,7 +2577,8 @@ mod tests {
use reth_db::test_utils::create_test_static_files_dir;
use reth_primitives::{EthereumHardfork, U256};
use reth_provider::{
providers::StaticFileProvider, test_utils::blocks::BlockchainTestData,
providers::StaticFileProvider,
test_utils::{blocks::BlockchainTestData, MockNodeTypesWithDB},
};
use reth_testing_utils::{generators::random_block, GenesisAllocator};
#[tokio::test]
@ -2680,8 +2678,8 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
@ -2760,8 +2758,8 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
@ -2810,8 +2808,8 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),
@ -2881,8 +2879,8 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
insert_blocks(
ProviderFactory::new(
env.db.as_ref(),
ProviderFactory::<MockNodeTypesWithDB>::new(
env.db.clone(),
chain_spec.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
),

View File

@ -5,13 +5,13 @@ use crate::{
ConsensusEngineLiveSyncProgress, EthBeaconConsensus,
};
use futures::FutureExt;
use reth_chainspec::ChainSpec;
use reth_db_api::database::Database;
use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient,
};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::{BlockNumber, SealedBlock, B256};
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventSender;
@ -31,9 +31,9 @@ use tracing::trace;
/// Caution: If the pipeline is running, this type will not emit blocks downloaded from the network
/// [`EngineSyncEvent::FetchedFullBlock`] until the pipeline is idle to prevent commits to the
/// database while the pipeline is still active.
pub(crate) struct EngineSyncController<DB, Client>
pub(crate) struct EngineSyncController<N, Client>
where
DB: Database,
N: NodeTypesWithDB,
Client: BlockClient,
{
/// A downloader that can download full blocks from the network.
@ -42,7 +42,7 @@ where
pipeline_task_spawner: Box<dyn TaskSpawner>,
/// The current state of the pipeline.
/// The pipeline is used for large ranges.
pipeline_state: PipelineState<DB>,
pipeline_state: PipelineState<N>,
/// Pending target block for the pipeline to sync
pending_pipeline_target: Option<PipelineTarget>,
/// In-flight full block requests in progress.
@ -61,18 +61,18 @@ where
metrics: EngineSyncMetrics,
}
impl<DB, Client> EngineSyncController<DB, Client>
impl<N, Client> EngineSyncController<N, Client>
where
DB: Database + 'static,
N: ProviderNodeTypes,
Client: BlockClient + 'static,
{
/// Create a new instance
pub(crate) fn new(
pipeline: Pipeline<DB>,
pipeline: Pipeline<N>,
client: Client,
pipeline_task_spawner: Box<dyn TaskSpawner>,
max_block: Option<BlockNumber>,
chain_spec: Arc<ChainSpec>,
chain_spec: Arc<N::ChainSpec>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
) -> Self {
Self {
@ -393,14 +393,14 @@ pub(crate) enum EngineSyncEvent {
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
enum PipelineState<DB: Database> {
enum PipelineState<N: NodeTypesWithDB> {
/// Pipeline is idle.
Idle(Option<Pipeline<DB>>),
Idle(Option<Pipeline<N>>),
/// Pipeline is running and waiting for a response
Running(oneshot::Receiver<PipelineWithResult<DB>>),
Running(oneshot::Receiver<PipelineWithResult<N>>),
}
impl<DB: Database> PipelineState<DB> {
impl<N: NodeTypesWithDB> PipelineState<N> {
/// Returns `true` if the state matches idle.
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))
@ -412,12 +412,12 @@ mod tests {
use super::*;
use assert_matches::assert_matches;
use futures::poll;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase};
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient};
use reth_primitives::{BlockBody, Header, SealedHeader};
use reth_provider::{
test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome,
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
ExecutionOutcome,
};
use reth_prune_types::PruneModes;
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
@ -467,12 +467,12 @@ mod tests {
}
/// Builds the pipeline.
fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<Arc<TempDatabase<DatabaseEnv>>> {
fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<MockNodeTypesWithDB> {
reth_tracing::init_test_tracing();
// Setup pipeline
let (tip_tx, _tip_rx) = watch::channel(B256::default());
let mut pipeline = Pipeline::builder()
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);
@ -514,13 +514,13 @@ mod tests {
}
/// Builds the sync controller.
fn build<DB>(
fn build<N>(
self,
pipeline: Pipeline<DB>,
pipeline: Pipeline<N>,
chain_spec: Arc<ChainSpec>,
) -> EngineSyncController<DB, Either<Client, TestFullBlockClient>>
) -> EngineSyncController<N, Either<Client, TestFullBlockClient>>
where
DB: Database + 'static,
N: ProviderNodeTypes,
Client: BlockClient + 'static,
{
let client = self

View File

@ -22,7 +22,8 @@ use reth_network_p2p::{sync::NoopSyncStateUpdater, test_utils::NoopFullBlockClie
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{BlockNumber, B256};
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
providers::BlockchainProvider,
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
ExecutionOutcome, ProviderFactory,
};
use reth_prune::Pruner;
@ -39,10 +40,9 @@ use tokio::sync::{oneshot, watch};
type DatabaseEnv = TempDatabase<DE>;
type TestBeaconConsensusEngine<Client> = BeaconConsensusEngine<
Arc<DatabaseEnv>,
BlockchainProvider<Arc<DatabaseEnv>>,
MockNodeTypesWithDB,
BlockchainProvider<MockNodeTypesWithDB>,
Arc<Either<Client, NoopFullBlockClient>>,
EthEngineTypes,
>;
#[derive(Debug)]
@ -355,7 +355,7 @@ where
// Setup pipeline
let (tip_tx, tip_rx) = watch::channel(B256::default());
let mut pipeline = match self.base_config.pipeline_config {
TestPipelineConfig::Test(outputs) => Pipeline::builder()
TestPipelineConfig::Test(outputs) => Pipeline::<MockNodeTypesWithDB>::builder()
.add_stages(TestStages::new(outputs, Default::default()))
.with_tip_sender(tip_tx),
TestPipelineConfig::Real => {
@ -367,7 +367,7 @@ where
.build(client.clone(), consensus.clone(), provider_factory.clone())
.into_task();
Pipeline::builder().add_stages(DefaultStages::new(
Pipeline::<MockNodeTypesWithDB>::builder().add_stages(DefaultStages::new(
provider_factory.clone(),
tip_rx.clone(),
Arc::clone(&consensus),