diff --git a/Cargo.lock b/Cargo.lock index 697036d00..77e4205ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1106,6 +1106,22 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "beacon-api-sidecar-fetcher" +version = "0.1.0" +dependencies = [ + "alloy-rpc-types-beacon", + "clap", + "eyre", + "futures-util", + "reqwest 0.12.4", + "reth", + "reth-node-ethereum", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "beacon-api-sse" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 0f924cfcf..1aa1f1bc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,7 @@ members = [ "crates/transaction-pool/", "crates/trie-parallel/", "crates/trie/", + "examples/beacon-api-sidecar-fetcher/", "examples/beacon-api-sse/", "examples/bsc-p2p", "examples/custom-dev-node/", diff --git a/examples/beacon-api-sidecar-fetcher/Cargo.toml b/examples/beacon-api-sidecar-fetcher/Cargo.toml new file mode 100644 index 000000000..8d7fd39ef --- /dev/null +++ b/examples/beacon-api-sidecar-fetcher/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "beacon-api-sidecar-fetcher" +version = "0.1.0" +publish = false +edition.workspace = true +license.workspace = true + +[dependencies] +reth.workspace = true +reth-node-ethereum.workspace = true + +alloy-rpc-types-beacon.workspace = true + +serde.workspace = true +serde_json.workspace = true +clap.workspace = true +futures-util.workspace = true +eyre.workspace = true +thiserror.workspace = true +reqwest.workspace = true diff --git a/examples/beacon-api-sidecar-fetcher/src/main.rs b/examples/beacon-api-sidecar-fetcher/src/main.rs new file mode 100644 index 000000000..f3c7a843a --- /dev/null +++ b/examples/beacon-api-sidecar-fetcher/src/main.rs @@ -0,0 +1,98 @@ +//! Run with +//! +//! ```not_rust +//! cargo run -p beacon-api-beacon-sidecar-fetcher --node -- full +//! ``` +//! +//! This launches a regular reth instance and subscribes to payload attributes event stream. +//! +//! **NOTE**: This expects that the CL client is running an http server on `localhost:5052` and is +//! configured to emit payload attributes events. +//! +//! See beacon Node API: + +#![cfg_attr(not(test), warn(unused_crate_dependencies))] + +use std::{ + collections::VecDeque, + net::{IpAddr, Ipv4Addr}, +}; + +use clap::Parser; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use mined_sidecar::MinedSidecarStream; +use reth::{builder::NodeHandle, cli::Cli, primitives::B256, providers::CanonStateSubscriptions}; +use reth_node_ethereum::EthereumNode; + +pub mod mined_sidecar; + +fn main() { + Cli::::parse() + .run(|builder, args| async move { + // launch the node + let NodeHandle { node, node_exit_future } = + builder.node(EthereumNode::default()).launch().await?; + + let notifications: reth::providers::CanonStateNotificationStream = + node.provider.canonical_state_stream(); + + let pool = node.pool.clone(); + + let mut sidecar_stream = MinedSidecarStream { + events: notifications, + pool, + beacon_config: args, + client: reqwest::Client::new(), + pending_requests: FuturesUnordered::new(), + queued_actions: VecDeque::new(), + }; + + while let Some(result) = sidecar_stream.next().await { + match result { + Ok(blob_transaction) => { + // Handle successful transaction + println!("Processed BlobTransaction: {:?}", blob_transaction); + } + Err(e) => { + // Handle errors specifically + eprintln!("Failed to process transaction: {:?}", e); + } + } + } + node_exit_future.await + }) + .unwrap(); +} + +/// Our custom cli args extension that adds one flag to reth default CLI. +#[derive(Debug, Clone, clap::Parser)] +pub struct BeaconSidecarConfig { + /// Beacon Node http server address + #[arg(long = "cl.addr", default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))] + pub cl_addr: IpAddr, + /// Beacon Node http server port to listen on + #[arg(long = "cl.port", default_value_t = 5052)] + pub cl_port: u16, +} + +impl Default for BeaconSidecarConfig { + /// Default setup for lighthouse client + fn default() -> Self { + Self { + cl_addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), // Equivalent to Ipv4Addr::LOCALHOST + cl_port: 5052, + } + } +} + +impl BeaconSidecarConfig { + /// Returns the http url of the beacon node + pub fn http_base_url(&self) -> String { + format!("http://{}:{}", self.cl_addr, self.cl_port) + } + + /// Returns the URL to the beacon sidecars endpoint + pub fn sidecar_url(&self, block_root: B256) -> String { + format!("{}/eth/v1/beacon/blob_sidecars/{}", self.http_base_url(), block_root) + } +} diff --git a/examples/beacon-api-sidecar-fetcher/src/mined_sidecar.rs b/examples/beacon-api-sidecar-fetcher/src/mined_sidecar.rs new file mode 100644 index 000000000..5f5f4cbf1 --- /dev/null +++ b/examples/beacon-api-sidecar-fetcher/src/mined_sidecar.rs @@ -0,0 +1,278 @@ +use crate::BeaconSidecarConfig; +use alloy_rpc_types_beacon::sidecar::{BeaconBlobBundle, SidecarIterator}; +use eyre::Result; +use futures_util::{stream::FuturesUnordered, Future, Stream, StreamExt}; +use reqwest::{Error, StatusCode}; +use reth::{ + primitives::{BlobTransaction, SealedBlockWithSenders, B256}, + providers::CanonStateNotification, + transaction_pool::{BlobStoreError, TransactionPoolExt}, +}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::VecDeque, + pin::Pin, + task::{Context, Poll}, +}; +use thiserror::Error; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlockMetadata { + pub block_hash: B256, + pub block_number: u64, + pub gas_used: u64, +} + +#[derive(Debug, Clone)] +pub struct MinedBlob { + pub transaction: BlobTransaction, + pub block_metadata: BlockMetadata, +} + +#[derive(Debug, Clone)] +pub struct ReorgedBlob { + pub transaction_hash: B256, + pub block_metadata: BlockMetadata, +} + +#[derive(Debug, Clone)] +pub enum BlobTransactionEvent { + Mined(MinedBlob), + Reorged(ReorgedBlob), +} + +/// SideCarError Handles Errors from both EL and CL +#[derive(Debug, Error)] +pub enum SideCarError { + #[error("Reqwest encountered an error: {0}")] + ReqwestError(Error), + + #[error("Failed to fetch transactions from the blobstore: {0}")] + TransactionPoolError(BlobStoreError), + + #[error("400: {0}")] + InvalidBlockID(String), + + #[error("404: {0}")] + BlockNotFound(String), + + #[error("500: {0}")] + InternalError(String), + + #[error("Network error: {0}")] + NetworkError(String), + + #[error("Data parsing error: {0}")] + DeserializationError(String), + + #[error("{0} Error: {1}")] + UnknownError(u16, String), +} +/// Futures associated with retrieving blob data from the beacon client +type SidecarsFuture = + Pin, SideCarError>> + Send>>; + +/// A Stream that processes CanonStateNotifications and retrieves BlobTransactions from the beacon +/// client. +/// +/// First checks if the blob sidecar for a given EIP4844 is stored locally, if not attempts to +/// retrieve it from the CL Layer +#[must_use = "streams do nothing unless polled"] +pub struct MinedSidecarStream { + pub events: St, + pub pool: P, + pub beacon_config: BeaconSidecarConfig, + pub client: reqwest::Client, + pub pending_requests: FuturesUnordered, + pub queued_actions: VecDeque, +} + +impl MinedSidecarStream +where + St: Stream + Send + Unpin + 'static, + P: TransactionPoolExt + Unpin + 'static, +{ + fn process_block(&mut self, block: &SealedBlockWithSenders) { + let txs: Vec<_> = block + .transactions() + .filter(|tx| tx.is_eip4844()) + .map(|tx| (tx.clone(), tx.blob_versioned_hashes().unwrap().len())) + .collect(); + + let mut all_blobs_available = true; + let mut actions_to_queue: Vec = Vec::new(); + + if txs.is_empty() { + return; + } + + match self.pool.get_all_blobs_exact(txs.iter().map(|(tx, _)| tx.hash()).collect()) { + Ok(blobs) => { + for ((tx, _), sidecar) in txs.iter().zip(blobs.iter()) { + let transaction = BlobTransaction::try_from_signed(tx.clone(), sidecar.clone()) + .expect("should not fail to convert blob tx if it is already eip4844"); + + let block_metadata = BlockMetadata { + block_hash: block.hash(), + block_number: block.number, + gas_used: block.gas_used, + }; + actions_to_queue.push(BlobTransactionEvent::Mined(MinedBlob { + transaction, + block_metadata, + })); + } + } + Err(_err) => { + all_blobs_available = false; + } + }; + + // if any blob is missing we must instead query the consensus layer. + if all_blobs_available { + self.queued_actions.extend(actions_to_queue); + } else { + let client_clone = self.client.clone(); + let block_root = block.hash(); + let block_clone = block.clone(); + let sidecar_url = self.beacon_config.sidecar_url(block_root); + let query = + Box::pin(fetch_blobs_for_block(client_clone, sidecar_url, block_clone, txs)); + self.pending_requests.push(query); + } + } +} + +impl Stream for MinedSidecarStream +where + St: Stream + Send + Unpin + 'static, + P: TransactionPoolExt + Unpin + 'static, +{ + type Item = Result; + + /// Attempt to pull the next BlobTransaction from the stream. + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + // Request locally first, otherwise request from CL + loop { + if let Some(mined_sidecar) = this.queued_actions.pop_front() { + return Poll::Ready(Some(Ok(mined_sidecar))); + } + + // Check if any pending requests are ready and append to buffer + while let Poll::Ready(Some(pending_result)) = this.pending_requests.poll_next_unpin(cx) + { + match pending_result { + Ok(mined_sidecars) => { + for sidecar in mined_sidecars { + this.queued_actions.push_back(sidecar); + } + } + Err(err) => return Poll::Ready(Some(Err(err))), + } + } + + while let Poll::Ready(Some(notification)) = this.events.poll_next_unpin(cx) { + { + match notification { + CanonStateNotification::Commit { new } => { + for (_, block) in new.blocks().iter() { + this.process_block(block); + } + } + CanonStateNotification::Reorg { old, new } => { + // handle reorged blocks + for (_, block) in old.blocks().iter() { + let txs: Vec = block + .transactions() + .filter(|tx: &&reth::primitives::TransactionSigned| { + tx.is_eip4844() + }) + .map(|tx| { + let transaction_hash = tx.hash(); + let block_metadata = BlockMetadata { + block_hash: new.tip().block.hash(), + block_number: new.tip().block.number, + gas_used: new.tip().block.gas_used, + }; + BlobTransactionEvent::Reorged(ReorgedBlob { + transaction_hash, + block_metadata, + }) + }) + .collect(); + this.queued_actions.extend(txs); + } + + for (_, block) in new.blocks().iter() { + this.process_block(block); + } + } + } + } + } + } + } +} + +/// Query the Beacon Layer for missing BlobTransactions +async fn fetch_blobs_for_block( + client: reqwest::Client, + url: String, + block: SealedBlockWithSenders, + txs: Vec<(reth::primitives::TransactionSigned, usize)>, +) -> Result, SideCarError> { + let response = match client.get(url).header("Accept", "application/json").send().await { + Ok(response) => response, + Err(err) => return Err(SideCarError::ReqwestError(err)), + }; + + if !response.status().is_success() { + return match response.status() { + StatusCode::BAD_REQUEST => { + Err(SideCarError::InvalidBlockID("Invalid request to server.".to_string())) + } + StatusCode::NOT_FOUND => { + Err(SideCarError::BlockNotFound("Requested block not found.".to_string())) + } + StatusCode::INTERNAL_SERVER_ERROR => { + Err(SideCarError::InternalError("Server encountered an error.".to_string())) + } + _ => Err(SideCarError::UnknownError( + response.status().as_u16(), + "Unhandled HTTP status.".to_string(), + )), + }; + } + + let bytes = match response.bytes().await { + Ok(b) => b, + Err(e) => return Err(SideCarError::NetworkError(e.to_string())), + }; + + let blobs_bundle: BeaconBlobBundle = match serde_json::from_slice(&bytes) { + Ok(b) => b, + Err(e) => return Err(SideCarError::DeserializationError(e.to_string())), + }; + + let mut sidecar_iterator = SidecarIterator::new(blobs_bundle); + + let sidecars: Vec = txs + .iter() + .filter_map(|(tx, blob_len)| { + sidecar_iterator.next_sidecar(*blob_len).map(|sidecar| { + let transaction = BlobTransaction::try_from_signed(tx.clone(), sidecar) + .expect("should not fail to convert blob tx if it is already eip4844"); + let block_metadata = BlockMetadata { + block_hash: block.hash(), + block_number: block.number, + gas_used: block.gas_used, + }; + BlobTransactionEvent::Mined(MinedBlob { transaction, block_metadata }) + }) + }) + .collect(); + + Ok(sidecars) +}