mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
perf: no longer spawn filter tasks (#4096)
This commit is contained in:
@ -15,11 +15,8 @@ use reth_rpc_api::EthFilterApiServer;
|
||||
use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use reth_transaction_pool::TransactionPool;
|
||||
use std::{
|
||||
collections::HashMap, future::Future, iter::StepBy, ops::RangeInclusive, sync::Arc,
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::sync::{oneshot, Mutex};
|
||||
use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::trace;
|
||||
|
||||
/// The maximum number of headers we read at once when handling a range filter.
|
||||
@ -69,26 +66,6 @@ where
|
||||
Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static,
|
||||
Pool: TransactionPool + 'static,
|
||||
{
|
||||
/// Executes the given filter on a new task.
|
||||
///
|
||||
/// All the filter handles are implemented asynchronously. However, filtering is still a bit CPU
|
||||
/// intensive.
|
||||
async fn spawn_filter_task<C, F, R>(&self, c: C) -> Result<R, FilterError>
|
||||
where
|
||||
C: FnOnce(Self) -> F,
|
||||
F: Future<Output = Result<R, FilterError>> + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let this = self.clone();
|
||||
let f = c(this);
|
||||
self.inner.task_spawner.spawn(Box::pin(async move {
|
||||
let res = f.await;
|
||||
let _ = tx.send(res);
|
||||
}));
|
||||
rx.await.map_err(|_| FilterError::InternalError)?
|
||||
}
|
||||
|
||||
/// Returns all the filter changes for the given id, if any
|
||||
pub async fn filter_changes(&self, id: FilterId) -> Result<FilterChanges, FilterError> {
|
||||
let info = self.inner.provider.chain_info()?;
|
||||
@ -202,7 +179,7 @@ where
|
||||
/// Handler for `eth_getFilterChanges`
|
||||
async fn filter_changes(&self, id: FilterId) -> RpcResult<FilterChanges> {
|
||||
trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
|
||||
Ok(self.spawn_filter_task(|this| async move { this.filter_changes(id).await }).await?)
|
||||
Ok(EthFilter::filter_changes(self, id).await?)
|
||||
}
|
||||
|
||||
/// Returns an array of all logs matching filter with given id.
|
||||
@ -212,7 +189,7 @@ where
|
||||
/// Handler for `eth_getFilterLogs`
|
||||
async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
|
||||
trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
|
||||
Ok(self.spawn_filter_task(|this| async move { this.filter_logs(id).await }).await?)
|
||||
Ok(EthFilter::filter_logs(self, id).await?)
|
||||
}
|
||||
|
||||
/// Handler for `eth_uninstallFilter`
|
||||
@ -232,9 +209,7 @@ where
|
||||
/// Handler for `eth_getLogs`
|
||||
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
|
||||
trace!(target: "rpc::eth", "Serving eth_getLogs");
|
||||
Ok(self
|
||||
.spawn_filter_task(|this| async move { this.inner.logs_for_filter(filter).await })
|
||||
.await?)
|
||||
Ok(EthFilter::logs(self, filter).await?)
|
||||
}
|
||||
}
|
||||
|
||||
@ -269,6 +244,7 @@ struct EthFilterInner<Provider, Pool> {
|
||||
/// maximum number of headers to read at once for range filter
|
||||
max_headers_range: u64,
|
||||
/// The type that can spawn tasks.
|
||||
#[allow(unused)]
|
||||
task_spawner: Box<dyn TaskSpawner>,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user