feat(debug-client): improve RPC BlockProvider implementation (#14536)

This commit is contained in:
DaniPopes
2025-02-17 17:00:48 +01:00
committed by GitHub
parent 41f0f01982
commit f6e8443b56
3 changed files with 58 additions and 34 deletions

View File

@ -16,7 +16,8 @@ use tokio::sync::mpsc;
pub trait BlockProvider: Send + Sync + 'static { pub trait BlockProvider: Send + Sync + 'static {
/// Runs a block provider to send new blocks to the given sender. /// Runs a block provider to send new blocks to the given sender.
/// ///
/// Note: This is expected to be spawned in a separate task. /// Note: This is expected to be spawned in a separate task, and as such it should ignore
/// errors.
fn subscribe_blocks(&self, tx: mpsc::Sender<Block>) -> impl Future<Output = ()> + Send; fn subscribe_blocks(&self, tx: mpsc::Sender<Block>) -> impl Future<Output = ()> + Send;
/// Get a past block by number. /// Get a past block by number.

View File

@ -63,7 +63,11 @@ impl BlockProvider for EtherscanBlockProvider {
let block = match self.load_block(BlockNumberOrTag::Latest).await { let block = match self.load_block(BlockNumberOrTag::Latest).await {
Ok(block) => block, Ok(block) => block,
Err(err) => { Err(err) => {
warn!(target: "consensus::debug-client", %err, "failed to fetch a block from Etherscan"); warn!(
target: "consensus::debug-client",
%err,
"Failed to fetch a block from Etherscan",
);
continue continue
} }
}; };
@ -73,7 +77,7 @@ impl BlockProvider for EtherscanBlockProvider {
} }
if tx.send(block).await.is_err() { if tx.send(block).await.is_err() {
// channel closed // Channel closed.
break; break;
} }

View File

@ -1,54 +1,73 @@
use crate::BlockProvider; use crate::BlockProvider;
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{Provider, ProviderBuilder}; use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::{Block, BlockTransactionsKind}; use alloy_rpc_types_eth::Block;
use futures::StreamExt; use futures::StreamExt;
use reth_tracing::tracing::warn;
use std::{fmt, sync::Arc};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
/// Block provider that fetches new blocks from an RPC endpoint using a websocket connection. /// Block provider that fetches new blocks from an RPC endpoint using a connection that supports
#[derive(Debug, Clone)] /// RPC subscriptions.
#[derive(Clone)]
pub struct RpcBlockProvider { pub struct RpcBlockProvider {
ws_rpc_url: String, provider: Arc<dyn Provider>,
url: String,
}
impl fmt::Debug for RpcBlockProvider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RpcBlockProvider").field("url", &self.url).finish()
}
} }
impl RpcBlockProvider { impl RpcBlockProvider {
/// Create a new RPC block provider with the given WS RPC URL. /// Create a new RPC block provider with the given RPC URL.
pub const fn new(ws_rpc_url: String) -> Self { pub async fn new(rpc_url: &str) -> eyre::Result<Self> {
Self { ws_rpc_url } Ok(Self {
provider: Arc::new(ProviderBuilder::new().on_builtin(rpc_url).await?),
url: rpc_url.to_string(),
})
} }
} }
impl BlockProvider for RpcBlockProvider { impl BlockProvider for RpcBlockProvider {
async fn subscribe_blocks(&self, tx: Sender<Block>) { async fn subscribe_blocks(&self, tx: Sender<Block>) {
let ws_provider = ProviderBuilder::new() let mut stream = match self.provider.subscribe_blocks().await {
.on_builtin(&self.ws_rpc_url) Ok(sub) => sub.into_stream(),
.await Err(err) => {
.expect("failed to create WS provider"); warn!(
let mut stream = ws_provider target: "consensus::debug-client",
.subscribe_blocks() %err,
.await url=%self.url,
.expect("failed to subscribe on new blocks") "Failed to subscribe to blocks",
.into_stream(); );
return;
}
};
while let Some(header) = stream.next().await { while let Some(header) = stream.next().await {
let full_block = ws_provider match self.get_block(header.number).await {
.get_block_by_hash(header.hash, BlockTransactionsKind::Full) Ok(block) => {
.await if tx.send(block).await.is_err() {
.expect("failed to get block") // Channel closed.
.expect("block not found"); break;
if tx.send(full_block).await.is_err() { }
// channel closed }
break; Err(err) => {
warn!(
target: "consensus::debug-client",
%err,
url=%self.url,
"Failed to fetch a block",
);
}
} }
} }
} }
async fn get_block(&self, block_number: u64) -> eyre::Result<Block> { async fn get_block(&self, block_number: u64) -> eyre::Result<Block> {
let ws_provider = ProviderBuilder::new().on_builtin(&self.ws_rpc_url).await?; self.provider
let block: Block = ws_provider .get_block_by_number(block_number.into(), true.into())
.get_block_by_number(BlockNumberOrTag::Number(block_number), true.into())
.await? .await?
.ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?; .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))
Ok(block)
} }
} }