feat: introduce StaticFileSegment::BlockMeta (#13226)

This commit is contained in:
joshieDo
2025-01-14 23:47:28 +00:00
committed by GitHub
parent 46f4d73c4d
commit 1267718c7e
10 changed files with 346 additions and 111 deletions

View File

@ -14,6 +14,7 @@ Arguments:
- headers: Static File segment responsible for the `CanonicalHeaders`, `Headers`, `HeaderTerminalDifficulties` tables
- transactions: Static File segment responsible for the `Transactions` table
- receipts: Static File segment responsible for the `Receipts` table
- block-meta: Static File segment responsible for the `BlockBodyIndices`, `BlockOmmers`, `BlockWithdrawals` tables
Options:
--instance <INSTANCE>

View File

@ -14,6 +14,7 @@ Arguments:
- headers: Static File segment responsible for the `CanonicalHeaders`, `Headers`, `HeaderTerminalDifficulties` tables
- transactions: Static File segment responsible for the `Transactions` table
- receipts: Static File segment responsible for the `Receipts` table
- block-meta: Static File segment responsible for the `BlockBodyIndices`, `BlockOmmers`, `BlockWithdrawals` tables
<KEY>
The key to get content for

View File

@ -72,6 +72,7 @@ impl Command {
StaticFileSegment::Receipts => {
(table_key::<tables::Receipts>(&key)?, <ReceiptMask<ReceiptTy<N>>>::MASK)
}
StaticFileSegment::BlockMeta => todo!(),
};
let content = tool.provider_factory.static_file_provider().find_static_file(
@ -113,6 +114,9 @@ impl Command {
)?;
println!("{}", serde_json::to_string_pretty(&receipt)?);
}
StaticFileSegment::BlockMeta => {
todo!()
}
}
}
}

View File

@ -187,6 +187,7 @@ where
headers: stages_checkpoints[0],
receipts: stages_checkpoints[1],
transactions: stages_checkpoints[2],
block_meta: stages_checkpoints[2],
};
let targets = self.get_static_file_targets(highest_static_files)?;
self.run(targets)?;
@ -226,6 +227,9 @@ where
finalized_block_number,
)
}),
block_meta: finalized_block_numbers.block_meta.and_then(|finalized_block_number| {
self.get_static_file_target(highest_static_files.block_meta, finalized_block_number)
}),
};
trace!(
@ -322,6 +326,7 @@ mod tests {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
block_meta: None,
})
.expect("get static file targets");
assert_eq!(
@ -329,13 +334,19 @@ mod tests {
StaticFileTargets {
headers: Some(0..=1),
receipts: Some(0..=1),
transactions: Some(0..=1)
transactions: Some(0..=1),
block_meta: None
}
);
assert_matches!(static_file_producer.run(targets), Ok(_));
assert_eq!(
provider_factory.static_file_provider().get_highest_static_files(),
HighestStaticFiles { headers: Some(1), receipts: Some(1), transactions: Some(1) }
HighestStaticFiles {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
block_meta: None
}
);
let targets = static_file_producer
@ -343,6 +354,7 @@ mod tests {
headers: Some(3),
receipts: Some(3),
transactions: Some(3),
block_meta: None,
})
.expect("get static file targets");
assert_eq!(
@ -350,13 +362,19 @@ mod tests {
StaticFileTargets {
headers: Some(2..=3),
receipts: Some(2..=3),
transactions: Some(2..=3)
transactions: Some(2..=3),
block_meta: None
}
);
assert_matches!(static_file_producer.run(targets), Ok(_));
assert_eq!(
provider_factory.static_file_provider().get_highest_static_files(),
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
HighestStaticFiles {
headers: Some(3),
receipts: Some(3),
transactions: Some(3),
block_meta: None
}
);
let targets = static_file_producer
@ -364,6 +382,7 @@ mod tests {
headers: Some(4),
receipts: Some(4),
transactions: Some(4),
block_meta: None,
})
.expect("get static file targets");
assert_eq!(
@ -371,7 +390,8 @@ mod tests {
StaticFileTargets {
headers: Some(4..=4),
receipts: Some(4..=4),
transactions: Some(4..=4)
transactions: Some(4..=4),
block_meta: None
}
);
assert_matches!(
@ -380,7 +400,12 @@ mod tests {
);
assert_eq!(
provider_factory.static_file_provider().get_highest_static_files(),
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
HighestStaticFiles {
headers: Some(3),
receipts: Some(3),
transactions: Some(3),
block_meta: None
}
);
}
@ -408,6 +433,7 @@ mod tests {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
block_meta: None,
})
.expect("get static file targets");
assert_matches!(locked_producer.run(targets.clone()), Ok(_));

