This commit is contained in:
Kamal Nayan
2025-08-24 21:11:30 +00:00
committed by GitHub
2 changed files with 65 additions and 24 deletions

View File

@ -51,17 +51,20 @@ struct ScanResult {
new_blocks: Vec<BlockAndReceipts>,
}
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Result<ScanResult, Box<dyn std::error::Error>> {
// info!(
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
// path, start_height, last_line
// );
let file = std::fs::File::open(path).expect("Failed to open hour file path");
let file = std::fs::File::open(path)
.map_err(|e| format!("Failed to open hour file path {}: {}", path.display(), e))?;
let reader = BufReader::new(file);
let mut new_blocks = Vec::<BlockAndReceipts>::new();
let mut last_height = start_height;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
let lines: Vec<String> = reader.lines()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?;
let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 };
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
@ -109,7 +112,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
}
}
ScanResult { next_expected_height: last_height + 1, new_blocks }
Ok(ScanResult { next_expected_height: last_height + 1, new_blocks })
}
async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
@ -128,7 +131,8 @@ async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
engine_api_client,
envelope.execution_payload,
versioned_hashes,
payload_builder_attributes.parent_beacon_block_root.unwrap(),
payload_builder_attributes.parent_beacon_block_root
.ok_or("Missing required parent_beacon_block_root")?,
)
.await?
};
@ -144,7 +148,9 @@ fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime {
}
fn date_from_datetime(dt: OffsetDateTime) -> String {
dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap()
// Format string is constant and guaranteed to be valid
dt.format(&format_description::parse("[year][month][day]").expect("Valid format string"))
.expect("DateTime formatting should always succeed with valid format")
}
impl BlockIngest {
@ -166,7 +172,18 @@ impl BlockIngest {
let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.ingest_dir.to_string_lossy());
let file = std::fs::read(path).ok()?;
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder).unwrap();
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)
.map_err(|e| {
tracing::error!("Failed to deserialize block data for height {}: {}", height, e);
e
})
.ok()?;
if blocks.is_empty() {
tracing::error!("Deserialized empty blocks vector for height {}", height);
return None;
}
info!("Returning s3 synced block for @ Height [{height}]");
Some(blocks[0].clone())
}
@ -186,11 +203,11 @@ impl BlockIngest {
let mut next_height = current_head;
let mut dt = datetime_from_timestamp(current_ts)
.replace_minute(0)
.unwrap()
.expect("Valid minute replacement")
.replace_second(0)
.unwrap()
.expect("Valid second replacement")
.replace_nanosecond(0)
.unwrap();
.expect("Valid nanosecond replacement");
let mut hour = dt.hour();
let mut day_str = date_from_datetime(dt);
@ -200,9 +217,10 @@ impl BlockIngest {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() {
let ScanResult { next_expected_height, new_blocks } =
scan_hour_file(&hour_file, &mut last_line, next_height);
if !new_blocks.is_empty() {
let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height);
match scan_result {
Ok(ScanResult { next_expected_height, new_blocks }) => {
if !new_blocks.is_empty() {
let mut u_cache = cache.lock().await;
let mut u_pre_cache = precompiles_cache.lock();
for blk in new_blocks {
@ -221,6 +239,12 @@ impl BlockIngest {
}
next_height = next_expected_height;
}
}
Err(e) => {
tracing::error!("Failed to scan hour file {}: {}", hour_file.display(), e);
// Continue processing but skip this file
}
}
}
// Decide whether the *current* hour file is closed (past) or
@ -269,8 +293,10 @@ impl BlockIngest {
let mut height = head + 1;
let mut previous_hash = provider.block_hash(head)?.unwrap_or(genesis_hash);
let mut previous_timestamp =
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis();
let mut previous_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("System time should be after UNIX epoch")
.as_millis();
let engine_api = node.auth_server_handle().http_client();
let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?;
@ -278,8 +304,8 @@ impl BlockIngest {
const MINIMUM_TIMESTAMP: u64 = 1739849780;
let current_block_timestamp: u64 = provider
.block_by_number(head)
.expect("Failed to fetch current block in db")
.expect("Block does not exist")
.map_err(|e| format!("Database error fetching block {}: {}", head, e))?
.ok_or_else(|| format!("Block {} does not exist in database", head))?
.into_header()
.timestamp();
@ -376,11 +402,11 @@ impl BlockIngest {
let current_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.expect("System time should be after UNIX epoch")
.as_millis();
if height % 100 == 0 || current_timestamp - previous_timestamp > 100 {
EngineApiClient::<Engine>::fork_choice_updated_v2(
if let Err(e) = EngineApiClient::<Engine>::fork_choice_updated_v2(
&engine_api,
ForkchoiceState {
head_block_hash: block_hash,
@ -389,8 +415,10 @@ impl BlockIngest {
},
None,
)
.await
.unwrap();
.await {
tracing::error!("Failed to update fork choice for block {}: {}", height, e);
// Continue processing but log the failure - don't panic the entire blockchain
}
previous_timestamp = current_timestamp;
}
previous_hash = block_hash;

View File

@ -2278,7 +2278,16 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for num in block_body.tx_num_range() {
if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
block_receipts.push(receipts_iter.next().unwrap().1);
// Safe to unwrap here since we just peeked and confirmed it exists
// However, for maximum safety in database operations, we still handle it
if let Some((_, receipt)) = receipts_iter.next() {
block_receipts.push(receipt);
} else {
// This should never happen based on peek(), but handle gracefully
return Err(ProviderError::Database(reth_db::DatabaseError::Other(
"Receipt iterator state mismatch during state reconstruction".into()
)));
}
}
}
receipts.push(block_receipts);
@ -3070,9 +3079,13 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
return Ok(());
}
let first_number = blocks.first().unwrap().number();
// Safe to unwrap after empty check, but use defensive programming for critical DB ops
let first_number = blocks.first()
.expect("Blocks vector guaranteed non-empty after length check")
.number();
let last = blocks.last().unwrap();
let last = blocks.last()
.expect("Blocks vector guaranteed non-empty after length check");
let last_block_number = last.number();
let mut durations_recorder = metrics::DurationsRecorder::default();