refactor(exex): use async fns in op bridge example (#7601)

This commit is contained in:
Oliver Nordbjerg
2024-04-12 17:49:50 +02:00
committed by GitHub
parent d970e51c88
commit aa1fbfcba6

View File

@ -1,8 +1,3 @@
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use alloy_sol_types::{sol, SolEventInterface};
use futures::Future;
use reth::builder::FullNodeTypes;
@ -16,18 +11,23 @@ use rusqlite::Connection;
sol!(L1StandardBridge, "l1_standard_bridge_abi.json");
use crate::L1StandardBridge::{ETHBridgeFinalized, ETHBridgeInitiated, L1StandardBridgeEvents};
/// An example of ExEx that listens to ETH bridging events from OP Stack chains
/// and stores deposits and withdrawals in a SQLite database.
struct OPBridgeExEx<Node: FullNodeTypes> {
/// Initializes the ExEx.
///
/// Opens up a SQLite database and creates the tables (if they don't exist).
async fn init<Node: FullNodeTypes>(
ctx: ExExContext<Node>,
connection: Connection,
mut connection: Connection,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
create_tables(&mut connection)?;
Ok(op_bridge_exex(ctx, connection))
}
impl<Node: FullNodeTypes> OPBridgeExEx<Node> {
fn new(ctx: ExExContext<Node>, connection: Connection) -> eyre::Result<Self> {
// Create deposits and withdrawals tables
connection.execute(
r#"
/// Create SQLite tables if they do not exist.
fn create_tables(connection: &mut Connection) -> rusqlite::Result<()> {
// Create deposits and withdrawals tables
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS deposits (
id INTEGER PRIMARY KEY,
block_number INTEGER NOT NULL,
@ -38,10 +38,10 @@ impl<Node: FullNodeTypes> OPBridgeExEx<Node> {
amount TEXT NOT NULL
);
"#,
(),
)?;
connection.execute(
r#"
(),
)?;
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS withdrawals (
id INTEGER PRIMARY KEY,
block_number INTEGER NOT NULL,
@ -52,23 +52,23 @@ impl<Node: FullNodeTypes> OPBridgeExEx<Node> {
amount TEXT NOT NULL
);
"#,
(),
)?;
(),
)?;
// Create a bridge contract addresses table and insert known ones with their respective
// names
connection.execute(
r#"
// Create a bridge contract addresses table and insert known ones with their respective
// names
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS contracts (
id INTEGER PRIMARY KEY,
address TEXT NOT NULL UNIQUE,
name TEXT NOT NULL
);
"#,
(),
)?;
connection.execute(
r#"
(),
)?;
connection.execute(
r#"
INSERT OR IGNORE INTO contracts (address, name)
VALUES
('0x3154Cf16ccdb4C6d922629664174b904d80F2C35', 'Base'),
@ -78,76 +78,70 @@ impl<Node: FullNodeTypes> OPBridgeExEx<Node> {
('0x735aDBbE72226BD52e818E7181953f42E3b0FF21', 'Mode'),
('0x3B95bC951EE0f553ba487327278cAc44f29715E5', 'Manta');
"#,
(),
)?;
(),
)?;
info!("Initialized database tables");
info!("Initialized database tables");
Ok(Self { ctx, connection })
}
Ok(())
}
impl<Node: FullNodeTypes> Future for OPBridgeExEx<Node> {
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Process all new chain state notifications until there are no more
while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) {
// If there was a reorg, delete all deposits and withdrawals that were reverted
if let Some(reverted_chain) = notification.reverted() {
let events = decode_chain_into_events(&reverted_chain);
let mut deposits = 0;
let mut withdrawals = 0;
for (_, tx, _, event) in events {
match event {
// L1 -> L2 deposit
L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated {
..
}) => {
let deleted = this.connection.execute(
"DELETE FROM deposits WHERE tx_hash = ?;",
(tx.hash().to_string(),),
)?;
deposits += deleted;
}
// L2 -> L1 withdrawal
L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized {
..
}) => {
let deleted = this.connection.execute(
"DELETE FROM withdrawals WHERE tx_hash = ?;",
(tx.hash().to_string(),),
)?;
withdrawals += deleted;
}
_ => continue,
};
}
info!(block_range = ?reverted_chain.range(), %deposits, %withdrawals, "Reverted chain events");
}
// Insert all new deposits and withdrawals
let committed_chain = notification.committed();
let events = decode_chain_into_events(&committed_chain);
/// An example of ExEx that listens to ETH bridging events from OP Stack chains
/// and stores deposits and withdrawals in a SQLite database.
async fn op_bridge_exex<Node: FullNodeTypes>(
mut ctx: ExExContext<Node>,
connection: Connection,
) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = ctx.notifications.recv().await {
if let Some(reverted_chain) = notification.reverted() {
let events = decode_chain_into_events(&reverted_chain);
let mut deposits = 0;
let mut withdrawals = 0;
for (block, tx, log, event) in events {
for (_, tx, _, event) in events {
match event {
// L1 -> L2 deposit
L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated {
amount,
from,
to,
..
}) => {
let inserted = this.connection.execute(
L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated { .. }) => {
let deleted = connection.execute(
"DELETE FROM deposits WHERE tx_hash = ?;",
(tx.hash().to_string(),),
)?;
deposits += deleted;
}
// L2 -> L1 withdrawal
L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized { .. }) => {
let deleted = connection.execute(
"DELETE FROM withdrawals WHERE tx_hash = ?;",
(tx.hash().to_string(),),
)?;
withdrawals += deleted;
}
_ => continue,
}
}
info!(block_range = ?reverted_chain.range(), %deposits, %withdrawals, "Reverted chain events");
}
// Insert all new deposits and withdrawals
let committed_chain = notification.committed();
let events = decode_chain_into_events(&committed_chain);
let mut deposits = 0;
let mut withdrawals = 0;
for (block, tx, log, event) in events {
match event {
// L1 -> L2 deposit
L1StandardBridgeEvents::ETHBridgeInitiated(ETHBridgeInitiated {
amount,
from,
to,
..
}) => {
let inserted = connection.execute(
r#"
INSERT INTO deposits (block_number, tx_hash, contract_address, "from", "to", amount)
VALUES (?, ?, ?, ?, ?, ?)
@ -161,16 +155,16 @@ impl<Node: FullNodeTypes> Future for OPBridgeExEx<Node> {
amount.to_string(),
),
)?;
deposits += inserted;
}
// L2 -> L1 withdrawal
L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized {
amount,
from,
to,
..
}) => {
let inserted = this.connection.execute(
deposits += inserted;
}
// L2 -> L1 withdrawal
L1StandardBridgeEvents::ETHBridgeFinalized(ETHBridgeFinalized {
amount,
from,
to,
..
}) => {
let inserted = connection.execute(
r#"
INSERT INTO withdrawals (block_number, tx_hash, contract_address, "from", "to", amount)
VALUES (?, ?, ?, ?, ?, ?)
@ -184,21 +178,20 @@ impl<Node: FullNodeTypes> Future for OPBridgeExEx<Node> {
amount.to_string(),
),
)?;
withdrawals += inserted;
}
_ => continue,
};
}
info!(block_range = ?committed_chain.range(), %deposits, %withdrawals, "Committed chain events");
// Send a finished height event, signaling the node that we don't need any blocks below
// this height anymore
this.ctx.events.send(ExExEvent::FinishedHeight(notification.tip().number))?;
withdrawals += inserted;
}
_ => continue,
};
}
Poll::Pending
info!(block_range = ?committed_chain.range(), %deposits, %withdrawals, "Committed chain events");
// Send a finished height event, signaling the node that we don't need any blocks below
// this height anymore
ctx.events.send(ExExEvent::FinishedHeight(notification.tip().number))?;
}
Ok(())
}
/// Decode chain of blocks into a flattened list of receipt logs, and filter only
@ -232,9 +225,9 @@ fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("OPBridge", move |ctx| async {
.install_exex("OPBridge", |ctx| async move {
let connection = Connection::open("op_bridge.db")?;
OPBridgeExEx::new(ctx, connection)
init(ctx, connection).await
})
.launch()
.await?;