example(exex): move to a separate repo (#9500)

This commit is contained in:
Alexey Shekhirin
2024-07-15 11:15:15 +01:00
committed by GitHub
parent 9210269d3a
commit 831d5a74c8
21 changed files with 3 additions and 2459 deletions

183
Cargo.lock generated
View File

@ -570,7 +570,6 @@ version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "867a5469d61480fea08c7333ffeca52d5b621f5ca2e44f271b117ec1fc9a0525"
dependencies = [
"alloy-json-abi",
"alloy-sol-macro-input",
"const-hex",
"heck 0.5.0",
@ -589,13 +588,11 @@ version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e482dc33a32b6fadbc0f599adea520bd3aaa585c141a80b404d0a3e3fa72528"
dependencies = [
"alloy-json-abi",
"const-hex",
"dunce",
"heck 0.5.0",
"proc-macro2",
"quote",
"serde_json",
"syn 2.0.71",
"syn-solidity",
]
@ -616,7 +613,6 @@ version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a91ca40fa20793ae9c3841b83e74569d1cc9af29a2f5237314fd3452d51e38c7"
dependencies = [
"alloy-json-abi",
"alloy-primitives",
"alloy-sol-macro",
"const-hex",
@ -2443,7 +2439,7 @@ dependencies = [
"enr",
"fnv",
"futures",
"hashlink 0.8.4",
"hashlink",
"hex",
"hkdf",
"lazy_static",
@ -2871,115 +2867,6 @@ dependencies = [
"reth-rpc-types",
]
[[package]]
name = "example-exex-discv5"
version = "0.0.0"
dependencies = [
"clap",
"discv5",
"enr",
"eyre",
"futures",
"futures-util",
"reth",
"reth-chainspec",
"reth-discv5",
"reth-exex",
"reth-exex-test-utils",
"reth-network-peers",
"reth-node-api",
"reth-node-ethereum",
"reth-testing-utils",
"reth-tracing",
"serde_json",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "example-exex-in-memory-state"
version = "0.0.0"
dependencies = [
"eyre",
"reth",
"reth-execution-types",
"reth-exex",
"reth-exex-test-utils",
"reth-node-api",
"reth-node-ethereum",
"reth-testing-utils",
"reth-tracing",
"tokio",
]
[[package]]
name = "example-exex-minimal"
version = "0.0.0"
dependencies = [
"eyre",
"futures",
"reth",
"reth-execution-types",
"reth-exex",
"reth-exex-test-utils",
"reth-node-api",
"reth-node-ethereum",
"reth-tracing",
"tokio",
]
[[package]]
name = "example-exex-op-bridge"
version = "0.0.0"
dependencies = [
"alloy-sol-types",
"eyre",
"futures",
"rand 0.8.5",
"reth",
"reth-execution-types",
"reth-exex",
"reth-exex-test-utils",
"reth-node-api",
"reth-node-ethereum",
"reth-primitives",
"reth-testing-utils",
"reth-tracing",
"rusqlite",
"tempfile",
"tokio",
]
[[package]]
name = "example-exex-rollup"
version = "0.0.0"
dependencies = [
"alloy-consensus",
"alloy-genesis",
"alloy-rlp",
"alloy-sol-types",
"eyre",
"foundry-blob-explorers",
"once_cell",
"reth",
"reth-chainspec",
"reth-execution-errors",
"reth-execution-types",
"reth-exex",
"reth-node-api",
"reth-node-ethereum",
"reth-primitives",
"reth-provider",
"reth-revm",
"reth-testing-utils",
"reth-tracing",
"rusqlite",
"secp256k1",
"serde_json",
"tokio",
]
[[package]]
name = "example-manual-p2p"
version = "0.0.0"
@ -3110,18 +2997,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fast-float"
version = "0.2.0"
@ -3230,22 +3105,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "foundry-blob-explorers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "195bb5b228e1215c50d828f3e7d48a809a0af2bc0120462710ea5e7fcba3cbe2"
dependencies = [
"alloy-chains",
"alloy-eips",
"alloy-primitives",
"alloy-rpc-types-eth",
"alloy-serde",
"chrono",
"reqwest",
"serde",
]
[[package]]
name = "fragile"
version = "2.0.0"
@ -3560,15 +3419,6 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "hashlink"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "hdrhistogram"
version = "7.5.4"
@ -4756,17 +4606,6 @@ dependencies = [
"libsecp256k1-core",
]
[[package]]
name = "libsqlite3-sys"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "linked-hash-map"
version = "0.5.6"
@ -9089,20 +8928,6 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48fd7bd8a6377e15ad9d42a8ec25371b94ddc67abe7c8b9127bec79bebaaae18"
[[package]]
name = "rusqlite"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae"
dependencies = [
"bitflags 2.6.0",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink 0.9.1",
"libsqlite3-sys",
"smallvec",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -10811,12 +10636,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vergen"
version = "8.3.2"

View File

@ -127,7 +127,6 @@ members = [
"examples/custom-node-components/",
"examples/custom-payload-builder/",
"examples/db-access",
"examples/exex/*",
"examples/manual-p2p/",
"examples/network-txpool/",
"examples/network/",
@ -137,8 +136,6 @@ members = [
"examples/rpc-db/",
"examples/txpool-tracing/",
"examples/custom-rlpx-subprotocol",
"examples/exex/minimal/",
"examples/exex/op-bridge/",
"testing/ef-tests/",
"testing/testing-utils",
]

View File

@ -82,7 +82,7 @@ This method streams notifications to the client.
A proper way to represent the notification would be to define all fields in the schema, but it goes beyond the scope
of this chapter.
For an example of a full schema, see the [Remote ExEx](https://github.com/paradigmxyz/reth-exex-grpc/blob/22b26f7beca1c74577d28be3b3838eb352747be0/proto/exex.proto) example.
For an example of a full schema, see the [Remote ExEx](https://github.com/paradigmxyz/reth-exex-examples/blob/1f74410740ac996276a84ee72003f4f9cf041491/remote/proto/exex.proto) example.
</div>

View File

@ -24,13 +24,7 @@ to make a PR!
## ExEx
| Example | Description |
| ----------------------------------------- | --------------------------------------------------------------------------------------------------- |
| [In Memory State](./exex/in-memory-state) | Illustrates an ExEx that tracks the plain state in memory |
| [Minimal](./exex/minimal) | Illustrates how to build a simple ExEx |
| [OP Bridge](./exex/op-bridge) | Illustrates an ExEx that decodes Optimism deposit and withdrawal receipts from L1 |
| [Rollup](./exex/rollup) | Illustrates a rollup ExEx that derives the state from L1 |
| [Discv5 as ExEx](./exex/discv5) | Illustrates an ExEx that runs discv5 discovery stack |
See examples in a [dedicated repository](https://github.com/paradigmxyz/reth-exex-examples).
## RPC

View File

@ -1,33 +0,0 @@
[package]
name = "example-exex-discv5"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
discv5.workspace = true
enr.workspace = true
reth-discv5.workspace = true
reth.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-network-peers.workspace = true
reth-tracing.workspace = true
futures.workspace = true
clap.workspace = true
reth-chainspec.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
futures-util.workspace = true
tracing.workspace = true
eyre.workspace = true
[dev-dependencies]
reth-exex-test-utils.workspace = true
reth-testing-utils.workspace = true

View File

@ -1,70 +0,0 @@
use eyre::Result;
use futures::{Future, FutureExt};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::info;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tracing::error;
use crate::network::DiscV5ExEx;
/// The ExEx struct, representing the initialization and execution of the ExEx.
pub struct ExEx<Node: FullNodeComponents> {
exex: ExExContext<Node>,
disc_v5: DiscV5ExEx,
}
impl<Node: FullNodeComponents> ExEx<Node> {
pub fn new(exex: ExExContext<Node>, disc_v5: DiscV5ExEx) -> Self {
Self { exex, disc_v5 }
}
}
impl<Node: FullNodeComponents> Future for ExEx<Node> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll the Discv5 future until its drained
loop {
match self.disc_v5.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
info!("Discv5 task completed successfully");
}
Poll::Ready(Err(e)) => {
error!(?e, "Discv5 task encountered an error");
return Poll::Ready(Err(e));
}
Poll::Pending => {
// Exit match and continue to poll notifications
break;
}
}
}
// Continuously poll the ExExContext notifications
loop {
if let Some(notification) = ready!(self.exex.notifications.poll_recv(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
}
if let Some(committed_chain) = notification.committed_chain() {
self.exex
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
}
}
}

View File

@ -1,29 +0,0 @@
use clap::Parser;
use exex::ExEx;
use network::{cli_ext::Discv5ArgsExt, DiscV5ExEx};
use reth_node_ethereum::EthereumNode;
mod exex;
mod network;
fn main() -> eyre::Result<()> {
reth::cli::Cli::<Discv5ArgsExt>::parse().run(|builder, args| async move {
let tcp_port = args.tcp_port;
let udp_port = args.udp_port;
let handle = builder
.node(EthereumNode::default())
.install_exex("exex-discv5", move |ctx| async move {
// start Discv5 task
let disc_v5 = DiscV5ExEx::new(tcp_port, udp_port).await?;
// start exex task with discv5
Ok(ExEx::new(ctx, disc_v5))
})
.launch()
.await?;
handle.wait_for_node_exit().await
})
}

View File

@ -1,15 +0,0 @@
use clap::Args;
pub const DEFAULT_DISCOVERY_PORT: u16 = 30304;
pub const DEFAULT_RLPX_PORT: u16 = 30303;
#[derive(Debug, Clone, Args)]
pub(crate) struct Discv5ArgsExt {
/// TCP port used by RLPx
#[clap(long = "exex-discv5.tcp-port", default_value_t = DEFAULT_RLPX_PORT)]
pub tcp_port: u16,
/// UDP port used for discovery
#[clap(long = "exex-discv5.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)]
pub udp_port: u16,
}

View File

@ -1,123 +0,0 @@
#![allow(dead_code)]
use discv5::{enr::secp256k1::rand, Enr, Event, ListenConfig};
use reth::network::config::SecretKey;
use reth_discv5::{enr::EnrCombinedKeyWrapper, Config, Discv5};
use reth_network_peers::NodeRecord;
use reth_tracing::tracing::info;
use std::{
future::Future,
net::SocketAddr,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc;
pub(crate) mod cli_ext;
/// Helper struct to manage a discovery node using discv5.
pub(crate) struct DiscV5ExEx {
/// The inner discv5 instance.
inner: Discv5,
/// The node record of the discv5 instance.
node_record: NodeRecord,
/// The events stream of the discv5 instance.
events: mpsc::Receiver<discv5::Event>,
}
impl DiscV5ExEx {
/// Starts a new discv5 node.
pub async fn new(udp_port: u16, tcp_port: u16) -> eyre::Result<DiscV5ExEx> {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port}").parse()?;
let rlpx_addr: SocketAddr = format!("127.0.0.1:{tcp_port}").parse()?;
let discv5_listen_config = ListenConfig::from(discv5_addr);
let discv5_config = Config::builder(rlpx_addr)
.discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
.build();
let (discv5, events, node_record) = Discv5::start(&secret_key, discv5_config).await?;
Ok(Self { inner: discv5, events, node_record })
}
/// Adds a node to the table if its not already present.
pub fn add_node(&mut self, enr: Enr) -> eyre::Result<()> {
let reth_enr: enr::Enr<SecretKey> = EnrCombinedKeyWrapper(enr.clone()).into();
self.inner.add_node(reth_enr)?;
Ok(())
}
/// Returns the local ENR of the discv5 node.
pub fn local_enr(&self) -> Enr {
self.inner.with_discv5(|discv5| discv5.local_enr())
}
}
impl Future for DiscV5ExEx {
type Output = eyre::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
loop {
match ready!(this.events.poll_recv(cx)) {
Some(evt) => {
if let Event::SessionEstablished(enr, socket_addr) = evt {
info!(?enr, ?socket_addr, "Session established with a new peer.");
}
}
None => return Poll::Ready(Ok(())),
}
}
}
}
#[cfg(test)]
mod tests {
use crate::network::DiscV5ExEx;
use tracing::info;
#[tokio::test]
async fn can_establish_discv5_session_with_peer() {
reth_tracing::init_test_tracing();
let mut node_1 = DiscV5ExEx::new(30301, 30303).await.unwrap();
let node_1_enr = node_1.local_enr();
let mut node_2 = DiscV5ExEx::new(30302, 30303).await.unwrap();
let node_2_enr = node_2.local_enr();
info!(?node_1_enr, ?node_2_enr, "Started discovery nodes.");
// add node_2 to node_1 table
node_1.add_node(node_2_enr.clone()).unwrap();
// verify node_2 is in node_1 table
assert!(node_1
.inner
.with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id())));
// send ping from node_1 to node_2
node_1.inner.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap();
// verify they both established a session
let event_2_v5 = node_2.events.recv().await.unwrap();
let event_1_v5 = node_1.events.recv().await.unwrap();
assert!(matches!(
event_1_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
));
assert!(matches!(
event_2_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into()
));
// verify node_1 is in
let event_2_v5 = node_2.events.recv().await.unwrap();
assert!(matches!(
event_2_v5,
discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
));
}
}

View File

@ -1,22 +0,0 @@
[package]
name = "example-exex-in-memory-state"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
reth.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-tracing.workspace = true
reth-execution-types.workspace = true
eyre.workspace = true
[dev-dependencies]
reth-exex-test-utils.workspace = true
reth-testing-utils.workspace = true
tokio.workspace = true

View File

@ -1,148 +0,0 @@
#![warn(unused_crate_dependencies)]
use reth_execution_types::ExecutionOutcome;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
/// An ExEx that keeps track of the entire state in memory
struct InMemoryStateExEx<Node: FullNodeComponents> {
/// The context of the ExEx
ctx: ExExContext<Node>,
/// Execution outcome of the chain
execution_outcome: ExecutionOutcome,
}
impl<Node: FullNodeComponents> InMemoryStateExEx<Node> {
/// Create a new instance of the ExEx
fn new(ctx: ExExContext<Node>) -> Self {
Self { ctx, execution_outcome: ExecutionOutcome::default() }
}
}
impl<Node: FullNodeComponents + Unpin> Future for InMemoryStateExEx<Node> {
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
// revert to block before the reorg
this.execution_outcome.revert_to(new.first().number - 1);
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
this.execution_outcome.revert_to(old.first().number - 1);
info!(reverted_chain = ?old.range(), "Received revert");
}
};
if let Some(committed_chain) = notification.committed_chain() {
// extend the state with the new chain
this.execution_outcome.extend(committed_chain.execution_outcome().clone());
this.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
Poll::Ready(Ok(()))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("in-memory-state", |ctx| async move { Ok(InMemoryStateExEx::new(ctx)) })
.launch()
.await?;
handle.wait_for_node_exit().await
})
}
#[cfg(test)]
mod tests {
use reth::revm::db::BundleState;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex_test_utils::{test_exex_context, PollOnce};
use reth_testing_utils::generators::{self, random_block, random_receipt};
use std::pin::pin;
#[tokio::test]
async fn test_exex() -> eyre::Result<()> {
let mut rng = &mut generators::rng();
let (ctx, handle) = test_exex_context().await?;
let mut exex = pin!(super::InMemoryStateExEx::new(ctx));
let mut expected_state = ExecutionOutcome::default();
// Generate first block and its state
let block_1 = random_block(&mut rng, 0, None, Some(1), None)
.seal_with_senders()
.ok_or(eyre::eyre!("failed to recover senders"))?;
let block_number_1 = block_1.number;
let execution_outcome1 = ExecutionOutcome::new(
BundleState::default(),
vec![random_receipt(&mut rng, &block_1.body[0], None)].into(),
block_1.number,
vec![],
);
// Extend the expected state with the first block
expected_state.extend(execution_outcome1.clone());
// Send a notification to the Execution Extension that the chain with the first block has
// been committed
handle
.send_notification_chain_committed(Chain::new(vec![block_1], execution_outcome1, None))
.await?;
exex.poll_once().await?;
// Assert that the state of the first block has been added to the total state
assert_eq!(exex.as_mut().execution_outcome, expected_state);
// Generate second block and its state
let block_2 = random_block(&mut rng, 1, None, Some(2), None)
.seal_with_senders()
.ok_or(eyre::eyre!("failed to recover senders"))?;
let execution_outcome2 = ExecutionOutcome::new(
BundleState::default(),
vec![random_receipt(&mut rng, &block_2.body[0], None)].into(),
block_2.number,
vec![],
);
// Extend the expected execution outcome with the second block
expected_state.extend(execution_outcome2.clone());
// Send a notification to the Execution Extension that the chain with the second block has
// been committed
let chain_2 = Chain::new(vec![block_2], execution_outcome2, None);
handle.send_notification_chain_committed(chain_2.clone()).await?;
exex.poll_once().await?;
// Assert that the execution outcome of the second block has been added to the total state
assert_eq!(exex.as_mut().execution_outcome, expected_state);
// Send a notification to the Execution Extension that the chain with the second block has
// been reverted
handle.send_notification_chain_reverted(chain_2).await?;
exex.poll_once().await?;
// Assert that the execution outcome of the second block has been reverted
expected_state.revert_to(block_number_1);
assert_eq!(exex.as_mut().execution_outcome, expected_state);
Ok(())
}
}

View File

@ -1,22 +0,0 @@
[package]
name = "example-exex-minimal"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
reth.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-tracing.workspace = true
reth-execution-types.workspace = true
eyre.workspace = true
futures.workspace = true
[dev-dependencies]
reth-exex-test-utils.workspace = true
tokio.workspace = true

View File

@ -1,93 +0,0 @@
use futures::Future;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
/// The initialization logic of the ExEx is just an async function.
///
/// During initialization you can wait for resources you need to be up for the ExEx to function,
/// like a database connection.
async fn exex_init<Node: FullNodeComponents>(
ctx: ExExContext<Node>,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
Ok(exex(ctx))
}
/// An ExEx is just a future, which means you can implement all of it in an async function!
///
/// This ExEx just prints out whenever either a new chain of blocks being added, or a chain of
/// blocks being re-orged. After processing the chain, emits an [ExExEvent::FinishedHeight] event.
async fn exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
Ok(())
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("Minimal", exex_init)
.launch()
.await?;
handle.wait_for_node_exit().await
})
}
#[cfg(test)]
mod tests {
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex_test_utils::{test_exex_context, PollOnce};
use std::pin::pin;
#[tokio::test]
async fn test_exex() -> eyre::Result<()> {
// Initialize a test Execution Extension context with all dependencies
let (ctx, mut handle) = test_exex_context().await?;
// Save the current head of the chain to check the finished height against it later
let head = ctx.head;
// Send a notification to the Execution Extension that the chain has been committed
handle
.send_notification_chain_committed(Chain::from_block(
handle.genesis.clone(),
ExecutionOutcome::default(),
None,
))
.await?;
// Initialize the Execution Extension
let mut exex = pin!(super::exex_init(ctx).await?);
// Check that the Execution Extension did not emit any events until we polled it
handle.assert_events_empty();
// Poll the Execution Extension once to process incoming notifications
exex.poll_once().await?;
// Check that the Execution Extension emitted a `FinishedHeight` event with the correct
// height
handle.assert_event_finished_height(head.number)?;
Ok(())
}
}

