feat(stages): checkpoint downloaded headers (#2798)

This commit is contained in:
Alexey Shekhirin
2023-05-24 20:51:41 +04:00
committed by GitHub
parent b9f1819e69
commit 0b81096f8b
10 changed files with 232 additions and 66 deletions

View File

@ -4,7 +4,7 @@ use futures::Stream;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
use reth_primitives::{BlockNumber, StageCheckpoint};
use reth_primitives::StageCheckpoint;
use reth_stages::{ExecOutput, PipelineEvent, StageId};
use std::{
future::Future,
@ -22,12 +22,12 @@ struct NodeState {
/// The stage currently being executed.
current_stage: Option<StageId>,
/// The current checkpoint of the executing stage.
current_checkpoint: BlockNumber,
current_checkpoint: StageCheckpoint,
}
impl NodeState {
fn new(network: Option<NetworkHandle>) -> Self {
Self { network, current_stage: None, current_checkpoint: 0 }
Self { network, current_stage: None, current_checkpoint: StageCheckpoint::new(0) }
}
fn num_connected_peers(&self) -> usize {
@ -37,26 +37,23 @@ impl NodeState {
/// Processes an event emitted by the pipeline
fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Running { stage_id, stage_progress } => {
PipelineEvent::Running { stage_id, checkpoint } => {
let notable = self.current_stage.is_none();
self.current_stage = Some(stage_id);
self.current_checkpoint = stage_progress.unwrap_or_default();
self.current_checkpoint = checkpoint.unwrap_or_default();
if notable {
info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage");
info!(target: "reth::cli", stage = %stage_id, from = ?checkpoint, "Executing stage");
}
}
PipelineEvent::Ran {
stage_id,
result: ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done },
} => {
let notable = block_number > self.current_checkpoint;
self.current_checkpoint = block_number;
PipelineEvent::Ran { stage_id, result: ExecOutput { checkpoint, done } } => {
let notable = checkpoint.block_number > self.current_checkpoint.block_number;
self.current_checkpoint = checkpoint;
if done {
self.current_stage = None;
info!(target: "reth::cli", stage = %stage_id, checkpoint = block_number, "Stage finished executing");
info!(target: "reth::cli", stage = %stage_id, checkpoint = %checkpoint, "Stage finished executing");
} else if notable {
info!(target: "reth::cli", stage = %stage_id, checkpoint = block_number, "Stage committed progress");
info!(target: "reth::cli", stage = %stage_id, checkpoint = %checkpoint, "Stage committed progress");
}
}
_ => (),
@ -160,7 +157,7 @@ where
.current_stage
.map(|id| id.to_string())
.unwrap_or_else(|| "None".to_string());
info!(target: "reth::cli", connected_peers = this.state.num_connected_peers(), %stage, checkpoint = this.state.current_checkpoint, "Status");
info!(target: "reth::cli", connected_peers = this.state.num_connected_peers(), %stage, checkpoint = %this.state.current_checkpoint, "Status");
}
while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {

View File

@ -97,7 +97,7 @@ impl Compact for MerkleCheckpoint {
}
}
/// Saves the progress of AccountHashing
/// Saves the progress of AccountHashing stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct AccountHashingCheckpoint {
@ -109,7 +109,7 @@ pub struct AccountHashingCheckpoint {
pub to: u64,
}
/// Saves the progress of StorageHashing
/// Saves the progress of StorageHashing stage.
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct StorageHashingCheckpoint {
@ -123,6 +123,26 @@ pub struct StorageHashingCheckpoint {
pub to: u64,
}
/// Saves the progress of abstract stage iterating over or downloading entities.
#[main_codec]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct EntitiesCheckpoint {
/// Number of entities already processed.
pub processed: u64,
/// Total entities to be processed.
pub total: Option<u64>,
}
impl Display for EntitiesCheckpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(total) = self.total {
write!(f, "{:.1}%", 100.0 * self.processed as f64 / total as f64)
} else {
write!(f, "{}", self.processed)
}
}
}
/// Saves the progress of a stage.
#[main_codec]
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
@ -155,6 +175,14 @@ impl StageCheckpoint {
}
}
/// Returns the entities stage checkpoint, if any.
pub fn entities_stage_checkpoint(&self) -> Option<EntitiesCheckpoint> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::Entities(checkpoint)) => Some(checkpoint),
_ => None,
}
}
/// Sets the stage checkpoint to account hashing.
pub fn with_account_hashing_stage_checkpoint(
mut self,
@ -172,13 +200,20 @@ impl StageCheckpoint {
self.stage_checkpoint = Some(StageUnitCheckpoint::Storage(checkpoint));
self
}
/// Sets the stage checkpoint to entities.
pub fn with_entities_stage_checkpoint(mut self, checkpoint: EntitiesCheckpoint) -> Self {
self.stage_checkpoint = Some(StageUnitCheckpoint::Entities(checkpoint));
self
}
}
// TODO(alexey): ideally, we'd want to display block number + stage-specific metric (if available)
// in places like logs or traces
impl Display for StageCheckpoint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.block_number, f)
match self.stage_checkpoint {
Some(StageUnitCheckpoint::Entities(stage_checkpoint)) => stage_checkpoint.fmt(f),
_ => write!(f, "{}", self.block_number),
}
}
}
@ -194,6 +229,8 @@ pub enum StageUnitCheckpoint {
Account(AccountHashingCheckpoint),
/// Saves the progress of StorageHashing stage.
Storage(StorageHashingCheckpoint),
/// Saves the progress of abstract stage iterating over or downloading entities.
Entities(EntitiesCheckpoint),
}
impl Compact for StageUnitCheckpoint {
@ -214,6 +251,10 @@ impl Compact for StageUnitCheckpoint {
buf.put_u8(2);
1 + data.to_compact(buf)
}
StageUnitCheckpoint::Entities(data) => {
buf.put_u8(3);
1 + data.to_compact(buf)
}
}
}
@ -234,6 +275,10 @@ impl Compact for StageUnitCheckpoint {
let (data, buf) = StorageHashingCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Storage(data), buf)
}
3 => {
let (data, buf) = EntitiesCheckpoint::from_compact(&buf[1..], buf.len() - 1);
(Self::Entities(data), buf)
}
_ => unreachable!("Junk data in database: unknown StageUnitCheckpoint variant"),
}
}

