feat(net): add missing report on bad response (#271)

This commit is contained in:
Matthias Seitz
2022-11-28 08:33:04 +01:00
committed by GitHub
parent 3efc6ee67e
commit 0e436ae543
3 changed files with 22 additions and 11 deletions

View File

@ -1,6 +1,6 @@
//! Fetch data from the network.
use crate::{message::BlockRequest, peers::ReputationChange};
use crate::message::BlockRequest;
use futures::StreamExt;
use reth_eth_wire::{BlockBody, GetBlockBodies};
use reth_interfaces::p2p::{
@ -17,6 +17,7 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
mod client;
use crate::peers::ReputationChangeKind;
pub use client::FetchClient;
/// Manages data fetching operations.
@ -198,10 +199,22 @@ impl StateFetcher {
peer_id: PeerId,
res: RequestResult<Vec<Header>>,
) -> Option<BlockResponseOutcome> {
let is_error = res.is_err();
if let Some(resp) = self.inflight_headers_requests.remove(&peer_id) {
let _ = resp.response.send(res);
}
if is_error {
// if the response was erroneous we want to report the peer.
return Some(BlockResponseOutcome::BadResponse(
peer_id,
ReputationChangeKind::BadMessage,
))
}
if let Some(peer) = self.peers.get_mut(&peer_id) {
// If the peer is still ready to be accept new requests, we try to send a followup
// request immediately.
if peer.state.on_request_finished() {
return self.followup_request(peer_id)
}
@ -356,5 +369,5 @@ pub(crate) enum BlockResponseOutcome {
/// Continue with another request to the peer.
Request(PeerId, BlockRequest),
/// How to handle a bad response and the reputation change to apply.
BadResponse(PeerId, ReputationChange),
BadResponse(PeerId, ReputationChangeKind),
}

View File

@ -5,5 +5,4 @@ mod reputation;
pub(crate) use manager::{PeerAction, PeersManager};
pub use manager::{PeersConfig, PeersHandle};
pub(crate) use reputation::ReputationChange;
pub use reputation::{ReputationChangeKind, ReputationChangeWeights};

View File

@ -285,8 +285,8 @@ where
BlockResponseOutcome::Request(peer, request) => {
self.handle_block_request(peer, request);
}
BlockResponseOutcome::BadResponse(_peer, _reputation_change) => {
// TODO handle reputation change
BlockResponseOutcome::BadResponse(peer, reputation_change) => {
self.peers_manager.apply_reputation_change(&peer, reputation_change);
}
}
None
@ -297,17 +297,14 @@ where
match resp {
PeerResponseResult::BlockHeaders(res) => {
let outcome = self.state_fetcher.on_block_headers_response(peer, res)?;
return self.on_block_response_outcome(outcome)
self.on_block_response_outcome(outcome)
}
PeerResponseResult::BlockBodies(res) => {
let outcome = self.state_fetcher.on_block_bodies_response(peer, res)?;
return self.on_block_response_outcome(outcome)
self.on_block_response_outcome(outcome)
}
PeerResponseResult::PooledTransactions(_) => {}
PeerResponseResult::NodeData(_) => {}
PeerResponseResult::Receipts(_) => {}
_ => None,
}
None
}
/// Advances the state
@ -334,8 +331,10 @@ where
}
}
// need to buffer results here to make borrow checker happy
let mut disconnect_sessions = Vec::new();
let mut received_responses = Vec::new();
// poll all connected peers for responses
for (id, peer) in self.connected_peers.iter_mut() {
if let Some(response) = peer.pending_response.as_mut() {