refactor: unify logic for blocks removal (#12743)

Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
Arsenii Kulikov
2024-11-21 23:47:33 +04:00
committed by GitHub
parent edeacbecfb
commit 0558235b98
9 changed files with 267 additions and 536 deletions

View File

@ -10,10 +10,7 @@ use tracing::*;
use alloy_primitives::TxNumber;
use reth_db::{tables, transaction::DbTx};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
transaction::DbTxMut,
};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTxMut};
use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
use reth_primitives::StaticFileSegment;
use reth_provider::{
@ -70,6 +67,82 @@ impl<D: BodyDownloader> BodyStage<D> {
pub const fn new(downloader: D) -> Self {
Self { downloader, buffer: None }
}
/// Ensures that static files and database are in sync.
fn ensure_consistency<Provider>(
&self,
provider: &Provider,
unwind_block: Option<u64>,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut> + BlockReader + StaticFileProviderFactory,
{
// Get id for the next tx_num of zero if there are no transactions.
let next_tx_num = provider
.tx_ref()
.cursor_read::<tables::TransactionBlocks>()?
.last()?
.map(|(id, _)| id + 1)
.unwrap_or_default();
let static_file_provider = provider.static_file_provider();
// Make sure Transactions static file is at the same height. If it's further, this
// input execution was interrupted previously and we need to unwind the static file.
let next_static_file_tx_num = static_file_provider
.get_highest_static_file_tx(StaticFileSegment::Transactions)
.map(|id| id + 1)
.unwrap_or_default();
match next_static_file_tx_num.cmp(&next_tx_num) {
// If static files are ahead, we are currently unwinding the stage or we didn't reach
// the database commit in a previous stage run. So, our only solution is to unwind the
// static files and proceed from the database expected height.
Ordering::Greater => {
let highest_db_block =
provider.tx_ref().entries::<tables::BlockBodyIndices>()? as u64;
let mut static_file_producer =
static_file_provider.latest_writer(StaticFileSegment::Transactions)?;
static_file_producer
.prune_transactions(next_static_file_tx_num - next_tx_num, highest_db_block)?;
// Since this is a database <-> static file inconsistency, we commit the change
// straight away.
static_file_producer.commit()?;
}
// If static files are behind, then there was some corruption or loss of files. This
// error will trigger an unwind, that will bring the database to the same height as the
// static files.
Ordering::Less => {
// If we are already in the process of unwind, this might be fine because we will
// fix the inconsistency right away.
if let Some(unwind_to) = unwind_block {
let next_tx_num_after_unwind = provider
.tx_ref()
.get::<tables::BlockBodyIndices>(unwind_to)?
.map(|b| b.next_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?;
// This means we need a deeper unwind.
if next_tx_num_after_unwind > next_static_file_tx_num {
return Err(missing_static_data_error(
next_static_file_tx_num.saturating_sub(1),
&static_file_provider,
provider,
)?)
}
} else {
return Err(missing_static_data_error(
next_static_file_tx_num.saturating_sub(1),
&static_file_provider,
provider,
)?)
}
}
Ordering::Equal => {}
}
Ok(())
}
}
impl<Provider, D> Stage<Provider> for BodyStage<D>
@ -122,50 +195,9 @@ where
}
let (from_block, to_block) = input.next_block_range().into_inner();
// Get id for the next tx_num of zero if there are no transactions.
let next_tx_num = provider
.tx_ref()
.cursor_read::<tables::TransactionBlocks>()?
.last()?
.map(|(id, _)| id + 1)
.unwrap_or_default();
self.ensure_consistency(provider, None)?;
let static_file_provider = provider.static_file_provider();
// Make sure Transactions static file is at the same height. If it's further, this
// input execution was interrupted previously and we need to unwind the static file.
let next_static_file_tx_num = static_file_provider
.get_highest_static_file_tx(StaticFileSegment::Transactions)
.map(|id| id + 1)
.unwrap_or_default();
match next_static_file_tx_num.cmp(&next_tx_num) {
// If static files are ahead, then we didn't reach the database commit in a previous
// stage run. So, our only solution is to unwind the static files and proceed from the
// database expected height.
Ordering::Greater => {
let mut static_file_producer =
static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
static_file_producer
.prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?;
// Since this is a database <-> static file inconsistency, we commit the change
// straight away.
static_file_producer.commit()?;
}
// If static files are behind, then there was some corruption or loss of files. This
// error will trigger an unwind, that will bring the database to the same height as the
// static files.
Ordering::Less => {
return Err(missing_static_data_error(
next_static_file_tx_num.saturating_sub(1),
&static_file_provider,
provider,
)?)
}
Ordering::Equal => {}
}
debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, start_tx_id = next_tx_num, "Commencing sync");
debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, "Commencing sync");
let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?;
trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks");
@ -200,66 +232,8 @@ where
) -> Result<UnwindOutput, StageError> {
self.buffer.take();
let static_file_provider = provider.static_file_provider();
let tx = provider.tx_ref();
// Cursors to unwind bodies, ommers
let mut body_cursor = tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut ommers_cursor = tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = tx.cursor_write::<tables::BlockWithdrawals>()?;
// Cursors to unwind transitions
let mut tx_block_cursor = tx.cursor_write::<tables::TransactionBlocks>()?;
let mut rev_walker = body_cursor.walk_back(None)?;
while let Some((number, block_meta)) = rev_walker.next().transpose()? {
if number <= input.unwind_to {
break
}
// Delete the ommers entry if any
if ommers_cursor.seek_exact(number)?.is_some() {
ommers_cursor.delete_current()?;
}
// Delete the withdrawals entry if any
if withdrawals_cursor.seek_exact(number)?.is_some() {
withdrawals_cursor.delete_current()?;
}
// Delete all transaction to block values.
if !block_meta.is_empty() &&
tx_block_cursor.seek_exact(block_meta.last_tx_num())?.is_some()
{
tx_block_cursor.delete_current()?;
}
// Delete the current body value
rev_walker.delete_current()?;
}
let mut static_file_producer =
static_file_provider.latest_writer(StaticFileSegment::Transactions)?;
// Unwind from static files. Get the current last expected transaction from DB, and match it
// on static file
let db_tx_num =
body_cursor.last()?.map(|(_, block_meta)| block_meta.last_tx_num()).unwrap_or_default();
let static_file_tx_num: u64 = static_file_provider
.get_highest_static_file_tx(StaticFileSegment::Transactions)
.unwrap_or_default();
// If there are more transactions on database, then we are missing static file data and we
// need to unwind further.
if db_tx_num > static_file_tx_num {
return Err(missing_static_data_error(
static_file_tx_num,
&static_file_provider,
provider,
)?)
}
// Unwinds static file
static_file_producer
.prune_transactions(static_file_tx_num.saturating_sub(db_tx_num), input.unwind_to)?;
self.ensure_consistency(provider, Some(input.unwind_to))?;
provider.remove_bodies_above(input.unwind_to, StorageLocation::Both)?;
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(input.unwind_to)
@ -268,6 +242,8 @@ where
}
}
/// Called when database is ahead of static files. Attempts to find the first block we are missing
/// transactions for.
fn missing_static_data_error<Provider>(
last_tx_num: TxNumber,
static_file_provider: &StaticFileProvider<Provider::Primitives>,