View File

@ -51,8 +51,8 @@ pub use chain::{
MAINNET, SEPOLIA,
};
pub use checkpoints::{
AccountHashingCheckpoint, MerkleCheckpoint, StageCheckpoint, StageUnitCheckpoint,
StorageHashingCheckpoint,
AccountHashingCheckpoint, EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint,
StageUnitCheckpoint, StorageHashingCheckpoint,
};
pub use compression::*;
pub use constants::{

View File

@ -2,7 +2,7 @@ use crate::{
id::StageId,
stage::{ExecOutput, UnwindInput, UnwindOutput},
};
use reth_primitives::BlockNumber;
use reth_primitives::StageCheckpoint;
/// An event emitted by a [Pipeline][crate::Pipeline].
///
@ -18,7 +18,7 @@ pub enum PipelineEvent {
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
stage_progress: Option<BlockNumber>,
checkpoint: Option<StageCheckpoint>,
},
/// Emitted when a stage has run a single time.
Ran {

View File

@ -312,10 +312,7 @@ where
})
}
self.listeners.notify(PipelineEvent::Running {
stage_id,
stage_progress: prev_checkpoint.map(|progress| progress.block_number),
});
self.listeners.notify(PipelineEvent::Running { stage_id, checkpoint: prev_checkpoint });
match stage
.execute(&mut tx, ExecInput { previous_stage, checkpoint: prev_checkpoint })
@ -462,12 +459,12 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
@ -515,17 +512,17 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId("C"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("C"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("C"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
@ -604,12 +601,12 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
@ -678,12 +675,12 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None },
PipelineEvent::Error { stage_id: StageId("B") },
PipelineEvent::Unwinding {
stage_id: StageId("A"),
@ -697,12 +694,15 @@ mod tests {
stage_id: StageId("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
},
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) },
PipelineEvent::Running {
stage_id: StageId("A"),
checkpoint: Some(StageCheckpoint::new(0))
},
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },

View File

@ -1,7 +1,7 @@
use crate::StageId;
use metrics::Gauge;
use reth_metrics_derive::Metrics;
use reth_primitives::StageCheckpoint;
use reth_primitives::{EntitiesCheckpoint, StageCheckpoint, StageUnitCheckpoint};
use std::collections::HashMap;
#[derive(Metrics)]
@ -9,6 +9,10 @@ use std::collections::HashMap;
pub(crate) struct StageMetrics {
/// The block number of the last commit for a stage.
checkpoint: Gauge,
/// The number of processed entities of the last commit for a stage, if applicable.
entities_processed: Gauge,
/// The number of total entities of the last commit for a stage, if applicable.
entities_total: Gauge,
}
#[derive(Default)]
@ -18,11 +22,23 @@ pub(crate) struct Metrics {
impl Metrics {
pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, checkpoint: StageCheckpoint) {
// TODO(alexey): track other metrics from `checkpoint`
self.checkpoints
let stage_metrics = self
.checkpoints
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]))
.checkpoint
.set(checkpoint.block_number as f64);
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]));
stage_metrics.checkpoint.set(checkpoint.block_number as f64);
#[allow(clippy::single_match)]
match checkpoint.stage_checkpoint {
Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { processed, total })) => {
stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}
}
_ => (),
}
}
}

