chore: move stage methods to StageCheckpointProvider and add StageCheckpointWriter (#3195)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
joshieDo
2023-06-17 12:06:25 +01:00
committed by GitHub
parent e252cd6a2f
commit 017c9cea9c
14 changed files with 67 additions and 53 deletions

View File

@ -21,7 +21,7 @@ use reth_primitives::{
SealedHeader, H256, U256,
};
use reth_provider::{
BlockProvider, BlockSource, CanonChainTracker, ProviderError, StageCheckpointProvider,
BlockProvider, BlockSource, CanonChainTracker, ProviderError, StageCheckpointReader,
};
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
@ -210,7 +210,7 @@ pub struct BeaconConsensusEngine<DB, BT, Client>
where
DB: Database,
Client: HeadersClient + BodiesClient,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + StageCheckpointProvider,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + StageCheckpointReader,
{
/// Controls syncing triggered by engine updates.
sync: EngineSyncController<DB, Client>,
@ -238,11 +238,7 @@ where
impl<DB, BT, Client> BeaconConsensusEngine<DB, BT, Client>
where
DB: Database + Unpin + 'static,
BT: BlockchainTreeEngine
+ BlockProvider
+ CanonChainTracker
+ StageCheckpointProvider
+ 'static,
BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + StageCheckpointReader + 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// Create a new instance of the [BeaconConsensusEngine].
@ -1235,7 +1231,7 @@ where
BT: BlockchainTreeEngine
+ BlockProvider
+ CanonChainTracker
+ StageCheckpointProvider
+ StageCheckpointReader
+ Unpin
+ 'static,
{

View File

@ -6,7 +6,7 @@ use reth_primitives::{
constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, listener::EventListeners, stage::StageId,
BlockNumber, ChainSpec, H256,
};
use reth_provider::{ProviderFactory, StageCheckpointProvider};
use reth_provider::{ProviderFactory, StageCheckpointReader, StageCheckpointWriter};
use std::{pin::Pin, sync::Arc};
use tokio::sync::watch;
use tokio_stream::wrappers::UnboundedReceiverStream;

View File

@ -12,7 +12,9 @@ use reth_primitives::{
trie::StoredSubNode,
BlockNumber, SealedHeader, H256,
};
use reth_provider::{DatabaseProviderRW, HeaderProvider, ProviderError};
use reth_provider::{
DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
};
use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress};
use std::fmt::Debug;
use tracing::*;

View File

@ -16,8 +16,8 @@ pub use traits::{
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider,
ExecutorFactory, HeaderProvider, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt,
StageCheckpointProvider, StateProvider, StateProviderBox, StateProviderFactory,
StateRootProvider, TransactionsProvider, WithdrawalsProvider,
StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox,
StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider,
};
/// Provider trait implementations.

View File

@ -2,7 +2,7 @@ use crate::{
providers::state::{historical::HistoricalStateProvider, latest::LatestStateProvider},
traits::{BlockSource, ReceiptProvider},
BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider, HeaderProvider,
ProviderError, StageCheckpointProvider, StateProviderBox, TransactionsProvider,
ProviderError, StageCheckpointReader, StateProviderBox, TransactionsProvider,
WithdrawalsProvider,
};
use reth_db::{database::Database, models::StoredBlockBodyIndices};
@ -280,10 +280,14 @@ impl<DB: Database> WithdrawalsProvider for ProviderFactory<DB> {
}
}
impl<DB: Database> StageCheckpointProvider for ProviderFactory<DB> {
impl<DB: Database> StageCheckpointReader for ProviderFactory<DB> {
fn get_stage_checkpoint(&self, id: StageId) -> Result<Option<StageCheckpoint>> {
self.provider()?.get_stage_checkpoint(id)
}
fn get_stage_checkpoint_progress(&self, id: StageId) -> Result<Option<Vec<u8>>> {
self.provider()?.get_stage_checkpoint_progress(id)
}
}
impl<DB: Database> EvmEnvProvider for ProviderFactory<DB> {

View File

@ -1,9 +1,9 @@
use crate::{
insert_canonical_block,
post_state::StorageChangeset,
traits::{AccountExtProvider, BlockSource, ReceiptProvider},
traits::{AccountExtProvider, BlockSource, ReceiptProvider, StageCheckpointWriter},
AccountProvider, BlockHashProvider, BlockNumProvider, BlockProvider, EvmEnvProvider,
HeaderProvider, PostState, ProviderError, StageCheckpointProvider, TransactionError,
HeaderProvider, PostState, ProviderError, StageCheckpointReader, TransactionError,
TransactionsProvider, WithdrawalsProvider,
};
use itertools::{izip, Itertools};
@ -1072,32 +1072,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(())
}
/// Save stage checkpoint.
pub fn save_stage_checkpoint(
&self,
id: StageId,
checkpoint: StageCheckpoint,
) -> std::result::Result<(), DatabaseError> {
self.tx.put::<tables::SyncStage>(id.to_string(), checkpoint)
}
/// Get stage checkpoint progress.
pub fn get_stage_checkpoint_progress(
&self,
id: StageId,
) -> std::result::Result<Option<Vec<u8>>, DatabaseError> {
self.tx.get::<tables::SyncStageProgress>(id.to_string())
}
/// Save stage checkpoint progress.
pub fn save_stage_checkpoint_progress(
&self,
id: StageId,
checkpoint: Vec<u8>,
) -> std::result::Result<(), DatabaseError> {
self.tx.put::<tables::SyncStageProgress>(id.to_string(), checkpoint)
}
/// Query the block body by number.
pub fn block_body_indices(
&self,
@ -1865,8 +1839,25 @@ impl<'this, TX: DbTx<'this>> EvmEnvProvider for DatabaseProvider<'this, TX> {
}
}
impl<'this, TX: DbTx<'this>> StageCheckpointProvider for DatabaseProvider<'this, TX> {
impl<'this, TX: DbTx<'this>> StageCheckpointReader for DatabaseProvider<'this, TX> {
fn get_stage_checkpoint(&self, id: StageId) -> Result<Option<StageCheckpoint>> {
Ok(self.tx.get::<tables::SyncStage>(id.to_string())?)
}
/// Get stage checkpoint progress.
fn get_stage_checkpoint_progress(&self, id: StageId) -> Result<Option<Vec<u8>>> {
Ok(self.tx.get::<tables::SyncStageProgress>(id.to_string())?)
}
}
impl<'this, TX: DbTxMut<'this>> StageCheckpointWriter for DatabaseProvider<'this, TX> {
/// Save stage checkpoint progress.
fn save_stage_checkpoint_progress(&self, id: StageId, checkpoint: Vec<u8>) -> Result<()> {
Ok(self.tx.put::<tables::SyncStageProgress>(id.to_string(), checkpoint)?)
}
/// Save stage checkpoint.
fn save_stage_checkpoint(&self, id: StageId, checkpoint: StageCheckpoint) -> Result<()> {
Ok(self.tx.put::<tables::SyncStage>(id.to_string(), checkpoint)?)
}
}

