From f6e8443b56c1cefc92cddaa512ef3f46ef15fce0 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Mon, 17 Feb 2025 17:00:48 +0100 Subject: [PATCH] feat(debug-client): improve RPC BlockProvider implementation (#14536) --- crates/consensus/debug-client/src/client.rs | 3 +- .../debug-client/src/providers/etherscan.rs | 8 +- .../debug-client/src/providers/rpc.rs | 81 ++++++++++++------- 3 files changed, 58 insertions(+), 34 deletions(-) diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs index 0e2a50370..67fb45fc3 100644 --- a/crates/consensus/debug-client/src/client.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -16,7 +16,8 @@ use tokio::sync::mpsc; pub trait BlockProvider: Send + Sync + 'static { /// Runs a block provider to send new blocks to the given sender. /// - /// Note: This is expected to be spawned in a separate task. + /// Note: This is expected to be spawned in a separate task, and as such it should ignore + /// errors. fn subscribe_blocks(&self, tx: mpsc::Sender) -> impl Future + Send; /// Get a past block by number. diff --git a/crates/consensus/debug-client/src/providers/etherscan.rs b/crates/consensus/debug-client/src/providers/etherscan.rs index 06dfdabbc..0aa3b5c9c 100644 --- a/crates/consensus/debug-client/src/providers/etherscan.rs +++ b/crates/consensus/debug-client/src/providers/etherscan.rs @@ -63,7 +63,11 @@ impl BlockProvider for EtherscanBlockProvider { let block = match self.load_block(BlockNumberOrTag::Latest).await { Ok(block) => block, Err(err) => { - warn!(target: "consensus::debug-client", %err, "failed to fetch a block from Etherscan"); + warn!( + target: "consensus::debug-client", + %err, + "Failed to fetch a block from Etherscan", + ); continue } }; @@ -73,7 +77,7 @@ impl BlockProvider for EtherscanBlockProvider { } if tx.send(block).await.is_err() { - // channel closed + // Channel closed. break; } diff --git a/crates/consensus/debug-client/src/providers/rpc.rs b/crates/consensus/debug-client/src/providers/rpc.rs index 758371636..009e26d18 100644 --- a/crates/consensus/debug-client/src/providers/rpc.rs +++ b/crates/consensus/debug-client/src/providers/rpc.rs @@ -1,54 +1,73 @@ use crate::BlockProvider; -use alloy_eips::BlockNumberOrTag; use alloy_provider::{Provider, ProviderBuilder}; -use alloy_rpc_types_eth::{Block, BlockTransactionsKind}; +use alloy_rpc_types_eth::Block; use futures::StreamExt; +use reth_tracing::tracing::warn; +use std::{fmt, sync::Arc}; use tokio::sync::mpsc::Sender; -/// Block provider that fetches new blocks from an RPC endpoint using a websocket connection. -#[derive(Debug, Clone)] +/// Block provider that fetches new blocks from an RPC endpoint using a connection that supports +/// RPC subscriptions. +#[derive(Clone)] pub struct RpcBlockProvider { - ws_rpc_url: String, + provider: Arc, + url: String, +} + +impl fmt::Debug for RpcBlockProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RpcBlockProvider").field("url", &self.url).finish() + } } impl RpcBlockProvider { - /// Create a new RPC block provider with the given WS RPC URL. - pub const fn new(ws_rpc_url: String) -> Self { - Self { ws_rpc_url } + /// Create a new RPC block provider with the given RPC URL. + pub async fn new(rpc_url: &str) -> eyre::Result { + Ok(Self { + provider: Arc::new(ProviderBuilder::new().on_builtin(rpc_url).await?), + url: rpc_url.to_string(), + }) } } impl BlockProvider for RpcBlockProvider { async fn subscribe_blocks(&self, tx: Sender) { - let ws_provider = ProviderBuilder::new() - .on_builtin(&self.ws_rpc_url) - .await - .expect("failed to create WS provider"); - let mut stream = ws_provider - .subscribe_blocks() - .await - .expect("failed to subscribe on new blocks") - .into_stream(); - + let mut stream = match self.provider.subscribe_blocks().await { + Ok(sub) => sub.into_stream(), + Err(err) => { + warn!( + target: "consensus::debug-client", + %err, + url=%self.url, + "Failed to subscribe to blocks", + ); + return; + } + }; while let Some(header) = stream.next().await { - let full_block = ws_provider - .get_block_by_hash(header.hash, BlockTransactionsKind::Full) - .await - .expect("failed to get block") - .expect("block not found"); - if tx.send(full_block).await.is_err() { - // channel closed - break; + match self.get_block(header.number).await { + Ok(block) => { + if tx.send(block).await.is_err() { + // Channel closed. + break; + } + } + Err(err) => { + warn!( + target: "consensus::debug-client", + %err, + url=%self.url, + "Failed to fetch a block", + ); + } } } } async fn get_block(&self, block_number: u64) -> eyre::Result { - let ws_provider = ProviderBuilder::new().on_builtin(&self.ws_rpc_url).await?; - let block: Block = ws_provider - .get_block_by_number(BlockNumberOrTag::Number(block_number), true.into()) + self.provider + .get_block_by_number(block_number.into(), true.into()) .await? - .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?; - Ok(block) + .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number)) } }