View File

@ -10,7 +10,9 @@ use reth_interfaces::{
p2p::headers::downloader::{HeaderDownloader, SyncTarget},
provider::ProviderError,
};
use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader, StageCheckpoint, H256};
use reth_primitives::{
BlockHashOrNumber, BlockNumber, EntitiesCheckpoint, SealedHeader, StageCheckpoint, H256,
};
use reth_provider::Transaction;
use tokio::sync::watch;
use tracing::*;
@ -197,6 +199,7 @@ where
// Lookup the head and tip of the sync range
let gap = self.get_sync_gap(tx, current_progress.block_number).await?;
let local_head = gap.local_head.number;
let tip = gap.target.tip();
// Nothing to sync
@ -218,6 +221,61 @@ where
info!(target: "sync::stages::headers", len = downloaded_headers.len(), "Received headers");
let tip_block_number = match tip {
// If tip is hash and it equals to the first downloaded header's hash, we can use
// the block number of this header as tip.
BlockHashOrNumber::Hash(hash) => downloaded_headers.first().and_then(|header| {
if header.hash == hash {
Some(header.number)
} else {
None
}
}),
// If tip is number, we can just grab it and not resolve using downloaded headers.
BlockHashOrNumber::Number(number) => Some(number),
};
// Since we're syncing headers in batches, gap tip will move in reverse direction towards
// our local head with every iteration. To get the actual target block number we're
// syncing towards, we need to take into account already synced headers from the database.
// It is `None`, if tip didn't change and we're still downloading headers for previously
// calculated gap.
let target_block_number = if let Some(tip_block_number) = tip_block_number {
let local_max_block_number = tx
.cursor_read::<tables::CanonicalHeaders>()?
.last()?
.map(|(canonical_block, _)| canonical_block);
Some(tip_block_number.max(local_max_block_number.unwrap_or(tip_block_number)))
} else {
None
};
let mut stage_checkpoint = current_progress
.entities_stage_checkpoint()
.unwrap_or(EntitiesCheckpoint {
// If for some reason (e.g. due to DB migration) we don't have `processed`
// in the middle of headers sync, set it to the local head block number +
// number of block already filled in the gap.
processed: local_head +
(target_block_number.unwrap_or_default() - tip_block_number.unwrap_or_default()),
// Shouldn't fail because on the first iteration, we download the header for missing
// tip, and use its block number.
total: target_block_number.or_else(|| {
warn!(target: "sync::stages::headers", ?tip, "No downloaded header for tip found");
// Safe, because `Display` impl for `EntitiesCheckpoint` will fallback to displaying
// just `processed`
None
}),
});
// Total headers can be updated if we received new tip from the network, and need to fill
// the local gap.
if let Some(target_block_number) = target_block_number {
stage_checkpoint.total = Some(target_block_number);
}
stage_checkpoint.processed += downloaded_headers.len() as u64;
// Write the headers to db
self.write_headers::<DB>(tx, downloaded_headers)?.unwrap_or_default();
@ -228,9 +286,16 @@ where
.map(|(num, _)| num)
.unwrap_or_default(),
);
Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress), done: true })
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(stage_progress)
.with_entities_stage_checkpoint(stage_checkpoint),
done: true,
})
} else {
Ok(ExecOutput { checkpoint: current_progress, done: false })
Ok(ExecOutput {
checkpoint: current_progress.with_entities_stage_checkpoint(stage_checkpoint),
done: false,
})
}
}
@ -245,10 +310,21 @@ where
input.unwind_to + 1,
)?;
tx.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
tx.unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
let unwound_headers = tx.unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
let stage_checkpoint =
input.checkpoint.entities_stage_checkpoint().map(|checkpoint| EntitiesCheckpoint {
processed: checkpoint.processed.saturating_sub(unwound_headers as u64),
total: None,
});
let mut checkpoint = StageCheckpoint::new(input.unwind_to);
if let Some(stage_checkpoint) = stage_checkpoint {
checkpoint = checkpoint.with_entities_stage_checkpoint(stage_checkpoint);
}
info!(target: "sync::stages::headers", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
Ok(UnwindOutput { checkpoint })
}
}
@ -284,7 +360,7 @@ mod tests {
};
use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::random_header;
use reth_primitives::H256;
use reth_primitives::{StageUnitCheckpoint, H256};
use test_runner::HeadersTestRunner;
mod test_runner {
@ -474,7 +550,16 @@ mod tests {
runner.send_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) if block_number == tip.number);
assert_matches!( result, Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total: Some(total),
}))
}, done: true }) if block_number == tip.number
// -1 because we don't need to download the local head
&& processed == stage_progress + headers.len() as u64 - 1
&& total == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
@ -539,7 +624,7 @@ mod tests {
let mut runner = HeadersTestRunner::with_linear_downloader();
// pick range that's larger than the configured headers batch size
let (stage_progress, previous_stage) = (600, 1200);
let input = ExecInput {
let mut input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
@ -553,15 +638,33 @@ mod tests {
runner.send_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false }) if block_number == stage_progress);
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total: Some(total),
}))
}, done: false }) if block_number == stage_progress &&
processed == stage_progress + 500 &&
total == tip.number);
runner.client.clear().await;
runner.client.extend(headers.iter().take(101).map(|h| h.clone().unseal()).rev()).await;
input.checkpoint = Some(result.unwrap().checkpoint);
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) if block_number == tip.number);
assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total: Some(total),
}))
}, done: true }) if block_number == tip.number
// -1 because we don't need to download the local head
&& processed == stage_progress + headers.len() as u64 - 1
&& total == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
}

