mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore: remove unused private stream type (#9357)
This commit is contained in:
@ -4,7 +4,6 @@ use crate::{
|
||||
error::PeerRequestResult,
|
||||
headers::client::{HeadersClient, SingleHeaderRequest},
|
||||
};
|
||||
use futures::Stream;
|
||||
use reth_consensus::{Consensus, ConsensusError};
|
||||
use reth_eth_wire_types::HeadersDirection;
|
||||
use reth_network_peers::WithPeerId;
|
||||
@ -634,69 +633,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// A type that buffers the result of a range request so we can return it as a `Stream`.
|
||||
struct FullBlockRangeStream<Client>
|
||||
where
|
||||
Client: BodiesClient + HeadersClient,
|
||||
{
|
||||
/// The inner [`FetchFullBlockRangeFuture`] that is polled.
|
||||
inner: FetchFullBlockRangeFuture<Client>,
|
||||
/// The blocks that have been received so far.
|
||||
///
|
||||
/// If this is `None` then the request is still in progress. If the vec is empty, then all of
|
||||
/// the response values have been consumed.
|
||||
blocks: Option<Vec<SealedBlock>>,
|
||||
}
|
||||
|
||||
impl<Client> From<FetchFullBlockRangeFuture<Client>> for FullBlockRangeStream<Client>
|
||||
where
|
||||
Client: BodiesClient + HeadersClient,
|
||||
{
|
||||
fn from(inner: FetchFullBlockRangeFuture<Client>) -> Self {
|
||||
Self { inner, blocks: None }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Client> Stream for FullBlockRangeStream<Client>
|
||||
where
|
||||
Client: BodiesClient + HeadersClient + Unpin + 'static,
|
||||
{
|
||||
type Item = SealedBlock;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
// If all blocks have been consumed, then return `None`.
|
||||
if let Some(blocks) = &mut this.blocks {
|
||||
if blocks.is_empty() {
|
||||
// Stream is finished
|
||||
return Poll::Ready(None)
|
||||
}
|
||||
|
||||
// return the next block if it's ready - the vec should be in ascending order since it
|
||||
// is reversed right after it is received from the future, so we can just pop() the
|
||||
// elements to return them from the stream in descending order
|
||||
return Poll::Ready(blocks.pop())
|
||||
}
|
||||
|
||||
// poll the inner future if the blocks are not yet ready
|
||||
let mut blocks = ready!(Pin::new(&mut this.inner).poll(cx));
|
||||
|
||||
// the blocks are returned in descending order, reverse the list so we can just pop() the
|
||||
// vec to yield the next block in the stream
|
||||
blocks.reverse();
|
||||
|
||||
// pop the first block from the vec as the first stream element and store the rest
|
||||
let first_result = blocks.pop();
|
||||
|
||||
// if the inner future is ready, then we can return the blocks
|
||||
this.blocks = Some(blocks);
|
||||
|
||||
// return the first block
|
||||
Poll::Ready(first_result)
|
||||
}
|
||||
}
|
||||
|
||||
/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
|
||||
/// futures until they return responses. It will return either the header or body result, depending
|
||||
/// on which future successfully returned.
|
||||
@ -742,7 +678,6 @@ enum RangeResponseResult {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_utils::TestFullBlockClient;
|
||||
use futures::StreamExt;
|
||||
use std::ops::Range;
|
||||
|
||||
#[tokio::test]
|
||||
@ -808,43 +743,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn download_full_block_range_stream() {
|
||||
let client = TestFullBlockClient::default();
|
||||
let (header, body) = insert_headers_into_client(&client, 0..50);
|
||||
let client = FullBlockClient::test_client(client);
|
||||
|
||||
let future = client.get_full_block_range(header.hash(), 1);
|
||||
let mut stream = FullBlockRangeStream::from(future);
|
||||
|
||||
// ensure only block in the stream is the one we requested
|
||||
let received = stream.next().await.expect("response should not be None");
|
||||
assert_eq!(received, SealedBlock::new(header.clone(), body.clone()));
|
||||
|
||||
// stream should be done now
|
||||
assert_eq!(stream.next().await, None);
|
||||
|
||||
// there are 11 total blocks
|
||||
let future = client.get_full_block_range(header.hash(), 11);
|
||||
let mut stream = FullBlockRangeStream::from(future);
|
||||
|
||||
// check first header
|
||||
let received = stream.next().await.expect("response should not be None");
|
||||
let mut curr_number = received.number;
|
||||
assert_eq!(received, SealedBlock::new(header.clone(), body.clone()));
|
||||
|
||||
// check the rest of the headers
|
||||
for _ in 0..10 {
|
||||
let received = stream.next().await.expect("response should not be None");
|
||||
assert_eq!(received.number, curr_number - 1);
|
||||
curr_number = received.number;
|
||||
}
|
||||
|
||||
// ensure stream is done
|
||||
let received = stream.next().await;
|
||||
assert!(received.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn download_full_block_range_over_soft_limit() {
|
||||
// default soft limit is 20, so we will request 50 blocks
|
||||
|
||||
Reference in New Issue
Block a user