perf: spawn eth filter tasks (#2696)

This commit is contained in:
Matthias Seitz
2023-05-16 19:35:48 +02:00
committed by GitHub
parent a97c5b34a9
commit 42843d5d71
3 changed files with 163 additions and 97 deletions

View File

@ -61,7 +61,13 @@ where
gas_oracle, gas_oracle,
Box::new(executor.clone()), Box::new(executor.clone()),
); );
let eth_filter = EthFilter::new(client, pool, eth_cache.clone(), DEFAULT_MAX_LOGS_IN_RESPONSE); let eth_filter = EthFilter::new(
client,
pool,
eth_cache.clone(),
DEFAULT_MAX_LOGS_IN_RESPONSE,
Box::new(executor.clone()),
);
launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await
} }

View File

@ -845,19 +845,21 @@ where
}), }),
); );
let executor = Box::new(self.executor.clone());
let api = EthApi::with_spawner( let api = EthApi::with_spawner(
self.client.clone(), self.client.clone(),
self.pool.clone(), self.pool.clone(),
self.network.clone(), self.network.clone(),
cache.clone(), cache.clone(),
gas_oracle, gas_oracle,
Box::new(self.executor.clone()), executor.clone(),
); );
let filter = EthFilter::new( let filter = EthFilter::new(
self.client.clone(), self.client.clone(),
self.pool.clone(), self.pool.clone(),
cache.clone(), cache.clone(),
self.config.eth.max_logs_per_response, self.config.eth.max_logs_per_response,
executor.clone(),
); );
let pubsub = EthPubSub::with_spawner( let pubsub = EthPubSub::with_spawner(
@ -865,7 +867,7 @@ where
self.pool.clone(), self.pool.clone(),
self.events.clone(), self.events.clone(),
self.network.clone(), self.network.clone(),
Box::new(self.executor.clone()), executor,
); );
let eth = EthHandlers { api, cache, filter, pubsub }; let eth = EthHandlers { api, cache, filter, pubsub };

View File

