chore(engine): remove db attribute (#2917)

This commit is contained in:
Roman Krasiuk
2023-05-30 16:44:41 +03:00
committed by GitHub
parent 3347da091f
commit ed209f6cb4
4 changed files with 51 additions and 41 deletions

View File

@ -3,7 +3,7 @@ use crate::{
sync::{EngineSyncController, EngineSyncEvent},
};
use futures::{Future, StreamExt, TryFutureExt};
use reth_db::{database::Database, tables, transaction::DbTx};
use reth_db::database::Database;
use reth_interfaces::{
blockchain_tree::{
error::{InsertBlockError, InsertBlockErrorKind},
@ -21,7 +21,7 @@ use reth_primitives::{
H256, U256,
};
use reth_provider::{
providers::get_stage_checkpoint, BlockProvider, BlockSource, CanonChainTracker, ProviderError,
BlockProvider, BlockSource, CanonChainTracker, ProviderError, StageCheckpointProvider,
};
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
@ -151,10 +151,8 @@ pub struct BeaconConsensusEngine<DB, BT, Client>
where
DB: Database,
Client: HeadersClient + BodiesClient,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + StageCheckpointProvider,
{
/// The database handle.
db: DB,
/// Controls syncing triggered by engine updates.
sync: EngineSyncController<DB, Client>,
/// The type we can use to query both the database and the blockchain tree.
@ -182,13 +180,16 @@ where
impl<DB, BT, Client> BeaconConsensusEngine<DB, BT, Client>
where
DB: Database + Unpin + 'static,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + 'static,
BT: BlockchainTreeEngine
+ BlockProvider
+ CanonChainTracker
+ StageCheckpointProvider
+ 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// Create a new instance of the [BeaconConsensusEngine].
#[allow(clippy::too_many_arguments)]
pub fn new(
db: DB,
client: Client,
pipeline: Pipeline<DB>,
blockchain: BT,
@ -201,7 +202,6 @@ where
) -> (Self, BeaconConsensusEngineHandle) {
let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel(
db,
client,
pipeline,
blockchain,
@ -220,7 +220,6 @@ where
/// the [BeaconEngineMessage] communication channel.
#[allow(clippy::too_many_arguments)]
pub fn with_channel(
db: DB,
client: Client,
pipeline: Pipeline<DB>,
blockchain: BT,
@ -242,7 +241,6 @@ where
max_block,
);
let mut this = Self {
db,
sync,
blockchain,
sync_state_updater,
@ -310,11 +308,6 @@ where
}
}
/// Loads the header for the given `block_number` from the database.
fn load_header(&self, block_number: u64) -> Result<Option<Header>, Error> {
Ok(self.db.view(|tx| tx.get::<tables::Headers>(block_number))??)
}
/// Checks if the given `head` points to an invalid header, which requires a specific response
/// to a forkchoice update.
fn check_invalid_ancestor(&mut self, head: H256) -> Option<PayloadStatus> {
@ -327,7 +320,7 @@ where
// Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
// PoW block, which we need to identify by looking at the parent's block difficulty
if let Ok(Some(parent)) = self.load_header(parent_number) {
if let Ok(Some(parent)) = self.blockchain.header_by_number(parent_number) {
if parent.difficulty != U256::ZERO {
latest_valid_hash = H256::zero();
}
@ -407,10 +400,11 @@ where
let header = outcome.into_header();
debug!(target: "consensus::engine", hash=?state.head_block_hash, number=header.number, "canonicalized new head");
let pipeline_min_progress =
get_stage_checkpoint(&self.db.tx()?, StageId::Finish)?
.unwrap_or_default()
.block_number;
let pipeline_min_progress = self
.blockchain
.get_stage_checkpoint(StageId::Finish)?
.unwrap_or_default()
.block_number;
if pipeline_min_progress < header.number {
debug!(target: "consensus::engine", last_finished=pipeline_min_progress, head_number=header.number, "pipeline run to head required");
@ -554,7 +548,7 @@ where
// find the appropriate target to sync to, if we don't have the safe block hash then we
// start syncing to the safe block via pipeline first
let target = if !state.safe_block_hash.is_zero() &&
self.get_block_number(state.safe_block_hash).ok().flatten().is_none()
self.blockchain.block_number(state.safe_block_hash).ok().flatten().is_none()
{
state.safe_block_hash
} else {
@ -788,15 +782,13 @@ where
&mut self,
state: ForkchoiceState,
) -> Result<(), reth_interfaces::Error> {
let needs_pipeline_run = match self.get_block_number(state.finalized_block_hash)? {
let needs_pipeline_run = match self.blockchain.block_number(state.finalized_block_hash)? {
Some(number) => {
// Attempt to restore the tree.
self.blockchain.restore_canonical_hashes(number)?;
// After restoring the tree, check if the head block is missing.
self.db
.view(|tx| tx.get::<tables::HeaderNumbers>(state.head_block_hash))??
.is_none()
self.blockchain.header(&state.head_block_hash)?.is_none()
}
None => true,
};
@ -807,11 +799,6 @@ where
Ok(())
}
/// Retrieve the block number for the given block hash.
fn get_block_number(&self, hash: H256) -> Result<Option<BlockNumber>, reth_interfaces::Error> {
Ok(self.db.view(|tx| tx.get::<tables::HeaderNumbers>(hash))??)
}
/// Event handler for events emitted by the [EngineSyncController].
///
/// This returns a result to indicate whether the engine future should resolve (fatal error).
@ -924,7 +911,12 @@ impl<DB, BT, Client> Future for BeaconConsensusEngine<DB, BT, Client>
where
DB: Database + Unpin + 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + Unpin + 'static,
BT: BlockchainTreeEngine
+ BlockProvider
+ CanonChainTracker
+ StageCheckpointProvider
+ Unpin
+ 'static,
{
type Output = Result<(), BeaconConsensusEngineError>;
@ -1116,7 +1108,6 @@ mod tests {
let latest = chain_spec.genesis_header().seal_slow();
let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest);
let (engine, handle) = BeaconConsensusEngine::new(
db.clone(),
NoopFullBlockClient::default(),
pipeline,
blockchain_provider,
@ -1287,7 +1278,7 @@ mod tests {
mod fork_choice_updated {
use super::*;
use reth_db::transaction::DbTxMut;
use reth_db::{tables, transaction::DbTxMut};
use reth_interfaces::test_utils::generators::random_block;
use reth_rpc_types::engine::ForkchoiceUpdateError;