docs: move ExEx book examples (#11616)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
caglarkaya
2024-10-14 12:50:07 +03:00
committed by GitHub
parent 7c2c3a6a5c
commit b637101894
16 changed files with 587 additions and 475 deletions

View File

@ -25,41 +25,7 @@ We will also need a bunch of dependencies. Some of them you know from the [Hello
but some of specific to what we need now. but some of specific to what we need now.
```toml ```toml
[package] {{#include ../../sources/exex/remote/Cargo.toml}}
name = "remote-exex"
version = "0.1.0"
edition = "2021"
[dependencies]
# reth
reth = { git = "https://github.com/paradigmxyz/reth.git" }
reth-exex = { git = "https://github.com/paradigmxyz/reth.git", features = ["serde"] }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git"}
reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" }
# async
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
futures-util = "0.3"
# grpc
tonic = "0.11"
prost = "0.12"
bincode = "1"
# misc
eyre = "0.6"
[build-dependencies]
tonic-build = "0.11"
[[bin]]
name = "exex"
path = "src/exex.rs"
[[bin]]
name = "consumer"
path = "src/consumer.rs"
``` ```
We also added a build dependency for Tonic. We will use it to generate the Rust code for our We also added a build dependency for Tonic. We will use it to generate the Rust code for our
@ -87,26 +53,12 @@ For an example of a full schema, see the [Remote ExEx](https://github.com/paradi
</div> </div>
```protobuf ```protobuf
syntax = "proto3"; {{#include ../../sources/exex/remote/proto/exex.proto}}
package exex;
service RemoteExEx {
rpc Subscribe(SubscribeRequest) returns (stream ExExNotification) {}
}
message SubscribeRequest {}
message ExExNotification {
bytes data = 1;
}
``` ```
To instruct Tonic to generate the Rust code using this `.proto`, add the following lines to your `lib.rs` file: To instruct Tonic to generate the Rust code using this `.proto`, add the following lines to your `lib.rs` file:
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
pub mod proto { {{#include ../../sources/exex/remote/src/lib.rs}}
tonic::include_proto!("exex");
}
``` ```
## ExEx and gRPC server ## ExEx and gRPC server
@ -119,52 +71,7 @@ Let's create a minimal gRPC server that listens on the port `:10000`, and spawn
the [NodeBuilder](https://reth.rs/docs/reth/builder/struct.NodeBuilder.html)'s [task executor](https://reth.rs/docs/reth/tasks/struct.TaskExecutor.html). the [NodeBuilder](https://reth.rs/docs/reth/builder/struct.NodeBuilder.html)'s [task executor](https://reth.rs/docs/reth/tasks/struct.TaskExecutor.html).
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
use remote_exex::proto::{ {{#include ../../sources/exex/remote/src/exex_1.rs}}
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth_exex::ExExNotification;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
struct ExExService {}
#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;
async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (_tx, rx) = mpsc::channel(1);
Ok(Response::new(ReceiverStream::new(rx)))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder.node(EthereumNode::default()).launch().await?;
handle
.node
.task_executor
.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}
``` ```
Currently, it does not send anything on the stream. Currently, it does not send anything on the stream.
@ -175,40 +82,7 @@ Let's create this channel in the `main` function where we will have both gRPC se
and save the sender part (that way we will be able to create new receivers) of this channel in our gRPC server. and save the sender part (that way we will be able to create new receivers) of this channel in our gRPC server.
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
// ... {{#include ../../sources/exex/remote/src/exex_2.rs}}
use reth_exex::{ExExNotification};
struct ExExService {
notifications: Arc<broadcast::Sender<ExExNotification>>,
}
...
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = Arc::new(broadcast::channel(1).0);
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder
.node(EthereumNode::default())
.launch()
.await?;
handle
.node
.task_executor
.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}
``` ```
And with that, we're ready to handle incoming notifications, serialize them with [bincode](https://docs.rs/bincode/) And with that, we're ready to handle incoming notifications, serialize them with [bincode](https://docs.rs/bincode/)
@ -218,37 +92,7 @@ For each incoming request, we spawn a separate tokio task that will run in the b
and then return the stream receiver to the client. and then return the stream receiver to the client.
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
// ... {{#rustdoc_include ../../sources/exex/remote/src/exex_3.rs:snippet}}
#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;
async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (tx, rx) = mpsc::channel(1);
let mut notifications = self.notifications.subscribe();
tokio::spawn(async move {
while let Ok(notification) = notifications.recv().await {
let proto_notification = proto::ExExNotification {
data: bincode::serialize(&notification).expect("failed to serialize"),
};
tx.send(Ok(proto_notification))
.await
.expect("failed to send notification to client");
info!("Notification sent to the gRPC client");
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
// ...
``` ```
That's it for the gRPC server part! It doesn't receive anything on the `notifications` channel yet, That's it for the gRPC server part! It doesn't receive anything on the `notifications` channel yet,
@ -267,65 +111,14 @@ Don't forget to emit `ExExEvent::FinishedHeight`
</div> </div>
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
// ... {{#rustdoc_include ../../sources/exex/remote/src/exex_4.rs:snippet}}
use futures_util::StreamExt;
use reth_exex::{ExExContext, ExExEvent};
async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
info!("Notification sent to the gRPC server");
let _ = notifications.send(notification);
}
Ok(())
}
// ...
``` ```
All that's left is to connect all pieces together: install our ExEx in the node and pass the sender part All that's left is to connect all pieces together: install our ExEx in the node and pass the sender part
of communication channel to it. of communication channel to it.
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
// ... {{#rustdoc_include ../../sources/exex/remote/src/exex.rs:snippet}}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = Arc::new(broadcast::channel(1).0);
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder
.node(EthereumNode::default())
.install_exex("remote-exex", |ctx| async move {
Ok(remote_exex(ctx, notifications))
})
.launch()
.await?;
handle
.node
.task_executor
.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}
``` ```
### Full `exex.rs` code ### Full `exex.rs` code
@ -334,98 +127,7 @@ fn main() -> eyre::Result<()> {
<summary>Click to expand</summary> <summary>Click to expand</summary>
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
use std::sync::Arc; {{#include ../../sources/exex/remote/src/exex.rs}}
use futures_util::StreamExt;
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
struct ExExService {
notifications: Arc<broadcast::Sender<ExExNotification>>,
}
#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;
async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (tx, rx) = mpsc::channel(1);
let mut notifications = self.notifications.subscribe();
tokio::spawn(async move {
while let Ok(notification) = notifications.recv().await {
let proto_notification = proto::ExExNotification {
data: bincode::serialize(&notification).expect("failed to serialize"),
};
tx.send(Ok(proto_notification))
.await
.expect("failed to send notification to client");
info!(?notification, "Notification sent to the gRPC client");
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.next().await {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
info!(?notification, "Notification sent to the gRPC server");
let _ = notifications.send(notification);
}
Ok(())
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = Arc::new(broadcast::channel(1).0);
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder
.node(EthereumNode::default())
.install_exex("remote-exex", |ctx| async move {
Ok(remote_exex(ctx, notifications))
})
.launch()
.await?;
handle
.node
.task_executor
.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}
``` ```
</details> </details>
@ -442,38 +144,7 @@ because notifications can get very heavy
</div> </div>
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
use remote_exex::proto::{remote_ex_ex_client::RemoteExExClient, SubscribeRequest}; {{#include ../../sources/exex/remote/src/consumer.rs}}
use reth_exex::ExExNotification;
use reth_tracing::{tracing::info, RethTracer, Tracer};
#[tokio::main]
async fn main() -> eyre::Result<()> {
let _ = RethTracer::new().init()?;
let mut client = RemoteExExClient::connect("http://[::1]:10000")
.await?
.max_encoding_message_size(usize::MAX)
.max_decoding_message_size(usize::MAX);
let mut stream = client.subscribe(SubscribeRequest {}).await?.into_inner();
while let Some(notification) = stream.message().await? {
let notification: ExExNotification = bincode::deserialize(&notification.data)?;
match notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
}
Ok(())
}
``` ```
## Running ## Running

View File

@ -19,63 +19,7 @@ because you can't access variables inside the function to assert the state of yo
</div> </div>
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
use std::{ {{#include ../../sources/exex/tracking-state/src/bin/1.rs}}
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use futures_util::StreamExt;
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
struct MyExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
}
impl<Node: FullNodeComponents> Future for MyExEx<Node> {
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
if let Some(committed_chain) = notification.committed_chain() {
this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
}
Poll::Ready(Ok(()))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("my-exex", |ctx| async move { Ok(MyExEx { ctx }) })
.launch()
.await?;
handle.wait_for_node_exit().await
})
}
``` ```
For those who are not familiar with how async Rust works on a lower level, that may seem scary, For those who are not familiar with how async Rust works on a lower level, that may seem scary,
@ -96,85 +40,7 @@ With all that done, we're now free to add more fields to our `MyExEx` struct, an
Our ExEx will count the number of transactions in each block and log it to the console. Our ExEx will count the number of transactions in each block and log it to the console.
```rust,norun,noplayground,ignore ```rust,norun,noplayground,ignore
use std::{ {{#include ../../sources/exex/tracking-state/src/bin/2.rs}}
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use futures_util::StreamExt;
use reth::{api::FullNodeComponents, primitives::BlockNumber};
use reth_exex::{ExExContext, ExExEvent};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
struct MyExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
/// First block that was committed since the start of the ExEx.
first_block: Option<BlockNumber>,
/// Total number of transactions committed.
transactions: u64,
}
impl<Node: FullNodeComponents> MyExEx<Node> {
fn new(ctx: ExExContext<Node>) -> Self {
Self {
ctx,
first_block: None,
transactions: 0,
}
}
}
impl<Node: FullNodeComponents> Future for MyExEx<Node> {
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
if let Some(reverted_chain) = notification.reverted_chain() {
this.transactions = this.transactions.saturating_sub(
reverted_chain
.blocks_iter()
.map(|b| b.body.len() as u64)
.sum(),
);
}
if let Some(committed_chain) = notification.committed_chain() {
this.first_block.get_or_insert(committed_chain.first().number);
this.transactions += committed_chain
.blocks_iter()
.map(|b| b.body.len() as u64)
.sum::<u64>();
this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
if let Some(first_block) = this.first_block {
info!(%first_block, transactions = %this.transactions, "Total number of transactions");
}
}
Poll::Ready(Ok(()))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("my-exex", |ctx| async move { Ok(MyExEx::new(ctx)) })
.launch()
.await?;
handle.wait_for_node_exit().await
})
}
``` ```
As you can see, we added two fields to our ExEx struct: As you can see, we added two fields to our ExEx struct:

View File

@ -1,6 +1,8 @@
[workspace] [workspace]
members = [ members = [
"exex/hello-world", "exex/hello-world",
"exex/remote",
"exex/tracking-state",
] ]
# Explicitly set the resolver to version 2, which is the default for packages with edition >= 2021 # Explicitly set the resolver to version 2, which is the default for packages with edition >= 2021

View File

@ -0,0 +1,52 @@
[package]
name = "remote-exex"
version = "0.1.0"
edition = "2021"
[dependencies]
# reth
reth = { git = "https://github.com/paradigmxyz/reth.git" }
reth-exex = { git = "https://github.com/paradigmxyz/reth.git", features = ["serde"] }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git"}
reth-node-api = { git = "https://github.com/paradigmxyz/reth.git"}
reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" }
# async
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
futures-util = "0.3"
# grpc
tonic = "0.11"
prost = "0.12"
bincode = "1"
# misc
eyre = "0.6"
[build-dependencies]
tonic-build = "0.11"
[[bin]]
name = "exex_1"
path = "src/exex_1.rs"
[[bin]]
name = "exex_2"
path = "src/exex_2.rs"
[[bin]]
name = "exex_3"
path = "src/exex_3.rs"
[[bin]]
name = "exex_4"
path = "src/exex_4.rs"
[[bin]]
name = "exex"
path = "src/exex.rs"
[[bin]]
name = "consumer"
path = "src/consumer.rs"

View File

@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/exex.proto")?;
Ok(())
}

View File

@ -0,0 +1,13 @@
syntax = "proto3";
package exex;
service RemoteExEx {
rpc Subscribe(SubscribeRequest) returns (stream ExExNotification) {}
}
message SubscribeRequest {}
message ExExNotification {
bytes data = 1;
}

View File

@ -0,0 +1,32 @@
use remote_exex::proto::{remote_ex_ex_client::RemoteExExClient, SubscribeRequest};
use reth_exex::ExExNotification;
use reth_tracing::{tracing::info, RethTracer, Tracer};
#[tokio::main]
async fn main() -> eyre::Result<()> {
let _ = RethTracer::new().init()?;
let mut client = RemoteExExClient::connect("http://[::1]:10000")
.await?
.max_encoding_message_size(usize::MAX)
.max_decoding_message_size(usize::MAX);
let mut stream = client.subscribe(SubscribeRequest {}).await?.into_inner();
while let Some(notification) = stream.message().await? {
let notification: ExExNotification = bincode::deserialize(&notification.data)?;
match notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
}
Ok(())
}

View File

@ -0,0 +1,87 @@
use futures_util::TryStreamExt;
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
struct ExExService {
notifications: Arc<broadcast::Sender<ExExNotification>>,
}
#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;
async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (tx, rx) = mpsc::channel(1);
let mut notifications = self.notifications.subscribe();
tokio::spawn(async move {
while let Ok(notification) = notifications.recv().await {
let proto_notification = proto::ExExNotification {
data: bincode::serialize(&notification).expect("failed to serialize"),
};
tx.send(Ok(proto_notification))
.await
.expect("failed to send notification to client");
info!("Notification sent to the gRPC client");
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.try_next().await? {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
info!("Notification sent to the gRPC server");
let _ = notifications.send(notification);
}
Ok(())
}
// ANCHOR: snippet
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = Arc::new(broadcast::channel(1).0);
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder
.node(EthereumNode::default())
.install_exex("remote-exex", |ctx| async move { Ok(remote_exex(ctx, notifications)) })
.launch()
.await?;
handle.node.task_executor.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}
// ANCHOR_END: snippet

View File

@ -0,0 +1,40 @@
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth_node_ethereum::EthereumNode;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
struct ExExService {}
#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;
async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (_tx, rx) = mpsc::channel(1);
Ok(Response::new(ReceiverStream::new(rx)))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder.node(EthereumNode::default()).launch().await?;
handle.node.task_executor.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}

View File

@ -0,0 +1,49 @@
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth_exex::ExExNotification;
use reth_node_ethereum::EthereumNode;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
#[allow(dead_code)]
struct ExExService {
notifications: Arc<broadcast::Sender<ExExNotification>>,
}
#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;
async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (_tx, rx) = mpsc::channel(1);
Ok(Response::new(ReceiverStream::new(rx)))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = Arc::new(broadcast::channel(1).0);
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder.node(EthereumNode::default()).launch().await?;
handle.node.task_executor.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}

View File

@ -0,0 +1,65 @@
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth_exex::ExExNotification;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
struct ExExService {
notifications: Arc<broadcast::Sender<ExExNotification>>,
}
// ANCHOR: snippet
#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;
async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (tx, rx) = mpsc::channel(1);
let mut notifications = self.notifications.subscribe();
tokio::spawn(async move {
while let Ok(notification) = notifications.recv().await {
let proto_notification = proto::ExExNotification {
data: bincode::serialize(&notification).expect("failed to serialize"),
};
tx.send(Ok(proto_notification))
.await
.expect("failed to send notification to client");
info!("Notification sent to the gRPC client");
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
// ANCHOR_END: snippet
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = Arc::new(broadcast::channel(1).0);
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder.node(EthereumNode::default()).launch().await?;
handle.node.task_executor.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}

View File

@ -0,0 +1,84 @@
use futures_util::TryStreamExt;
use remote_exex::proto::{
self,
remote_ex_ex_server::{RemoteExEx, RemoteExExServer},
};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};
struct ExExService {
notifications: Arc<broadcast::Sender<ExExNotification>>,
}
#[tonic::async_trait]
impl RemoteExEx for ExExService {
type SubscribeStream = ReceiverStream<Result<proto::ExExNotification, Status>>;
async fn subscribe(
&self,
_request: Request<proto::SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let (tx, rx) = mpsc::channel(1);
let mut notifications = self.notifications.subscribe();
tokio::spawn(async move {
while let Ok(notification) = notifications.recv().await {
let proto_notification = proto::ExExNotification {
data: bincode::serialize(&notification).expect("failed to serialize"),
};
tx.send(Ok(proto_notification))
.await
.expect("failed to send notification to client");
info!("Notification sent to the gRPC client");
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
// ANCHOR: snippet
#[allow(dead_code)]
async fn remote_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
notifications: Arc<broadcast::Sender<ExExNotification>>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.try_next().await? {
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
info!("Notification sent to the gRPC server");
let _ = notifications.send(notification);
}
Ok(())
}
// ANCHOR_END: snippet
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let notifications = Arc::new(broadcast::channel(1).0);
let server = Server::builder()
.add_service(RemoteExExServer::new(ExExService {
notifications: notifications.clone(),
}))
.serve("[::1]:10000".parse().unwrap());
let handle = builder.node(EthereumNode::default()).launch().await?;
handle.node.task_executor.spawn_critical("gRPC server", async move {
server.await.expect("failed to start gRPC server")
});
handle.wait_for_node_exit().await
})
}

View File

@ -0,0 +1,3 @@
pub mod proto {
tonic::include_proto!("exex");
}

View File

@ -0,0 +1,14 @@
[package]
name = "tracking-state"
version = "0.1.0"
edition = "2021"
[dependencies]
reth = { git = "https://github.com/paradigmxyz/reth.git" }
reth-exex = { git = "https://github.com/paradigmxyz/reth.git", features = ["serde"] }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git"}
reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" }
eyre = "0.6" # Easy error handling
futures-util = "0.3" # Stream utilities for consuming notifications
alloy-primitives = "0.8.7"

View File

@ -0,0 +1,57 @@
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use futures_util::{FutureExt, TryStreamExt};
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
struct MyExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
}
impl<Node: FullNodeComponents> Future for MyExEx<Node> {
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
if let Some(committed_chain) = notification.committed_chain() {
this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
}
Poll::Ready(Ok(()))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("my-exex", |ctx| async move { Ok(MyExEx { ctx }) })
.launch()
.await?;
handle.wait_for_node_exit().await
})
}

View File

@ -0,0 +1,73 @@
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use alloy_primitives::BlockNumber;
use futures_util::{FutureExt, TryStreamExt};
use reth::api::FullNodeComponents;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
struct MyExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
/// First block that was committed since the start of the ExEx.
first_block: Option<BlockNumber>,
/// Total number of transactions committed.
transactions: u64,
}
impl<Node: FullNodeComponents> MyExEx<Node> {
fn new(ctx: ExExContext<Node>) -> Self {
Self { ctx, first_block: None, transactions: 0 }
}
}
impl<Node: FullNodeComponents> Future for MyExEx<Node> {
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? {
if let Some(reverted_chain) = notification.reverted_chain() {
this.transactions = this.transactions.saturating_sub(
reverted_chain.blocks_iter().map(|b| b.body.transactions.len() as u64).sum(),
);
}
if let Some(committed_chain) = notification.committed_chain() {
this.first_block.get_or_insert(committed_chain.first().number);
this.transactions += committed_chain
.blocks_iter()
.map(|b| b.body.transactions.len() as u64)
.sum::<u64>();
this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
if let Some(first_block) = this.first_block {
info!(%first_block, transactions = %this.transactions, "Total number of transactions");
}
}
Poll::Ready(Ok(()))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("my-exex", |ctx| async move { Ok(MyExEx::new(ctx)) })
.launch()
.await?;
handle.wait_for_node_exit().await
})
}