View File

@ -1,28 +0,0 @@
[package]
name = "example-exex-op-bridge"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
reth.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-primitives.workspace = true
reth-execution-types.workspace = true
reth-tracing.workspace = true
eyre.workspace = true
futures.workspace = true
alloy-sol-types = { workspace = true, features = ["json"] }
rusqlite = { version = "0.31.0", features = ["bundled"] }
[dev-dependencies]
reth-exex-test-utils.workspace = true
reth-testing-utils.workspace = true
tokio.workspace = true
rand.workspace = true
tempfile.workspace = true

File diff suppressed because one or more lines are too long

View File

@ -1,425 +0,0 @@
use alloy_sol_types::{sol, SolEventInterface};
use futures::Future;
use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_primitives::{address, Address, Log, SealedBlockWithSenders, TransactionSigned};
use reth_tracing::tracing::info;
use rusqlite::Connection;
sol!(L1StandardBridge, "l1_standard_bridge_abi.json");
use crate::L1StandardBridge::{ETHBridgeFinalized, ETHBridgeInitiated, L1StandardBridgeEvents};
const OP_BRIDGES: [Address; 6] = [
address!("3154Cf16ccdb4C6d922629664174b904d80F2C35"),
address!("3a05E5d33d7Ab3864D53aaEc93c8301C1Fa49115"),
address!("697402166Fbf2F22E970df8a6486Ef171dbfc524"),
address!("99C9fc46f92E8a1c0deC1b1747d010903E884bE1"),
address!("735aDBbE72226BD52e818E7181953f42E3b0FF21"),
address!("3B95bC951EE0f553ba487327278cAc44f29715E5"),
];
/// Initializes the ExEx.
///
/// Opens up a SQLite database and creates the tables (if they don't exist).
async fn init<Node: FullNodeComponents>(
ctx: ExExContext<Node>,
mut connection: Connection,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
create_tables(&mut connection)?;
Ok(op_bridge_exex(ctx, connection))
}
/// Create SQLite tables if they do not exist.
fn create_tables(connection: &mut Connection) -> rusqlite::Result<()> {
// Create deposits and withdrawals tables
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS deposits (
id INTEGER PRIMARY KEY,
block_number INTEGER NOT NULL,
tx_hash TEXT NOT NULL UNIQUE,
contract_address TEXT NOT NULL,
"from" TEXT NOT NULL,
"to" TEXT NOT NULL,
amount TEXT NOT NULL
);
"#,
(),
)?;
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS withdrawals (
id INTEGER PRIMARY KEY,
block_number INTEGER NOT NULL,
tx_hash TEXT NOT NULL UNIQUE,
contract_address TEXT NOT NULL,
"from" TEXT NOT NULL,
"to" TEXT NOT NULL,
amount TEXT NOT NULL
);
"#,
(),
)?;
// Create a bridge contract addresses table and insert known ones with their respective
// names
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS contracts (
id INTEGER PRIMARY KEY,
address TEXT NOT NULL UNIQUE,
name TEXT NOT NULL
);
"#,
(),
)?;
connection.execute(
r#"
INSERT OR IGNORE INTO contracts (address, name)
VALUES
('0x3154Cf16ccdb4C6d922629664174b904d80F2C35', 'Base'),
('0x3a05E5d33d7Ab3864D53aaEc93c8301C1Fa49115', 'Blast'),
('0x697402166Fbf2F22E970df8a6486Ef171dbfc524', 'Blast'),
('0x99C9fc46f92E8a1c0deC1b1747d010903E884bE1', 'Optimism'),
('0x735aDBbE72226BD52e818E7181953f42E3b0FF21', 'Mode'),
('0x3B95bC951EE0f553ba487327278cAc44f29715E5', 'Manta');
"#,
(),
)?;
info!("Initialized database tables");
Ok(())
}
/// An example of ExEx that listens to ETH bridging events from OP Stack chains
/// and stores deposits and withdrawals in a SQLite database.
async fn op_bridge_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
connection: Connection,
) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = ctx.notifications.recv().await {
// Revert all deposits and withdrawals
if let Some(reverted_chain) = notification.reverted_chain() {
let events = decode_chain_into_events(&reverted_chain);
let mut deposits = 0;
let mut withdrawals = 0;
for (_, tx, _, event) in events {
match event {
// L1 -> L2 deposit
L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated { .. }) => {
let deleted = connection.execute(
"DELETE FROM deposits WHERE tx_hash = ?;",
(tx.hash().to_string(),),
)?;
deposits += deleted;
}
// L2 -> L1 withdrawal
L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized { .. }) => {
let deleted = connection.execute(
"DELETE FROM withdrawals WHERE tx_hash = ?;",
(tx.hash().to_string(),),
)?;
withdrawals += deleted;
}
_ => continue,
}
}
info!(block_range = ?reverted_chain.range(), %deposits, %withdrawals, "Reverted chain events");
}
// Insert all new deposits and withdrawals
if let Some(committed_chain) = notification.committed_chain() {
let events = decode_chain_into_events(&committed_chain);
let mut deposits = 0;
let mut withdrawals = 0;
for (block, tx, log, event) in events {
match event {
// L1 -> L2 deposit
L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated {
amount,
from,
to,
..
}) => {
let inserted = connection.execute(
r#"
INSERT INTO deposits (block_number, tx_hash, contract_address, "from", "to", amount)
VALUES (?, ?, ?, ?, ?, ?)
"#,
(
block.number,
tx.hash().to_string(),
log.address.to_string(),
from.to_string(),
to.to_string(),
amount.to_string(),
),
)?;
deposits += inserted;
}
// L2 -> L1 withdrawal
L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized {
amount,
from,
to,
..
}) => {
let inserted = connection.execute(
r#"
INSERT INTO withdrawals (block_number, tx_hash, contract_address, "from", "to", amount)
VALUES (?, ?, ?, ?, ?, ?)
"#,
(
block.number,
tx.hash().to_string(),
log.address.to_string(),
from.to_string(),
to.to_string(),
amount.to_string(),
),
)?;
withdrawals += inserted;
}
_ => continue,
};
}
info!(block_range = ?committed_chain.range(), %deposits, %withdrawals, "Committed chain events");
// Send a finished height event, signaling the node that we don't need any blocks below
// this height anymore
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
Ok(())
}
/// Decode chain of blocks into a flattened list of receipt logs, and filter only
/// [L1StandardBridgeEvents].
fn decode_chain_into_events(
chain: &Chain,
) -> impl Iterator<Item = (&SealedBlockWithSenders, &TransactionSigned, &Log, L1StandardBridgeEvents)>
{
chain
// Get all blocks and receipts
.blocks_and_receipts()
// Get all receipts
.flat_map(|(block, receipts)| {
block
.body
.iter()
.zip(receipts.iter().flatten())
.map(move |(tx, receipt)| (block, tx, receipt))
})
// Get all logs from expected bridge contracts
.flat_map(|(block, tx, receipt)| {
receipt
.logs
.iter()
.filter(|log| OP_BRIDGES.contains(&log.address))
.map(move |log| (block, tx, log))
})
// Decode and filter bridge events
.filter_map(|(block, tx, log)| {
L1StandardBridgeEvents::decode_raw_log(log.topics(), &log.data.data, true)
.ok()
.map(|event| (block, tx, log, event))
})
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("OPBridge", |ctx| async move {
let connection = Connection::open("op_bridge.db")?;
init(ctx, connection).await
})
.launch()
.await?;
handle.wait_for_node_exit().await
})
}
#[cfg(test)]
mod tests {
use std::pin::pin;
use alloy_sol_types::SolEvent;
use reth::revm::db::BundleState;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex_test_utils::{test_exex_context, PollOnce};
use reth_primitives::{
Address, Block, Header, Log, Receipt, Transaction, TransactionSigned, TxKind, TxLegacy,
TxType, U256,
};
use reth_testing_utils::generators::sign_tx_with_random_key_pair;
use rusqlite::Connection;
use crate::{L1StandardBridge, OP_BRIDGES};
/// Given the address of a bridge contract and an event, construct a transaction signed with a
/// random private key and a receipt for that transaction.
fn construct_tx_and_receipt<E: SolEvent>(
to: Address,
event: E,
) -> eyre::Result<(TransactionSigned, Receipt)> {
let tx = Transaction::Legacy(TxLegacy { to: TxKind::Call(to), ..Default::default() });
let log = Log::new(
to,
event.encode_topics().into_iter().map(|topic| topic.0).collect(),
event.encode_data().into(),
)
.ok_or_else(|| eyre::eyre!("failed to encode event"))?;
#[allow(clippy::needless_update)] // side-effect of optimism fields
let receipt = Receipt {
tx_type: TxType::Legacy,
success: true,
cumulative_gas_used: 0,
logs: vec![log],
..Default::default()
};
Ok((sign_tx_with_random_key_pair(&mut rand::thread_rng(), tx), receipt))
}
#[tokio::test]
async fn test_exex() -> eyre::Result<()> {
// Initialize the test Execution Extension context with all dependencies
let (ctx, handle) = test_exex_context().await?;
// Create a temporary database file, so we can access it later for assertions
let db_file = tempfile::NamedTempFile::new()?;
// Initialize the ExEx
let mut exex = pin!(super::init(ctx, Connection::open(&db_file)?).await?);
// Generate random "from" and "to" addresses for deposit and withdrawal events
let from_address = Address::random();
let to_address = Address::random();
// Construct deposit event, transaction and receipt
let deposit_event = L1StandardBridge::ETHBridgeInitiated {
from: from_address,
to: to_address,
amount: U256::from(100),
extraData: Default::default(),
};
let (deposit_tx, deposit_tx_receipt) =
construct_tx_and_receipt(OP_BRIDGES[0], deposit_event.clone())?;
// Construct withdrawal event, transaction and receipt
let withdrawal_event = L1StandardBridge::ETHBridgeFinalized {
from: from_address,
to: to_address,
amount: U256::from(200),
extraData: Default::default(),
};
let (withdrawal_tx, withdrawal_tx_receipt) =
construct_tx_and_receipt(OP_BRIDGES[1], withdrawal_event.clone())?;
// Construct a block
let block = Block {
header: Header::default(),
body: vec![deposit_tx, withdrawal_tx],
..Default::default()
}
.seal_slow()
.seal_with_senders()
.ok_or_else(|| eyre::eyre!("failed to recover senders"))?;
// Construct a chain
let chain = Chain::new(
vec![block.clone()],
ExecutionOutcome::new(
BundleState::default(),
vec![deposit_tx_receipt, withdrawal_tx_receipt].into(),
block.number,
vec![block.requests.clone().unwrap_or_default()],
),
None,
);
// Send a notification that the chain has been committed
handle.send_notification_chain_committed(chain.clone()).await?;
// Poll the ExEx once, it will process the notification that we just sent
exex.poll_once().await?;
let connection = Connection::open(&db_file)?;
// Assert that the deposit event was parsed correctly and inserted into the database
let deposits: Vec<(u64, String, String, String, String, String)> = connection
.prepare(r#"SELECT block_number, contract_address, "from", "to", amount, tx_hash FROM deposits"#)?
.query_map([], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?))
})?
.collect::<Result<Vec<_>, _>>()?;
assert_eq!(deposits.len(), 1);
assert_eq!(
deposits[0],
(
block.number,
OP_BRIDGES[0].to_string(),
from_address.to_string(),
to_address.to_string(),
deposit_event.amount.to_string(),
block.body[0].hash().to_string()
)
);
// Assert that the withdrawal event was parsed correctly and inserted into the database
let withdrawals: Vec<(u64, String, String, String, String, String)> = connection
.prepare(r#"SELECT block_number, contract_address, "from", "to", amount, tx_hash FROM withdrawals"#)?
.query_map([], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?))
})?
.collect::<Result<Vec<_>, _>>()?;
assert_eq!(withdrawals.len(), 1);
assert_eq!(
withdrawals[0],
(
block.number,
OP_BRIDGES[1].to_string(),
from_address.to_string(),
to_address.to_string(),
withdrawal_event.amount.to_string(),
block.body[1].hash().to_string()
)
);
// Send a notification that the same chain has been reverted
handle.send_notification_chain_reverted(chain).await?;
// Poll the ExEx once, it will process the notification that we just sent
exex.poll_once().await?;
// Assert that the deposit was removed from the database
let deposits = connection
.prepare(r#"SELECT block_number, contract_address, "from", "to", amount, tx_hash FROM deposits"#)?
.query_map([], |_| {
Ok(())
})?
.count();
assert_eq!(deposits, 0);
// Assert that the withdrawal was removed from the database
let withdrawals = connection
.prepare(r#"SELECT block_number, contract_address, "from", "to", amount, tx_hash FROM withdrawals"#)?
.query_map([], |_| {
Ok(())
})?
.count();
assert_eq!(withdrawals, 0);
Ok(())
}
}