View File

@ -36,9 +36,9 @@ pub fn generate_from_to(ident: &Ident, fields: &FieldList, is_zstd: bool) -> Tok
impl Compact for #ident {
fn to_compact<B>(self, buf: &mut B) -> usize where B: bytes::BufMut + AsMut<[u8]> {
let mut flags = #flags::default();
let mut total_len = 0;
let mut total_length = 0;
#(#to_compact)*
total_len
total_length
}
fn from_compact(mut buf: &[u8], len: usize) -> (Self, &[u8]) {
@ -176,7 +176,7 @@ fn generate_to_compact(fields: &FieldList, ident: &Ident, is_zstd: bool) -> Vec<
// Places the flag bits.
lines.push(quote! {
let flags = flags.into_bytes();
total_len += flags.len() + buffer.len();
total_length += flags.len() + buffer.len();
buf.put_slice(&flags);
});

View File

@ -251,7 +251,7 @@ mod tests {
impl Compact for TestStruct {
fn to_compact<B>(self, buf: &mut B) -> usize where B: bytes::BufMut + AsMut<[u8]> {
let mut flags = TestStructFlags::default();
let mut total_len = 0;
let mut total_length = 0;
let mut buffer = bytes::BytesMut::new();
let f_u64_len = self.f_u64.to_compact(&mut buffer);
flags.set_f_u64_len(f_u64_len as u8);
@ -270,10 +270,10 @@ mod tests {
let f_vec_empty_len = self.f_vec_empty.to_compact(&mut buffer);
let f_vec_some_len = self.f_vec_some.specialized_to_compact(&mut buffer);
let flags = flags.into_bytes();
total_len += flags.len() + buffer.len();
total_length += flags.len() + buffer.len();
buf.put_slice(&flags);
buf.put(buffer);
total_len
total_length
}
fn from_compact(mut buf: &[u8], len: usize) -> (Self, &[u8]) {
let (flags, mut buf) = TestStructFlags::from(buf);

View File

@ -180,9 +180,10 @@ where
Ok(td.into())
}
/// Unwind table by some number key
/// Unwind table by some number key.
/// Returns number of rows unwound.
#[inline]
pub fn unwind_table_by_num<T>(&self, num: u64) -> Result<(), DbError>
pub fn unwind_table_by_num<T>(&self, num: u64) -> Result<usize, DbError>
where
DB: Database,
T: Table<Key = u64>,
@ -190,10 +191,11 @@ where
self.unwind_table::<T, _>(num, |key| key)
}
/// Unwind the table to a provided block
/// Unwind the table to a provided block.
/// Returns number of rows unwound.
///
/// Note: Key is not inclusive and specified key would stay in db.
pub(crate) fn unwind_table<T, F>(&self, key: u64, mut selector: F) -> Result<(), DbError>
pub(crate) fn unwind_table<T, F>(&self, key: u64, mut selector: F) -> Result<usize, DbError>
where
DB: Database,
T: Table,
@ -201,14 +203,17 @@ where
{
let mut cursor = self.cursor_write::<T>()?;
let mut reverse_walker = cursor.walk_back(None)?;
let mut deleted = 0;
while let Some(Ok((entry_key, _))) = reverse_walker.next() {
if selector(entry_key.clone()) <= key {
break
}
self.delete::<T>(entry_key, None)?;
deleted += 1;
}
Ok(())
Ok(deleted)
}
/// Unwind a table forward by a [Walker][reth_db::abstraction::cursor::Walker] on another table