View File

@ -2,7 +2,7 @@ use crate::{
BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider, BlockProviderIdExt,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, PostStateDataProvider, ProviderError,
ReceiptProvider, StageCheckpointProvider, StateProviderBox, StateProviderFactory,
ReceiptProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory,
TransactionsProvider, WithdrawalsProvider,
};
use reth_db::{database::Database, models::StoredBlockBodyIndices};
@ -344,7 +344,7 @@ where
}
}
impl<DB, Tree> StageCheckpointProvider for BlockchainProvider<DB, Tree>
impl<DB, Tree> StageCheckpointReader for BlockchainProvider<DB, Tree>
where
DB: Database,
Tree: Send + Sync,
@ -352,6 +352,10 @@ where
fn get_stage_checkpoint(&self, id: StageId) -> Result<Option<StageCheckpoint>> {
self.database.provider()?.get_stage_checkpoint(id)
}
fn get_stage_checkpoint_progress(&self, id: StageId) -> Result<Option<Vec<u8>>> {
self.database.provider()?.get_stage_checkpoint_progress(id)
}
}
impl<DB, Tree> EvmEnvProvider for BlockchainProvider<DB, Tree>

View File

@ -1,7 +1,7 @@
use crate::{
traits::{BlockSource, ReceiptProvider},
AccountProvider, BlockHashProvider, BlockIdProvider, BlockNumProvider, BlockProvider,
BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, StageCheckpointProvider,
BlockProviderIdExt, EvmEnvProvider, HeaderProvider, PostState, StageCheckpointReader,
StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider,
WithdrawalsProvider,
};
@ -311,10 +311,14 @@ impl StateProviderFactory for NoopProvider {
}
}
impl StageCheckpointProvider for NoopProvider {
impl StageCheckpointReader for NoopProvider {
fn get_stage_checkpoint(&self, _id: StageId) -> Result<Option<StageCheckpoint>> {
Ok(None)
}
fn get_stage_checkpoint_progress(&self, _id: StageId) -> Result<Option<Vec<u8>>> {
Ok(None)
}
}
impl WithdrawalsProvider for NoopProvider {

View File

@ -46,4 +46,4 @@ pub use chain::{
};
mod stage_checkpoint;
pub use stage_checkpoint::StageCheckpointProvider;
pub use stage_checkpoint::{StageCheckpointReader, StageCheckpointWriter};

View File

@ -3,7 +3,20 @@ use reth_primitives::stage::{StageCheckpoint, StageId};
/// The trait for fetching stage checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait StageCheckpointProvider: Send + Sync {
pub trait StageCheckpointReader: Send + Sync {
/// Fetch the checkpoint for the given stage.
fn get_stage_checkpoint(&self, id: StageId) -> Result<Option<StageCheckpoint>>;
/// Get stage checkpoint progress.
fn get_stage_checkpoint_progress(&self, id: StageId) -> Result<Option<Vec<u8>>>;
}
/// The trait for updating stage checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait StageCheckpointWriter: Send + Sync {
/// Save stage checkpoint.
fn save_stage_checkpoint(&self, id: StageId, checkpoint: StageCheckpoint) -> Result<()>;
/// Save stage checkpoint progress.
fn save_stage_checkpoint_progress(&self, id: StageId, checkpoint: Vec<u8>) -> Result<()>;
}