View File

@ -33,6 +33,9 @@ pub struct HighestStaticFiles {
/// Highest static file block of transactions, inclusive.
/// If [`None`], no static file is available.
pub transactions: Option<BlockNumber>,
/// Highest static file block of transactions, inclusive.
/// If [`None`], no static file is available.
pub block_meta: Option<BlockNumber>,
}
impl HighestStaticFiles {
@ -42,6 +45,7 @@ impl HighestStaticFiles {
StaticFileSegment::Headers => self.headers,
StaticFileSegment::Transactions => self.transactions,
StaticFileSegment::Receipts => self.receipts,
StaticFileSegment::BlockMeta => self.block_meta,
}
}
@ -51,17 +55,23 @@ impl HighestStaticFiles {
StaticFileSegment::Headers => &mut self.headers,
StaticFileSegment::Transactions => &mut self.transactions,
StaticFileSegment::Receipts => &mut self.receipts,
StaticFileSegment::BlockMeta => &mut self.block_meta,
}
}
/// Returns an iterator over all static file segments
fn iter(&self) -> impl Iterator<Item = Option<BlockNumber>> {
[self.headers, self.transactions, self.receipts, self.block_meta].into_iter()
}
/// Returns the minimum block of all segments.
pub fn min_block_num(&self) -> Option<u64> {
[self.headers, self.transactions, self.receipts].iter().filter_map(|&option| option).min()
self.iter().flatten().min()
}
/// Returns the maximum block of all segments.
pub fn max_block_num(&self) -> Option<u64> {
[self.headers, self.transactions, self.receipts].iter().filter_map(|&option| option).max()
self.iter().flatten().max()
}
}
@ -74,12 +84,17 @@ pub struct StaticFileTargets {
pub receipts: Option<RangeInclusive<BlockNumber>>,
/// Targeted range of transactions.
pub transactions: Option<RangeInclusive<BlockNumber>>,
/// Targeted range of block meta.
pub block_meta: Option<RangeInclusive<BlockNumber>>,
}
impl StaticFileTargets {
/// Returns `true` if any of the targets are [Some].
pub const fn any(&self) -> bool {
self.headers.is_some() || self.receipts.is_some() || self.transactions.is_some()
self.headers.is_some() ||
self.receipts.is_some() ||
self.transactions.is_some() ||
self.block_meta.is_some()
}
/// Returns `true` if all targets are either [`None`] or has beginning of the range equal to the
@ -89,6 +104,7 @@ impl StaticFileTargets {
(self.headers.as_ref(), static_files.headers),
(self.receipts.as_ref(), static_files.receipts),
(self.transactions.as_ref(), static_files.transactions),
(self.block_meta.as_ref(), static_files.block_meta),
]
.iter()
.all(|(target_block_range, highest_static_fileted_block)| {
@ -118,8 +134,12 @@ mod tests {
#[test]
fn test_highest_static_files_highest() {
let files =
HighestStaticFiles { headers: Some(100), receipts: Some(200), transactions: None };
let files = HighestStaticFiles {
headers: Some(100),
receipts: Some(200),
transactions: None,
block_meta: None,
};
// Test for headers segment
assert_eq!(files.highest(StaticFileSegment::Headers), Some(100));
@ -146,12 +166,20 @@ mod tests {
// Modify transactions value
*files.as_mut(StaticFileSegment::Transactions) = Some(350);
assert_eq!(files.transactions, Some(350));
// Modify block meta value
*files.as_mut(StaticFileSegment::BlockMeta) = Some(350);
assert_eq!(files.block_meta, Some(350));
}
#[test]
fn test_highest_static_files_min() {
let files =
HighestStaticFiles { headers: Some(300), receipts: Some(100), transactions: None };
let files = HighestStaticFiles {
headers: Some(300),
receipts: Some(100),
transactions: None,
block_meta: None,
};
// Minimum value among the available segments
assert_eq!(files.min_block_num(), Some(100));
@ -163,8 +191,12 @@ mod tests {
#[test]
fn test_highest_static_files_max() {
let files =
HighestStaticFiles { headers: Some(300), receipts: Some(100), transactions: Some(500) };
let files = HighestStaticFiles {
headers: Some(300),
receipts: Some(100),
transactions: Some(500),
block_meta: Some(500),
};
// Maximum value among the available segments
assert_eq!(files.max_block_num(), Some(500));

View File

@ -3,7 +3,7 @@ use alloy_primitives::TxNumber;
use derive_more::Display;
use serde::{Deserialize, Serialize};
use std::{ops::RangeInclusive, str::FromStr};
use strum::{AsRefStr, EnumIter, EnumString};
use strum::{AsRefStr, EnumString};
#[derive(
Debug,
@ -17,7 +17,6 @@ use strum::{AsRefStr, EnumIter, EnumString};
Deserialize,
Serialize,
EnumString,
EnumIter,
AsRefStr,
Display,
)]
@ -34,6 +33,10 @@ pub enum StaticFileSegment {
#[strum(serialize = "receipts")]
/// Static File segment responsible for the `Receipts` table.
Receipts,
#[strum(serialize = "blockmeta")]
/// Static File segment responsible for the `BlockBodyIndices`, `BlockOmmers`,
/// `BlockWithdrawals` tables.
BlockMeta,
}
impl StaticFileSegment {
@ -43,9 +46,17 @@ impl StaticFileSegment {
Self::Headers => "headers",
Self::Transactions => "transactions",
Self::Receipts => "receipts",
Self::BlockMeta => "blockmeta",
}
}
/// Returns an iterator over all segments.
pub fn iter() -> impl Iterator<Item = Self> {
// The order of segments is significant and must be maintained to ensure correctness. For
// example, Transactions require BlockBodyIndices from Blockmeta to be sound.
[Self::Headers, Self::BlockMeta, Self::Transactions, Self::Receipts].into_iter()
}
/// Returns the default configuration of the segment.
pub const fn config(&self) -> SegmentConfig {
SegmentConfig { compression: Compression::Lz4 }
@ -54,7 +65,7 @@ impl StaticFileSegment {
/// Returns the number of columns for the segment
pub const fn columns(&self) -> usize {
match self {
Self::Headers => 3,
Self::Headers | Self::BlockMeta => 3,
Self::Transactions | Self::Receipts => 1,
}
}
@ -118,16 +129,25 @@ impl StaticFileSegment {
matches!(self, Self::Headers)
}
/// Returns `true` if the segment is `StaticFileSegment::BlockMeta`.
pub const fn is_block_meta(&self) -> bool {
matches!(self, Self::BlockMeta)
}
/// Returns `true` if the segment is `StaticFileSegment::Receipts`.
pub const fn is_receipts(&self) -> bool {
matches!(self, Self::Receipts)
}
/// Returns `true` if the segment is `StaticFileSegment::Receipts` or
/// `StaticFileSegment::Transactions`.
/// Returns `true` if a segment row is linked to a transaction.
pub const fn is_tx_based(&self) -> bool {
matches!(self, Self::Receipts | Self::Transactions)
}
/// Returns `true` if a segment row is linked to a block.
pub const fn is_block_based(&self) -> bool {
matches!(self, Self::Headers | Self::BlockMeta)
}
}
/// A segment header that contains information common to all segments. Used for storage.
@ -228,40 +248,32 @@ impl SegmentHeader {
/// Increments tx end range depending on segment
pub fn increment_tx(&mut self) {
match self.segment {
StaticFileSegment::Headers => (),
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
if let Some(tx_range) = &mut self.tx_range {
tx_range.end += 1;
} else {
self.tx_range = Some(SegmentRangeInclusive::new(0, 0));
}
if self.segment.is_tx_based() {
if let Some(tx_range) = &mut self.tx_range {
tx_range.end += 1;
} else {
self.tx_range = Some(SegmentRangeInclusive::new(0, 0));
}
}
}
/// Removes `num` elements from end of tx or block range.
pub fn prune(&mut self, num: u64) {
match self.segment {
StaticFileSegment::Headers => {
if let Some(range) = &mut self.block_range {
if num > range.end - range.start {
self.block_range = None;
} else {
range.end = range.end.saturating_sub(num);
}
};
if self.segment.is_block_based() {
if let Some(range) = &mut self.block_range {
if num > range.end - range.start {
self.block_range = None;
} else {
range.end = range.end.saturating_sub(num);
}
};
} else if let Some(range) = &mut self.tx_range {
if num > range.end - range.start {
self.tx_range = None;
} else {
range.end = range.end.saturating_sub(num);
}
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
if let Some(range) = &mut self.tx_range {
if num > range.end - range.start {
self.tx_range = None;
} else {
range.end = range.end.saturating_sub(num);
}
};
}
};
}
}
/// Sets a new `block_range`.
@ -286,10 +298,10 @@ impl SegmentHeader {
/// Returns the row offset which depends on whether the segment is block or transaction based.
pub fn start(&self) -> Option<u64> {
match self.segment {
StaticFileSegment::Headers => self.block_start(),
StaticFileSegment::Transactions | StaticFileSegment::Receipts => self.tx_start(),
if self.segment.is_block_based() {
return self.block_start()
}
self.tx_start()
}
}
@ -355,7 +367,6 @@ mod tests {
use super::*;
use alloy_primitives::hex;
use reth_nippy_jar::NippyJar;
use strum::IntoEnumIterator;
#[test]
fn test_filename() {

View File

@ -1,10 +1,10 @@
use crate::{
add_static_file_mask,
static_file::mask::{ColumnSelectorOne, ColumnSelectorTwo},
HeaderTerminalDifficulties,
BlockBodyIndices, BlockWithdrawals, HeaderTerminalDifficulties,
};
use alloy_primitives::BlockHash;
use reth_db_api::table::Table;
use reth_db_api::{models::StoredBlockOmmers, table::Table};
// HEADER MASKS
add_static_file_mask! {
@ -42,3 +42,17 @@ add_static_file_mask! {
#[doc = "Mask for selecting a single transaction from Transactions static file segment"]
TransactionMask<T>, T, 0b1
}
// BLOCK_META MASKS
add_static_file_mask! {
#[doc = "Mask for a `StoredBlockBodyIndices` from BlockMeta static file segment"]
BodyIndicesMask, <BlockBodyIndices as Table>::Value, 0b001
}
add_static_file_mask! {
#[doc = "Mask for a `StoredBlockOmmers` from BlockMeta static file segment"]
OmmersMask<H>, StoredBlockOmmers<H>, 0b010
}
add_static_file_mask! {
#[doc = "Mask for a `StoredBlockWithdrawals` from BlockMeta static file segment"]
WithdrawalsMask, <BlockWithdrawals as Table>::Value, 0b100
}

View File

@ -7,18 +7,21 @@ use crate::{
TransactionsProvider,
};
use alloy_consensus::transaction::TransactionMeta;
use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
use reth_chainspec::ChainInfo;
use reth_db::{
models::StoredBlockBodyIndices,
static_file::{
BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask, StaticFileCursor,
TDWithHashMask, TotalDifficultyMask, TransactionMask,
BlockHashMask, BodyIndicesMask, HeaderMask, HeaderWithHashMask, OmmersMask, ReceiptMask,
StaticFileCursor, TDWithHashMask, TotalDifficultyMask, TransactionMask, WithdrawalsMask,
},
table::{Decompress, Value},
};
use reth_node_types::NodePrimitives;
use reth_primitives_traits::{SealedHeader, SignedTransaction};
use reth_node_types::{FullNodePrimitives, NodePrimitives};
use reth_primitives::SealedHeader;
use reth_primitives_traits::SignedTransaction;
use reth_storage_api::{BlockBodyIndicesProvider, OmmersProvider, WithdrawalsProvider};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
fmt::Debug,
@ -351,3 +354,36 @@ impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction, Receipt: Decomp
Ok(receipts)
}
}
impl<N: NodePrimitives> WithdrawalsProvider for StaticFileJarProvider<'_, N> {
fn withdrawals_by_block(
&self,
id: BlockHashOrNumber,
_: u64,
) -> ProviderResult<Option<Withdrawals>> {
if let Some(num) = id.as_number() {
return Ok(self.cursor()?.get_one::<WithdrawalsMask>(num.into())?.map(|s| s.withdrawals))
}
// Only accepts block number queries
Err(ProviderError::UnsupportedProvider)
}
}
impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileJarProvider<'_, N> {
fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
if let Some(num) = id.as_number() {
return Ok(self
.cursor()?
.get_one::<OmmersMask<Self::Header>>(num.into())?
.map(|s| s.ommers))
}
// Only accepts block number queries
Err(ProviderError::UnsupportedProvider)
}
}
impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileJarProvider<'_, N> {
fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
self.cursor()?.get_one::<BodyIndicesMask>(num.into())
}
}

View File

@ -48,7 +48,6 @@ use std::{
path::{Path, PathBuf},
sync::{mpsc, Arc},
};
use strum::IntoEnumIterator;
use tracing::{info, trace, warn};
/// Alias type for a map that can be queried for block ranges from a transaction
@ -682,6 +681,11 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
};
for segment in StaticFileSegment::iter() {
// Not integrated yet
if segment.is_block_meta() {
continue
}
if has_receipt_pruning && segment.is_receipts() {
// Pruned nodes (including full node) do not store receipts as static files.
continue
@ -776,6 +780,13 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
highest_tx,
highest_block,
)?,
StaticFileSegment::BlockMeta => self
.ensure_invariants::<_, tables::BlockBodyIndices>(
provider,
segment,
highest_block,
highest_block,
)?,
} {
update_unwind_target(unwind);
}
@ -825,41 +836,46 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
where
Provider: DBProvider + BlockReader + StageCheckpointReader,
{
let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
let highest_static_file_block = highest_static_file_block.unwrap_or_default();
let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
if let Some((db_first_entry, _)) = db_cursor.first()? {
// If there is a gap between the entry found in static file and
// database, then we have most likely lost static file data and need to unwind so we can
// load it again
if !(db_first_entry <= highest_static_file_entry ||
highest_static_file_entry + 1 == db_first_entry)
if let (Some(highest_entry), Some(highest_block)) =
(highest_static_file_entry, highest_static_file_block)
{
info!(
target: "reth::providers::static_file",
?db_first_entry,
?highest_static_file_entry,
unwind_target = highest_static_file_block,
?segment,
"Setting unwind target."
);
return Ok(Some(highest_static_file_block))
// If there is a gap between the entry found in static file and
// database, then we have most likely lost static file data and need to unwind so we
// can load it again
if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
info!(
target: "reth::providers::static_file",
?db_first_entry,
?highest_entry,
unwind_target = highest_block,
?segment,
"Setting unwind target."
);
return Ok(Some(highest_block))
}
}
if let Some((db_last_entry, _)) = db_cursor.last()? {
if db_last_entry > highest_static_file_entry {
if highest_static_file_entry
.is_none_or(|highest_entry| db_last_entry > highest_entry)
{
return Ok(None)
}
}
}
let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
let highest_static_file_block = highest_static_file_block.unwrap_or_default();
// If static file entry is ahead of the database entries, then ensure the checkpoint block
// number matches.
let checkpoint_block_number = provider
.get_stage_checkpoint(match segment {
StaticFileSegment::Headers => StageId::Headers,
StaticFileSegment::Transactions => StageId::Bodies,
StaticFileSegment::Transactions | StaticFileSegment::BlockMeta => StageId::Bodies,
StaticFileSegment::Receipts => StageId::Execution,
})?
.unwrap_or_default()
@ -890,8 +906,11 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
);
let mut writer = self.latest_writer(segment)?;
if segment.is_headers() {
// TODO(joshie): is_block_meta
writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
} else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
// todo joshie: is querying block_body_indices a potential issue once bbi is moved
// to sf as well
let number = highest_static_file_entry - block.last_tx_num();
if segment.is_receipts() {
writer.prune_receipts(number, checkpoint_block_number)?;
@ -928,6 +947,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
headers: self.get_highest_static_file_block(StaticFileSegment::Headers),
receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
transactions: self.get_highest_static_file_block(StaticFileSegment::Transactions),
block_meta: self.get_highest_static_file_block(StaticFileSegment::BlockMeta),
}
}
@ -970,11 +990,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
P: FnMut(&T) -> bool,
{
let get_provider = |start: u64| match segment {
StaticFileSegment::Headers => {
let get_provider = |start: u64| {
if segment.is_block_based() {
self.get_segment_provider_from_block(segment, start, None)
}
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
} else {
self.get_segment_provider_from_transaction(segment, start, None)
}
};
@ -1046,11 +1065,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
T: std::fmt::Debug,
{
let get_provider = move |start: u64| match segment {
StaticFileSegment::Headers => {
let get_provider = move |start: u64| {
if segment.is_block_based() {
self.get_segment_provider_from_block(segment, start, None)
}
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
} else {
self.get_segment_provider_from_transaction(segment, start, None)
}
};
@ -1098,11 +1116,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
FD: Fn() -> ProviderResult<Option<T>>,
{
// If there is, check the maximum block or transaction number of the segment.
let static_file_upper_bound = match segment {
StaticFileSegment::Headers => self.get_highest_static_file_block(segment),
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
self.get_highest_static_file_tx(segment)
}
let static_file_upper_bound = if segment.is_block_based() {
self.get_highest_static_file_block(segment)
} else {
self.get_highest_static_file_tx(segment)
};
if static_file_upper_bound
@ -1140,11 +1157,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
let mut data = Vec::new();
// If there is, check the maximum block or transaction number of the segment.
if let Some(static_file_upper_bound) = match segment {
StaticFileSegment::Headers => self.get_highest_static_file_block(segment),
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
self.get_highest_static_file_tx(segment)
}
if let Some(static_file_upper_bound) = if segment.is_block_based() {
self.get_highest_static_file_block(segment)
} else {
self.get_highest_static_file_tx(segment)
} {
if block_or_tx_range.start <= static_file_upper_bound {
let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
@ -1665,25 +1681,56 @@ impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
impl<N: NodePrimitives> WithdrawalsProvider for StaticFileProvider<N> {
fn withdrawals_by_block(
&self,
_id: BlockHashOrNumber,
_timestamp: u64,
id: BlockHashOrNumber,
timestamp: u64,
) -> ProviderResult<Option<Withdrawals>> {
// Required data not present in static_files
if let Some(num) = id.as_number() {
return self
.get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
.and_then(|provider| provider.withdrawals_by_block(id, timestamp))
.or_else(|err| {
if let ProviderError::MissingStaticFileBlock(_, _) = err {
Ok(None)
} else {
Err(err)
}
})
}
// Only accepts block number queries
Err(ProviderError::UnsupportedProvider)
}
}
impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileProvider<N> {
fn ommers(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
// Required data not present in static_files
fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
if let Some(num) = id.as_number() {
return self
.get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
.and_then(|provider| provider.ommers(id))
.or_else(|err| {
if let ProviderError::MissingStaticFileBlock(_, _) = err {
Ok(None)
} else {
Err(err)
}
})
}
// Only accepts block number queries
Err(ProviderError::UnsupportedProvider)
}
}
impl<N: Send + Sync> BlockBodyIndicesProvider for StaticFileProvider<N> {
fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
// Required data not present in static_files
Err(ProviderError::UnsupportedProvider)
impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
self.get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
.and_then(|provider| provider.block_body_indices(num))
.or_else(|err| {
if let ProviderError::MissingStaticFileBlock(_, _) = err {
Ok(None)
} else {
Err(err)
}
})
}
}

View File

@ -6,6 +6,7 @@ use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
use reth_codecs::Compact;
use reth_db::models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals};
use reth_db_api::models::CompactU256;
use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
use reth_node_types::NodePrimitives;
@ -32,6 +33,7 @@ pub(crate) struct StaticFileWriters<N> {
headers: RwLock<Option<StaticFileProviderRW<N>>>,
transactions: RwLock<Option<StaticFileProviderRW<N>>>,
receipts: RwLock<Option<StaticFileProviderRW<N>>>,
block_meta: RwLock<Option<StaticFileProviderRW<N>>>,
}
impl<N> Default for StaticFileWriters<N> {
@ -40,6 +42,7 @@ impl<N> Default for StaticFileWriters<N> {
headers: Default::default(),
transactions: Default::default(),
receipts: Default::default(),
block_meta: Default::default(),
}
}
}
@ -54,6 +57,7 @@ impl<N: NodePrimitives> StaticFileWriters<N> {
StaticFileSegment::Headers => self.headers.write(),
StaticFileSegment::Transactions => self.transactions.write(),
StaticFileSegment::Receipts => self.receipts.write(),
StaticFileSegment::BlockMeta => self.block_meta.write(),
};
if write_guard.is_none() {
@ -230,6 +234,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
StaticFileSegment::Receipts => {
self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
}
StaticFileSegment::BlockMeta => todo!(),
}
}
@ -393,13 +398,10 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
let mut remaining_rows = num_rows;
let segment = self.writer.user_header().segment();
while remaining_rows > 0 {
let len = match segment {
StaticFileSegment::Headers => {
self.writer.user_header().block_len().unwrap_or_default()
}
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
self.writer.user_header().tx_len().unwrap_or_default()
}
let len = if segment.is_block_based() {
self.writer.user_header().block_len().unwrap_or_default()
} else {
self.writer.user_header().tx_len().unwrap_or_default()
};
if remaining_rows >= len {
@ -555,6 +557,61 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
Ok(())
}
/// Appends [`StoredBlockBodyIndices`], [`StoredBlockOmmers`] and [`StoredBlockWithdrawals`] to
/// static file.
///
/// It **CALLS** `increment_block()` since it's a block based segment.
pub fn append_eth_block_meta(
&mut self,
body_indices: &StoredBlockBodyIndices,
ommers: &StoredBlockOmmers<N::BlockHeader>,
withdrawals: &StoredBlockWithdrawals,
expected_block_number: BlockNumber,
) -> ProviderResult<()>
where
N::BlockHeader: Compact,
{
self.append_block_meta(body_indices, ommers, withdrawals, expected_block_number)
}
/// Appends [`StoredBlockBodyIndices`] and any other two arbitrary types belonging to the block
/// body to static file.
///
/// It **CALLS** `increment_block()` since it's a block based segment.
pub fn append_block_meta<F1, F2>(
&mut self,
body_indices: &StoredBlockBodyIndices,
field1: &F1,
field2: &F2,
expected_block_number: BlockNumber,
) -> ProviderResult<()>
where
N::BlockHeader: Compact,
F1: Compact,
F2: Compact,
{
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::BlockMeta);
self.increment_block(expected_block_number)?;
self.append_column(body_indices)?;
self.append_column(field1)?;
self.append_column(field2)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::BlockMeta,
StaticFileProviderOperation::Append,
Some(start.elapsed()),
);
}
Ok(())
}
/// Appends transaction to static file.
///
/// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
@ -682,6 +739,12 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
self.queue_prune(to_delete, None)
}
/// Adds an instruction to prune `to_delete` bloc_ meta rows during commit.
pub fn prune_block_meta(&mut self, to_delete: u64) -> ProviderResult<()> {
debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::BlockMeta);
self.queue_prune(to_delete, None)
}
/// Adds an instruction to prune `to_delete` elements during commit.
///
/// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based