feat: add sse payload attributes example and beacon api support (#5130)

This commit is contained in:
Matthias Seitz
2023-10-23 22:47:04 +02:00
committed by GitHub
parent b71a8ac85d
commit 7d46db4a21
11 changed files with 609 additions and 4 deletions

View File

@ -0,0 +1,16 @@
[package]
name = "beacon-api-sse"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
reth.workspace = true
eyre.workspace = true
clap.workspace = true
tracing.workspace = true
futures-util.workspace = true
tokio = { workspace = true, features = ["time"] }
mev-share-sse = "0.1.5"

View File

@ -0,0 +1,113 @@
//! Example of how to subscribe to beacon chain events via SSE.
//!
//! See also [ethereum-beacon-API eventstream](https://ethereum.github.io/beacon-APIs/#/Events/eventstream)
//!
//! Run with
//!
//! ```not_rust
//! cargo run -p beacon-api-sse -- node
//! ```
//!
//! This launches a regular reth instance and subscribes to payload attributes event stream.
//!
//! **NOTE**: This expects that the CL client is running an http server on `localhost:5052` and is
//! configured to emit payload attributes events.
//!
//! See lighthouse beacon Node API: <https://lighthouse-book.sigmaprime.io/api-bn.html#beacon-node-api>
use clap::Parser;
use futures_util::stream::StreamExt;
use mev_share_sse::{client::EventStream, EventClient};
use reth::{
cli::{
components::RethNodeComponents,
ext::{RethCliExt, RethNodeCommandConfig},
Cli,
},
rpc::types::engine::beacon_api::events::PayloadAttributesEvent,
tasks::TaskSpawner,
};
use std::net::{IpAddr, Ipv4Addr};
use tracing::{info, warn};
fn main() {
Cli::<BeaconEventsExt>::parse().run().unwrap();
}
/// The type that tells the reth CLI what extensions to use
#[derive(Debug, Default)]
#[non_exhaustive]
struct BeaconEventsExt;
impl RethCliExt for BeaconEventsExt {
/// This tells the reth CLI to install additional CLI arguments
type Node = BeaconEventsConfig;
}
/// Our custom cli args extension that adds one flag to reth default CLI.
#[derive(Debug, Clone, clap::Parser)]
struct BeaconEventsConfig {
/// Beacon Node http server address
#[arg(long = "cl.addr", default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))]
pub cl_addr: IpAddr,
/// Beacon Node http server port to listen on
#[arg(long = "cl.port", default_value_t = 5052)]
pub cl_port: u16,
}
impl BeaconEventsConfig {
/// Returns the http url of the beacon node
pub fn http_base_url(&self) -> String {
format!("http://{}:{}", self.cl_addr, self.cl_port)
}
/// Returns the URL to the events endpoint
pub fn events_url(&self) -> String {
format!("{}/eth/v1/events", self.http_base_url())
}
/// Service that subscribes to beacon chain payload attributes events
async fn run(self) {
let client = EventClient::default();
let mut subscription = self.new_payload_attributes_subscription(&client).await;
while let Some(event) = subscription.next().await {
info!("Received payload attributes: {:?}", event);
}
}
// It can take a bit until the CL endpoint is live so we retry a few times
async fn new_payload_attributes_subscription(
&self,
client: &EventClient,
) -> EventStream<PayloadAttributesEvent> {
let payloads_url = format!("{}?topics=payload_attributes", self.events_url());
loop {
match client.subscribe(&payloads_url).await {
Ok(subscription) => return subscription,
Err(err) => {
warn!("Failed to subscribe to payload attributes events: {:?}\nRetrying in 5 seconds...", err);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
}
impl RethNodeCommandConfig for BeaconEventsConfig {
fn on_node_started<Reth: RethNodeComponents>(&mut self, components: &Reth) -> eyre::Result<()> {
components.task_executor().spawn(Box::pin(self.clone().run()));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_config() {
let args = BeaconEventsConfig::try_parse_from(["reth"]);
assert!(args.is_ok());
}
}