fix(net): prevent bad loop if no peers available (#359)

* fix(net): prevent bad loop if no peers available

* test: add poll fetcher test
This commit is contained in:
Matthias Seitz
2022-12-09 12:39:08 +01:00
committed by GitHub
parent dffc42d6d5
commit b0149f0b9f

View File

@ -90,26 +90,33 @@ impl StateFetcher {
}
/// Returns the next action to return
fn poll_action(&mut self) -> Option<FetchAction> {
fn poll_action(&mut self) -> PollAction {
// we only check and not pop here since we don't know yet whether a peer is available.
if self.queued_requests.is_empty() {
return None
return PollAction::NoRequests
}
let peer_id = *self.next_peer()?.0;
let peer_id = if let Some(peer) = self.next_peer() {
*peer.0
} else {
return PollAction::NoPeersAvailable
};
let request = self.queued_requests.pop_front().expect("not empty; qed");
let request = self.prepare_block_request(peer_id, request);
Some(FetchAction::BlockRequest { peer_id, request })
PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
}
/// Advance the state the syncer
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
// drain buffered actions first
loop {
if let Some(action) = self.poll_action() {
return Poll::Ready(action)
}
let no_peers_available = match self.poll_action() {
PollAction::Ready(action) => return Poll::Ready(action),
PollAction::NoRequests => false,
PollAction::NoPeersAvailable => true,
};
if let Poll::Ready(Some(status)) = self.status_rx.poll_next_unpin(cx) {
return Poll::Ready(FetchAction::StatusUpdate(status))
@ -128,7 +135,7 @@ impl StateFetcher {
}
}
if self.queued_requests.is_empty() {
if self.queued_requests.is_empty() || no_peers_available {
return Poll::Pending
}
}
@ -244,6 +251,13 @@ impl Default for StateFetcher {
}
}
/// The outcome of [`StateFetcher::poll_action`]
enum PollAction {
Ready(FetchAction),
NoRequests,
NoPeersAvailable,
}
/// Represents a connected peer
struct Peer {
/// The state this peer currently resides in.
@ -352,3 +366,26 @@ pub(crate) enum BlockResponseOutcome {
/// How to handle a bad response and the reputation change to apply.
BadResponse(PeerId, ReputationChangeKind),
}
#[cfg(test)]
mod tests {
use super::*;
use std::future::poll_fn;
#[tokio::test(flavor = "multi_thread")]
async fn test_poll_fetcher() {
let mut fetcher = StateFetcher::default();
poll_fn(move |cx| {
assert!(fetcher.poll(cx).is_pending());
let (tx, _rx) = oneshot::channel();
fetcher
.queued_requests
.push_back(DownloadRequest::GetBlockBodies { request: vec![], response: tx });
assert!(fetcher.poll(cx).is_pending());
Poll::Ready(())
})
.await;
}
}