View File

@ -1,39 +0,0 @@
[package]
name = "example-exex-rollup"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
# reth
reth.workspace = true
reth-chainspec.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-primitives.workspace = true
reth-execution-errors.workspace = true
reth-execution-types.workspace = true
reth-provider.workspace = true
reth-revm.workspace = true
reth-tracing.workspace = true
# async
tokio.workspace = true
# misc
alloy-consensus = { workspace = true, features = ["kzg"] }
alloy-genesis.workspace = true
alloy-rlp.workspace = true
alloy-sol-types = { workspace = true, features = ["json"] }
eyre.workspace = true
foundry-blob-explorers = "0.1"
once_cell.workspace = true
rusqlite = { version = "0.31.0", features = ["bundled"] }
serde_json.workspace = true
[dev-dependencies]
reth-testing-utils.workspace = true
secp256k1.workspace = true

File diff suppressed because one or more lines are too long

View File

@ -1,460 +0,0 @@
use std::{
collections::{hash_map::Entry, HashMap},
str::FromStr,
sync::{Arc, Mutex, MutexGuard},
};
use reth_primitives::{
revm_primitives::{AccountInfo, Bytecode},
Address, Bytes, SealedBlockWithSenders, StorageEntry, B256, U256,
};
use reth_provider::{bundle_state::StorageRevertsIter, OriginalValuesKnown};
use reth_revm::db::{
states::{PlainStorageChangeset, PlainStorageRevert},
BundleState,
};
use rusqlite::Connection;
/// Type used to initialize revms bundle state.
type BundleStateInit =
HashMap<Address, (Option<AccountInfo>, Option<AccountInfo>, HashMap<B256, (U256, U256)>)>;
/// Types used inside RevertsInit to initialize revms reverts.
pub type AccountRevertInit = (Option<Option<AccountInfo>>, Vec<StorageEntry>);
/// Type used to initialize revms reverts.
pub type RevertsInit = HashMap<Address, AccountRevertInit>;
pub struct Database {
connection: Arc<Mutex<Connection>>,
}
impl Database {
/// Create new database with the provided connection.
pub fn new(connection: Connection) -> eyre::Result<Self> {
let database = Self { connection: Arc::new(Mutex::new(connection)) };
database.create_tables()?;
Ok(database)
}
fn connection(&self) -> MutexGuard<'_, Connection> {
self.connection.lock().expect("failed to acquire database lock")
}
fn create_tables(&self) -> eyre::Result<()> {
self.connection().execute_batch(
"CREATE TABLE IF NOT EXISTS block (
id INTEGER PRIMARY KEY,
number TEXT UNIQUE,
data TEXT
);
CREATE TABLE IF NOT EXISTS account (
id INTEGER PRIMARY KEY,
address TEXT UNIQUE,
data TEXT
);
CREATE TABLE IF NOT EXISTS account_revert (
id INTEGER PRIMARY KEY,
block_number TEXT,
address TEXT,
data TEXT,
UNIQUE (block_number, address)
);
CREATE TABLE IF NOT EXISTS storage (
id INTEGER PRIMARY KEY,
address TEXT,
key TEXT,
data TEXT,
UNIQUE (address, key)
);
CREATE TABLE IF NOT EXISTS storage_revert (
id INTEGER PRIMARY KEY,
block_number TEXT,
address TEXT,
key TEXT,
data TEXT,
UNIQUE (block_number, address, key)
);
CREATE TABLE IF NOT EXISTS bytecode (
id INTEGER PRIMARY KEY,
hash TEXT UNIQUE,
data TEXT
);",
)?;
Ok(())
}
/// Insert block with bundle into the database.
pub fn insert_block_with_bundle(
&self,
block: &SealedBlockWithSenders,
bundle: BundleState,
) -> eyre::Result<()> {
let mut connection = self.connection();
let tx = connection.transaction()?;
tx.execute(
"INSERT INTO block (number, data) VALUES (?, ?)",
(block.header.number.to_string(), serde_json::to_string(block)?),
)?;
let (changeset, reverts) = bundle.into_plain_state_and_reverts(OriginalValuesKnown::Yes);
for (address, account) in changeset.accounts {
if let Some(account) = account {
tx.execute(
"INSERT INTO account (address, data) VALUES (?, ?) ON CONFLICT(address) DO UPDATE SET data = excluded.data",
(address.to_string(), serde_json::to_string(&account)?),
)?;
} else {
tx.execute("DELETE FROM account WHERE address = ?", (address.to_string(),))?;
}
}
if reverts.accounts.len() > 1 {
eyre::bail!("too many blocks in account reverts");
}
if let Some(account_reverts) = reverts.accounts.into_iter().next() {
for (address, account) in account_reverts {
tx.execute(
"INSERT INTO account_revert (block_number, address, data) VALUES (?, ?, ?) ON CONFLICT(block_number, address) DO UPDATE SET data = excluded.data",
(block.header.number.to_string(), address.to_string(), serde_json::to_string(&account)?),
)?;
}
}
for PlainStorageChangeset { address, wipe_storage, storage } in changeset.storage {
if wipe_storage {
tx.execute("DELETE FROM storage WHERE address = ?", (address.to_string(),))?;
}
for (key, data) in storage {
tx.execute(
"INSERT INTO storage (address, key, data) VALUES (?, ?, ?) ON CONFLICT(address, key) DO UPDATE SET data = excluded.data",
(address.to_string(), B256::from(key).to_string(), data.to_string()),
)?;
}
}
if reverts.storage.len() > 1 {
eyre::bail!("too many blocks in storage reverts");
}
if let Some(storage_reverts) = reverts.storage.into_iter().next() {
for PlainStorageRevert { address, wiped, storage_revert } in storage_reverts {
let storage = storage_revert
.into_iter()
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
.collect::<Vec<_>>();
let wiped_storage = if wiped { get_storages(&tx, address)? } else { Vec::new() };
for (key, data) in StorageRevertsIter::new(storage, wiped_storage) {
tx.execute(
"INSERT INTO storage_revert (block_number, address, key, data) VALUES (?, ?, ?, ?) ON CONFLICT(block_number, address, key) DO UPDATE SET data = excluded.data",
(block.header.number.to_string(), address.to_string(), key.to_string(), data.to_string()),
)?;
}
}
}
for (hash, bytecode) in changeset.contracts {
tx.execute(
"INSERT INTO bytecode (hash, data) VALUES (?, ?) ON CONFLICT(hash) DO NOTHING",
(hash.to_string(), bytecode.bytecode().to_string()),
)?;
}
tx.commit()?;
Ok(())
}
/// Reverts the tip block from the database, checking it against the provided block number.
///
/// The code is adapted from <https://github.com/paradigmxyz/reth/blob/a8cd1f71a03c773c24659fc28bfed2ba5f2bd97b/crates/storage/provider/src/providers/database/provider.rs#L365-L540>
pub fn revert_tip_block(&self, block_number: U256) -> eyre::Result<()> {
let mut connection = self.connection();
let tx = connection.transaction()?;
let tip_block_number = tx
.query_row::<String, _, _>(
"SELECT number FROM block ORDER BY number DESC LIMIT 1",
[],
|row| row.get(0),
)
.map(|data| U256::from_str(&data))??;
if block_number != tip_block_number {
eyre::bail!("Reverts can only be done from the tip. Attempted to revert block {} with tip block {}", block_number, tip_block_number);
}
tx.execute("DELETE FROM block WHERE number = ?", (block_number.to_string(),))?;
let mut state = BundleStateInit::new();
let mut reverts = RevertsInit::new();
let account_reverts = tx
.prepare("SELECT address, data FROM account_revert WHERE block_number = ?")?
.query((block_number.to_string(),))?
.mapped(|row| {
Ok((
Address::from_str(row.get_ref(0)?.as_str()?),
serde_json::from_str::<Option<AccountInfo>>(row.get_ref(1)?.as_str()?),
))
})
.map(|result| {
let (address, data) = result?;
Ok((address?, data?))
})
.collect::<eyre::Result<Vec<_>>>()?;
for (address, old_info) in account_reverts {
// insert old info into reverts
reverts.entry(address).or_default().0 = Some(old_info.clone());
match state.entry(address) {
Entry::Vacant(entry) => {
let new_info = get_account(&tx, address)?;
entry.insert((old_info, new_info, HashMap::new()));
}
Entry::Occupied(mut entry) => {
// overwrite old account state
entry.get_mut().0 = old_info;
}
}
}
let storage_reverts = tx
.prepare("SELECT address, key, data FROM storage_revert WHERE block_number = ?")?
.query((block_number.to_string(),))?
.mapped(|row| {
Ok((
Address::from_str(row.get_ref(0)?.as_str()?),
B256::from_str(row.get_ref(1)?.as_str()?),
U256::from_str(row.get_ref(2)?.as_str()?),
))
})
.map(|result| {
let (address, key, data) = result?;
Ok((address?, key?, data?))
})
.collect::<eyre::Result<Vec<_>>>()?;
for (address, key, old_data) in storage_reverts.into_iter().rev() {
let old_storage = StorageEntry { key, value: old_data };
// insert old info into reverts
reverts.entry(address).or_default().1.push(old_storage);
// get account state or insert from plain state
let account_state = match state.entry(address) {
Entry::Vacant(entry) => {
let present_info = get_account(&tx, address)?;
entry.insert((present_info.clone(), present_info, HashMap::new()))
}
Entry::Occupied(entry) => entry.into_mut(),
};
// match storage
match account_state.2.entry(old_storage.key) {
Entry::Vacant(entry) => {
let new_value = get_storage(&tx, address, old_storage.key)?.unwrap_or_default();
entry.insert((old_storage.value, new_value));
}
Entry::Occupied(mut entry) => {
entry.get_mut().0 = old_storage.value;
}
};
}
// iterate over local plain state remove all account and all storages
for (address, (old_account, new_account, storage)) in state {
// revert account if needed
if old_account != new_account {
if let Some(account) = old_account {
upsert_account(&tx, address, |_| Ok(account))?;
} else {
delete_account(&tx, address)?;
}
}
// revert storages
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
// delete previous value
delete_storage(&tx, address, storage_key)?;
// insert value if needed
if !old_storage_value.is_zero() {
upsert_storage(&tx, address, storage_key, old_storage_value)?;
}
}
}
tx.commit()?;
Ok(())
}
/// Get block by number.
pub fn get_block(&self, number: U256) -> eyre::Result<Option<SealedBlockWithSenders>> {
let block = self.connection().query_row::<String, _, _>(
"SELECT data FROM block WHERE number = ?",
(number.to_string(),),
|row| row.get(0),
);
match block {
Ok(data) => Ok(Some(serde_json::from_str(&data)?)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Insert new account if it does not exist, update otherwise. The provided closure is called
/// with the current account, if it exists.
pub fn upsert_account(
&self,
address: Address,
f: impl FnOnce(Option<AccountInfo>) -> eyre::Result<AccountInfo>,
) -> eyre::Result<()> {
upsert_account(&self.connection(), address, f)
}
/// Get account by address.
pub fn get_account(&self, address: Address) -> eyre::Result<Option<AccountInfo>> {
get_account(&self.connection(), address)
}
}
/// Insert new account if it does not exist, update otherwise. The provided closure is called
/// with the current account, if it exists. Connection can be either
/// [rusqlite::Transaction] or [rusqlite::Connection].
fn upsert_account(
connection: &Connection,
address: Address,
f: impl FnOnce(Option<AccountInfo>) -> eyre::Result<AccountInfo>,
) -> eyre::Result<()> {
let account = get_account(connection, address)?;
let account = f(account)?;
connection.execute(
"INSERT INTO account (address, data) VALUES (?, ?) ON CONFLICT(address) DO UPDATE SET data = excluded.data",
(address.to_string(), serde_json::to_string(&account)?),
)?;
Ok(())
}
/// Delete account by address. Connection can be either [rusqlite::Transaction] or
/// [rusqlite::Connection].
fn delete_account(connection: &Connection, address: Address) -> eyre::Result<()> {
connection.execute("DELETE FROM account WHERE address = ?", (address.to_string(),))?;
Ok(())
}
/// Get account by address using the database connection. Connection can be either
/// [rusqlite::Transaction] or [rusqlite::Connection].
fn get_account(connection: &Connection, address: Address) -> eyre::Result<Option<AccountInfo>> {
match connection.query_row::<String, _, _>(
"SELECT data FROM account WHERE address = ?",
(address.to_string(),),
|row| row.get(0),
) {
Ok(account_info) => Ok(Some(serde_json::from_str(&account_info)?)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Insert new storage if it does not exist, update otherwise. Connection can be either
/// [rusqlite::Transaction] or [rusqlite::Connection].
fn upsert_storage(
connection: &Connection,
address: Address,
key: B256,
data: U256,
) -> eyre::Result<()> {
connection.execute(
"INSERT INTO storage (address, key, data) VALUES (?, ?, ?) ON CONFLICT(address, key) DO UPDATE SET data = excluded.data",
(address.to_string(), key.to_string(), data.to_string()),
)?;
Ok(())
}
/// Delete storage by address and key. Connection can be either [rusqlite::Transaction] or
/// [rusqlite::Connection].
fn delete_storage(connection: &Connection, address: Address, key: B256) -> eyre::Result<()> {
connection.execute(
"DELETE FROM storage WHERE address = ? AND key = ?",
(address.to_string(), key.to_string()),
)?;
Ok(())
}
/// Get all storages for the provided address using the database connection. Connection can be
/// either [rusqlite::Transaction] or [rusqlite::Connection].
fn get_storages(connection: &Connection, address: Address) -> eyre::Result<Vec<(B256, U256)>> {
connection
.prepare("SELECT key, data FROM storage WHERE address = ?")?
.query((address.to_string(),))?
.mapped(|row| {
Ok((
B256::from_str(row.get_ref(0)?.as_str()?),
U256::from_str(row.get_ref(1)?.as_str()?),
))
})
.map(|result| {
let (key, data) = result?;
Ok((key?, data?))
})
.collect()
}
/// Get storage for the provided address by key using the database connection. Connection can be
/// either [rusqlite::Transaction] or [rusqlite::Connection].
fn get_storage(connection: &Connection, address: Address, key: B256) -> eyre::Result<Option<U256>> {
match connection.query_row::<String, _, _>(
"SELECT data FROM storage WHERE address = ? AND key = ?",
(address.to_string(), key.to_string()),
|row| row.get(0),
) {
Ok(data) => Ok(Some(U256::from_str(&data)?)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
impl reth_revm::Database for Database {
type Error = eyre::Report;
fn basic(&mut self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
self.get_account(address)
}
fn code_by_hash(&mut self, code_hash: B256) -> Result<Bytecode, Self::Error> {
let bytecode = self.connection().query_row::<String, _, _>(
"SELECT data FROM bytecode WHERE hash = ?",
(code_hash.to_string(),),
|row| row.get(0),
);
match bytecode {
Ok(data) => Ok(Bytecode::new_raw(Bytes::from_str(&data).unwrap())),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(Bytecode::default()),
Err(err) => Err(err.into()),
}
}
fn storage(&mut self, address: Address, index: U256) -> Result<U256, Self::Error> {
get_storage(&self.connection(), address, index.into()).map(|data| data.unwrap_or_default())
}
fn block_hash(&mut self, number: u64) -> Result<B256, Self::Error> {
let block_hash = self.connection().query_row::<String, _, _>(
"SELECT hash FROM block WHERE number = ?",
(number.to_string(),),
|row| row.get(0),
);
match block_hash {
Ok(data) => Ok(B256::from_str(&data).unwrap()),
// No special handling for `QueryReturnedNoRows` is needed, because revm does block
// number bound checks on its own.
// See https://github.com/bluealloy/revm/blob/1ca3d39f6a9e9778f8eb0fcb74fe529345a531b4/crates/interpreter/src/instructions/host.rs#L106-L123.
Err(err) => Err(err.into()),
}
}
}

File diff suppressed because one or more lines are too long

View File

@ -1,276 +0,0 @@
//! Example of a simple rollup that derives its state from the L1 chain by executing transactions,
//! processing deposits and storing all related data in an SQLite database.
//!
//! The rollup contract accepts blocks of transactions and deposits of ETH and is deployed on
//! Holesky at [ROLLUP_CONTRACT_ADDRESS], see <https://github.com/init4tech/zenith/blob/e0481e930947513166881a83e276b316c2f38502/src/Zenith.sol>.
use alloy_genesis::Genesis;
use alloy_sol_types::{sol, SolEventInterface, SolInterface};
use db::Database;
use execution::execute_block;
use once_cell::sync::Lazy;
use reth_chainspec::{ChainSpec, ChainSpecBuilder};
use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_primitives::{address, Address, SealedBlockWithSenders, TransactionSigned, U256};
use reth_tracing::tracing::{error, info};
use rusqlite::Connection;
use std::sync::Arc;
mod db;
mod execution;
sol!(RollupContract, "rollup_abi.json");
use RollupContract::{RollupContractCalls, RollupContractEvents};
const DATABASE_PATH: &str = "rollup.db";
const ROLLUP_CONTRACT_ADDRESS: Address = address!("97C0E40c6B5bb5d4fa3e2AA1C6b8bC7EA5ECAe31");
const ROLLUP_SUBMITTER_ADDRESS: Address = address!("5b0517Dc94c413a5871536872605522E54C85a03");
const CHAIN_ID: u64 = 17001;
static CHAIN_SPEC: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
Arc::new(
ChainSpecBuilder::default()
.chain(CHAIN_ID.into())
.genesis(Genesis::clique_genesis(CHAIN_ID, ROLLUP_SUBMITTER_ADDRESS))
.shanghai_activated()
.build(),
)
});
struct Rollup<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
db: Database,
}
impl<Node: FullNodeComponents> Rollup<Node> {
fn new(ctx: ExExContext<Node>, connection: Connection) -> eyre::Result<Self> {
let db = Database::new(connection)?;
Ok(Self { ctx, db })
}
async fn start(mut self) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = self.ctx.notifications.recv().await {
if let Some(reverted_chain) = notification.reverted_chain() {
self.revert(&reverted_chain)?;
}
if let Some(committed_chain) = notification.committed_chain() {
self.commit(&committed_chain).await?;
self.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
Ok(())
}
/// Process a new chain commit.
///
/// This function decodes all transactions to the rollup contract into events, executes the
/// corresponding actions and inserts the results into the database.
async fn commit(&mut self, chain: &Chain) -> eyre::Result<()> {
let events = decode_chain_into_rollup_events(chain);
for (_, tx, event) in events {
match event {
// A new block is submitted to the rollup contract.
// The block is executed on top of existing rollup state and committed into the
// database.
RollupContractEvents::BlockSubmitted(RollupContract::BlockSubmitted {
blockDataHash,
..
}) => {
let call = RollupContractCalls::abi_decode(tx.input(), true)?;
if let RollupContractCalls::submitBlock(RollupContract::submitBlockCall {
header,
blockData,
..
}) = call
{
match execute_block(
&mut self.db,
self.ctx.pool(),
tx,
&header,
blockData,
blockDataHash,
)
.await
{
Ok((block, bundle, _, _)) => {
let block = block.seal_slow();
self.db.insert_block_with_bundle(&block, bundle)?;
info!(
tx_hash = %tx.recalculate_hash(),
chain_id = %header.rollupChainId,
sequence = %header.sequence,
transactions = block.body.len(),
"Block submitted, executed and inserted into database"
);
}
Err(err) => {
error!(
%err,
tx_hash = %tx.recalculate_hash(),
chain_id = %header.rollupChainId,
sequence = %header.sequence,
"Failed to execute block"
);
}
}
}
}
// A deposit of ETH to the rollup contract. The deposit is added to the recipient's
// balance and committed into the database.
RollupContractEvents::Enter(RollupContract::Enter {
rollupChainId,
token,
rollupRecipient,
amount,
}) => {
if rollupChainId != U256::from(CHAIN_ID) {
error!(tx_hash = %tx.recalculate_hash(), "Invalid rollup chain ID");
continue
}
if token != Address::ZERO {
error!(tx_hash = %tx.recalculate_hash(), "Only ETH deposits are supported");
continue
}
self.db.upsert_account(rollupRecipient, |account| {
let mut account = account.unwrap_or_default();
account.balance += amount;
Ok(account)
})?;
info!(
tx_hash = %tx.recalculate_hash(),
%amount,
recipient = %rollupRecipient,
"Deposit",
);
}
_ => (),
}
}
Ok(())
}
/// Process a chain revert.
///
/// This function decodes all transactions to the rollup contract into events, reverts the
/// corresponding actions and updates the database.
fn revert(&mut self, chain: &Chain) -> eyre::Result<()> {
let mut events = decode_chain_into_rollup_events(chain);
// Reverse the order of events to start reverting from the tip
events.reverse();
for (_, tx, event) in events {
match event {
// The block is reverted from the database.
RollupContractEvents::BlockSubmitted(_) => {
let call = RollupContractCalls::abi_decode(tx.input(), true)?;
if let RollupContractCalls::submitBlock(RollupContract::submitBlockCall {
header,
..
}) = call
{
self.db.revert_tip_block(header.sequence)?;
info!(
tx_hash = %tx.recalculate_hash(),
chain_id = %header.rollupChainId,
sequence = %header.sequence,
"Block reverted"
);
}
}
// The deposit is subtracted from the recipient's balance.
RollupContractEvents::Enter(RollupContract::Enter {
rollupChainId,
token,
rollupRecipient,
amount,
}) => {
if rollupChainId != U256::from(CHAIN_ID) {
error!(tx_hash = %tx.recalculate_hash(), "Invalid rollup chain ID");
continue
}
if token != Address::ZERO {
error!(tx_hash = %tx.recalculate_hash(), "Only ETH deposits are supported");
continue
}
self.db.upsert_account(rollupRecipient, |account| {
let mut account = account.ok_or(eyre::eyre!("account not found"))?;
account.balance -= amount;
Ok(account)
})?;
info!(
tx_hash = %tx.recalculate_hash(),
%amount,
recipient = %rollupRecipient,
"Deposit reverted",
);
}
_ => (),
}
}
Ok(())
}
}
/// Decode chain of blocks into a flattened list of receipt logs, filter only transactions to the
/// Rollup contract [ROLLUP_CONTRACT_ADDRESS] and extract [RollupContractEvents].
fn decode_chain_into_rollup_events(
chain: &Chain,
) -> Vec<(&SealedBlockWithSenders, &TransactionSigned, RollupContractEvents)> {
chain
// Get all blocks and receipts
.blocks_and_receipts()
// Get all receipts
.flat_map(|(block, receipts)| {
block
.body
.iter()
.zip(receipts.iter().flatten())
.map(move |(tx, receipt)| (block, tx, receipt))
})
// Get all logs from rollup contract
.flat_map(|(block, tx, receipt)| {
receipt
.logs
.iter()
.filter(|log| log.address == ROLLUP_CONTRACT_ADDRESS)
.map(move |log| (block, tx, log))
})
// Decode and filter rollup events
.filter_map(|(block, tx, log)| {
RollupContractEvents::decode_raw_log(log.topics(), &log.data.data, true)
.ok()
.map(|event| (block, tx, event))
})
.collect()
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("Rollup", move |ctx| async {
let connection = Connection::open(DATABASE_PATH)?;
Ok(Rollup::new(ctx, connection)?.start())
})
.launch()
.await?;
handle.wait_for_node_exit().await
})
}