From 076c91a916f1b584adb19569927163882de7a3fc Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 25 Jul 2023 16:51:30 +0200 Subject: [PATCH] feat: add Tracing call pool (#3908) --- Cargo.lock | 2 + Cargo.toml | 1 + crates/net/downloaders/Cargo.toml | 2 +- crates/rpc/rpc-builder/Cargo.toml | 3 + crates/rpc/rpc-builder/src/lib.rs | 3 + crates/rpc/rpc-builder/src/tracing_pool.rs | 138 +++++++++++++++++++++ crates/stages/Cargo.toml | 2 +- 7 files changed, 149 insertions(+), 2 deletions(-) create mode 100644 crates/rpc/rpc-builder/src/tracing_pool.rs diff --git a/Cargo.lock b/Cargo.lock index 7b674b4ee..5df882cc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5740,6 +5740,8 @@ version = "0.1.0-alpha.4" dependencies = [ "hyper", "jsonrpsee", + "pin-project", + "rayon", "reth-beacon-consensus", "reth-interfaces", "reth-ipc", diff --git a/Cargo.toml b/Cargo.toml index 92ed754a3..89efb758c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ serde_json = "1.0.94" serde = { version = "1.0", default-features = false } rand = "0.8.5" strum = "0.25" +rayon = "1.7" ### proc-macros proc-macro2 = "1.0" diff --git a/crates/net/downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml index 07942b2ad..36179fe07 100644 --- a/crates/net/downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -26,7 +26,7 @@ tokio-util = { workspace = true, features = ["codec"] } # misc tracing = { workspace = true } -rayon = "1.6.0" +rayon = { workspace = true } thiserror = { workspace = true } # optional deps for the test-utils feature diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 8d51e1780..c0078969d 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -33,6 +33,9 @@ strum = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] } thiserror = { workspace = true } tracing = { workspace = true } +rayon = { workspace = true } +pin-project = { workspace = true } +tokio = { workspace = true, features = ["sync"] } [dev-dependencies] reth-tracing = { path = "../../tracing" } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 8737519fa..afa7a37bf 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -154,6 +154,9 @@ mod eth; /// Common RPC constants. pub mod constants; +/// Additional support for tracing related rpc calls +pub mod tracing_pool; + // re-export for convenience pub use crate::eth::{EthConfig, EthHandlers}; pub use jsonrpsee::server::ServerBuilder; diff --git a/crates/rpc/rpc-builder/src/tracing_pool.rs b/crates/rpc/rpc-builder/src/tracing_pool.rs new file mode 100644 index 000000000..dd3056117 --- /dev/null +++ b/crates/rpc/rpc-builder/src/tracing_pool.rs @@ -0,0 +1,138 @@ +//! Additional helpers for executing tracing calls + +use std::{ + future::Future, + panic::{catch_unwind, AssertUnwindSafe}, + pin::Pin, + sync::Arc, + task::{ready, Context, Poll}, + thread, +}; +use tokio::sync::oneshot; + +/// Used to execute tracing calls on a rayon threadpool from within a tokio runtime. +#[derive(Clone, Debug)] +pub struct TracingCallPool { + pool: Arc, +} + +impl TracingCallPool { + /// Create a new `TracingCallPool` with the given threadpool. + pub fn new(pool: rayon::ThreadPool) -> Self { + Self { pool: Arc::new(pool) } + } + + /// Convenience function to start building a new threadpool. + pub fn builder() -> rayon::ThreadPoolBuilder { + rayon::ThreadPoolBuilder::new() + } + + /// Convenience function to build a new threadpool with the default configuration. + /// + /// Uses [`rayon::ThreadPoolBuilder::build`](rayon::ThreadPoolBuilder::build) defaults but + /// increases the stack size to 8MB. + pub fn build() -> Result { + Self::builder() + // increase stack size, mostly for RPC calls that use the evm: and + .stack_size(8 * 1024 * 1024) + .build() + .map(Self::new) + } + + /// Asynchronous wrapper around Rayon's + /// [`ThreadPool::spawn`](rayon::ThreadPool::spawn). + /// + /// Runs a function on the configured threadpool, returning a future that resolves with the + /// function's return value. + /// + /// If the function panics, the future will resolve to an error. + pub fn spawn(&self, func: F) -> TracingCallHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + + self.pool.spawn(move || { + let _result = tx.send(catch_unwind(AssertUnwindSafe(func))); + }); + + TracingCallHandle { rx } + } + + /// Asynchronous wrapper around Rayon's + /// [`ThreadPool::spawn_fifo`](rayon::ThreadPool::spawn_fifo). + /// + /// Runs a function on the configured threadpool, returning a future that resolves with the + /// function's return value. + /// + /// If the function panics, the future will resolve to an error. + pub fn spawn_fifo(&self, func: F) -> TracingCallHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + + self.pool.spawn_fifo(move || { + let _result = tx.send(catch_unwind(AssertUnwindSafe(func))); + }); + + TracingCallHandle { rx } + } +} + +/// Async handle for a blocking tracing task running in a Rayon thread pool. +/// +/// ## Panics +/// +/// If polled from outside a tokio runtime. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[pin_project::pin_project] +pub struct TracingCallHandle { + #[pin] + pub(crate) rx: oneshot::Receiver>, +} + +impl Future for TracingCallHandle { + type Output = thread::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match ready!(self.project().rx.poll(cx)) { + Ok(res) => Poll::Ready(res), + Err(_) => Poll::Ready(Err(Box::::default())), + } + } +} + +/// An error returned when the Tokio channel is dropped while awaiting a result. +/// +/// This should only happen +#[derive(Debug, Default, thiserror::Error)] +#[error("Tokio channel dropped while awaiting result")] +#[non_exhaustive] +pub struct TokioTracingCallError; + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn tracing_pool() { + let pool = TracingCallPool::build().unwrap(); + let res = pool.spawn(move || 5); + let res = res.await.unwrap(); + assert_eq!(res, 5); + } + + #[tokio::test] + async fn tracing_pool_panic() { + let pool = TracingCallPool::build().unwrap(); + let res = pool.spawn(move || -> i32 { + panic!(); + }); + let res = res.await; + assert!(res.is_err()); + } +} diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index f83dcab22..c8236c3a7 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -41,7 +41,7 @@ serde = { workspace = true } thiserror = { workspace = true } aquamarine = "0.3.0" itertools = "0.10.5" -rayon = "1.6.0" +rayon = { workspace = true } num-traits = "0.2.15" [dev-dependencies]