fix(net): always reset peer state on response (#1179)

This commit is contained in:
Matthias Seitz
2023-02-06 09:53:00 +01:00
committed by GitHub
parent c6a7d3637e
commit 54744b3e6b

View File

@ -219,41 +219,43 @@ impl StateFetcher {
Some(BlockResponseOutcome::Request(peer_id, req)) Some(BlockResponseOutcome::Request(peer_id, req))
} }
/// Called on a `GetBlockHeaders` response from a peer /// Called on a `GetBlockHeaders` response from a peer.
///
/// This delegates the response and returns a [BlockResponseOutcome] to either queue in a direct
/// followup request or get the peer reported if the response was a
/// [EthResponseValidator::reputation_change_err]
pub(crate) fn on_block_headers_response( pub(crate) fn on_block_headers_response(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
res: RequestResult<Vec<Header>>, res: RequestResult<Vec<Header>>,
) -> Option<BlockResponseOutcome> { ) -> Option<BlockResponseOutcome> {
let is_error = res.is_err(); let is_error = res.is_err();
let reputation_change = res.reputation_change_err(); let maybe_reputation_change = res.reputation_change_err();
let resp = self.inflight_headers_requests.remove(&peer_id); let resp = self.inflight_headers_requests.remove(&peer_id);
let is_likely_bad_response = resp let is_likely_bad_response = resp
.as_ref() .as_ref()
.map(|r| res.is_likely_bad_headers_response(&r.request)) .map(|r| res.is_likely_bad_headers_response(&r.request))
.unwrap_or_default(); .unwrap_or_default();
if let Some(resp) = resp { if let Some(resp) = resp {
// delegate the response
let _ = resp.response.send(res.map(|h| (peer_id, h).into())); let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
} }
if is_error {
// if the response was erroneous we want to report the peer.
return reputation_change.map(|reputation_change| {
BlockResponseOutcome::BadResponse(peer_id, reputation_change)
})
}
if let Some(peer) = self.peers.get_mut(&peer_id) { if let Some(peer) = self.peers.get_mut(&peer_id) {
// If the peer is still ready to be accept new requests, we try to send a followup // If the peer is still ready to accept new requests, we try to send a followup
// request immediately. // request immediately.
if peer.state.on_request_finished() && !is_likely_bad_response { if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
return self.followup_request(peer_id) return self.followup_request(peer_id)
} }
} }
None // if the response was an `Err` worth reporting the peer for then we return a `BadResponse`
// outcome
maybe_reputation_change
.map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
} }
/// Called on a `GetBlockBodies` response from a peer /// Called on a `GetBlockBodies` response from a peer
@ -413,7 +415,7 @@ pub(crate) enum BlockResponseOutcome {
mod tests { mod tests {
use super::*; use super::*;
use crate::{peers::PeersManager, PeersConfig}; use crate::{peers::PeersManager, PeersConfig};
use reth_primitives::{H256, H512}; use reth_primitives::{SealedHeader, H256, H512};
use std::future::poll_fn; use std::future::poll_fn;
#[tokio::test(flavor = "multi_thread")] #[tokio::test(flavor = "multi_thread")]
@ -513,4 +515,56 @@ mod tests {
None None
); );
} }
#[tokio::test]
async fn test_header_response_outcome() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher = StateFetcher::new(manager.handle(), Default::default());
let peer_id = H512::random();
let request_pair = || {
let (tx, _rx) = oneshot::channel();
let req = Request {
request: HeadersRequest {
start: 0u64.into(),
limit: 1,
direction: Default::default(),
},
response: tx,
};
let mut header = SealedHeader::default().unseal();
header.number = 0u64;
(req, header)
};
fetcher.new_active_peer(
peer_id,
Default::default(),
Default::default(),
Default::default(),
);
let (req, header) = request_pair();
fetcher.inflight_headers_requests.insert(peer_id, req);
let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
assert!(outcome.is_none());
assert!(fetcher.peers[&peer_id].state.is_idle());
let outcome =
fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
assert!(EthResponseValidator::reputation_change_err(&Err(RequestError::Timeout)).is_some());
match outcome {
BlockResponseOutcome::BadResponse(peer, _) => {
assert_eq!(peer, peer_id)
}
BlockResponseOutcome::Request(_, _) => {
unreachable!()
}
};
assert!(fetcher.peers[&peer_id].state.is_idle());
}
} }