chore: add safety check on StaticFileProviderRW::increment_block (#7137)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
joshieDo
2024-03-14 21:55:57 +00:00
committed by GitHub
parent 41c6f24393
commit 3d86c4ac59
10 changed files with 117 additions and 16 deletions

View File

@ -125,6 +125,9 @@ pub enum ProviderError {
/// Static File is finalized and cannot be written to.
#[error("unable to write block #{1} to finalized static file {0}")]
FinalizedStaticFile(StaticFileSegment, BlockNumber),
/// Trying to insert data from an unexpected block number.
#[error("trying to append data to {0} as block #{1} but expected block #{2}")]
UnexpectedStaticFileBlockNumber(StaticFileSegment, BlockNumber, BlockNumber),
/// Error encountered when the block number conversion from U256 to u64 causes an overflow.
#[error("failed to convert block number U256 to u64: {0}")]
BlockNumberOverflow(U256),

View File

@ -179,8 +179,8 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// Increment block on static file header.
if block_number > 0 {
let appended_block_number =
static_file_producer.increment_block(StaticFileSegment::Transactions)?;
let appended_block_number = static_file_producer
.increment_block(StaticFileSegment::Transactions, block_number)?;
if appended_block_number != block_number {
// This scenario indicates a critical error in the logic of adding new

View File

@ -803,6 +803,12 @@ mod tests {
.unwrap()
.commit()
.unwrap();
{
let mut receipts_writer =
provider.static_file_provider().latest_writer(StaticFileSegment::Receipts).unwrap();
receipts_writer.increment_block(StaticFileSegment::Receipts, 0).unwrap();
receipts_writer.commit().unwrap();
}
provider.commit().unwrap();
// insert pre state
@ -948,6 +954,12 @@ mod tests {
.unwrap()
.commit()
.unwrap();
{
let mut receipts_writer =
provider.static_file_provider().latest_writer(StaticFileSegment::Receipts).unwrap();
receipts_writer.increment_block(StaticFileSegment::Receipts, 0).unwrap();
receipts_writer.commit().unwrap();
}
provider.commit().unwrap();
// variables
@ -1061,6 +1073,12 @@ mod tests {
.unwrap()
.commit()
.unwrap();
{
let mut receipts_writer =
provider.static_file_provider().latest_writer(StaticFileSegment::Receipts).unwrap();
receipts_writer.increment_block(StaticFileSegment::Receipts, 0).unwrap();
receipts_writer.commit().unwrap();
}
provider.commit().unwrap();
// variables

View File

@ -13,8 +13,8 @@ use reth_db::{
};
use reth_interfaces::{provider::ProviderResult, test_utils::generators::ChangeSet};
use reth_primitives::{
keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader, StorageEntry,
TxHash, TxNumber, B256, MAINNET, U256,
keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader,
StaticFileSegment, StorageEntry, TxHash, TxNumber, B256, MAINNET, U256,
};
use reth_provider::{
providers::{StaticFileProviderRWRefMut, StaticFileWriter},
@ -139,6 +139,17 @@ impl TestStageDB {
td: U256,
) -> ProviderResult<()> {
if let Some(writer) = writer {
// Backfill: some tests start at a forward block number, but static files require no
// gaps.
let segment_header = writer.user_header();
if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
for block_number in 0..header.number {
let mut prev = header.clone().unseal();
prev.number = block_number;
writer.append_header(prev, U256::ZERO, B256::ZERO)?;
}
}
writer.append_header(header.header().clone(), td, header.hash())?;
} else {
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
@ -155,7 +166,7 @@ impl TestStageDB {
I: IntoIterator<Item = &'a SealedHeader>,
{
let provider = self.factory.static_file_provider();
let mut writer = provider.latest_writer(reth_primitives::StaticFileSegment::Headers)?;
let mut writer = provider.latest_writer(StaticFileSegment::Headers)?;
let tx = self.factory.provider_rw()?.into_tx();
let mut td = U256::ZERO;
@ -210,8 +221,7 @@ impl TestStageDB {
let blocks = blocks.into_iter().collect::<Vec<_>>();
{
let mut headers_writer =
provider.latest_writer(reth_primitives::StaticFileSegment::Headers)?;
let mut headers_writer = provider.latest_writer(StaticFileSegment::Headers)?;
blocks.iter().try_for_each(|block| {
Self::insert_header(Some(&mut headers_writer), &tx, &block.header, U256::ZERO)
@ -221,9 +231,9 @@ impl TestStageDB {
}
{
let mut txs_writer = storage_kind.is_static().then(|| {
provider.latest_writer(reth_primitives::StaticFileSegment::Transactions).unwrap()
});
let mut txs_writer = storage_kind
.is_static()
.then(|| provider.latest_writer(StaticFileSegment::Transactions).unwrap());
blocks.into_iter().try_for_each(|block| {
// Insert into body tables.
@ -251,7 +261,17 @@ impl TestStageDB {
});
if let Some(txs_writer) = &mut txs_writer {
txs_writer.increment_block(reth_primitives::StaticFileSegment::Transactions)?;
// Backfill: some tests start at a forward block number, but static files
// require no gaps.
let segment_header = txs_writer.user_header();
if segment_header.block_end().is_none() &&
segment_header.expected_block_start() == 0
{
for block in 0..block.number {
txs_writer.increment_block(StaticFileSegment::Transactions, block)?;
}
}
txs_writer.increment_block(StaticFileSegment::Transactions, block.number)?;
}
res
})?;

View File

@ -34,7 +34,7 @@ impl<DB: Database> Segment<DB> for Receipts {
for block in block_range {
let _static_file_block =
static_file_writer.increment_block(StaticFileSegment::Receipts)?;
static_file_writer.increment_block(StaticFileSegment::Receipts, block)?;
debug_assert_eq!(_static_file_block, block);
let block_body_indices = provider

View File

@ -36,7 +36,7 @@ impl<DB: Database> Segment<DB> for Transactions {
for block in block_range {
let _static_file_block =
static_file_writer.increment_block(StaticFileSegment::Transactions)?;
static_file_writer.increment_block(StaticFileSegment::Transactions, block)?;
debug_assert_eq!(_static_file_block, block);
let block_body_indices = provider

View File

@ -324,7 +324,7 @@ impl BundleStateWithReceipts {
if let Some(static_file_producer) = &mut static_file_producer {
// Increment block on static file header.
static_file_producer.increment_block(StaticFileSegment::Receipts)?;
static_file_producer.increment_block(StaticFileSegment::Receipts, block_number)?;
for (tx_idx, receipt) in receipts.into_iter().enumerate() {
let receipt = receipt

View File

@ -181,6 +181,17 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
};
let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
// Backfill: some tests start at a forward block number, but static files require no gaps.
let segment_header = writer.user_header();
if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
for block_number in 0..block.number {
let mut prev = block.header.clone().unseal();
prev.number = block_number;
writer.append_header(prev, U256::ZERO, B256::ZERO)?;
}
}
writer.append_header(block.header.as_ref().clone(), ttd, block.hash())?;
self.insert_block(block, prune_modes)

View File

@ -185,7 +185,13 @@ impl StaticFileProviderRW {
/// and create the next one if we are past the end range.
///
/// Returns the current [`BlockNumber`] as seen in the static file.
pub fn increment_block(&mut self, segment: StaticFileSegment) -> ProviderResult<BlockNumber> {
pub fn increment_block(
&mut self,
segment: StaticFileSegment,
expected_block_number: BlockNumber,
) -> ProviderResult<BlockNumber> {
self.check_next_block_number(expected_block_number, segment)?;
let start = Instant::now();
if let Some(last_block) = self.writer.user_header().block_end() {
// We have finished the previous static file and must freeze it
@ -216,6 +222,33 @@ impl StaticFileProviderRW {
Ok(block)
}
/// Verifies if the incoming block number matches the next expected block number
/// for a static file. This ensures data continuity when adding new blocks.
fn check_next_block_number(
&mut self,
expected_block_number: u64,
segment: StaticFileSegment,
) -> ProviderResult<()> {
// The next static file block number can be found by checking the one after block_end.
// However if it's a new file that hasn't been added any data, its block range will actually
// be None. In that case, the next block will be found on `expected_block_start`.
let next_static_file_block = self
.writer
.user_header()
.block_end()
.map(|b| b + 1)
.unwrap_or(self.writer.user_header().expected_block_start());
if expected_block_number != next_static_file_block {
return Err(ProviderError::UnexpectedStaticFileBlockNumber(
segment,
expected_block_number,
next_static_file_block,
))
}
Ok(())
}
/// Truncates a number of rows from disk. It deletes and loads an older static file if block
/// goes beyond the start of the current block range.
///
@ -342,7 +375,7 @@ impl StaticFileProviderRW {
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
let block_number = self.increment_block(StaticFileSegment::Headers)?;
let block_number = self.increment_block(StaticFileSegment::Headers, header.number)?;
self.append_column(header)?;
self.append_column(CompactU256::from(terminal_difficulty))?;
@ -511,6 +544,12 @@ impl StaticFileProviderRW {
pub fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
}
#[cfg(any(test, feature = "test-utils"))]
/// Helper function to access [`SegmentHeader`].
pub fn user_header(&self) -> &SegmentHeader {
self.writer.user_header()
}
}
fn create_jar(

View File

@ -106,6 +106,16 @@ impl Case for BlockchainTestCase {
.map_err(|err| Error::RethError(err.into()))?;
case.pre.write_to_db(provider.tx_ref())?;
// Initialize receipts static file with genesis
{
let mut receipts_writer = provider
.static_file_provider()
.latest_writer(StaticFileSegment::Receipts)
.unwrap();
receipts_writer.increment_block(StaticFileSegment::Receipts, 0).unwrap();
receipts_writer.commit_without_sync_all().unwrap();
}
// Decode and insert blocks, creating a chain of blocks for the test case.
let last_block = case.blocks.iter().try_fold(None, |_, block| {
let decoded = SealedBlock::decode(&mut block.rlp.as_ref())?;