feat: Make import service resumeable

This commit is contained in:
sprites0
2025-06-22 14:52:13 -04:00
parent 64b8ca79ec
commit 9113c5253c
2 changed files with 11 additions and 7 deletions

View File

@ -77,6 +77,7 @@ where
engine: BeaconConsensusEngineHandle<HlPayloadTypes>, engine: BeaconConsensusEngineHandle<HlPayloadTypes>,
from_network: UnboundedReceiver<IncomingBlock>, from_network: UnboundedReceiver<IncomingBlock>,
to_network: UnboundedSender<ImportEvent>, to_network: UnboundedSender<ImportEvent>,
height: u64,
) -> Self { ) -> Self {
Self { Self {
engine, engine,
@ -84,7 +85,7 @@ where
from_network, from_network,
to_network, to_network,
pending_imports: FuturesUnordered::new(), pending_imports: FuturesUnordered::new(),
height: 2000000, height,
} }
} }
@ -175,9 +176,14 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
let prev_height = this.height;
// Receive new blocks from network // Receive new blocks from network
while let Some(block) = collect_block(this.height) { while let Some(block) = collect_block(this.height) {
if this.height > prev_height + 1000 {
break;
}
let peer_id = PeerId::random(); let peer_id = PeerId::random();
let reth_block = block.to_reth_block(); let reth_block = block.to_reth_block();
let td = U128::from(reth_block.header().difficulty()); let td = U128::from(reth_block.header().difficulty());
@ -190,10 +196,6 @@ where
}; };
this.on_new_block(msg, peer_id); this.on_new_block(msg, peer_id);
this.height += 1; this.height += 1;
if this.height > 2000000 {
break;
}
} }
// Process completed imports and send events to network // Process completed imports and send events to network
@ -380,7 +382,7 @@ mod tests {
let handle = ImportHandle::new(to_import, import_outcome); let handle = ImportHandle::new(to_import, import_outcome);
let service = ImportService::new(consensus, engine_handle, from_network, to_network); let service = ImportService::new(consensus, engine_handle, from_network, to_network, 1);
tokio::spawn(Box::pin(async move { tokio::spawn(Box::pin(async move {
service.await.unwrap(); service.await.unwrap();
})); }));

View File

@ -11,6 +11,7 @@ use crate::{
HlBlock, HlBlock,
}; };
use alloy_rlp::{Decodable, Encodable}; use alloy_rlp::{Decodable, Encodable};
use reth_provider::BlockNumReader;
// use handshake::HlHandshake; // use handshake::HlHandshake;
use reth::{ use reth::{
api::{FullNodeTypes, TxTy}, api::{FullNodeTypes, TxTy},
@ -182,6 +183,7 @@ impl HlNetworkBuilder {
let consensus = Arc::new(HlConsensus { let consensus = Arc::new(HlConsensus {
provider: ctx.provider().clone(), provider: ctx.provider().clone(),
}); });
let number = ctx.provider().last_block_number().unwrap_or(1);
ctx.task_executor() ctx.task_executor()
.spawn_critical("block import", async move { .spawn_critical("block import", async move {
@ -193,7 +195,7 @@ impl HlNetworkBuilder {
.await .await
.unwrap(); .unwrap();
ImportService::new(consensus, handle, from_network, to_network) ImportService::new(consensus, handle, from_network, to_network, number)
.await .await
.unwrap(); .unwrap();
}); });