feat(pruner, storage): prune receipts & save checkpoints to database (#3733)

Co-authored-by: joshieDo <ranriver@protonmail.com>
This commit is contained in:
Alexey Shekhirin
2023-07-24 17:39:02 +01:00
committed by GitHub
parent 0ff75b5011
commit 1ca7f3ae40
27 changed files with 433 additions and 98 deletions

6
Cargo.lock generated
View File

@ -5586,7 +5586,13 @@ dependencies = [
name = "reth-prune"
version = "0.1.0-alpha.4"
dependencies = [
"assert_matches",
"itertools",
"reth-db",
"reth-interfaces",
"reth-primitives",
"reth-provider",
"reth-stages",
"thiserror",
"tracing",
]

View File

@ -8,7 +8,7 @@ use reth_db::{cursor::DbCursorRO, init_db, tables, transaction::DbTx};
use reth_primitives::{
fs,
stage::{StageCheckpoint, StageId},
ChainSpec, PruneTargets,
ChainSpec, PruneModes,
};
use reth_provider::{ProviderFactory, StageCheckpointReader};
use reth_stages::{
@ -96,7 +96,7 @@ impl Command {
let mut execution_stage = ExecutionStage::new(
factory,
ExecutionStageThresholds { max_blocks: Some(1), max_changes: None },
PruneTargets::all(),
PruneModes::all(),
);
let mut account_hashing_stage = AccountHashingStage::default();

View File

@ -78,6 +78,7 @@ use reth_interfaces::p2p::headers::client::HeadersClient;
use reth_payload_builder::PayloadBuilderService;
use reth_primitives::DisplayHardforks;
use reth_provider::providers::BlockchainProvider;
use reth_prune::BatchSizes;
use reth_stages::stages::{
AccountHashingStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage,
StorageHashingStage, TransactionLookupStage,
@ -364,7 +365,14 @@ impl Command {
let pruner = config.prune.map(|prune_config| {
info!(target: "reth::cli", "Pruner initialized");
reth_prune::Pruner::new(prune_config.block_interval, tree_config.max_reorg_depth())
reth_prune::Pruner::new(
db.clone(),
self.chain.clone(),
prune_config.block_interval,
tree_config.max_reorg_depth(),
prune_config.parts,
BatchSizes::default(),
)
});
// Configure the consensus engine

View File

@ -2,7 +2,7 @@ use super::setup;
use crate::utils::DbTool;
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv};
use reth_primitives::{stage::StageCheckpoint, BlockNumber, ChainSpec, PruneTargets};
use reth_primitives::{stage::StageCheckpoint, BlockNumber, ChainSpec, PruneModes};
use reth_provider::ProviderFactory;
use reth_stages::{
stages::{
@ -70,7 +70,7 @@ async fn unwind_and_copy<DB: Database>(
let mut exec_stage = ExecutionStage::new(
reth_revm::Factory::new(db_tool.chain.clone()),
ExecutionStageThresholds { max_blocks: Some(u64::MAX), max_changes: None },
PruneTargets::all(),
PruneModes::all(),
);
exec_stage

View File

@ -5,7 +5,7 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_network::{NetworkConfigBuilder, PeersConfig, SessionsConfig};
use reth_primitives::PruneTargets;
use reth_primitives::PruneModes;
use secp256k1::SecretKey;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
@ -285,12 +285,12 @@ pub struct PruneConfig {
/// Minimum pruning interval measured in blocks.
pub block_interval: u64,
/// Pruning configuration for every part of the data that can be pruned.
pub parts: PruneTargets,
pub parts: PruneModes,
}
impl Default for PruneConfig {
fn default() -> Self {
Self { block_interval: 10, parts: PruneTargets::default() }
Self { block_interval: 10, parts: PruneModes::default() }
}
}

View File

@ -193,7 +193,7 @@ where
/// be used to download and execute the missing blocks.
pipeline_run_threshold: u64,
/// Controls pruning triggered by engine updates.
prune: Option<EnginePruneController>,
prune: Option<EnginePruneController<DB>>,
}
impl<DB, BT, Client> BeaconConsensusEngine<DB, BT, Client>
@ -220,7 +220,7 @@ where
payload_builder: PayloadBuilderHandle,
target: Option<H256>,
pipeline_run_threshold: u64,
pruner: Option<Pruner>,
pruner: Option<Pruner<DB>>,
) -> Result<(Self, BeaconConsensusEngineHandle), Error> {
let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel(
@ -266,7 +266,7 @@ where
pipeline_run_threshold: u64,
to_engine: UnboundedSender<BeaconEngineMessage>,
rx: UnboundedReceiver<BeaconEngineMessage>,
pruner: Option<Pruner>,
pruner: Option<Pruner<DB>>,
) -> Result<(Self, BeaconConsensusEngineHandle), Error> {
let handle = BeaconConsensusEngineHandle { to_engine };
let sync = EngineSyncController::new(
@ -1727,11 +1727,14 @@ mod tests {
test_utils::{NoopFullBlockClient, TestConsensus},
};
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{stage::StageCheckpoint, ChainSpec, ChainSpecBuilder, H256, MAINNET};
use reth_primitives::{
stage::StageCheckpoint, ChainSpec, ChainSpecBuilder, PruneModes, H256, MAINNET,
};
use reth_provider::{
providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockExecutor, BlockWriter,
ExecutorFactory, ProviderFactory, StateProvider,
};
use reth_prune::BatchSizes;
use reth_revm::Factory;
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
@ -2071,7 +2074,14 @@ mod tests {
let latest = self.chain_spec.genesis_header().seal_slow();
let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest);
let pruner = Pruner::new(5, 0);
let pruner = Pruner::new(
db.clone(),
self.chain_spec.clone(),
5,
0,
PruneModes::default(),
BatchSizes::default(),
);
let (mut engine, handle) = BeaconConsensusEngine::new(
client,

View File

@ -1,6 +1,7 @@
//! Prune management for the engine implementation.
use futures::FutureExt;
use reth_db::database::Database;
use reth_primitives::BlockNumber;
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_tasks::TaskSpawner;
@ -10,16 +11,16 @@ use tokio::sync::oneshot;
/// Manages pruning under the control of the engine.
///
/// This type controls the [Pruner].
pub(crate) struct EnginePruneController {
pub(crate) struct EnginePruneController<DB> {
/// The current state of the pruner.
pruner_state: PrunerState,
pruner_state: PrunerState<DB>,
/// The type that can spawn the pruner task.
pruner_task_spawner: Box<dyn TaskSpawner>,
}
impl EnginePruneController {
impl<DB: Database + 'static> EnginePruneController<DB> {
/// Create a new instance
pub(crate) fn new(pruner: Pruner, pruner_task_spawner: Box<dyn TaskSpawner>) -> Self {
pub(crate) fn new(pruner: Pruner<DB>, pruner_task_spawner: Box<dyn TaskSpawner>) -> Self {
Self { pruner_state: PrunerState::Idle(Some(pruner)), pruner_task_spawner }
}
@ -131,14 +132,14 @@ pub(crate) enum EnginePruneEvent {
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
enum PrunerState {
enum PrunerState<DB> {
/// Pruner is idle.
Idle(Option<Pruner>),
Idle(Option<Pruner<DB>>),
/// Pruner is running and waiting for a response
Running(oneshot::Receiver<PrunerWithResult>),
Running(oneshot::Receiver<PrunerWithResult<DB>>),
}
impl PrunerState {
impl<DB> PrunerState<DB> {
/// Returns `true` if the state matches idle.
fn is_idle(&self) -> bool {
matches!(self, PrunerState::Idle(_))

View File

@ -78,7 +78,7 @@ pub use net::{
SEPOLIA_BOOTNODES,
};
pub use peer::{PeerId, WithPeerId};
pub use prune::{PruneCheckpoint, PruneMode, PrunePart, PruneTargets};
pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef};
pub use revm_primitives::JumpMap;
pub use serde_helper::JsonU256;

View File

@ -7,7 +7,7 @@ use reth_codecs::{main_codec, Compact};
#[cfg_attr(test, derive(Default))]
pub struct PruneCheckpoint {
/// Highest pruned block number.
block_number: BlockNumber,
pub block_number: BlockNumber,
/// Prune mode.
prune_mode: PruneMode,
pub prune_mode: PruneMode,
}

View File

@ -6,4 +6,4 @@ mod target;
pub use checkpoint::PruneCheckpoint;
pub use mode::PruneMode;
pub use part::PrunePart;
pub use target::PruneTargets;
pub use target::PruneModes;

View File

@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
/// Pruning configuration for every part of the data that can be pruned.
#[derive(Debug, Clone, Default, Copy, Deserialize, Eq, PartialEq, Serialize)]
#[serde(default)]
pub struct PruneTargets {
pub struct PruneModes {
/// Sender Recovery pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
pub sender_recovery: Option<PruneMode>,
@ -26,25 +26,42 @@ pub struct PruneTargets {
pub storage_history: Option<PruneMode>,
}
macro_rules! should_prune_method {
($($config:ident),+) => {
macro_rules! impl_prune_parts {
($(($part:ident, $human_part:expr)),+) => {
$(
paste! {
#[allow(missing_docs)]
pub fn [<should_prune_ $config>](&self, block: BlockNumber, tip: BlockNumber) -> bool {
if let Some(config) = &self.$config {
return self.should_prune(config, block, tip)
#[doc = concat!(
"Check if ",
$human_part,
" should be pruned at the target block according to the provided tip."
)]
pub fn [<should_prune_ $part>](&self, block: BlockNumber, tip: BlockNumber) -> bool {
if let Some(mode) = &self.$part {
return self.should_prune(mode, block, tip)
}
false
}
}
)+
$(
paste! {
#[doc = concat!(
"Returns block up to which ",
$human_part,
" pruning needs to be done, inclusive, according to the provided tip."
)]
pub fn [<prune_to_block_ $part>](&self, tip: BlockNumber) -> Option<(BlockNumber, PruneMode)> {
self.$part.as_ref().map(|mode| (self.prune_to_block(mode, tip), *mode))
}
}
)+
/// Sets pruning to all targets.
pub fn all() -> Self {
PruneTargets {
Self {
$(
$config: Some(PruneMode::Full),
$part: Some(PruneMode::Full),
)+
}
}
@ -52,15 +69,15 @@ macro_rules! should_prune_method {
};
}
impl PruneTargets {
impl PruneModes {
/// Sets pruning to no target.
pub fn none() -> Self {
PruneTargets::default()
PruneModes::default()
}
/// Check if target block should be pruned
pub fn should_prune(&self, target: &PruneMode, block: BlockNumber, tip: BlockNumber) -> bool {
match target {
/// Check if target block should be pruned according to the provided prune mode and tip.
pub fn should_prune(&self, mode: &PruneMode, block: BlockNumber, tip: BlockNumber) -> bool {
match mode {
PruneMode::Full => true,
PruneMode::Distance(distance) => {
if *distance > tip {
@ -72,11 +89,21 @@ impl PruneTargets {
}
}
should_prune_method!(
sender_recovery,
transaction_lookup,
receipts,
account_history,
storage_history
/// Returns block up to which pruning needs to be done, inclusive, according to the provided
/// prune mode and tip.
pub fn prune_to_block(&self, mode: &PruneMode, tip: BlockNumber) -> BlockNumber {
match mode {
PruneMode::Full => tip,
PruneMode::Distance(distance) => tip.saturating_sub(*distance),
PruneMode::Before(n) => *n,
}
}
impl_prune_parts!(
(sender_recovery, "Sender Recovery"),
(transaction_lookup, "Transaction Lookup"),
(receipts, "Receipts"),
(account_history, "Account History"),
(storage_history, "Storage History")
);
}

View File

@ -13,8 +13,20 @@ Pruning implementation
[dependencies]
# reth
reth-primitives = { workspace = true }
reth-db = { workspace = true }
reth-provider = { workspace = true }
reth-interfaces = { workspace = true }
# misc
tracing = { workspace = true }
thiserror = { workspace = true }
itertools = "0.10"
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { path = "../stages", features = ["test-utils"] }
# misc
assert_matches = "1.5.0"

View File

@ -1,4 +1,15 @@
use reth_db::DatabaseError;
use reth_provider::ProviderError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PrunerError {}
pub enum PrunerError {
#[error("An interface error occurred.")]
Interface(#[from] reth_interfaces::Error),
#[error(transparent)]
Database(#[from] DatabaseError),
#[error(transparent)]
Provider(#[from] ProviderError),
}

View File

@ -2,4 +2,4 @@ mod error;
mod pruner;
pub use error::PrunerError;
pub use pruner::{Pruner, PrunerResult, PrunerWithResult};
pub use pruner::{BatchSizes, Pruner, PrunerResult, PrunerWithResult};

View File

@ -1,17 +1,31 @@
//! Support for pruning.
use crate::PrunerError;
use reth_primitives::BlockNumber;
use tracing::debug;
use reth_db::{database::Database, tables};
use reth_primitives::{BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart};
use reth_provider::{BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointWriter};
use std::sync::Arc;
use tracing::{debug, instrument, trace};
/// Result of [Pruner::run] execution
pub type PrunerResult = Result<(), PrunerError>;
/// The pipeline type itself with the result of [Pruner::run]
pub type PrunerWithResult = (Pruner, PrunerResult);
pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);
pub struct BatchSizes {
receipts: usize,
}
impl Default for BatchSizes {
fn default() -> Self {
Self { receipts: 10000 }
}
}
/// Pruning routine. Main pruning logic happens in [Pruner::run].
pub struct Pruner {
pub struct Pruner<DB> {
provider_factory: ProviderFactory<DB>,
/// Minimum pruning interval measured in blocks. All prune parts are checked and, if needed,
/// pruned, when the chain advances by the specified number of blocks.
min_block_interval: u64,
@ -22,17 +36,39 @@ pub struct Pruner {
/// Last pruned block number. Used in conjunction with `min_block_interval` to determine
/// when the pruning needs to be initiated.
last_pruned_block_number: Option<BlockNumber>,
modes: PruneModes,
batch_sizes: BatchSizes,
}
impl Pruner {
impl<DB: Database> Pruner<DB> {
/// Creates a new [Pruner].
pub fn new(min_block_interval: u64, max_prune_depth: u64) -> Self {
Self { min_block_interval, max_prune_depth, last_pruned_block_number: None }
pub fn new(
db: DB,
chain_spec: Arc<ChainSpec>,
min_block_interval: u64,
max_prune_depth: u64,
modes: PruneModes,
batch_sizes: BatchSizes,
) -> Self {
Self {
provider_factory: ProviderFactory::new(db, chain_spec),
min_block_interval,
max_prune_depth,
last_pruned_block_number: None,
modes,
batch_sizes,
}
}
/// Run the pruner
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
// Pruning logic
let provider = self.provider_factory.provider_rw()?;
if let Some((to_block, prune_mode)) = self.modes.prune_to_block_receipts(tip_block_number) {
self.prune_receipts(&provider, to_block, prune_mode)?;
}
provider.commit()?;
self.last_pruned_block_number = Some(tip_block_number);
Ok(())
@ -58,15 +94,62 @@ impl Pruner {
false
}
}
/// Prune receipts up to the provided block, inclusive.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
fn prune_receipts(
&self,
provider: &DatabaseProviderRW<'_, DB>,
to_block: BlockNumber,
prune_mode: PruneMode,
) -> PrunerResult {
let to_block_body = match provider.block_body_indices(to_block)? {
Some(body) => body,
None => {
trace!(target: "pruner", "No receipts to prune");
return Ok(())
}
};
provider.prune_table_in_batches::<tables::Receipts, _, _>(
..=to_block_body.last_tx_num(),
self.batch_sizes.receipts,
|receipts| {
trace!(
target: "pruner",
%receipts,
"Pruned receipts"
);
},
)?;
provider.save_prune_checkpoint(
PrunePart::Receipts,
PruneCheckpoint { block_number: to_block, prune_mode },
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::Pruner;
use crate::{pruner::BatchSizes, Pruner};
use assert_matches::assert_matches;
use reth_db::{tables, test_utils::create_test_rw_db};
use reth_interfaces::test_utils::{
generators,
generators::{random_block_range, random_receipt},
};
use reth_primitives::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
#[test]
fn pruner_is_pruning_needed() {
let pruner = Pruner::new(5, 0);
fn is_pruning_needed() {
let db = create_test_rw_db();
let pruner =
Pruner::new(db, MAINNET.clone(), 5, 0, PruneModes::default(), BatchSizes::default());
// No last pruned block number was set before
let first_block_number = 1;
@ -80,4 +163,61 @@ mod tests {
let third_block_number = second_block_number;
assert!(pruner.is_pruning_needed(third_block_number));
}
#[test]
fn prune_receipts() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let mut receipts = Vec::new();
for block in &blocks {
for transaction in &block.body {
receipts
.push((receipts.len() as u64, random_receipt(&mut rng, transaction, Some(0))));
}
}
tx.insert_receipts(receipts).expect("insert receipts");
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::Receipts>().unwrap().len()
);
let prune_to_block = 10;
let prune_mode = PruneMode::Before(prune_to_block);
let pruner = Pruner::new(
tx.inner_raw(),
MAINNET.clone(),
5,
0,
PruneModes { receipts: Some(prune_mode), ..Default::default() },
BatchSizes {
// Less than total amount of blocks to prune to test the batching logic
receipts: 10,
},
);
let provider = tx.inner_rw();
assert_matches!(pruner.prune_receipts(&provider, prune_to_block, prune_mode), Ok(()));
provider.commit().expect("commit");
assert_eq!(
tx.table::<tables::Receipts>().unwrap().len(),
blocks[prune_to_block as usize + 1..]
.iter()
.map(|block| block.body.len())
.sum::<usize>()
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(),
Some(PruneCheckpoint { block_number: prune_to_block, prune_mode })
);
}
}

View File

@ -70,7 +70,7 @@ criterion = { version = "0.5", features = ["async_futures"] }
serde_json = { workspace = true }
[features]
test-utils = []
test-utils = ["reth-interfaces/test-utils"]
[[bench]]
name = "criterion"

View File

@ -14,7 +14,7 @@ use reth_primitives::{
stage::{
CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, StageCheckpoint, StageId,
},
BlockNumber, Header, PruneTargets, U256,
BlockNumber, Header, PruneModes, U256,
};
use reth_provider::{
post_state::PostState, BlockExecutor, BlockReader, DatabaseProviderRW, ExecutorFactory,
@ -60,7 +60,7 @@ pub struct ExecutionStage<EF: ExecutorFactory> {
/// The commit thresholds of the execution stage.
thresholds: ExecutionStageThresholds,
/// Pruning configuration.
prune_targets: PruneTargets,
prune_targets: PruneModes,
}
impl<EF: ExecutorFactory> ExecutionStage<EF> {
@ -68,7 +68,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
pub fn new(
executor_factory: EF,
thresholds: ExecutionStageThresholds,
prune_targets: PruneTargets,
prune_targets: PruneModes,
) -> Self {
Self { metrics_tx: None, executor_factory, thresholds, prune_targets }
}
@ -77,7 +77,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
///
/// The commit threshold will be set to 10_000.
pub fn new_with_factory(executor_factory: EF) -> Self {
Self::new(executor_factory, ExecutionStageThresholds::default(), PruneTargets::default())
Self::new(executor_factory, ExecutionStageThresholds::default(), PruneModes::default())
}
/// Set the metric events sender.
@ -425,7 +425,7 @@ mod tests {
use reth_db::{models::AccountBeforeTx, test_utils::create_test_rw_db};
use reth_primitives::{
hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode,
ChainSpecBuilder, PruneMode, PruneTargets, SealedBlock, StorageEntry, H160, H256, MAINNET,
ChainSpecBuilder, PruneMode, PruneModes, SealedBlock, StorageEntry, H160, H256, MAINNET,
U256,
};
use reth_provider::{AccountReader, BlockWriter, ProviderFactory, ReceiptProvider};
@ -439,7 +439,7 @@ mod tests {
ExecutionStage::new(
factory,
ExecutionStageThresholds { max_blocks: Some(100), max_changes: None },
PruneTargets::none(),
PruneModes::none(),
)
}
@ -943,7 +943,7 @@ mod tests {
provider.commit().unwrap();
let check_pruning = |factory: Arc<ProviderFactory<_>>,
prune_targets: PruneTargets,
prune_targets: PruneModes,
expect_num_receipts: usize| async move {
let provider = factory.provider_rw().unwrap();
@ -960,7 +960,7 @@ mod tests {
);
};
let mut prune = PruneTargets::none();
let mut prune = PruneModes::none();
check_pruning(factory.clone(), prune, 1).await;

View File

@ -10,8 +10,8 @@ use reth_db::{
DatabaseEnv, DatabaseError as DbError,
};
use reth_primitives::{
keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256,
MAINNET, U256,
keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader, StorageEntry,
TxNumber, H256, MAINNET, U256,
};
use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, ProviderFactory};
use std::{
@ -268,6 +268,19 @@ impl TestTransaction {
})
}
/// Insert collection of ([TxNumber], [Receipt]) into the corresponding table.
pub fn insert_receipts<I>(&self, receipts: I) -> Result<(), DbError>
where
I: IntoIterator<Item = (TxNumber, Receipt)>,
{
self.commit(|tx| {
receipts.into_iter().try_for_each(|(tx_num, receipt)| {
// Insert into receipts table.
tx.put::<tables::Receipts>(tx_num, receipt)
})
})
}
/// Insert collection of ([Address], [Account]) into corresponding tables.
pub fn insert_accounts_and_storages<I, S>(&self, accounts: I) -> Result<(), DbError>
where

View File

@ -51,7 +51,7 @@ pub enum TableType {
}
/// Number of tables that should be present inside database.
pub const NUM_TABLES: usize = 25;
pub const NUM_TABLES: usize = 26;
/// The general purpose of this is to use with a combination of Tables enum,
/// by implementing a `TableViewer` trait you can operate on db tables in an abstract way.
@ -183,7 +183,8 @@ tables!([
(StoragesTrie, TableType::DupSort),
(TxSenders, TableType::Table),
(SyncStage, TableType::Table),
(SyncStageProgress, TableType::Table)
(SyncStageProgress, TableType::Table),
(PruneCheckpoints, TableType::Table)
]);
#[macro_export]
@ -417,7 +418,7 @@ table!(
table!(
/// Stores the highest pruned block number and prune mode of each prune part.
( PruneParts ) PrunePart | PruneCheckpoint
( PruneCheckpoints ) PrunePart | PruneCheckpoint
);
/// Alias Types
@ -459,6 +460,7 @@ mod tests {
(TableType::Table, TxSenders::const_name()),
(TableType::Table, SyncStage::const_name()),
(TableType::Table, SyncStageProgress::const_name()),
(TableType::Table, PruneCheckpoints::const_name()),
];
#[test]

View File

@ -26,10 +26,10 @@ pub use traits::{
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions,
ChainSpecProvider, ChangeSetReader, EvmEnvProvider, ExecutorFactory, HashingWriter,
HeaderProvider, HistoryWriter, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt,
StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox,
StateProviderFactory, StateRootProvider, StorageReader, TransactionsProvider,
WithdrawalsProvider,
HeaderProvider, HistoryWriter, PostStateDataProvider, PruneCheckpointReader,
PruneCheckpointWriter, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader,
StageCheckpointWriter, StateProvider, StateProviderBox, StateProviderFactory,
StateRootProvider, StorageReader, TransactionsProvider, WithdrawalsProvider,
};
/// Provider trait implementations.

View File

@ -8,7 +8,7 @@ use reth_db::{
};
use reth_primitives::{
bloom::logs_bloom, keccak256, proofs::calculate_receipt_root_ref, Account, Address,
BlockNumber, Bloom, Bytecode, Log, PruneMode, PruneTargets, Receipt, StorageEntry, H256, U256,
BlockNumber, Bloom, Bytecode, Log, PruneMode, PruneModes, Receipt, StorageEntry, H256, U256,
};
use reth_trie::{
hashed_cursor::{HashedPostState, HashedPostStateCursorFactory, HashedStorage},
@ -79,7 +79,7 @@ pub struct PostState {
/// The receipt(s) of the executed transaction(s).
receipts: BTreeMap<BlockNumber, Vec<Receipt>>,
/// Pruning configuration.
prune_targets: PruneTargets,
prune_targets: PruneModes,
}
impl PostState {
@ -94,7 +94,7 @@ impl PostState {
}
/// Add a pruning configuration.
pub fn add_prune_targets(&mut self, prune_targets: PruneTargets) {
pub fn add_prune_targets(&mut self, prune_targets: PruneModes) {
self.prune_targets = prune_targets;
}

View File

@ -2,16 +2,17 @@ use crate::{
providers::state::{historical::HistoricalStateProvider, latest::LatestStateProvider},
traits::{BlockSource, ReceiptProvider},
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, EvmEnvProvider,
HeaderProvider, ProviderError, StageCheckpointReader, StateProviderBox, TransactionsProvider,
WithdrawalsProvider,
HeaderProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProviderBox,
TransactionsProvider, WithdrawalsProvider,
};
use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv};
use reth_interfaces::Result;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, ChainInfo,
ChainSpec, Header, Receipt, SealedBlock, SealedHeader, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, H256, U256,
ChainSpec, Header, PruneCheckpoint, PrunePart, Receipt, SealedBlock, SealedHeader,
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal,
H256, U256,
};
use reth_revm_primitives::primitives::{BlockEnv, CfgEnv};
use std::{ops::RangeBounds, sync::Arc};
@ -360,6 +361,12 @@ where
}
}
impl<DB: Database> PruneCheckpointReader for ProviderFactory<DB> {
fn get_prune_checkpoint(&self, part: PrunePart) -> Result<Option<PruneCheckpoint>> {
self.provider()?.get_prune_checkpoint(part)
}
}
#[cfg(test)]
mod tests {
use super::ProviderFactory;

View File

@ -5,7 +5,8 @@ use crate::{
},
AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter,
EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError,
StageCheckpointReader, StorageReader, TransactionsProvider, WithdrawalsProvider,
PruneCheckpointReader, PruneCheckpointWriter, StageCheckpointReader, StorageReader,
TransactionsProvider, WithdrawalsProvider,
};
use itertools::{izip, Itertools};
use reth_db::{
@ -16,7 +17,7 @@ use reth_db::{
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
},
table::Table,
table::{Key, Table},
tables,
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
@ -26,9 +27,10 @@ use reth_primitives::{
keccak256,
stage::{StageCheckpoint, StageId},
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders,
ChainInfo, ChainSpec, Hardfork, Head, Header, Receipt, SealedBlock, SealedBlockWithSenders,
SealedHeader, StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered,
TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, H256, U256,
ChainInfo, ChainSpec, Hardfork, Head, Header, PruneCheckpoint, PrunePart, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, StorageEntry, TransactionMeta, TransactionSigned,
TransactionSignedEcRecovered, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, H256,
U256,
};
use reth_revm_primitives::{
config::revm_spec,
@ -617,6 +619,54 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(())
}
/// Prune the table for the specified key range.
/// Returns number of rows pruned.
pub fn prune_table<T, K>(
&self,
range: impl RangeBounds<K>,
) -> std::result::Result<usize, DatabaseError>
where
T: Table<Key = K>,
K: Key,
{
self.prune_table_in_batches::<T, K, _>(range, usize::MAX, |_| {})
}
/// Prune the table for the specified key range calling `chunk_callback` after every
/// `batch_size` pruned rows.
///
/// Returns number of rows pruned.
pub fn prune_table_in_batches<T, K, F>(
&self,
range: impl RangeBounds<K>,
batch_size: usize,
batch_callback: F,
) -> std::result::Result<usize, DatabaseError>
where
T: Table<Key = K>,
K: Key,
F: Fn(usize),
{
let mut cursor = self.tx.cursor_write::<T>()?;
let mut walker = cursor.walk_range(range)?;
let mut deleted = 0;
while let Some(Ok(_)) = walker.next() {
walker.delete_current()?;
deleted += 1;
if deleted % batch_size == 0 {
batch_callback(batch_size);
}
}
if deleted % batch_size != 0 {
batch_callback(deleted % batch_size);
}
Ok(deleted)
}
/// Load shard and remove it. If list is empty, last shard was full or
/// there are no shards at all.
fn take_shard<T>(&self, key: T::Key) -> Result<Vec<u64>>
@ -1816,3 +1866,15 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> BlockWriter for DatabaseProvider<'
Ok(())
}
}
impl<'this, TX: DbTx<'this>> PruneCheckpointReader for DatabaseProvider<'this, TX> {
fn get_prune_checkpoint(&self, part: PrunePart) -> Result<Option<PruneCheckpoint>> {
Ok(self.tx.get::<tables::PruneCheckpoints>(part)?)
}
}
impl<'this, TX: DbTxMut<'this>> PruneCheckpointWriter for DatabaseProvider<'this, TX> {
fn save_prune_checkpoint(&self, part: PrunePart, checkpoint: PruneCheckpoint) -> Result<()> {
Ok(self.tx.put::<tables::PruneCheckpoints>(part, checkpoint)?)
}
}

View File

@ -2,9 +2,9 @@ use crate::{
BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider,
PostStateDataProvider, ProviderError, ReceiptProvider, ReceiptProviderIdExt,
StageCheckpointReader, StateProviderBox, StateProviderFactory, TransactionsProvider,
WithdrawalsProvider,
PostStateDataProvider, ProviderError, PruneCheckpointReader, ReceiptProvider,
ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox, StateProviderFactory,
TransactionsProvider, WithdrawalsProvider,
};
use reth_db::{database::Database, models::StoredBlockBodyIndices};
use reth_interfaces::{
@ -15,8 +15,8 @@ use reth_interfaces::{
use reth_primitives::{
stage::{StageCheckpoint, StageId},
Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumber,
BlockNumberOrTag, BlockWithSenders, ChainInfo, ChainSpec, Header, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, TransactionMeta, TransactionSigned,
BlockNumberOrTag, BlockWithSenders, ChainInfo, ChainSpec, Header, PruneCheckpoint, PrunePart,
Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, H256, U256,
};
use reth_revm_primitives::primitives::{BlockEnv, CfgEnv};
@ -438,6 +438,16 @@ where
}
}
impl<DB, Tree> PruneCheckpointReader for BlockchainProvider<DB, Tree>
where
DB: Database,
Tree: Send + Sync,
{
fn get_prune_checkpoint(&self, part: PrunePart) -> Result<Option<PruneCheckpoint>> {
self.database.provider()?.get_prune_checkpoint(part)
}
}
impl<DB, Tree> ChainSpecProvider for BlockchainProvider<DB, Tree>
where
DB: Send + Sync,

View File

@ -2,17 +2,18 @@ use crate::{
traits::{BlockSource, ReceiptProvider},
AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider, PostState,
ReceiptProviderIdExt, StageCheckpointReader, StateProvider, StateProviderBox,
StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider,
PruneCheckpointReader, ReceiptProviderIdExt, StageCheckpointReader, StateProvider,
StateProviderBox, StateProviderFactory, StateRootProvider, TransactionsProvider,
WithdrawalsProvider,
};
use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_interfaces::Result;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber, Bytecode, Bytes,
ChainInfo, ChainSpec, Header, Receipt, SealedBlock, SealedHeader, StorageKey, StorageValue,
TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, H256,
KECCAK_EMPTY, MAINNET, U256,
ChainInfo, ChainSpec, Header, PruneCheckpoint, PrunePart, Receipt, SealedBlock, SealedHeader,
StorageKey, StorageValue, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash,
TxNumber, H256, KECCAK_EMPTY, MAINNET, U256,
};
use reth_revm_primitives::primitives::{BlockEnv, CfgEnv};
use std::{ops::RangeBounds, sync::Arc};
@ -360,3 +361,9 @@ impl WithdrawalsProvider for NoopProvider {
Ok(None)
}
}
impl PruneCheckpointReader for NoopProvider {
fn get_prune_checkpoint(&self, _part: PrunePart) -> Result<Option<PruneCheckpoint>> {
Ok(None)
}
}

View File

@ -59,3 +59,6 @@ pub use hashing::HashingWriter;
mod history;
pub use history::HistoryWriter;
mod prune_checkpoint;
pub use prune_checkpoint::{PruneCheckpointReader, PruneCheckpointWriter};

View File

@ -0,0 +1,16 @@
use reth_interfaces::Result;
use reth_primitives::{PruneCheckpoint, PrunePart};
/// The trait for fetching prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait PruneCheckpointReader: Send + Sync {
/// Fetch the checkpoint for the given prune part.
fn get_prune_checkpoint(&self, part: PrunePart) -> Result<Option<PruneCheckpoint>>;
}
/// The trait for updating prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait PruneCheckpointWriter: Send + Sync {
/// Save prune checkpoint.
fn save_prune_checkpoint(&self, part: PrunePart, checkpoint: PruneCheckpoint) -> Result<()>;
}