@ -4,7 +4,7 @@ use crate::{
error::{EthApiError, EthResult}, error::{EthApiError, EthResult},
logs_utils, logs_utils,
}, },
result::{internal_rpc_err, rpc_error_with_code, ToRpcResult}, result::{rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider, EthSubscriptionIdProvider,
}; };
use async_trait::async_trait; use async_trait::async_trait;
@ -13,16 +13,19 @@ use reth_primitives::{Receipt, SealedBlock};
use reth_provider::{BlockIdProvider, BlockProvider, EvmEnvProvider}; use reth_provider::{BlockIdProvider, BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthFilterApiServer; use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log}; use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool; use reth_transaction_pool::TransactionPool;
use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant}; use std::{
use tokio::sync::Mutex; collections::HashMap, future::Future, iter::StepBy, ops::RangeInclusive, sync::Arc,
time::Instant,
};
use tokio::sync::{oneshot, Mutex};
use tracing::trace; use tracing::trace;
/// The maximum number of headers we read at once when handling a range filter. /// The maximum number of headers we read at once when handling a range filter.
const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
/// `Eth` filter RPC implementation. /// `Eth` filter RPC implementation.
#[derive(Debug, Clone)]
pub struct EthFilter<Client, Pool> { pub struct EthFilter<Client, Pool> {
/// All nested fields bundled together. /// All nested fields bundled together.
inner: Arc<EthFilterInner<Client, Pool>>, inner: Arc<EthFilterInner<Client, Pool>>,
@ -40,6 +43,7 @@ impl<Client, Pool> EthFilter<Client, Pool> {
pool: Pool, pool: Pool,
eth_cache: EthStateCache, eth_cache: EthStateCache,
max_logs_per_response: usize, max_logs_per_response: usize,
task_spawner: Box<dyn TaskSpawner>,
) -> Self { ) -> Self {
let inner = EthFilterInner { let inner = EthFilterInner {
client, client,
@ -49,6 +53,7 @@ impl<Client, Pool> EthFilter<Client, Pool> {
max_logs_per_response, max_logs_per_response,
eth_cache, eth_cache,
max_headers_range: MAX_HEADERS_RANGE, max_headers_range: MAX_HEADERS_RANGE,
task_spawner,
}; };
Self { inner: Arc::new(inner) } Self { inner: Arc::new(inner) }
} }
@ -59,6 +64,117 @@ impl<Client, Pool> EthFilter<Client, Pool> {
} }
} }
impl<Client, Pool> EthFilter<Client, Pool>
where
Client: BlockProvider + BlockIdProvider + EvmEnvProvider + 'static,
Pool: TransactionPool + 'static,
{
/// Executes the given filter on a new task.
///
/// All the filter handles are implemented asynchronously. However, filtering is still a bit CPU
/// intensive.
async fn spawn_filter_task<C, F, R>(&self, c: C) -> Result<R, FilterError>
where
C: FnOnce(Self) -> F,
F: Future<Output = Result<R, FilterError>> + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let this = self.clone();
let f = c(this);
self.inner.task_spawner.spawn(Box::pin(async move {
let res = f.await;
let _ = tx.send(res);
}));
rx.await.map_err(|_| FilterError::InternalError)?
}
/// Returns all the filter changes for the given id, if any
pub async fn filter_changes(&self, id: FilterId) -> Result<FilterChanges, FilterError> {
let info = self.inner.client.chain_info()?;
let best_number = info.best_number;
let (start_block, kind) = {
let mut filters = self.inner.active_filters.inner.lock().await;
let filter = filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id))?;
// update filter
// we fetch all changes from [filter.block..best_block], so we advance the filter's
// block to `best_block +1`
let mut block = best_number + 1;
std::mem::swap(&mut filter.block, &mut block);
filter.last_poll_timestamp = Instant::now();
(block, filter.kind.clone())
};
match kind {
FilterKind::PendingTransaction => {
Err(EthApiError::Unsupported("pending transaction filter not supported").into())
}
FilterKind::Block => {
let mut block_hashes = Vec::new();
for block_num in start_block..best_number {
let block_hash = self
.inner
.client
.block_hash(block_num)?
.ok_or(EthApiError::UnknownBlockNumber)?;
block_hashes.push(block_hash);
}
Ok(FilterChanges::Hashes(block_hashes))
}
FilterKind::Log(filter) => {
let (from_block_number, to_block_number) = match filter.block_option {
FilterBlockOption::Range { from_block, to_block } => {
let from = from_block
.map(|num| self.inner.client.convert_block_number(num))
.transpose()?
.flatten();
let to = to_block
.map(|num| self.inner.client.convert_block_number(num))
.transpose()?
.flatten();
logs_utils::get_filter_block_range(from, to, start_block, info)
}
FilterBlockOption::AtBlockHash(_) => {
// blockHash is equivalent to fromBlock = toBlock = the block number with
// hash blockHash
(start_block, best_number)
}
};
let logs = self
.inner
.get_logs_in_block_range(&filter, from_block_number, to_block_number)
.await?;
Ok(FilterChanges::Logs(logs))
}
}
}
/// Returns an array of all logs matching filter with given id.
///
/// Returns an error if no matching log filter exists.
///
/// Handler for `eth_getFilterLogs`
pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, FilterError> {
let filter = {
let filters = self.inner.active_filters.inner.lock().await;
if let FilterKind::Log(ref filter) =
filters.get(&id).ok_or_else(|| FilterError::FilterNotFound(id.clone()))?.kind
{
*filter.clone()
} else {
// Not a log filter
return Err(FilterError::FilterNotFound(id))
}
};
self.inner.logs_for_filter(filter).await
}
}
#[async_trait] #[async_trait]
impl<Client, Pool> EthFilterApiServer for EthFilter<Client, Pool> impl<Client, Pool> EthFilterApiServer for EthFilter<Client, Pool>
where where
@ -86,69 +202,7 @@ where
/// Handler for `eth_getFilterChanges` /// Handler for `eth_getFilterChanges`
async fn filter_changes(&self, id: FilterId) -> RpcResult<FilterChanges> { async fn filter_changes(&self, id: FilterId) -> RpcResult<FilterChanges> {
trace!(target: "rpc::eth", "Serving eth_getFilterChanges"); trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
let info = self.inner.client.chain_info().to_rpc_result()?; Ok(self.spawn_filter_task(|this| async move { this.filter_changes(id).await }).await?)
let best_number = info.best_number;
let (start_block, kind) = {
let mut filters = self.inner.active_filters.inner.lock().await;
let filter = filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id))?;
// update filter
// we fetch all changes from [filter.block..best_block], so we advance the filter's
// block to `best_block +1`
let mut block = best_number + 1;
std::mem::swap(&mut filter.block, &mut block);
filter.last_poll_timestamp = Instant::now();
(block, filter.kind.clone())
};
match kind {
FilterKind::PendingTransaction => {
return Err(internal_rpc_err("method not implemented"))
}
FilterKind::Block => {
let mut block_hashes = Vec::new();
for block_num in start_block..best_number {
let block_hash = self
.inner
.client
.block_hash(block_num)
.to_rpc_result()?
.ok_or(EthApiError::UnknownBlockNumber)?;
block_hashes.push(block_hash);
}
Ok(FilterChanges::Hashes(block_hashes))
}
FilterKind::Log(filter) => {
let (from_block_number, to_block_number) = match filter.block_option {
FilterBlockOption::Range { from_block, to_block } => {
let from = from_block
.map(|num| self.inner.client.convert_block_number(num))
.transpose()
.to_rpc_result()?
.flatten();
let to = to_block
.map(|num| self.inner.client.convert_block_number(num))
.transpose()
.to_rpc_result()?
.flatten();
logs_utils::get_filter_block_range(from, to, start_block, info)
}
FilterBlockOption::AtBlockHash(_) => {
// blockHash is equivalent to fromBlock = toBlock = the block number with
// hash blockHash
(start_block, best_number)
}
};
let logs = self
.inner
.get_logs_in_block_range(&filter, from_block_number, to_block_number)
.await?;
Ok(FilterChanges::Logs(logs))
}
}
} }
/// Returns an array of all logs matching filter with given id. /// Returns an array of all logs matching filter with given id.
@ -158,19 +212,7 @@ where
/// Handler for `eth_getFilterLogs` /// Handler for `eth_getFilterLogs`
async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> { async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getFilterLogs"); trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
let filter = { Ok(self.spawn_filter_task(|this| async move { this.filter_logs(id).await }).await?)
let filters = self.inner.active_filters.inner.lock().await;
if let FilterKind::Log(ref filter) =
filters.get(&id).ok_or_else(|| FilterError::FilterNotFound(id.clone()))?.kind
{
*filter.clone()
} else {
// Not a log filter
return Err(FilterError::FilterNotFound(id).into())
}
};
self.inner.logs_for_filter(filter).await
} }
/// Handler for `eth_uninstallFilter` /// Handler for `eth_uninstallFilter`
@ -190,7 +232,21 @@ where
/// Handler for `eth_getLogs` /// Handler for `eth_getLogs`
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> { async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs"); trace!(target: "rpc::eth", "Serving eth_getLogs");
self.inner.logs_for_filter(filter).await Ok(self
.spawn_filter_task(|this| async move { this.inner.logs_for_filter(filter).await })
.await?)
}
}
impl<Client, Pool> std::fmt::Debug for EthFilter<Client, Pool> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthFilter").finish_non_exhaustive()
}
}
impl<Client, Pool> Clone for EthFilter<Client, Pool> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
} }
} }
@ -212,6 +268,8 @@ struct EthFilterInner<Client, Pool> {
eth_cache: EthStateCache, eth_cache: EthStateCache,
/// maximum number of headers to read at once for range filter /// maximum number of headers to read at once for range filter
max_headers_range: u64, max_headers_range: u64,
/// The type that can spawn tasks.
task_spawner: Box<dyn TaskSpawner>,
} }
impl<Client, Pool> EthFilterInner<Client, Pool> impl<Client, Pool> EthFilterInner<Client, Pool>
@ -220,16 +278,14 @@ where
Pool: TransactionPool + 'static, Pool: TransactionPool + 'static,
{ {
/// Returns logs matching given filter object. /// Returns logs matching given filter object.
async fn logs_for_filter(&self, filter: Filter) -> RpcResult<Vec<Log>> { async fn logs_for_filter(&self, filter: Filter) -> Result<Vec<Log>, FilterError> {
match filter.block_option { match filter.block_option {
FilterBlockOption::AtBlockHash(block_hash) => { FilterBlockOption::AtBlockHash(block_hash) => {
let mut all_logs = Vec::new(); let mut all_logs = Vec::new();
// all matching logs in the block, if it exists // all matching logs in the block, if it exists
if let Some(block) = self.eth_cache.get_block(block_hash).await.to_rpc_result()? { if let Some(block) = self.eth_cache.get_block(block_hash).await? {
// get receipts for the block // get receipts for the block
if let Some(receipts) = if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
self.eth_cache.get_receipts(block_hash).await.to_rpc_result()?
{
let filter = FilteredParams::new(Some(filter)); let filter = FilteredParams::new(Some(filter));
logs_utils::append_matching_block_logs( logs_utils::append_matching_block_logs(
&mut all_logs, &mut all_logs,
@ -244,25 +300,21 @@ where
} }
FilterBlockOption::Range { from_block, to_block } => { FilterBlockOption::Range { from_block, to_block } => {
// compute the range // compute the range
let info = self.client.chain_info().to_rpc_result()?; let info = self.client.chain_info()?;
// we start at the most recent block if unset in filter // we start at the most recent block if unset in filter
let start_block = info.best_number; let start_block = info.best_number;
let from = from_block let from = from_block
.map(|num| self.client.convert_block_number(num)) .map(|num| self.client.convert_block_number(num))
.transpose() .transpose()?
.to_rpc_result()?
.flatten(); .flatten();
let to = to_block let to = to_block
.map(|num| self.client.convert_block_number(num)) .map(|num| self.client.convert_block_number(num))
.transpose() .transpose()?
.to_rpc_result()?
.flatten(); .flatten();
let (from_block_number, to_block_number) = let (from_block_number, to_block_number) =
logs_utils::get_filter_block_range(from, to, start_block, info); logs_utils::get_filter_block_range(from, to, start_block, info);
Ok(self self.get_logs_in_block_range(&filter, from_block_number, to_block_number).await
.get_logs_in_block_range(&filter, from_block_number, to_block_number)
.await?)
} }
} }
} }
@ -393,6 +445,9 @@ pub enum FilterError {
QueryExceedsMaxResults(usize), QueryExceedsMaxResults(usize),
#[error(transparent)] #[error(transparent)]
EthAPIError(#[from] EthApiError), EthAPIError(#[from] EthApiError),
/// Error thrown when a spawned task failed to deliver a response.
#[error("internal filter error")]
InternalError,
} }
// convert the error // convert the error
@ -403,6 +458,9 @@ impl From<FilterError> for jsonrpsee::types::error::ErrorObject<'static> {
jsonrpsee::types::error::INVALID_PARAMS_CODE, jsonrpsee::types::error::INVALID_PARAMS_CODE,
"filter not found", "filter not found",
), ),
err @ FilterError::InternalError => {
rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
}
FilterError::EthAPIError(err) => err.into(), FilterError::EthAPIError(err) => err.into(),
err @ FilterError::QueryExceedsMaxResults(_) => { err @ FilterError::QueryExceedsMaxResults(_) => {
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string()) rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())