diff --git a/book/developers/exex/remote.md b/book/developers/exex/remote.md index 4344e28b3..0ec704308 100644 --- a/book/developers/exex/remote.md +++ b/book/developers/exex/remote.md @@ -25,41 +25,7 @@ We will also need a bunch of dependencies. Some of them you know from the [Hello but some of specific to what we need now. ```toml -[package] -name = "remote-exex" -version = "0.1.0" -edition = "2021" - -[dependencies] -# reth -reth = { git = "https://github.com/paradigmxyz/reth.git" } -reth-exex = { git = "https://github.com/paradigmxyz/reth.git", features = ["serde"] } -reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git"} -reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } - -# async -tokio = { version = "1", features = ["full"] } -tokio-stream = "0.1" -futures-util = "0.3" - -# grpc -tonic = "0.11" -prost = "0.12" -bincode = "1" - -# misc -eyre = "0.6" - -[build-dependencies] -tonic-build = "0.11" - -[[bin]] -name = "exex" -path = "src/exex.rs" - -[[bin]] -name = "consumer" -path = "src/consumer.rs" +{{#include ../../sources/exex/remote/Cargo.toml}} ``` We also added a build dependency for Tonic. We will use it to generate the Rust code for our @@ -87,26 +53,12 @@ For an example of a full schema, see the [Remote ExEx](https://github.com/paradi ```protobuf -syntax = "proto3"; - -package exex; - -service RemoteExEx { - rpc Subscribe(SubscribeRequest) returns (stream ExExNotification) {} -} - -message SubscribeRequest {} - -message ExExNotification { - bytes data = 1; -} +{{#include ../../sources/exex/remote/proto/exex.proto}} ``` To instruct Tonic to generate the Rust code using this `.proto`, add the following lines to your `lib.rs` file: ```rust,norun,noplayground,ignore -pub mod proto { - tonic::include_proto!("exex"); -} +{{#include ../../sources/exex/remote/src/lib.rs}} ``` ## ExEx and gRPC server @@ -119,52 +71,7 @@ Let's create a minimal gRPC server that listens on the port `:10000`, and spawn the [NodeBuilder](https://reth.rs/docs/reth/builder/struct.NodeBuilder.html)'s [task executor](https://reth.rs/docs/reth/tasks/struct.TaskExecutor.html). ```rust,norun,noplayground,ignore -use remote_exex::proto::{ - self, - remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, -}; -use reth_exex::ExExNotification; -use reth_node_ethereum::EthereumNode; -use reth_tracing::tracing::info; -use std::sync::Arc; -use tokio::sync::{broadcast, mpsc}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::{transport::Server, Request, Response, Status}; - -struct ExExService {} - -#[tonic::async_trait] -impl RemoteExEx for ExExService { - type SubscribeStream = ReceiverStream>; - - async fn subscribe( - &self, - _request: Request, - ) -> Result, Status> { - let (_tx, rx) = mpsc::channel(1); - - Ok(Response::new(ReceiverStream::new(rx))) - } -} - -fn main() -> eyre::Result<()> { - reth::cli::Cli::parse_args().run(|builder, _| async move { - let server = Server::builder() - .add_service(RemoteExExServer::new(ExExService {})) - .serve("[::1]:10000".parse().unwrap()); - - let handle = builder.node(EthereumNode::default()).launch().await?; - - handle - .node - .task_executor - .spawn_critical("gRPC server", async move { - server.await.expect("failed to start gRPC server") - }); - - handle.wait_for_node_exit().await - }) -} +{{#include ../../sources/exex/remote/src/exex_1.rs}} ``` Currently, it does not send anything on the stream. @@ -175,40 +82,7 @@ Let's create this channel in the `main` function where we will have both gRPC se and save the sender part (that way we will be able to create new receivers) of this channel in our gRPC server. ```rust,norun,noplayground,ignore -// ... -use reth_exex::{ExExNotification}; - -struct ExExService { - notifications: Arc>, -} - -... - -fn main() -> eyre::Result<()> { - reth::cli::Cli::parse_args().run(|builder, _| async move { - let notifications = Arc::new(broadcast::channel(1).0); - - let server = Server::builder() - .add_service(RemoteExExServer::new(ExExService { - notifications: notifications.clone(), - })) - .serve("[::1]:10000".parse().unwrap()); - - let handle = builder - .node(EthereumNode::default()) - .launch() - .await?; - - handle - .node - .task_executor - .spawn_critical("gRPC server", async move { - server.await.expect("failed to start gRPC server") - }); - - handle.wait_for_node_exit().await - }) -} +{{#include ../../sources/exex/remote/src/exex_2.rs}} ``` And with that, we're ready to handle incoming notifications, serialize them with [bincode](https://docs.rs/bincode/) @@ -218,37 +92,7 @@ For each incoming request, we spawn a separate tokio task that will run in the b and then return the stream receiver to the client. ```rust,norun,noplayground,ignore -// ... - -#[tonic::async_trait] -impl RemoteExEx for ExExService { - type SubscribeStream = ReceiverStream>; - - async fn subscribe( - &self, - _request: Request, - ) -> Result, Status> { - let (tx, rx) = mpsc::channel(1); - - let mut notifications = self.notifications.subscribe(); - tokio::spawn(async move { - while let Ok(notification) = notifications.recv().await { - let proto_notification = proto::ExExNotification { - data: bincode::serialize(¬ification).expect("failed to serialize"), - }; - tx.send(Ok(proto_notification)) - .await - .expect("failed to send notification to client"); - - info!("Notification sent to the gRPC client"); - } - }); - - Ok(Response::new(ReceiverStream::new(rx))) - } -} - -// ... +{{#rustdoc_include ../../sources/exex/remote/src/exex_3.rs:snippet}} ``` That's it for the gRPC server part! It doesn't receive anything on the `notifications` channel yet, @@ -267,65 +111,14 @@ Don't forget to emit `ExExEvent::FinishedHeight` ```rust,norun,noplayground,ignore -// ... - -use futures_util::StreamExt; -use reth_exex::{ExExContext, ExExEvent}; - -async fn remote_exex( - mut ctx: ExExContext, - notifications: Arc>, -) -> eyre::Result<()> { - while let Some(notification) = ctx.notifications.next().await { - if let Some(committed_chain) = notification.committed_chain() { - ctx.events - .send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; - } - - info!("Notification sent to the gRPC server"); - let _ = notifications.send(notification); - } - - Ok(()) -} - -// ... +{{#rustdoc_include ../../sources/exex/remote/src/exex_4.rs:snippet}} ``` All that's left is to connect all pieces together: install our ExEx in the node and pass the sender part of communication channel to it. ```rust,norun,noplayground,ignore -// ... - -fn main() -> eyre::Result<()> { - reth::cli::Cli::parse_args().run(|builder, _| async move { - let notifications = Arc::new(broadcast::channel(1).0); - - let server = Server::builder() - .add_service(RemoteExExServer::new(ExExService { - notifications: notifications.clone(), - })) - .serve("[::1]:10000".parse().unwrap()); - - let handle = builder - .node(EthereumNode::default()) - .install_exex("remote-exex", |ctx| async move { - Ok(remote_exex(ctx, notifications)) - }) - .launch() - .await?; - - handle - .node - .task_executor - .spawn_critical("gRPC server", async move { - server.await.expect("failed to start gRPC server") - }); - - handle.wait_for_node_exit().await - }) -} +{{#rustdoc_include ../../sources/exex/remote/src/exex.rs:snippet}} ``` ### Full `exex.rs` code @@ -334,98 +127,7 @@ fn main() -> eyre::Result<()> { Click to expand ```rust,norun,noplayground,ignore -use std::sync::Arc; - -use futures_util::StreamExt; -use remote_exex::proto::{ - self, - remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, -}; -use reth::api::FullNodeComponents; -use reth_exex::{ExExContext, ExExEvent, ExExNotification}; -use reth_node_ethereum::EthereumNode; -use reth_tracing::tracing::info; -use tokio::sync::{broadcast, mpsc}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::{transport::Server, Request, Response, Status}; - -struct ExExService { - notifications: Arc>, -} - -#[tonic::async_trait] -impl RemoteExEx for ExExService { - type SubscribeStream = ReceiverStream>; - - async fn subscribe( - &self, - _request: Request, - ) -> Result, Status> { - let (tx, rx) = mpsc::channel(1); - - let mut notifications = self.notifications.subscribe(); - tokio::spawn(async move { - while let Ok(notification) = notifications.recv().await { - let proto_notification = proto::ExExNotification { - data: bincode::serialize(¬ification).expect("failed to serialize"), - }; - tx.send(Ok(proto_notification)) - .await - .expect("failed to send notification to client"); - - info!(?notification, "Notification sent to the gRPC client"); - } - }); - - Ok(Response::new(ReceiverStream::new(rx))) - } -} - -async fn remote_exex( - mut ctx: ExExContext, - notifications: Arc>, -) -> eyre::Result<()> { - while let Some(notification) = ctx.notifications.next().await { - if let Some(committed_chain) = notification.committed_chain() { - ctx.events - .send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; - } - - info!(?notification, "Notification sent to the gRPC server"); - let _ = notifications.send(notification); - } - - Ok(()) -} - -fn main() -> eyre::Result<()> { - reth::cli::Cli::parse_args().run(|builder, _| async move { - let notifications = Arc::new(broadcast::channel(1).0); - - let server = Server::builder() - .add_service(RemoteExExServer::new(ExExService { - notifications: notifications.clone(), - })) - .serve("[::1]:10000".parse().unwrap()); - - let handle = builder - .node(EthereumNode::default()) - .install_exex("remote-exex", |ctx| async move { - Ok(remote_exex(ctx, notifications)) - }) - .launch() - .await?; - - handle - .node - .task_executor - .spawn_critical("gRPC server", async move { - server.await.expect("failed to start gRPC server") - }); - - handle.wait_for_node_exit().await - }) -} +{{#include ../../sources/exex/remote/src/exex.rs}} ``` @@ -442,38 +144,7 @@ because notifications can get very heavy ```rust,norun,noplayground,ignore -use remote_exex::proto::{remote_ex_ex_client::RemoteExExClient, SubscribeRequest}; -use reth_exex::ExExNotification; -use reth_tracing::{tracing::info, RethTracer, Tracer}; - -#[tokio::main] -async fn main() -> eyre::Result<()> { - let _ = RethTracer::new().init()?; - - let mut client = RemoteExExClient::connect("http://[::1]:10000") - .await? - .max_encoding_message_size(usize::MAX) - .max_decoding_message_size(usize::MAX); - - let mut stream = client.subscribe(SubscribeRequest {}).await?.into_inner(); - while let Some(notification) = stream.message().await? { - let notification: ExExNotification = bincode::deserialize(¬ification.data)?; - - match notification { - ExExNotification::ChainCommitted { new } => { - info!(committed_chain = ?new.range(), "Received commit"); - } - ExExNotification::ChainReorged { old, new } => { - info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); - } - ExExNotification::ChainReverted { old } => { - info!(reverted_chain = ?old.range(), "Received revert"); - } - }; - } - - Ok(()) -} +{{#include ../../sources/exex/remote/src/consumer.rs}} ``` ## Running diff --git a/book/developers/exex/tracking-state.md b/book/developers/exex/tracking-state.md index 52c73e618..d2a9fe6ca 100644 --- a/book/developers/exex/tracking-state.md +++ b/book/developers/exex/tracking-state.md @@ -19,63 +19,7 @@ because you can't access variables inside the function to assert the state of yo ```rust,norun,noplayground,ignore -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use futures_util::StreamExt; -use reth::api::FullNodeComponents; -use reth_exex::{ExExContext, ExExEvent, ExExNotification}; -use reth_node_ethereum::EthereumNode; -use reth_tracing::tracing::info; - -struct MyExEx { - ctx: ExExContext, -} - -impl Future for MyExEx { - type Output = eyre::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) { - match ¬ification { - ExExNotification::ChainCommitted { new } => { - info!(committed_chain = ?new.range(), "Received commit"); - } - ExExNotification::ChainReorged { old, new } => { - info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); - } - ExExNotification::ChainReverted { old } => { - info!(reverted_chain = ?old.range(), "Received revert"); - } - }; - - if let Some(committed_chain) = notification.committed_chain() { - this.ctx - .events - .send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; - } - } - - Poll::Ready(Ok(())) - } -} - -fn main() -> eyre::Result<()> { - reth::cli::Cli::parse_args().run(|builder, _| async move { - let handle = builder - .node(EthereumNode::default()) - .install_exex("my-exex", |ctx| async move { Ok(MyExEx { ctx }) }) - .launch() - .await?; - - handle.wait_for_node_exit().await - }) -} +{{#include ../../sources/exex/tracking-state/src/bin/1.rs}} ``` For those who are not familiar with how async Rust works on a lower level, that may seem scary, @@ -96,85 +40,7 @@ With all that done, we're now free to add more fields to our `MyExEx` struct, an Our ExEx will count the number of transactions in each block and log it to the console. ```rust,norun,noplayground,ignore -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use futures_util::StreamExt; -use reth::{api::FullNodeComponents, primitives::BlockNumber}; -use reth_exex::{ExExContext, ExExEvent}; -use reth_node_ethereum::EthereumNode; -use reth_tracing::tracing::info; - -struct MyExEx { - ctx: ExExContext, - /// First block that was committed since the start of the ExEx. - first_block: Option, - /// Total number of transactions committed. - transactions: u64, -} - -impl MyExEx { - fn new(ctx: ExExContext) -> Self { - Self { - ctx, - first_block: None, - transactions: 0, - } - } -} - -impl Future for MyExEx { - type Output = eyre::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) { - if let Some(reverted_chain) = notification.reverted_chain() { - this.transactions = this.transactions.saturating_sub( - reverted_chain - .blocks_iter() - .map(|b| b.body.len() as u64) - .sum(), - ); - } - - if let Some(committed_chain) = notification.committed_chain() { - this.first_block.get_or_insert(committed_chain.first().number); - - this.transactions += committed_chain - .blocks_iter() - .map(|b| b.body.len() as u64) - .sum::(); - - this.ctx - .events - .send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; - } - - if let Some(first_block) = this.first_block { - info!(%first_block, transactions = %this.transactions, "Total number of transactions"); - } - } - - Poll::Ready(Ok(())) - } -} - -fn main() -> eyre::Result<()> { - reth::cli::Cli::parse_args().run(|builder, _| async move { - let handle = builder - .node(EthereumNode::default()) - .install_exex("my-exex", |ctx| async move { Ok(MyExEx::new(ctx)) }) - .launch() - .await?; - - handle.wait_for_node_exit().await - }) -} +{{#include ../../sources/exex/tracking-state/src/bin/2.rs}} ``` As you can see, we added two fields to our ExEx struct: diff --git a/book/sources/Cargo.toml b/book/sources/Cargo.toml index c04c8567f..1529af952 100644 --- a/book/sources/Cargo.toml +++ b/book/sources/Cargo.toml @@ -1,6 +1,8 @@ [workspace] members = [ "exex/hello-world", + "exex/remote", + "exex/tracking-state", ] # Explicitly set the resolver to version 2, which is the default for packages with edition >= 2021 diff --git a/book/sources/exex/remote/Cargo.toml b/book/sources/exex/remote/Cargo.toml new file mode 100644 index 000000000..6eeb848ca --- /dev/null +++ b/book/sources/exex/remote/Cargo.toml @@ -0,0 +1,52 @@ +[package] +name = "remote-exex" +version = "0.1.0" +edition = "2021" + +[dependencies] +# reth +reth = { git = "https://github.com/paradigmxyz/reth.git" } +reth-exex = { git = "https://github.com/paradigmxyz/reth.git", features = ["serde"] } +reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git"} +reth-node-api = { git = "https://github.com/paradigmxyz/reth.git"} +reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } + +# async +tokio = { version = "1", features = ["full"] } +tokio-stream = "0.1" +futures-util = "0.3" + +# grpc +tonic = "0.11" +prost = "0.12" +bincode = "1" + +# misc +eyre = "0.6" + +[build-dependencies] +tonic-build = "0.11" + +[[bin]] +name = "exex_1" +path = "src/exex_1.rs" + +[[bin]] +name = "exex_2" +path = "src/exex_2.rs" + +[[bin]] +name = "exex_3" +path = "src/exex_3.rs" + +[[bin]] +name = "exex_4" +path = "src/exex_4.rs" + +[[bin]] +name = "exex" +path = "src/exex.rs" + +[[bin]] +name = "consumer" +path = "src/consumer.rs" \ No newline at end of file diff --git a/book/sources/exex/remote/build.rs b/book/sources/exex/remote/build.rs new file mode 100644 index 000000000..8e66f2a30 --- /dev/null +++ b/book/sources/exex/remote/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/exex.proto")?; + Ok(()) +} diff --git a/book/sources/exex/remote/proto/exex.proto b/book/sources/exex/remote/proto/exex.proto new file mode 100644 index 000000000..9bb180b5f --- /dev/null +++ b/book/sources/exex/remote/proto/exex.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package exex; + +service RemoteExEx { + rpc Subscribe(SubscribeRequest) returns (stream ExExNotification) {} +} + +message SubscribeRequest {} + +message ExExNotification { + bytes data = 1; +} \ No newline at end of file diff --git a/book/sources/exex/remote/src/consumer.rs b/book/sources/exex/remote/src/consumer.rs new file mode 100644 index 000000000..a0400f4bb --- /dev/null +++ b/book/sources/exex/remote/src/consumer.rs @@ -0,0 +1,32 @@ +use remote_exex::proto::{remote_ex_ex_client::RemoteExExClient, SubscribeRequest}; +use reth_exex::ExExNotification; +use reth_tracing::{tracing::info, RethTracer, Tracer}; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let _ = RethTracer::new().init()?; + + let mut client = RemoteExExClient::connect("http://[::1]:10000") + .await? + .max_encoding_message_size(usize::MAX) + .max_decoding_message_size(usize::MAX); + + let mut stream = client.subscribe(SubscribeRequest {}).await?.into_inner(); + while let Some(notification) = stream.message().await? { + let notification: ExExNotification = bincode::deserialize(¬ification.data)?; + + match notification { + ExExNotification::ChainCommitted { new } => { + info!(committed_chain = ?new.range(), "Received commit"); + } + ExExNotification::ChainReorged { old, new } => { + info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + } + ExExNotification::ChainReverted { old } => { + info!(reverted_chain = ?old.range(), "Received revert"); + } + }; + } + + Ok(()) +} diff --git a/book/sources/exex/remote/src/exex.rs b/book/sources/exex/remote/src/exex.rs new file mode 100644 index 000000000..1ae4785db --- /dev/null +++ b/book/sources/exex/remote/src/exex.rs @@ -0,0 +1,87 @@ +use futures_util::TryStreamExt; +use remote_exex::proto::{ + self, + remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, +}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_node_api::FullNodeComponents; +use reth_node_ethereum::EthereumNode; +use reth_tracing::tracing::info; +use std::sync::Arc; +use tokio::sync::{broadcast, mpsc}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status}; + +struct ExExService { + notifications: Arc>, +} + +#[tonic::async_trait] +impl RemoteExEx for ExExService { + type SubscribeStream = ReceiverStream>; + + async fn subscribe( + &self, + _request: Request, + ) -> Result, Status> { + let (tx, rx) = mpsc::channel(1); + + let mut notifications = self.notifications.subscribe(); + tokio::spawn(async move { + while let Ok(notification) = notifications.recv().await { + let proto_notification = proto::ExExNotification { + data: bincode::serialize(¬ification).expect("failed to serialize"), + }; + tx.send(Ok(proto_notification)) + .await + .expect("failed to send notification to client"); + + info!("Notification sent to the gRPC client"); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +async fn remote_exex( + mut ctx: ExExContext, + notifications: Arc>, +) -> eyre::Result<()> { + while let Some(notification) = ctx.notifications.try_next().await? { + if let Some(committed_chain) = notification.committed_chain() { + ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; + } + + info!("Notification sent to the gRPC server"); + let _ = notifications.send(notification); + } + + Ok(()) +} + +// ANCHOR: snippet +fn main() -> eyre::Result<()> { + reth::cli::Cli::parse_args().run(|builder, _| async move { + let notifications = Arc::new(broadcast::channel(1).0); + + let server = Server::builder() + .add_service(RemoteExExServer::new(ExExService { + notifications: notifications.clone(), + })) + .serve("[::1]:10000".parse().unwrap()); + + let handle = builder + .node(EthereumNode::default()) + .install_exex("remote-exex", |ctx| async move { Ok(remote_exex(ctx, notifications)) }) + .launch() + .await?; + + handle.node.task_executor.spawn_critical("gRPC server", async move { + server.await.expect("failed to start gRPC server") + }); + + handle.wait_for_node_exit().await + }) +} +// ANCHOR_END: snippet diff --git a/book/sources/exex/remote/src/exex_1.rs b/book/sources/exex/remote/src/exex_1.rs new file mode 100644 index 000000000..09a4bcc06 --- /dev/null +++ b/book/sources/exex/remote/src/exex_1.rs @@ -0,0 +1,40 @@ +use remote_exex::proto::{ + self, + remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, +}; +use reth_node_ethereum::EthereumNode; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status}; + +struct ExExService {} + +#[tonic::async_trait] +impl RemoteExEx for ExExService { + type SubscribeStream = ReceiverStream>; + + async fn subscribe( + &self, + _request: Request, + ) -> Result, Status> { + let (_tx, rx) = mpsc::channel(1); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +fn main() -> eyre::Result<()> { + reth::cli::Cli::parse_args().run(|builder, _| async move { + let server = Server::builder() + .add_service(RemoteExExServer::new(ExExService {})) + .serve("[::1]:10000".parse().unwrap()); + + let handle = builder.node(EthereumNode::default()).launch().await?; + + handle.node.task_executor.spawn_critical("gRPC server", async move { + server.await.expect("failed to start gRPC server") + }); + + handle.wait_for_node_exit().await + }) +} diff --git a/book/sources/exex/remote/src/exex_2.rs b/book/sources/exex/remote/src/exex_2.rs new file mode 100644 index 000000000..c4f51ddae --- /dev/null +++ b/book/sources/exex/remote/src/exex_2.rs @@ -0,0 +1,49 @@ +use remote_exex::proto::{ + self, + remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, +}; +use reth_exex::ExExNotification; +use reth_node_ethereum::EthereumNode; +use std::sync::Arc; +use tokio::sync::{broadcast, mpsc}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status}; + +#[allow(dead_code)] +struct ExExService { + notifications: Arc>, +} + +#[tonic::async_trait] +impl RemoteExEx for ExExService { + type SubscribeStream = ReceiverStream>; + + async fn subscribe( + &self, + _request: Request, + ) -> Result, Status> { + let (_tx, rx) = mpsc::channel(1); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +fn main() -> eyre::Result<()> { + reth::cli::Cli::parse_args().run(|builder, _| async move { + let notifications = Arc::new(broadcast::channel(1).0); + + let server = Server::builder() + .add_service(RemoteExExServer::new(ExExService { + notifications: notifications.clone(), + })) + .serve("[::1]:10000".parse().unwrap()); + + let handle = builder.node(EthereumNode::default()).launch().await?; + + handle.node.task_executor.spawn_critical("gRPC server", async move { + server.await.expect("failed to start gRPC server") + }); + + handle.wait_for_node_exit().await + }) +} diff --git a/book/sources/exex/remote/src/exex_3.rs b/book/sources/exex/remote/src/exex_3.rs new file mode 100644 index 000000000..9f264cf34 --- /dev/null +++ b/book/sources/exex/remote/src/exex_3.rs @@ -0,0 +1,65 @@ +use remote_exex::proto::{ + self, + remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, +}; +use reth_exex::ExExNotification; +use reth_node_ethereum::EthereumNode; +use reth_tracing::tracing::info; +use std::sync::Arc; +use tokio::sync::{broadcast, mpsc}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status}; + +struct ExExService { + notifications: Arc>, +} + +// ANCHOR: snippet +#[tonic::async_trait] +impl RemoteExEx for ExExService { + type SubscribeStream = ReceiverStream>; + + async fn subscribe( + &self, + _request: Request, + ) -> Result, Status> { + let (tx, rx) = mpsc::channel(1); + + let mut notifications = self.notifications.subscribe(); + tokio::spawn(async move { + while let Ok(notification) = notifications.recv().await { + let proto_notification = proto::ExExNotification { + data: bincode::serialize(¬ification).expect("failed to serialize"), + }; + tx.send(Ok(proto_notification)) + .await + .expect("failed to send notification to client"); + + info!("Notification sent to the gRPC client"); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} +// ANCHOR_END: snippet + +fn main() -> eyre::Result<()> { + reth::cli::Cli::parse_args().run(|builder, _| async move { + let notifications = Arc::new(broadcast::channel(1).0); + + let server = Server::builder() + .add_service(RemoteExExServer::new(ExExService { + notifications: notifications.clone(), + })) + .serve("[::1]:10000".parse().unwrap()); + + let handle = builder.node(EthereumNode::default()).launch().await?; + + handle.node.task_executor.spawn_critical("gRPC server", async move { + server.await.expect("failed to start gRPC server") + }); + + handle.wait_for_node_exit().await + }) +} diff --git a/book/sources/exex/remote/src/exex_4.rs b/book/sources/exex/remote/src/exex_4.rs new file mode 100644 index 000000000..24c7bf2c2 --- /dev/null +++ b/book/sources/exex/remote/src/exex_4.rs @@ -0,0 +1,84 @@ +use futures_util::TryStreamExt; +use remote_exex::proto::{ + self, + remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, +}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_node_api::FullNodeComponents; +use reth_node_ethereum::EthereumNode; +use reth_tracing::tracing::info; +use std::sync::Arc; +use tokio::sync::{broadcast, mpsc}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{transport::Server, Request, Response, Status}; + +struct ExExService { + notifications: Arc>, +} + +#[tonic::async_trait] +impl RemoteExEx for ExExService { + type SubscribeStream = ReceiverStream>; + + async fn subscribe( + &self, + _request: Request, + ) -> Result, Status> { + let (tx, rx) = mpsc::channel(1); + + let mut notifications = self.notifications.subscribe(); + tokio::spawn(async move { + while let Ok(notification) = notifications.recv().await { + let proto_notification = proto::ExExNotification { + data: bincode::serialize(¬ification).expect("failed to serialize"), + }; + tx.send(Ok(proto_notification)) + .await + .expect("failed to send notification to client"); + + info!("Notification sent to the gRPC client"); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +// ANCHOR: snippet +#[allow(dead_code)] +async fn remote_exex( + mut ctx: ExExContext, + notifications: Arc>, +) -> eyre::Result<()> { + while let Some(notification) = ctx.notifications.try_next().await? { + if let Some(committed_chain) = notification.committed_chain() { + ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; + } + + info!("Notification sent to the gRPC server"); + let _ = notifications.send(notification); + } + + Ok(()) +} +// ANCHOR_END: snippet + +fn main() -> eyre::Result<()> { + reth::cli::Cli::parse_args().run(|builder, _| async move { + let notifications = Arc::new(broadcast::channel(1).0); + + let server = Server::builder() + .add_service(RemoteExExServer::new(ExExService { + notifications: notifications.clone(), + })) + .serve("[::1]:10000".parse().unwrap()); + + let handle = builder.node(EthereumNode::default()).launch().await?; + + handle.node.task_executor.spawn_critical("gRPC server", async move { + server.await.expect("failed to start gRPC server") + }); + + handle.wait_for_node_exit().await + }) +} diff --git a/book/sources/exex/remote/src/lib.rs b/book/sources/exex/remote/src/lib.rs new file mode 100644 index 000000000..9abb458bd --- /dev/null +++ b/book/sources/exex/remote/src/lib.rs @@ -0,0 +1,3 @@ +pub mod proto { + tonic::include_proto!("exex"); +} diff --git a/book/sources/exex/tracking-state/Cargo.toml b/book/sources/exex/tracking-state/Cargo.toml new file mode 100644 index 000000000..3ce21b0c3 --- /dev/null +++ b/book/sources/exex/tracking-state/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "tracking-state" +version = "0.1.0" +edition = "2021" + +[dependencies] +reth = { git = "https://github.com/paradigmxyz/reth.git" } +reth-exex = { git = "https://github.com/paradigmxyz/reth.git", features = ["serde"] } +reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git"} +reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } + +eyre = "0.6" # Easy error handling +futures-util = "0.3" # Stream utilities for consuming notifications +alloy-primitives = "0.8.7" diff --git a/book/sources/exex/tracking-state/src/bin/1.rs b/book/sources/exex/tracking-state/src/bin/1.rs new file mode 100644 index 000000000..0d42e0791 --- /dev/null +++ b/book/sources/exex/tracking-state/src/bin/1.rs @@ -0,0 +1,57 @@ +use std::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, +}; + +use futures_util::{FutureExt, TryStreamExt}; +use reth::api::FullNodeComponents; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_node_ethereum::EthereumNode; +use reth_tracing::tracing::info; + +struct MyExEx { + ctx: ExExContext, +} + +impl Future for MyExEx { + type Output = eyre::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? { + match ¬ification { + ExExNotification::ChainCommitted { new } => { + info!(committed_chain = ?new.range(), "Received commit"); + } + ExExNotification::ChainReorged { old, new } => { + info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + } + ExExNotification::ChainReverted { old } => { + info!(reverted_chain = ?old.range(), "Received revert"); + } + }; + + if let Some(committed_chain) = notification.committed_chain() { + this.ctx + .events + .send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; + } + } + + Poll::Ready(Ok(())) + } +} + +fn main() -> eyre::Result<()> { + reth::cli::Cli::parse_args().run(|builder, _| async move { + let handle = builder + .node(EthereumNode::default()) + .install_exex("my-exex", |ctx| async move { Ok(MyExEx { ctx }) }) + .launch() + .await?; + + handle.wait_for_node_exit().await + }) +} diff --git a/book/sources/exex/tracking-state/src/bin/2.rs b/book/sources/exex/tracking-state/src/bin/2.rs new file mode 100644 index 000000000..941681066 --- /dev/null +++ b/book/sources/exex/tracking-state/src/bin/2.rs @@ -0,0 +1,73 @@ +use std::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, +}; + +use alloy_primitives::BlockNumber; +use futures_util::{FutureExt, TryStreamExt}; +use reth::api::FullNodeComponents; +use reth_exex::{ExExContext, ExExEvent}; +use reth_node_ethereum::EthereumNode; +use reth_tracing::tracing::info; + +struct MyExEx { + ctx: ExExContext, + /// First block that was committed since the start of the ExEx. + first_block: Option, + /// Total number of transactions committed. + transactions: u64, +} + +impl MyExEx { + fn new(ctx: ExExContext) -> Self { + Self { ctx, first_block: None, transactions: 0 } + } +} + +impl Future for MyExEx { + type Output = eyre::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? { + if let Some(reverted_chain) = notification.reverted_chain() { + this.transactions = this.transactions.saturating_sub( + reverted_chain.blocks_iter().map(|b| b.body.transactions.len() as u64).sum(), + ); + } + + if let Some(committed_chain) = notification.committed_chain() { + this.first_block.get_or_insert(committed_chain.first().number); + + this.transactions += committed_chain + .blocks_iter() + .map(|b| b.body.transactions.len() as u64) + .sum::(); + + this.ctx + .events + .send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; + } + + if let Some(first_block) = this.first_block { + info!(%first_block, transactions = %this.transactions, "Total number of transactions"); + } + } + + Poll::Ready(Ok(())) + } +} + +fn main() -> eyre::Result<()> { + reth::cli::Cli::parse_args().run(|builder, _| async move { + let handle = builder + .node(EthereumNode::default()) + .install_exex("my-exex", |ctx| async move { Ok(MyExEx::new(ctx)) }) + .launch() + .await?; + + handle.wait_for_node_exit().await + }) +}