feat: make bodies downloader generic over header (#13259)

This commit is contained in:
Dan Cline
2024-12-10 04:29:51 -05:00
committed by GitHub
parent da99986ea2
commit 15470b4350
10 changed files with 112 additions and 92 deletions

View File

@ -35,11 +35,11 @@ use tracing::info;
/// All blocks in a batch are fetched at the same time.
#[must_use = "Stream does nothing unless polled"]
#[derive(Debug)]
pub struct BodiesDownloader<B: BodiesClient, Provider> {
pub struct BodiesDownloader<B: BodiesClient, Provider: HeaderProvider> {
/// The bodies client
client: Arc<B>,
/// The consensus client
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
consensus: Arc<dyn Consensus<Provider::Header, B::Body>>,
/// The database handle
provider: Provider,
/// The maximum number of non-empty blocks per one request
@ -57,11 +57,11 @@ pub struct BodiesDownloader<B: BodiesClient, Provider> {
/// The latest block number returned.
latest_queued_block_number: Option<BlockNumber>,
/// Requests in progress
in_progress_queue: BodiesRequestQueue<B>,
in_progress_queue: BodiesRequestQueue<Provider::Header, B>,
/// Buffered responses
buffered_responses: BinaryHeap<OrderedBodiesResponse<B::Body>>,
buffered_responses: BinaryHeap<OrderedBodiesResponse<Provider::Header, B::Body>>,
/// Queued body responses that can be returned for insertion into the database.
queued_bodies: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
queued_bodies: Vec<BlockResponse<Provider::Header, B::Body>>,
/// The bodies downloader metrics.
metrics: BodyDownloaderMetrics,
}
@ -69,7 +69,7 @@ pub struct BodiesDownloader<B: BodiesClient, Provider> {
impl<B, Provider> BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: InMemorySize> + 'static,
Provider: HeaderProvider + Unpin + 'static,
Provider: HeaderProvider<Header: BlockHeader> + Unpin + 'static,
{
/// Returns the next contiguous request.
fn next_headers_request(&self) -> DownloadResult<Option<Vec<SealedHeader<Provider::Header>>>> {
@ -193,14 +193,16 @@ where
}
/// Queues bodies and sets the latest queued block number
fn queue_bodies(&mut self, bodies: Vec<BlockResponse<alloy_consensus::Header, B::Body>>) {
fn queue_bodies(&mut self, bodies: Vec<BlockResponse<Provider::Header, B::Body>>) {
self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number());
self.queued_bodies.extend(bodies);
self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
}
/// Removes the next response from the buffer.
fn pop_buffered_response(&mut self) -> Option<OrderedBodiesResponse<B::Body>> {
fn pop_buffered_response(
&mut self,
) -> Option<OrderedBodiesResponse<Provider::Header, B::Body>> {
let resp = self.buffered_responses.pop()?;
self.metrics.buffered_responses.decrement(1.);
self.buffered_blocks_size_bytes -= resp.size();
@ -210,13 +212,10 @@ where
}
/// Adds a new response to the internal buffer
fn buffer_bodies_response(
&mut self,
response: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
) {
fn buffer_bodies_response(&mut self, response: Vec<BlockResponse<Provider::Header, B::Body>>) {
// take into account capacity
let size = response.iter().map(BlockResponse::size).sum::<usize>() +
response.capacity() * mem::size_of::<BlockResponse<B::Body>>();
response.capacity() * mem::size_of::<BlockResponse<Provider::Header, B::Body>>();
let response = OrderedBodiesResponse { resp: response, size };
let response_len = response.len();
@ -230,9 +229,7 @@ where
}
/// Returns a response if it's first block number matches the next expected.
fn try_next_buffered(
&mut self,
) -> Option<Vec<BlockResponse<alloy_consensus::Header, B::Body>>> {
fn try_next_buffered(&mut self) -> Option<Vec<BlockResponse<Provider::Header, B::Body>>> {
if let Some(next) = self.buffered_responses.peek() {
let expected = self.next_expected_block_number();
let next_block_range = next.block_range();
@ -258,9 +255,7 @@ where
/// Returns the next batch of block bodies that can be returned if we have enough buffered
/// bodies
fn try_split_next_batch(
&mut self,
) -> Option<Vec<BlockResponse<alloy_consensus::Header, B::Body>>> {
fn try_split_next_batch(&mut self) -> Option<Vec<BlockResponse<Provider::Header, B::Body>>> {
if self.queued_bodies.len() >= self.stream_batch_size {
let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>();
self.queued_bodies.shrink_to_fit();
@ -292,12 +287,17 @@ where
Self: BodyDownloader + 'static,
{
/// Spawns the downloader task via [`tokio::task::spawn`]
pub fn into_task(self) -> TaskDownloader<<Self as BodyDownloader>::Body> {
pub fn into_task(
self,
) -> TaskDownloader<<Self as BodyDownloader>::Header, <Self as BodyDownloader>::Body> {
self.into_task_with(&TokioTaskExecutor::default())
}
/// Convert the downloader into a [`TaskDownloader`] by spawning it via the given spawner.
pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader<<Self as BodyDownloader>::Body>
pub fn into_task_with<S>(
self,
spawner: &S,
) -> TaskDownloader<<Self as BodyDownloader>::Header, <Self as BodyDownloader>::Body>
where
S: TaskSpawner,
{
@ -308,8 +308,9 @@ where
impl<B, Provider> BodyDownloader for BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: Debug + InMemorySize> + 'static,
Provider: HeaderProvider<Header = alloy_consensus::Header> + Unpin + 'static,
Provider: HeaderProvider + Unpin + 'static,
{
type Header = Provider::Header;
type Body = B::Body;
/// Set a new download range (exclusive).
@ -358,9 +359,9 @@ where
impl<B, Provider> Stream for BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: InMemorySize> + 'static,
Provider: HeaderProvider<Header = alloy_consensus::Header> + Unpin + 'static,
Provider: HeaderProvider + Unpin + 'static,
{
type Item = BodyDownloaderResult<B::Body>;
type Item = BodyDownloaderResult<Provider::Header, B::Body>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
@ -442,13 +443,28 @@ where
}
#[derive(Debug)]
struct OrderedBodiesResponse<B> {
resp: Vec<BlockResponse<alloy_consensus::Header, B>>,
struct OrderedBodiesResponse<H, B> {
resp: Vec<BlockResponse<H, B>>,
/// The total size of the response in bytes
size: usize,
}
impl<B> OrderedBodiesResponse<B> {
impl<H, B> OrderedBodiesResponse<H, B> {
#[inline]
fn len(&self) -> usize {
self.resp.len()
}
/// Returns the size of the response in bytes
///
/// See [`BlockResponse::size`]
#[inline]
const fn size(&self) -> usize {
self.size
}
}
impl<H: BlockHeader, B> OrderedBodiesResponse<H, B> {
/// Returns the block number of the first element
///
/// # Panics
@ -464,36 +480,23 @@ impl<B> OrderedBodiesResponse<B> {
fn block_range(&self) -> RangeInclusive<u64> {
self.first_block_number()..=self.resp.last().expect("is not empty").block_number()
}
#[inline]
fn len(&self) -> usize {
self.resp.len()
}
/// Returns the size of the response in bytes
///
/// See [`BlockResponse::size`]
#[inline]
const fn size(&self) -> usize {
self.size
}
}
impl<B> PartialEq for OrderedBodiesResponse<B> {
impl<H: BlockHeader, B> PartialEq for OrderedBodiesResponse<H, B> {
fn eq(&self, other: &Self) -> bool {
self.first_block_number() == other.first_block_number()
}
}
impl<B> Eq for OrderedBodiesResponse<B> {}
impl<H: BlockHeader, B> Eq for OrderedBodiesResponse<H, B> {}
impl<B> PartialOrd for OrderedBodiesResponse<B> {
impl<H: BlockHeader, B> PartialOrd for OrderedBodiesResponse<H, B> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<B> Ord for OrderedBodiesResponse<B> {
impl<H: BlockHeader, B> Ord for OrderedBodiesResponse<H, B> {
fn cmp(&self, other: &Self) -> Ordering {
self.first_block_number().cmp(&other.first_block_number()).reverse()
}
@ -573,7 +576,7 @@ impl BodiesDownloaderBuilder {
pub fn build<B, Provider>(
self,
client: B,
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
consensus: Arc<dyn Consensus<Provider::Header, B::Body>>,
provider: Provider,
) -> BodiesDownloader<B, Provider>
where

View File

@ -9,18 +9,24 @@ use std::{fmt::Debug, ops::RangeInclusive};
/// A [`BodyDownloader`] implementation that does nothing.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct NoopBodiesDownloader<B>(std::marker::PhantomData<B>);
pub struct NoopBodiesDownloader<H, B> {
_header: std::marker::PhantomData<H>,
_body: std::marker::PhantomData<B>,
}
impl<B: Debug + Send + Sync + Unpin + 'static> BodyDownloader for NoopBodiesDownloader<B> {
impl<H: Debug + Send + Sync + Unpin + 'static, B: Debug + Send + Sync + Unpin + 'static>
BodyDownloader for NoopBodiesDownloader<H, B>
{
type Body = B;
type Header = H;
fn set_download_range(&mut self, _: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
Ok(())
}
}
impl<B> Stream for NoopBodiesDownloader<B> {
type Item = Result<Vec<BlockResponse<alloy_consensus::Header, B>>, DownloadError>;
impl<H, B> Stream for NoopBodiesDownloader<H, B> {
type Item = Result<Vec<BlockResponse<H, B>>, DownloadError>;
fn poll_next(
self: std::pin::Pin<&mut Self>,

View File

@ -1,5 +1,6 @@
use super::request::BodiesRequestFuture;
use crate::metrics::BodyDownloaderMetrics;
use alloy_consensus::BlockHeader;
use alloy_primitives::BlockNumber;
use futures::{stream::FuturesUnordered, Stream};
use futures_util::StreamExt;
@ -19,18 +20,19 @@ use std::{
/// The wrapper around [`FuturesUnordered`] that keeps information
/// about the blocks currently being requested.
#[derive(Debug)]
pub(crate) struct BodiesRequestQueue<B: BodiesClient> {
pub(crate) struct BodiesRequestQueue<H, B: BodiesClient> {
/// Inner body request queue.
inner: FuturesUnordered<BodiesRequestFuture<B>>,
inner: FuturesUnordered<BodiesRequestFuture<H, B>>,
/// The downloader metrics.
metrics: BodyDownloaderMetrics,
/// Last requested block number.
pub(crate) last_requested_block_number: Option<BlockNumber>,
}
impl<B> BodiesRequestQueue<B>
impl<H, B> BodiesRequestQueue<H, B>
where
B: BodiesClient + 'static,
H: BlockHeader,
{
/// Create new instance of request queue.
pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self {
@ -58,15 +60,15 @@ where
pub(crate) fn push_new_request(
&mut self,
client: Arc<B>,
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
request: Vec<SealedHeader>,
consensus: Arc<dyn Consensus<H, B::Body>>,
request: Vec<SealedHeader<H>>,
) {
// Set last max requested block number
self.last_requested_block_number = request
.last()
.map(|last| match self.last_requested_block_number {
Some(num) => last.number.max(num),
None => last.number,
Some(num) => last.number().max(num),
None => last.number(),
})
.or(self.last_requested_block_number);
// Create request and push into the queue.
@ -76,11 +78,12 @@ where
}
}
impl<B> Stream for BodiesRequestQueue<B>
impl<H, B> Stream for BodiesRequestQueue<H, B>
where
H: BlockHeader + Send + Sync + Unpin + 'static,
B: BodiesClient<Body: InMemorySize> + 'static,
{
type Item = DownloadResult<Vec<BlockResponse<alloy_consensus::Header, B::Body>>>;
type Item = DownloadResult<Vec<BlockResponse<H, B::Body>>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().inner.poll_next_unpin(cx)

View File

@ -38,30 +38,31 @@ use std::{
/// All errors regarding the response cause the peer to get penalized, meaning that adversaries
/// that try to give us bodies that do not match the requested order are going to be penalized
/// and eventually disconnected.
pub(crate) struct BodiesRequestFuture<B: BodiesClient> {
pub(crate) struct BodiesRequestFuture<H, B: BodiesClient> {
client: Arc<B>,
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
consensus: Arc<dyn Consensus<H, B::Body>>,
metrics: BodyDownloaderMetrics,
/// Metrics for individual responses. This can be used to observe how the size (in bytes) of
/// responses change while bodies are being downloaded.
response_metrics: ResponseMetrics,
// Headers to download. The collection is shrunk as responses are buffered.
pending_headers: VecDeque<SealedHeader>,
pending_headers: VecDeque<SealedHeader<H>>,
/// Internal buffer for all blocks
buffer: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
buffer: Vec<BlockResponse<H, B::Body>>,
fut: Option<B::Output>,
/// Tracks how many bodies we requested in the last request.
last_request_len: Option<usize>,
}
impl<B> BodiesRequestFuture<B>
impl<H, B> BodiesRequestFuture<H, B>
where
B: BodiesClient + 'static,
H: BlockHeader,
{
/// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request.
pub(crate) fn new(
client: Arc<B>,
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
consensus: Arc<dyn Consensus<H, B::Body>>,
metrics: BodyDownloaderMetrics,
) -> Self {
Self {
@ -76,7 +77,7 @@ where
}
}
pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader>) -> Self {
pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader<H>>) -> Self {
self.buffer.reserve_exact(headers.len());
self.pending_headers = VecDeque::from(headers);
// Submit the request only if there are any headers to download.
@ -192,7 +193,7 @@ where
if let Err(error) = self.consensus.validate_block_pre_execution(&block) {
// Body is invalid, put the header back and return an error
let hash = block.hash();
let number = block.number;
let number = block.number();
self.pending_headers.push_front(block.header);
return Err(DownloadError::BodyValidation {
hash,
@ -213,11 +214,12 @@ where
}
}
impl<B> Future for BodiesRequestFuture<B>
impl<H, B> Future for BodiesRequestFuture<H, B>
where
H: BlockHeader + Unpin + Send + Sync + 'static,
B: BodiesClient<Body: InMemorySize> + 'static,
{
type Output = DownloadResult<Vec<BlockResponse<alloy_consensus::Header, B::Body>>>;
type Output = DownloadResult<Vec<BlockResponse<H, B::Body>>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

View File

@ -24,15 +24,15 @@ pub const BODIES_TASK_BUFFER_SIZE: usize = 4;
/// A [BodyDownloader] that drives a spawned [BodyDownloader] on a spawned task.
#[derive(Debug)]
#[pin_project]
pub struct TaskDownloader<B> {
pub struct TaskDownloader<H, B> {
#[pin]
from_downloader: ReceiverStream<BodyDownloaderResult<B>>,
from_downloader: ReceiverStream<BodyDownloaderResult<H, B>>,
to_downloader: UnboundedSender<RangeInclusive<BlockNumber>>,
}
// === impl TaskDownloader ===
impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
impl<H: Send + Sync + Unpin + 'static, B: Send + Sync + Unpin + 'static> TaskDownloader<H, B> {
/// Spawns the given `downloader` via [`tokio::task::spawn`] returns a [`TaskDownloader`] that's
/// connected to that task.
///
@ -64,7 +64,7 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
/// ```
pub fn spawn<T>(downloader: T) -> Self
where
T: BodyDownloader<Body = B> + 'static,
T: BodyDownloader<Header = H, Body = B> + 'static,
{
Self::spawn_with(downloader, &TokioTaskExecutor::default())
}
@ -73,7 +73,7 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
/// that's connected to that task.
pub fn spawn_with<T, S>(downloader: T, spawner: &S) -> Self
where
T: BodyDownloader<Body = B> + 'static,
T: BodyDownloader<Header = H, Body = B> + 'static,
S: TaskSpawner,
{
let (bodies_tx, bodies_rx) = mpsc::channel(BODIES_TASK_BUFFER_SIZE);
@ -91,7 +91,10 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
}
}
impl<B: Debug + Send + Sync + Unpin + 'static> BodyDownloader for TaskDownloader<B> {
impl<H: Debug + Send + Sync + Unpin + 'static, B: Debug + Send + Sync + Unpin + 'static>
BodyDownloader for TaskDownloader<H, B>
{
type Header = H;
type Body = B;
fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
@ -100,8 +103,8 @@ impl<B: Debug + Send + Sync + Unpin + 'static> BodyDownloader for TaskDownloader
}
}
impl<B> Stream for TaskDownloader<B> {
type Item = BodyDownloaderResult<B>;
impl<H, B> Stream for TaskDownloader<H, B> {
type Item = BodyDownloaderResult<H, B>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().from_downloader.poll_next(cx)
@ -111,7 +114,7 @@ impl<B> Stream for TaskDownloader<B> {
/// A [`BodyDownloader`] that runs on its own task
struct SpawnedDownloader<T: BodyDownloader> {
updates: UnboundedReceiverStream<RangeInclusive<BlockNumber>>,
bodies_tx: PollSender<BodyDownloaderResult<T::Body>>,
bodies_tx: PollSender<BodyDownloaderResult<T::Header, T::Body>>,
downloader: T,
}

View File

@ -5,7 +5,7 @@ use futures::Stream;
use std::{fmt::Debug, ops::RangeInclusive};
/// Body downloader return type.
pub type BodyDownloaderResult<B> = DownloadResult<Vec<BlockResponse<alloy_consensus::Header, B>>>;
pub type BodyDownloaderResult<H, B> = DownloadResult<Vec<BlockResponse<H, B>>>;
/// A downloader capable of fetching and yielding block bodies from block headers.
///
@ -13,8 +13,11 @@ pub type BodyDownloaderResult<B> = DownloadResult<Vec<BlockResponse<alloy_consen
/// while a [`BodiesClient`][crate::bodies::client::BodiesClient] represents a client capable of
/// fulfilling these requests.
pub trait BodyDownloader:
Send + Sync + Stream<Item = BodyDownloaderResult<Self::Body>> + Unpin
Send + Sync + Stream<Item = BodyDownloaderResult<Self::Header, Self::Body>> + Unpin
{
/// The type of header that can be returned in a blck
type Header: Debug + Send + Sync + Unpin + 'static;
/// The type of the body that is being downloaded.
type Body: Debug + Send + Sync + Unpin + 'static;

View File

@ -1,10 +1,11 @@
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockNumber, U256};
use reth_primitives::{BlockBody, SealedBlock, SealedHeader};
use reth_primitives_traits::{BlockHeader, InMemorySize};
use reth_primitives_traits::InMemorySize;
/// The block response
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum BlockResponse<H = alloy_consensus::Header, B = BlockBody> {
pub enum BlockResponse<H, B = BlockBody> {
/// Full block response (with transactions or ommers)
Full(SealedBlock<H, B>),
/// The empty block response

View File

@ -87,7 +87,7 @@ pub fn build_pipeline<N, H, B, Executor>(
where
N: ProviderNodeTypes,
H: HeaderDownloader<Header = HeaderTy<N>> + 'static,
B: BodyDownloader<Body = BodyTy<N>> + 'static,
B: BodyDownloader<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
Executor: BlockExecutorProvider<Primitives = N::Primitives>,
N::Primitives: NodePrimitives<BlockHeader = reth_primitives::Header>,
{

View File

@ -5,7 +5,7 @@ use reth_db::{tables, transaction::DbTx};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTxMut};
use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
use reth_primitives::StaticFileSegment;
use reth_primitives_traits::{Block, BlockBody};
use reth_primitives_traits::{Block, BlockBody, BlockHeader};
use reth_provider::{
providers::StaticFileWriter, BlockReader, BlockWriter, DBProvider, ProviderError,
StaticFileProviderFactory, StatsReader, StorageLocation,
@ -56,7 +56,7 @@ pub struct BodyStage<D: BodyDownloader> {
/// The body downloader.
downloader: D,
/// Block response buffer.
buffer: Option<Vec<BlockResponse<alloy_consensus::Header, D::Body>>>,
buffer: Option<Vec<BlockResponse<D::Header, D::Body>>>,
}
impl<D: BodyDownloader> BodyStage<D> {
@ -72,9 +72,7 @@ impl<D: BodyDownloader> BodyStage<D> {
unwind_block: Option<u64>,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut>
+ BlockReader<Header = reth_primitives::Header>
+ StaticFileProviderFactory,
Provider: DBProvider<Tx: DbTxMut> + BlockReader + StaticFileProviderFactory,
{
// Get id for the next tx_num of zero if there are no transactions.
let next_tx_num = provider
@ -151,9 +149,9 @@ where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StatsReader
+ BlockReader<Header = reth_primitives::Header>
+ BlockWriter<Block: Block<Body = D::Body>>,
D: BodyDownloader<Body: BlockBody<Transaction: Compact>>,
+ BlockReader
+ BlockWriter<Block: Block<Header = D::Header, Body = D::Body>>,
D: BodyDownloader<Header: BlockHeader, Body: BlockBody<Transaction: Compact>>,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@ -764,6 +762,7 @@ mod tests {
}
impl BodyDownloader for TestBodyDownloader {
type Header = Header;
type Body = BlockBody;
fn set_download_range(
@ -786,7 +785,7 @@ mod tests {
}
impl Stream for TestBodyDownloader {
type Item = BodyDownloaderResult<BlockBody>;
type Item = BodyDownloaderResult<Header, BlockBody>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

View File

@ -258,7 +258,7 @@ pub(crate) fn missing_static_data_error<Provider>(
segment: StaticFileSegment,
) -> Result<StageError, ProviderError>
where
Provider: BlockReader<Header = reth_primitives::Header> + StaticFileProviderFactory,
Provider: BlockReader + StaticFileProviderFactory,
{
let mut last_block =
static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();