From 0b81096f8bb38a518f167971fa02e607a0bc1be4 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 24 May 2023 20:51:41 +0400 Subject: [PATCH] feat(stages): checkpoint downloaded headers (#2798) --- bin/reth/src/node/events.rs | 27 ++-- crates/primitives/src/checkpoints.rs | 55 +++++++- crates/primitives/src/lib.rs | 4 +- crates/stages/src/pipeline/event.rs | 4 +- crates/stages/src/pipeline/mod.rs | 30 ++--- crates/stages/src/pipeline/sync_metrics.rs | 28 +++- crates/stages/src/stages/headers.rs | 123 ++++++++++++++++-- .../codecs/derive/src/compact/generator.rs | 6 +- .../storage/codecs/derive/src/compact/mod.rs | 6 +- crates/storage/provider/src/transaction.rs | 15 ++- 10 files changed, 232 insertions(+), 66 deletions(-) diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 5008eb8dd..f9cf3528f 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -4,7 +4,7 @@ use futures::Stream; use reth_beacon_consensus::BeaconConsensusEngineEvent; use reth_network::{NetworkEvent, NetworkHandle}; use reth_network_api::PeersInfo; -use reth_primitives::{BlockNumber, StageCheckpoint}; +use reth_primitives::StageCheckpoint; use reth_stages::{ExecOutput, PipelineEvent, StageId}; use std::{ future::Future, @@ -22,12 +22,12 @@ struct NodeState { /// The stage currently being executed. current_stage: Option, /// The current checkpoint of the executing stage. - current_checkpoint: BlockNumber, + current_checkpoint: StageCheckpoint, } impl NodeState { fn new(network: Option) -> Self { - Self { network, current_stage: None, current_checkpoint: 0 } + Self { network, current_stage: None, current_checkpoint: StageCheckpoint::new(0) } } fn num_connected_peers(&self) -> usize { @@ -37,26 +37,23 @@ impl NodeState { /// Processes an event emitted by the pipeline fn handle_pipeline_event(&mut self, event: PipelineEvent) { match event { - PipelineEvent::Running { stage_id, stage_progress } => { + PipelineEvent::Running { stage_id, checkpoint } => { let notable = self.current_stage.is_none(); self.current_stage = Some(stage_id); - self.current_checkpoint = stage_progress.unwrap_or_default(); + self.current_checkpoint = checkpoint.unwrap_or_default(); if notable { - info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage"); + info!(target: "reth::cli", stage = %stage_id, from = ?checkpoint, "Executing stage"); } } - PipelineEvent::Ran { - stage_id, - result: ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done }, - } => { - let notable = block_number > self.current_checkpoint; - self.current_checkpoint = block_number; + PipelineEvent::Ran { stage_id, result: ExecOutput { checkpoint, done } } => { + let notable = checkpoint.block_number > self.current_checkpoint.block_number; + self.current_checkpoint = checkpoint; if done { self.current_stage = None; - info!(target: "reth::cli", stage = %stage_id, checkpoint = block_number, "Stage finished executing"); + info!(target: "reth::cli", stage = %stage_id, checkpoint = %checkpoint, "Stage finished executing"); } else if notable { - info!(target: "reth::cli", stage = %stage_id, checkpoint = block_number, "Stage committed progress"); + info!(target: "reth::cli", stage = %stage_id, checkpoint = %checkpoint, "Stage committed progress"); } } _ => (), @@ -160,7 +157,7 @@ where .current_stage .map(|id| id.to_string()) .unwrap_or_else(|| "None".to_string()); - info!(target: "reth::cli", connected_peers = this.state.num_connected_peers(), %stage, checkpoint = this.state.current_checkpoint, "Status"); + info!(target: "reth::cli", connected_peers = this.state.num_connected_peers(), %stage, checkpoint = %this.state.current_checkpoint, "Status"); } while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) { diff --git a/crates/primitives/src/checkpoints.rs b/crates/primitives/src/checkpoints.rs index ee826d6b0..8c8c7894b 100644 --- a/crates/primitives/src/checkpoints.rs +++ b/crates/primitives/src/checkpoints.rs @@ -97,7 +97,7 @@ impl Compact for MerkleCheckpoint { } } -/// Saves the progress of AccountHashing +/// Saves the progress of AccountHashing stage. #[main_codec] #[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] pub struct AccountHashingCheckpoint { @@ -109,7 +109,7 @@ pub struct AccountHashingCheckpoint { pub to: u64, } -/// Saves the progress of StorageHashing +/// Saves the progress of StorageHashing stage. #[main_codec] #[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] pub struct StorageHashingCheckpoint { @@ -123,6 +123,26 @@ pub struct StorageHashingCheckpoint { pub to: u64, } +/// Saves the progress of abstract stage iterating over or downloading entities. +#[main_codec] +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] +pub struct EntitiesCheckpoint { + /// Number of entities already processed. + pub processed: u64, + /// Total entities to be processed. + pub total: Option, +} + +impl Display for EntitiesCheckpoint { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(total) = self.total { + write!(f, "{:.1}%", 100.0 * self.processed as f64 / total as f64) + } else { + write!(f, "{}", self.processed) + } + } +} + /// Saves the progress of a stage. #[main_codec] #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] @@ -155,6 +175,14 @@ impl StageCheckpoint { } } + /// Returns the entities stage checkpoint, if any. + pub fn entities_stage_checkpoint(&self) -> Option { + match self.stage_checkpoint { + Some(StageUnitCheckpoint::Entities(checkpoint)) => Some(checkpoint), + _ => None, + } + } + /// Sets the stage checkpoint to account hashing. pub fn with_account_hashing_stage_checkpoint( mut self, @@ -172,13 +200,20 @@ impl StageCheckpoint { self.stage_checkpoint = Some(StageUnitCheckpoint::Storage(checkpoint)); self } + + /// Sets the stage checkpoint to entities. + pub fn with_entities_stage_checkpoint(mut self, checkpoint: EntitiesCheckpoint) -> Self { + self.stage_checkpoint = Some(StageUnitCheckpoint::Entities(checkpoint)); + self + } } -// TODO(alexey): ideally, we'd want to display block number + stage-specific metric (if available) -// in places like logs or traces impl Display for StageCheckpoint { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - Display::fmt(&self.block_number, f) + match self.stage_checkpoint { + Some(StageUnitCheckpoint::Entities(stage_checkpoint)) => stage_checkpoint.fmt(f), + _ => write!(f, "{}", self.block_number), + } } } @@ -194,6 +229,8 @@ pub enum StageUnitCheckpoint { Account(AccountHashingCheckpoint), /// Saves the progress of StorageHashing stage. Storage(StorageHashingCheckpoint), + /// Saves the progress of abstract stage iterating over or downloading entities. + Entities(EntitiesCheckpoint), } impl Compact for StageUnitCheckpoint { @@ -214,6 +251,10 @@ impl Compact for StageUnitCheckpoint { buf.put_u8(2); 1 + data.to_compact(buf) } + StageUnitCheckpoint::Entities(data) => { + buf.put_u8(3); + 1 + data.to_compact(buf) + } } } @@ -234,6 +275,10 @@ impl Compact for StageUnitCheckpoint { let (data, buf) = StorageHashingCheckpoint::from_compact(&buf[1..], buf.len() - 1); (Self::Storage(data), buf) } + 3 => { + let (data, buf) = EntitiesCheckpoint::from_compact(&buf[1..], buf.len() - 1); + (Self::Entities(data), buf) + } _ => unreachable!("Junk data in database: unknown StageUnitCheckpoint variant"), } } diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 39ed13ee2..5e59fdc55 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -51,8 +51,8 @@ pub use chain::{ MAINNET, SEPOLIA, }; pub use checkpoints::{ - AccountHashingCheckpoint, MerkleCheckpoint, StageCheckpoint, StageUnitCheckpoint, - StorageHashingCheckpoint, + AccountHashingCheckpoint, EntitiesCheckpoint, MerkleCheckpoint, StageCheckpoint, + StageUnitCheckpoint, StorageHashingCheckpoint, }; pub use compression::*; pub use constants::{ diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index 89b24490e..695bc46ac 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -2,7 +2,7 @@ use crate::{ id::StageId, stage::{ExecOutput, UnwindInput, UnwindOutput}, }; -use reth_primitives::BlockNumber; +use reth_primitives::StageCheckpoint; /// An event emitted by a [Pipeline][crate::Pipeline]. /// @@ -18,7 +18,7 @@ pub enum PipelineEvent { /// The stage that is about to be run. stage_id: StageId, /// The previous checkpoint of the stage. - stage_progress: Option, + checkpoint: Option, }, /// Emitted when a stage has run a single time. Ran { diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 1754428dd..ec81f9804 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -312,10 +312,7 @@ where }) } - self.listeners.notify(PipelineEvent::Running { - stage_id, - stage_progress: prev_checkpoint.map(|progress| progress.block_number), - }); + self.listeners.notify(PipelineEvent::Running { stage_id, checkpoint: prev_checkpoint }); match stage .execute(&mut tx, ExecInput { previous_stage, checkpoint: prev_checkpoint }) @@ -462,12 +459,12 @@ mod tests { assert_eq!( events.collect::>().await, vec![ - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, @@ -515,17 +512,17 @@ mod tests { events.collect::>().await, vec![ // Executing - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId("C"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("C"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("C"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, @@ -604,12 +601,12 @@ mod tests { events.collect::>().await, vec![ // Executing - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, @@ -678,12 +675,12 @@ mod tests { assert_eq!( events.collect::>().await, vec![ - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, PipelineEvent::Error { stage_id: StageId("B") }, PipelineEvent::Unwinding { stage_id: StageId("A"), @@ -697,12 +694,15 @@ mod tests { stage_id: StageId("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, }, - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) }, + PipelineEvent::Running { + stage_id: StageId("A"), + checkpoint: Some(StageCheckpoint::new(0)) + }, PipelineEvent::Ran { stage_id: StageId("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, + PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, PipelineEvent::Ran { stage_id: StageId("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, diff --git a/crates/stages/src/pipeline/sync_metrics.rs b/crates/stages/src/pipeline/sync_metrics.rs index be974aac0..04fdfafa5 100644 --- a/crates/stages/src/pipeline/sync_metrics.rs +++ b/crates/stages/src/pipeline/sync_metrics.rs @@ -1,7 +1,7 @@ use crate::StageId; use metrics::Gauge; use reth_metrics_derive::Metrics; -use reth_primitives::StageCheckpoint; +use reth_primitives::{EntitiesCheckpoint, StageCheckpoint, StageUnitCheckpoint}; use std::collections::HashMap; #[derive(Metrics)] @@ -9,6 +9,10 @@ use std::collections::HashMap; pub(crate) struct StageMetrics { /// The block number of the last commit for a stage. checkpoint: Gauge, + /// The number of processed entities of the last commit for a stage, if applicable. + entities_processed: Gauge, + /// The number of total entities of the last commit for a stage, if applicable. + entities_total: Gauge, } #[derive(Default)] @@ -18,11 +22,23 @@ pub(crate) struct Metrics { impl Metrics { pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, checkpoint: StageCheckpoint) { - // TODO(alexey): track other metrics from `checkpoint` - self.checkpoints + let stage_metrics = self + .checkpoints .entry(stage_id) - .or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])) - .checkpoint - .set(checkpoint.block_number as f64); + .or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])); + + stage_metrics.checkpoint.set(checkpoint.block_number as f64); + + #[allow(clippy::single_match)] + match checkpoint.stage_checkpoint { + Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { processed, total })) => { + stage_metrics.entities_processed.set(processed as f64); + + if let Some(total) = total { + stage_metrics.entities_total.set(total as f64); + } + } + _ => (), + } } } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index c83ea4356..4783a4e98 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -10,7 +10,9 @@ use reth_interfaces::{ p2p::headers::downloader::{HeaderDownloader, SyncTarget}, provider::ProviderError, }; -use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader, StageCheckpoint, H256}; +use reth_primitives::{ + BlockHashOrNumber, BlockNumber, EntitiesCheckpoint, SealedHeader, StageCheckpoint, H256, +}; use reth_provider::Transaction; use tokio::sync::watch; use tracing::*; @@ -197,6 +199,7 @@ where // Lookup the head and tip of the sync range let gap = self.get_sync_gap(tx, current_progress.block_number).await?; + let local_head = gap.local_head.number; let tip = gap.target.tip(); // Nothing to sync @@ -218,6 +221,61 @@ where info!(target: "sync::stages::headers", len = downloaded_headers.len(), "Received headers"); + let tip_block_number = match tip { + // If tip is hash and it equals to the first downloaded header's hash, we can use + // the block number of this header as tip. + BlockHashOrNumber::Hash(hash) => downloaded_headers.first().and_then(|header| { + if header.hash == hash { + Some(header.number) + } else { + None + } + }), + // If tip is number, we can just grab it and not resolve using downloaded headers. + BlockHashOrNumber::Number(number) => Some(number), + }; + + // Since we're syncing headers in batches, gap tip will move in reverse direction towards + // our local head with every iteration. To get the actual target block number we're + // syncing towards, we need to take into account already synced headers from the database. + // It is `None`, if tip didn't change and we're still downloading headers for previously + // calculated gap. + let target_block_number = if let Some(tip_block_number) = tip_block_number { + let local_max_block_number = tx + .cursor_read::()? + .last()? + .map(|(canonical_block, _)| canonical_block); + + Some(tip_block_number.max(local_max_block_number.unwrap_or(tip_block_number))) + } else { + None + }; + + let mut stage_checkpoint = current_progress + .entities_stage_checkpoint() + .unwrap_or(EntitiesCheckpoint { + // If for some reason (e.g. due to DB migration) we don't have `processed` + // in the middle of headers sync, set it to the local head block number + + // number of block already filled in the gap. + processed: local_head + + (target_block_number.unwrap_or_default() - tip_block_number.unwrap_or_default()), + // Shouldn't fail because on the first iteration, we download the header for missing + // tip, and use its block number. + total: target_block_number.or_else(|| { + warn!(target: "sync::stages::headers", ?tip, "No downloaded header for tip found"); + // Safe, because `Display` impl for `EntitiesCheckpoint` will fallback to displaying + // just `processed` + None + }), + }); + + // Total headers can be updated if we received new tip from the network, and need to fill + // the local gap. + if let Some(target_block_number) = target_block_number { + stage_checkpoint.total = Some(target_block_number); + } + stage_checkpoint.processed += downloaded_headers.len() as u64; + // Write the headers to db self.write_headers::(tx, downloaded_headers)?.unwrap_or_default(); @@ -228,9 +286,16 @@ where .map(|(num, _)| num) .unwrap_or_default(), ); - Ok(ExecOutput { checkpoint: StageCheckpoint::new(stage_progress), done: true }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(stage_progress) + .with_entities_stage_checkpoint(stage_checkpoint), + done: true, + }) } else { - Ok(ExecOutput { checkpoint: current_progress, done: false }) + Ok(ExecOutput { + checkpoint: current_progress.with_entities_stage_checkpoint(stage_checkpoint), + done: false, + }) } } @@ -245,10 +310,21 @@ where input.unwind_to + 1, )?; tx.unwind_table_by_num::(input.unwind_to)?; - tx.unwind_table_by_num::(input.unwind_to)?; + let unwound_headers = tx.unwind_table_by_num::(input.unwind_to)?; + + let stage_checkpoint = + input.checkpoint.entities_stage_checkpoint().map(|checkpoint| EntitiesCheckpoint { + processed: checkpoint.processed.saturating_sub(unwound_headers as u64), + total: None, + }); + + let mut checkpoint = StageCheckpoint::new(input.unwind_to); + if let Some(stage_checkpoint) = stage_checkpoint { + checkpoint = checkpoint.with_entities_stage_checkpoint(stage_checkpoint); + } info!(target: "sync::stages::headers", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished"); - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) + Ok(UnwindOutput { checkpoint }) } } @@ -284,7 +360,7 @@ mod tests { }; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::random_header; - use reth_primitives::H256; + use reth_primitives::{StageUnitCheckpoint, H256}; use test_runner::HeadersTestRunner; mod test_runner { @@ -474,7 +550,16 @@ mod tests { runner.send_tip(tip.hash()); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) if block_number == tip.number); + assert_matches!( result, Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total: Some(total), + })) + }, done: true }) if block_number == tip.number + // -1 because we don't need to download the local head + && processed == stage_progress + headers.len() as u64 - 1 + && total == tip.number); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); } @@ -539,7 +624,7 @@ mod tests { let mut runner = HeadersTestRunner::with_linear_downloader(); // pick range that's larger than the configured headers batch size let (stage_progress, previous_stage) = (600, 1200); - let input = ExecInput { + let mut input = ExecInput { previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -553,15 +638,33 @@ mod tests { runner.send_tip(tip.hash()); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: false }) if block_number == stage_progress); + assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total: Some(total), + })) + }, done: false }) if block_number == stage_progress && + processed == stage_progress + 500 && + total == tip.number); runner.client.clear().await; runner.client.extend(headers.iter().take(101).map(|h| h.clone().unseal()).rev()).await; + input.checkpoint = Some(result.unwrap().checkpoint); let rx = runner.execute(input); let result = rx.await.unwrap(); - assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { block_number, .. }, done: true }) if block_number == tip.number); + assert_matches!(result, Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total: Some(total), + })) + }, done: true }) if block_number == tip.number + // -1 because we don't need to download the local head + && processed == stage_progress + headers.len() as u64 - 1 + && total == tip.number); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); } } diff --git a/crates/storage/codecs/derive/src/compact/generator.rs b/crates/storage/codecs/derive/src/compact/generator.rs index 792140c49..2b408ec03 100644 --- a/crates/storage/codecs/derive/src/compact/generator.rs +++ b/crates/storage/codecs/derive/src/compact/generator.rs @@ -36,9 +36,9 @@ pub fn generate_from_to(ident: &Ident, fields: &FieldList, is_zstd: bool) -> Tok impl Compact for #ident { fn to_compact(self, buf: &mut B) -> usize where B: bytes::BufMut + AsMut<[u8]> { let mut flags = #flags::default(); - let mut total_len = 0; + let mut total_length = 0; #(#to_compact)* - total_len + total_length } fn from_compact(mut buf: &[u8], len: usize) -> (Self, &[u8]) { @@ -176,7 +176,7 @@ fn generate_to_compact(fields: &FieldList, ident: &Ident, is_zstd: bool) -> Vec< // Places the flag bits. lines.push(quote! { let flags = flags.into_bytes(); - total_len += flags.len() + buffer.len(); + total_length += flags.len() + buffer.len(); buf.put_slice(&flags); }); diff --git a/crates/storage/codecs/derive/src/compact/mod.rs b/crates/storage/codecs/derive/src/compact/mod.rs index 0d03d31be..3f67fb721 100644 --- a/crates/storage/codecs/derive/src/compact/mod.rs +++ b/crates/storage/codecs/derive/src/compact/mod.rs @@ -251,7 +251,7 @@ mod tests { impl Compact for TestStruct { fn to_compact(self, buf: &mut B) -> usize where B: bytes::BufMut + AsMut<[u8]> { let mut flags = TestStructFlags::default(); - let mut total_len = 0; + let mut total_length = 0; let mut buffer = bytes::BytesMut::new(); let f_u64_len = self.f_u64.to_compact(&mut buffer); flags.set_f_u64_len(f_u64_len as u8); @@ -270,10 +270,10 @@ mod tests { let f_vec_empty_len = self.f_vec_empty.to_compact(&mut buffer); let f_vec_some_len = self.f_vec_some.specialized_to_compact(&mut buffer); let flags = flags.into_bytes(); - total_len += flags.len() + buffer.len(); + total_length += flags.len() + buffer.len(); buf.put_slice(&flags); buf.put(buffer); - total_len + total_length } fn from_compact(mut buf: &[u8], len: usize) -> (Self, &[u8]) { let (flags, mut buf) = TestStructFlags::from(buf); diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 9481882c5..d3be66a22 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -180,9 +180,10 @@ where Ok(td.into()) } - /// Unwind table by some number key + /// Unwind table by some number key. + /// Returns number of rows unwound. #[inline] - pub fn unwind_table_by_num(&self, num: u64) -> Result<(), DbError> + pub fn unwind_table_by_num(&self, num: u64) -> Result where DB: Database, T: Table, @@ -190,10 +191,11 @@ where self.unwind_table::(num, |key| key) } - /// Unwind the table to a provided block + /// Unwind the table to a provided block. + /// Returns number of rows unwound. /// /// Note: Key is not inclusive and specified key would stay in db. - pub(crate) fn unwind_table(&self, key: u64, mut selector: F) -> Result<(), DbError> + pub(crate) fn unwind_table(&self, key: u64, mut selector: F) -> Result where DB: Database, T: Table, @@ -201,14 +203,17 @@ where { let mut cursor = self.cursor_write::()?; let mut reverse_walker = cursor.walk_back(None)?; + let mut deleted = 0; while let Some(Ok((entry_key, _))) = reverse_walker.next() { if selector(entry_key.clone()) <= key { break } self.delete::(entry_key, None)?; + deleted += 1; } - Ok(()) + + Ok(deleted) } /// Unwind a table forward by a [Walker][reth_db::abstraction::cursor::Walker] on another table