feat: add Tracing call pool (#3908)

This commit is contained in:
Matthias Seitz
2023-07-25 16:51:30 +02:00
committed by GitHub
parent 1c4e19bad7
commit 076c91a916
7 changed files with 149 additions and 2 deletions

2
Cargo.lock generated
View File

@ -5740,6 +5740,8 @@ version = "0.1.0-alpha.4"
dependencies = [
"hyper",
"jsonrpsee",
"pin-project",
"rayon",
"reth-beacon-consensus",
"reth-interfaces",
"reth-ipc",

View File

@ -118,6 +118,7 @@ serde_json = "1.0.94"
serde = { version = "1.0", default-features = false }
rand = "0.8.5"
strum = "0.25"
rayon = "1.7"
### proc-macros
proc-macro2 = "1.0"

View File

@ -26,7 +26,7 @@ tokio-util = { workspace = true, features = ["codec"] }
# misc
tracing = { workspace = true }
rayon = "1.6.0"
rayon = { workspace = true }
thiserror = { workspace = true }
# optional deps for the test-utils feature

View File

@ -33,6 +33,9 @@ strum = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
tracing = { workspace = true }
rayon = { workspace = true }
pin-project = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
[dev-dependencies]
reth-tracing = { path = "../../tracing" }

View File

@ -154,6 +154,9 @@ mod eth;
/// Common RPC constants.
pub mod constants;
/// Additional support for tracing related rpc calls
pub mod tracing_pool;
// re-export for convenience
pub use crate::eth::{EthConfig, EthHandlers};
pub use jsonrpsee::server::ServerBuilder;

View File

@ -0,0 +1,138 @@
//! Additional helpers for executing tracing calls
use std::{
future::Future,
panic::{catch_unwind, AssertUnwindSafe},
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
thread,
};
use tokio::sync::oneshot;
/// Used to execute tracing calls on a rayon threadpool from within a tokio runtime.
#[derive(Clone, Debug)]
pub struct TracingCallPool {
pool: Arc<rayon::ThreadPool>,
}
impl TracingCallPool {
/// Create a new `TracingCallPool` with the given threadpool.
pub fn new(pool: rayon::ThreadPool) -> Self {
Self { pool: Arc::new(pool) }
}
/// Convenience function to start building a new threadpool.
pub fn builder() -> rayon::ThreadPoolBuilder {
rayon::ThreadPoolBuilder::new()
}
/// Convenience function to build a new threadpool with the default configuration.
///
/// Uses [`rayon::ThreadPoolBuilder::build`](rayon::ThreadPoolBuilder::build) defaults but
/// increases the stack size to 8MB.
pub fn build() -> Result<Self, rayon::ThreadPoolBuildError> {
Self::builder()
// increase stack size, mostly for RPC calls that use the evm: <https://github.com/paradigmxyz/reth/issues/3056> and <https://github.com/bluealloy/revm/issues/305>
.stack_size(8 * 1024 * 1024)
.build()
.map(Self::new)
}
/// Asynchronous wrapper around Rayon's
/// [`ThreadPool::spawn`](rayon::ThreadPool::spawn).
///
/// Runs a function on the configured threadpool, returning a future that resolves with the
/// function's return value.
///
/// If the function panics, the future will resolve to an error.
pub fn spawn<F, R>(&self, func: F) -> TracingCallHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
self.pool.spawn(move || {
let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
});
TracingCallHandle { rx }
}
/// Asynchronous wrapper around Rayon's
/// [`ThreadPool::spawn_fifo`](rayon::ThreadPool::spawn_fifo).
///
/// Runs a function on the configured threadpool, returning a future that resolves with the
/// function's return value.
///
/// If the function panics, the future will resolve to an error.
pub fn spawn_fifo<F, R>(&self, func: F) -> TracingCallHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
self.pool.spawn_fifo(move || {
let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
});
TracingCallHandle { rx }
}
}
/// Async handle for a blocking tracing task running in a Rayon thread pool.
///
/// ## Panics
///
/// If polled from outside a tokio runtime.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
pub struct TracingCallHandle<T> {
#[pin]
pub(crate) rx: oneshot::Receiver<thread::Result<T>>,
}
impl<T> Future for TracingCallHandle<T> {
type Output = thread::Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(self.project().rx.poll(cx)) {
Ok(res) => Poll::Ready(res),
Err(_) => Poll::Ready(Err(Box::<TokioTracingCallError>::default())),
}
}
}
/// An error returned when the Tokio channel is dropped while awaiting a result.
///
/// This should only happen
#[derive(Debug, Default, thiserror::Error)]
#[error("Tokio channel dropped while awaiting result")]
#[non_exhaustive]
pub struct TokioTracingCallError;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn tracing_pool() {
let pool = TracingCallPool::build().unwrap();
let res = pool.spawn(move || 5);
let res = res.await.unwrap();
assert_eq!(res, 5);
}
#[tokio::test]
async fn tracing_pool_panic() {
let pool = TracingCallPool::build().unwrap();
let res = pool.spawn(move || -> i32 {
panic!();
});
let res = res.await;
assert!(res.is_err());
}
}

View File

@ -41,7 +41,7 @@ serde = { workspace = true }
thiserror = { workspace = true }
aquamarine = "0.3.0"
itertools = "0.10.5"
rayon = "1.6.0"
rayon = { workspace = true }
num-traits = "0.2.15"
[dev-dependencies]