fix(net): improve error propagation (#620)

* fix(net): improve error propagation

* refactor: unify on closed session

* add noop helpers

* add noop helpers

* test setup

* test: add dropped connection test

* rename mock --> test-utils
This commit is contained in:
Matthias Seitz
2022-12-29 16:05:56 +01:00
committed by GitHub
parent 96735ecfdc
commit 3ab6c278e8
12 changed files with 182 additions and 34 deletions

View File

@ -10,7 +10,7 @@ pub type RequestResult<T> = Result<T, RequestError>;
pub type PeerRequestResult<T> = RequestResult<WithPeerId<T>>;
/// Error variants that can happen when sending requests to a session.
#[derive(Debug, Error, Clone)]
#[derive(Debug, Error, Clone, Eq, PartialEq)]
#[allow(missing_docs)]
pub enum RequestError {
#[error("Closed channel to the peer.")]
@ -34,6 +34,11 @@ impl RequestError {
pub fn is_retryable(&self) -> bool {
matches!(self, RequestError::Timeout | RequestError::ConnectionDropped)
}
/// Whether the error happened because the channel was closed.
pub fn is_channel_closed(&self) -> bool {
matches!(self, RequestError::ChannelClosed)
}
}
impl<T> From<mpsc::error::SendError<T>> for RequestError {

View File

@ -44,4 +44,4 @@ tokio = { version = "1", features = ["full"] }
reth-tracing = { path = "../../tracing" }
[features]
mock = ["rand"]
test-utils = ["rand"]

View File

@ -64,8 +64,8 @@ use node::{kad_key, NodeKey};
// reexport NodeRecord primitive
pub use reth_primitives::NodeRecord;
#[cfg(any(test, feature = "mock"))]
pub mod mock;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
use reth_net_nat::ResolveNatInterval;
/// reexport to get public ip.
@ -135,6 +135,17 @@ impl Discv4 {
Ok(discv4)
}
/// Returns a new instance with the given channel directly
///
/// NOTE: this is only intended for test setups.
#[cfg(feature = "test-utils")]
pub fn noop() -> Self {
let (to_service, _rx) = mpsc::channel(1);
let local_addr =
(IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
Self { local_addr, to_service }
}
/// Binds a new UdpSocket and creates the service
///
/// ```
@ -1764,7 +1775,7 @@ mod tests {
use super::*;
use crate::{
bootnodes::mainnet_nodes,
mock::{create_discv4, create_discv4_with_config, rng_record},
test_utils::{create_discv4, create_discv4_with_config, rng_record},
};
use reth_primitives::{hex_literal::hex, ForkHash};

View File

@ -463,7 +463,7 @@ impl Decodable for Pong {
mod tests {
use super::*;
use crate::{
mock::{rng_endpoint, rng_ipv4_record, rng_ipv6_record, rng_message},
test_utils::{rng_endpoint, rng_ipv4_record, rng_ipv6_record, rng_message},
SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS,
};
use bytes::BytesMut;

View File

@ -54,6 +54,7 @@ secp256k1 = { version = "0.24", features = [
[dev-dependencies]
# reth
reth-discv4 = { path = "../discv4", features = ["test-utils"] }
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
reth-provider = { path = "../../storage/provider", features = ["test-utils"] }
reth-tracing = { path = "../../tracing" }

View File

@ -133,6 +133,30 @@ impl Discovery {
}
}
#[cfg(test)]
impl Discovery {
/// Returns a Discovery instance that does nothing and is intended for testing purposes.
///
/// NOTE: This instance does nothing
pub(crate) fn noop() -> Self {
let (_tx, rx) = tokio::sync::mpsc::channel(1);
Self {
discovered_nodes: Default::default(),
local_enr: NodeRecord {
address: IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
tcp_port: 0,
udp_port: 0,
id: PeerId::random(),
},
discv4: Discv4::noop(),
discv4_updates: ReceiverStream::new(rx),
_dsicv4_config: Default::default(),
queued_events: Default::default(),
_discv4_service: tokio::task::spawn(async move {}),
}
}
}
/// Events produced by the [`Discovery`] manager.
pub enum DiscoveryEvent {
/// A new node was discovered

View File

@ -160,15 +160,12 @@ pub enum PeerResponse {
impl PeerResponse {
/// Polls the type to completion.
pub(crate) fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<PeerResponseResult, oneshot::error::RecvError>> {
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult> {
macro_rules! poll_request {
($response:ident, $item:ident, $cx:ident) => {
match ready!($response.poll_unpin($cx)) {
Ok(res) => Ok(PeerResponseResult::$item(res.map(|item| item.0))),
Err(err) => Err(err),
Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
Err(err) => PeerResponseResult::$item(Err(err.into())),
}
};
}
@ -240,6 +237,17 @@ impl PeerResponseResult {
}
}
/// Returns the `Err` value if the result is an error.
pub fn err(&self) -> Option<&RequestError> {
match self {
PeerResponseResult::BlockHeaders(res) => res.as_ref().err(),
PeerResponseResult::BlockBodies(res) => res.as_ref().err(),
PeerResponseResult::PooledTransactions(res) => res.as_ref().err(),
PeerResponseResult::NodeData(res) => res.as_ref().err(),
PeerResponseResult::Receipts(res) => res.as_ref().err(),
}
}
/// Returns whether this result is an error.
#[allow(unused)]
pub fn is_err(&self) -> bool {
@ -265,6 +273,11 @@ pub struct PeerRequestSender {
// === impl PeerRequestSender ===
impl PeerRequestSender {
/// Constructs a new sender instance that's wired to a session
pub(crate) fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<PeerRequest>) -> Self {
Self { peer_id, to_session_tx }
}
/// Attempts to immediately send a message on this Sender
pub fn try_send(&self, req: PeerRequest) -> Result<(), TrySendError<PeerRequest>> {
self.to_session_tx.try_send(req)

View File

@ -392,12 +392,9 @@ impl Future for ActiveSession {
// not ready yet
this.received_requests.push(req);
}
Poll::Ready(Ok(resp)) => {
Poll::Ready(resp) => {
this.handle_outgoing_response(req.request_id, resp);
}
Poll::Ready(Err(_)) => {
// ignore on error
}
}
}

View File

@ -356,7 +356,7 @@ impl SessionManager {
let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
let messages = PeerRequestSender { peer_id, to_session_tx };
let messages = PeerRequestSender::new(peer_id, to_session_tx);
let session = ActiveSession {
next_id: 0,

View File

@ -138,6 +138,8 @@ where
}
/// Event hook for a disconnected session for the given peer.
///
/// This will remove the peer from the available set of peers and close all inflight requests.
pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
self.active_peers.remove(&peer);
self.state_fetcher.on_session_closed(&peer);
@ -298,11 +300,6 @@ where
}
}
/// Disconnect the session
fn on_session_disconnected(&mut self, peer: PeerId) {
self.active_peers.remove(&peer);
}
/// Sends The message to the peer's session and queues in a response.
///
/// Caution: this will replace an already pending response. It's the responsibility of the
@ -377,23 +374,30 @@ where
}
// need to buffer results here to make borrow checker happy
let mut disconnect_sessions = Vec::new();
let mut closed_sessions = Vec::new();
let mut received_responses = Vec::new();
// poll all connected peers for responses
for (id, peer) in self.active_peers.iter_mut() {
if let Some(mut response) = peer.pending_response.take() {
match response.poll(cx) {
Poll::Ready(Err(err)) => {
error!(
target : "net",
?id,
?err,
"Request canceled, response channel closed."
);
disconnect_sessions.push(*id);
Poll::Ready(res) => {
// check if the error is due to a closed channel to the session
if res.err().map(|err| err.is_channel_closed()).unwrap_or_default() {
error!(
target : "net",
?id,
"Request canceled, response channel from session closed."
);
// if the channel is closed, this means the peer session is also
// closed, in which case we can invoke the [Self::on_closed_session]
// immediately, preventing followup requests and propagate the
// connection dropped error
closed_sessions.push(*id);
} else {
received_responses.push((*id, res));
}
}
Poll::Ready(Ok(resp)) => received_responses.push((*id, resp)),
Poll::Pending => {
// not ready yet, store again.
peer.pending_response = Some(response);
@ -402,8 +406,8 @@ where
}
}
for peer in disconnect_sessions {
self.on_session_disconnected(peer)
for peer in closed_sessions {
self.on_session_closed(peer)
}
for (peer_id, resp) in received_responses {
@ -475,3 +479,91 @@ pub(crate) enum StateAction {
/// A peer was dropped
PeerRemoved(PeerId),
}
#[cfg(test)]
mod tests {
use crate::{
discovery::Discovery, fetch::StateFetcher, message::PeerRequestSender, peers::PeersManager,
state::NetworkState, PeerRequest,
};
use reth_eth_wire::{
capability::{Capabilities, Capability},
BlockBodies, BlockBody, EthVersion, Status,
};
use reth_interfaces::p2p::{bodies::client::BodiesClient, error::RequestError};
use reth_primitives::{Header, PeerId, H256};
use reth_provider::test_utils::NoopProvider;
use std::{future::poll_fn, sync::Arc};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
/// Returns a testing instance of the [NetworkState].
fn state() -> NetworkState<NoopProvider> {
let peers = PeersManager::default();
let handle = peers.handle();
NetworkState {
active_peers: Default::default(),
peers_manager: Default::default(),
queued_messages: Default::default(),
client: Arc::new(NoopProvider::default()),
discovery: Discovery::noop(),
genesis_hash: Default::default(),
state_fetcher: StateFetcher::new(handle),
}
}
fn capabilities() -> Arc<Capabilities> {
Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
}
// tests that ongoing requests are answered with connection dropped if the session that received
// that request is drops the request object.
#[tokio::test(flavor = "multi_thread")]
async fn test_dropped_active_session() {
let mut state = state();
let client = state.fetch_client();
let peer_id = PeerId::random();
let (tx, session_rx) = mpsc::channel(1);
let peer_tx = PeerRequestSender::new(peer_id, tx);
state.on_session_activated(peer_id, capabilities(), Status::default(), peer_tx);
assert!(state.active_peers.contains_key(&peer_id));
let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
let body_response = body.clone();
// this mimics an active session that receives the requests from the state
tokio::task::spawn(async move {
let mut stream = ReceiverStream::new(session_rx);
let resp = stream.next().await.unwrap();
match resp {
PeerRequest::GetBlockBodies { response, .. } => {
response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
}
_ => unreachable!(),
}
// wait for the next request, then drop
let _resp = stream.next().await.unwrap();
});
// spawn the state as future
tokio::task::spawn(async move {
loop {
poll_fn(|cx| state.poll(cx)).await;
}
});
// send requests to the state via the client
let (peer, bodies) = client.get_block_bodies(vec![H256::random()]).await.unwrap().split();
assert_eq!(peer, peer_id);
assert_eq!(bodies, vec![body]);
let resp = client.get_block_bodies(vec![H256::random()]).await;
assert!(resp.is_err());
assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
}
}

View File

@ -27,6 +27,11 @@ impl<T> WithPeerId<T> {
&self.1
}
/// Returns ownership of the underlying data.
pub fn into_data(self) -> T {
self.1
}
/// Transform the data
pub fn transform<F: From<T>>(self) -> WithPeerId<F> {
WithPeerId(self.0, self.1.into())