feat(rpc): limit block_range by 100_000 per eth_getLogs request (#5243)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
kaliubuntu0206
2023-11-02 23:29:03 +09:00
committed by GitHub
parent 24fade37ec
commit 0449c5eca6
9 changed files with 200 additions and 29 deletions

View File

@ -31,7 +31,7 @@ pub use stage_args::StageEnum;
mod gas_price_oracle_args; mod gas_price_oracle_args;
pub use gas_price_oracle_args::GasPriceOracleArgs; pub use gas_price_oracle_args::GasPriceOracleArgs;
/// TxPoolArgs for congiguring the transaction pool /// TxPoolArgs for configuring the transaction pool
mod txpool_args; mod txpool_args;
pub use txpool_args::TxPoolArgs; pub use txpool_args::TxPoolArgs;
@ -44,3 +44,5 @@ mod pruning_args;
pub use pruning_args::PruningArgs; pub use pruning_args::PruningArgs;
pub mod utils; pub mod utils;
pub mod types;

View File

@ -1,7 +1,7 @@
//! clap [Args](clap::Args) for RPC related arguments. //! clap [Args](clap::Args) for RPC related arguments.
use crate::{ use crate::{
args::GasPriceOracleArgs, args::{types::ZeroAsNone, GasPriceOracleArgs},
cli::{ cli::{
components::{RethNodeComponents, RethRpcComponents, RethRpcServerHandles}, components::{RethNodeComponents, RethRpcComponents, RethRpcServerHandles},
config::RethRpcConfig, config::RethRpcConfig,
@ -140,9 +140,13 @@ pub struct RpcServerArgs {
#[arg(long, value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_TRACING_REQUESTS)] #[arg(long, value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_TRACING_REQUESTS)]
pub rpc_max_tracing_requests: u32, pub rpc_max_tracing_requests: u32,
/// Maximum number of logs that can be returned in a single response. /// Maximum number of blocks that could be scanned per filter request. (0 = entire chain)
#[arg(long, value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_LOGS_PER_RESPONSE)] #[arg(long, value_name = "COUNT", default_value_t = ZeroAsNone::new(constants::DEFAULT_MAX_BLOCKS_PER_FILTER))]
pub rpc_max_logs_per_response: usize, pub rpc_max_blocks_per_filter: ZeroAsNone,
/// Maximum number of logs that can be returned in a single response. (0 = no limit)
#[arg(long, value_name = "COUNT", default_value_t = ZeroAsNone::new(constants::DEFAULT_MAX_LOGS_PER_RESPONSE as u64))]
pub rpc_max_logs_per_response: ZeroAsNone,
/// Maximum gas limit for `eth_call` and call tracing RPC methods. /// Maximum gas limit for `eth_call` and call tracing RPC methods.
#[arg( #[arg(
@ -326,7 +330,8 @@ impl RethRpcConfig for RpcServerArgs {
fn eth_config(&self) -> EthConfig { fn eth_config(&self) -> EthConfig {
EthConfig::default() EthConfig::default()
.max_tracing_requests(self.rpc_max_tracing_requests) .max_tracing_requests(self.rpc_max_tracing_requests)
.max_logs_per_response(self.rpc_max_logs_per_response) .max_blocks_per_filter(self.rpc_max_blocks_per_filter.unwrap_or_max())
.max_logs_per_response(self.rpc_max_logs_per_response.unwrap_or_max() as usize)
.rpc_gas_cap(self.rpc_gas_cap) .rpc_gas_cap(self.rpc_gas_cap)
.gpo_config(self.gas_price_oracle_config()) .gpo_config(self.gas_price_oracle_config())
} }
@ -598,4 +603,36 @@ mod tests {
); );
assert_eq!(config.ipc_endpoint().unwrap().path(), constants::DEFAULT_IPC_ENDPOINT); assert_eq!(config.ipc_endpoint().unwrap().path(), constants::DEFAULT_IPC_ENDPOINT);
} }
#[test]
fn test_zero_filter_limits() {
let args = CommandParser::<RpcServerArgs>::parse_from([
"reth",
"--rpc-max-blocks-per-filter",
"0",
"--rpc-max-logs-per-response",
"0",
])
.args;
let config = args.eth_config().filter_config();
assert_eq!(config.max_blocks_per_filter, Some(u64::MAX));
assert_eq!(config.max_logs_per_response, Some(usize::MAX));
}
#[test]
fn test_custom_filter_limits() {
let args = CommandParser::<RpcServerArgs>::parse_from([
"reth",
"--rpc-max-blocks-per-filter",
"100",
"--rpc-max-logs-per-response",
"200",
])
.args;
let config = args.eth_config().filter_config();
assert_eq!(config.max_blocks_per_filter, Some(100));
assert_eq!(config.max_logs_per_response, Some(200));
}
} }

View File

@ -0,0 +1,49 @@
//! Additional helper types for CLI parsing.
use std::{fmt, str::FromStr};
/// A helper type that maps `0` to `None` when parsing CLI arguments.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ZeroAsNone(pub Option<u64>);
impl ZeroAsNone {
/// Returns the inner value.
pub const fn new(value: u64) -> Self {
Self(Some(value))
}
/// Returns the inner value or `u64::MAX` if `None`.
pub fn unwrap_or_max(self) -> u64 {
self.0.unwrap_or(u64::MAX)
}
}
impl fmt::Display for ZeroAsNone {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0 {
Some(value) => write!(f, "{}", value),
None => write!(f, "0"),
}
}
}
impl FromStr for ZeroAsNone {
type Err = std::num::ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let value = s.parse::<u64>()?;
Ok(Self(if value == 0 { None } else { Some(value) }))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zero_parse() {
let val = "0".parse::<ZeroAsNone>().unwrap();
assert_eq!(val, ZeroAsNone(None));
assert_eq!(val.unwrap_or_max(), u64::MAX);
}
}

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
constants, constants,
constants::DEFAULT_MAX_LOGS_PER_RESPONSE, constants::{DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE},
error::{RpcError, ServerKind}, error::{RpcError, ServerKind},
EthConfig, EthConfig,
}; };
@ -16,7 +16,7 @@ use reth_provider::{
StateProviderFactory, StateProviderFactory,
}; };
use reth_rpc::{ use reth_rpc::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, eth::{cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig},
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter,
EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret,
}; };
@ -68,14 +68,11 @@ where
Box::new(executor.clone()), Box::new(executor.clone()),
BlockingTaskPool::build().expect("failed to build tracing pool"), BlockingTaskPool::build().expect("failed to build tracing pool"),
); );
let eth_filter = EthFilter::new( let config = EthFilterConfig::default()
provider, .max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE)
pool, .max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER);
eth_cache.clone(), let eth_filter =
DEFAULT_MAX_LOGS_PER_RESPONSE, EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone()));
Box::new(executor.clone()),
EthConfig::default().stale_filter_ttl,
);
launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await
} }

View File

@ -7,6 +7,9 @@ pub const DEFAULT_WS_RPC_PORT: u16 = 8546;
/// The default port for the auth server. /// The default port for the auth server.
pub const DEFAULT_AUTH_PORT: u16 = 8551; pub const DEFAULT_AUTH_PORT: u16 = 8551;
/// The default maximum block range allowed to filter
pub const DEFAULT_MAX_BLOCKS_PER_FILTER: u64 = 100_000;
/// The default maximum of logs in a single response. /// The default maximum of logs in a single response.
pub const DEFAULT_MAX_LOGS_PER_RESPONSE: usize = 20_000; pub const DEFAULT_MAX_LOGS_PER_RESPONSE: usize = 20_000;

View File

@ -1,9 +1,11 @@
use crate::constants::{DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_TRACING_REQUESTS}; use crate::constants::{
DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_TRACING_REQUESTS,
};
use reth_rpc::{ use reth_rpc::{
eth::{ eth::{
cache::{EthStateCache, EthStateCacheConfig}, cache::{EthStateCache, EthStateCacheConfig},
gas_oracle::GasPriceOracleConfig, gas_oracle::GasPriceOracleConfig,
RPC_DEFAULT_GAS_CAP, EthFilterConfig, RPC_DEFAULT_GAS_CAP,
}, },
BlockingTaskPool, EthApi, EthFilter, EthPubSub, BlockingTaskPool, EthApi, EthFilter, EthPubSub,
}; };
@ -33,6 +35,8 @@ pub struct EthConfig {
pub gas_oracle: GasPriceOracleConfig, pub gas_oracle: GasPriceOracleConfig,
/// The maximum number of tracing calls that can be executed in concurrently. /// The maximum number of tracing calls that can be executed in concurrently.
pub max_tracing_requests: u32, pub max_tracing_requests: u32,
/// Maximum number of blocks that could be scanned per filter request in `eth_getLogs` calls.
pub max_blocks_per_filter: u64,
/// Maximum number of logs that can be returned in a single response in `eth_getLogs` calls. /// Maximum number of logs that can be returned in a single response in `eth_getLogs` calls.
pub max_logs_per_response: usize, pub max_logs_per_response: usize,
/// Gas limit for `eth_call` and call tracing RPC methods. /// Gas limit for `eth_call` and call tracing RPC methods.
@ -44,6 +48,16 @@ pub struct EthConfig {
pub stale_filter_ttl: std::time::Duration, pub stale_filter_ttl: std::time::Duration,
} }
impl EthConfig {
/// Returns the filter config for the `eth_filter` handler.
pub fn filter_config(&self) -> EthFilterConfig {
EthFilterConfig::default()
.max_blocks_per_filter(self.max_blocks_per_filter)
.max_logs_per_response(self.max_logs_per_response)
.stale_filter_ttl(self.stale_filter_ttl)
}
}
/// Default value for stale filter ttl /// Default value for stale filter ttl
const DEFAULT_STALE_FILTER_TTL: std::time::Duration = std::time::Duration::from_secs(5 * 60); const DEFAULT_STALE_FILTER_TTL: std::time::Duration = std::time::Duration::from_secs(5 * 60);
@ -53,6 +67,7 @@ impl Default for EthConfig {
cache: EthStateCacheConfig::default(), cache: EthStateCacheConfig::default(),
gas_oracle: GasPriceOracleConfig::default(), gas_oracle: GasPriceOracleConfig::default(),
max_tracing_requests: DEFAULT_MAX_TRACING_REQUESTS, max_tracing_requests: DEFAULT_MAX_TRACING_REQUESTS,
max_blocks_per_filter: DEFAULT_MAX_BLOCKS_PER_FILTER,
max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE, max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE,
rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(), rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(),
stale_filter_ttl: DEFAULT_STALE_FILTER_TTL, stale_filter_ttl: DEFAULT_STALE_FILTER_TTL,
@ -79,6 +94,12 @@ impl EthConfig {
self self
} }
/// Configures the maximum block length to scan per `eth_getLogs` request
pub fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self {
self.max_blocks_per_filter = max_blocks;
self
}
/// Configures the maximum number of logs per response /// Configures the maximum number of logs per response
pub fn max_logs_per_response(mut self, max_logs: usize) -> Self { pub fn max_logs_per_response(mut self, max_logs: usize) -> Self {
self.max_logs_per_response = max_logs; self.max_logs_per_response = max_logs;

View File

@ -1043,9 +1043,8 @@ where
self.provider.clone(), self.provider.clone(),
self.pool.clone(), self.pool.clone(),
cache.clone(), cache.clone(),
self.config.eth.max_logs_per_response, self.config.eth.filter_config(),
executor.clone(), executor.clone(),
self.config.eth.stale_filter_ttl,
); );
let pubsub = EthPubSub::with_spawner( let pubsub = EthPubSub::with_spawner(

View File

@ -51,29 +51,32 @@ where
/// Creates a new, shareable instance. /// Creates a new, shareable instance.
/// ///
/// This uses the given pool to get notified about new transactions, the provider to interact /// This uses the given pool to get notified about new transactions, the provider to interact
/// with the blockchain, the cache to fetch cacheable data, like the logs and the /// with the blockchain, the cache to fetch cacheable data, like the logs.
/// max_logs_per_response to limit the amount of logs returned in a single response ///
/// `eth_getLogs` /// See also [EthFilterConfig].
/// ///
/// This also spawns a task that periodically clears stale filters. /// This also spawns a task that periodically clears stale filters.
pub fn new( pub fn new(
provider: Provider, provider: Provider,
pool: Pool, pool: Pool,
eth_cache: EthStateCache, eth_cache: EthStateCache,
max_logs_per_response: usize, config: EthFilterConfig,
task_spawner: Box<dyn TaskSpawner>, task_spawner: Box<dyn TaskSpawner>,
stale_filter_ttl: Duration,
) -> Self { ) -> Self {
let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
config;
let inner = EthFilterInner { let inner = EthFilterInner {
provider, provider,
active_filters: Default::default(), active_filters: Default::default(),
pool, pool,
id_provider: Arc::new(EthSubscriptionIdProvider::default()), id_provider: Arc::new(EthSubscriptionIdProvider::default()),
max_logs_per_response,
eth_cache, eth_cache,
max_headers_range: MAX_HEADERS_RANGE, max_headers_range: MAX_HEADERS_RANGE,
task_spawner, task_spawner,
stale_filter_ttl, stale_filter_ttl,
// if not set, use the max value, which is effectively no limit
max_blocks_per_filter: max_blocks_per_filter.unwrap_or(u64::MAX),
max_logs_per_response: max_logs_per_response.unwrap_or(usize::MAX),
}; };
let eth_filter = Self { inner: Arc::new(inner) }; let eth_filter = Self { inner: Arc::new(inner) };
@ -324,6 +327,8 @@ struct EthFilterInner<Provider, Pool> {
active_filters: ActiveFilters, active_filters: ActiveFilters,
/// Provides ids to identify filters /// Provides ids to identify filters
id_provider: Arc<dyn IdProvider>, id_provider: Arc<dyn IdProvider>,
/// Maximum number of blocks that could be scanned per filter
max_blocks_per_filter: u64,
/// Maximum number of logs that can be returned in a response /// Maximum number of logs that can be returned in a response
max_logs_per_response: usize, max_logs_per_response: usize,
/// The async cache frontend for eth related data /// The async cache frontend for eth related data
@ -424,6 +429,10 @@ where
) -> Result<Vec<Log>, FilterError> { ) -> Result<Vec<Log>, FilterError> {
trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range"); trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
if to_block - from_block > self.max_blocks_per_filter {
return Err(FilterError::QueryExceedsMaxBlocks(self.max_blocks_per_filter))
}
let mut all_logs = Vec::new(); let mut all_logs = Vec::new();
let filter_params = FilteredParams::new(Some(filter.clone())); let filter_params = FilteredParams::new(Some(filter.clone()));
@ -431,8 +440,6 @@ where
let address_filter = FilteredParams::address_filter(&filter.address); let address_filter = FilteredParams::address_filter(&filter.address);
let topics_filter = FilteredParams::topics_filter(&filter.topics); let topics_filter = FilteredParams::topics_filter(&filter.topics);
let is_multi_block_range = from_block != to_block;
// loop over the range of new blocks and check logs if the filter matches the log's bloom // loop over the range of new blocks and check logs if the filter matches the log's bloom
// filter // filter
for (from, to) in for (from, to) in
@ -467,6 +474,7 @@ where
// size check but only if range is multiple blocks, so we always return all // size check but only if range is multiple blocks, so we always return all
// logs of a single block // logs of a single block
let is_multi_block_range = from_block != to_block;
if is_multi_block_range && all_logs.len() > self.max_logs_per_response { if is_multi_block_range && all_logs.len() > self.max_logs_per_response {
return Err(FilterError::QueryExceedsMaxResults( return Err(FilterError::QueryExceedsMaxResults(
self.max_logs_per_response, self.max_logs_per_response,
@ -481,6 +489,56 @@ where
} }
} }
/// Config for the filter
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EthFilterConfig {
/// Maximum number of blocks that a filter can scan for logs.
///
/// If `None` then no limit is enforced.
pub max_blocks_per_filter: Option<u64>,
/// Maximum number of logs that can be returned in a single response in `eth_getLogs` calls.
///
/// If `None` then no limit is enforced.
pub max_logs_per_response: Option<usize>,
/// How long a filter remains valid after the last poll.
///
/// A filter is considered stale if it has not been polled for longer than this duration and
/// will be removed.
pub stale_filter_ttl: Duration,
}
impl EthFilterConfig {
/// Sets the maximum number of blocks that a filter can scan for logs.
pub fn max_blocks_per_filter(mut self, num: u64) -> Self {
self.max_blocks_per_filter = Some(num);
self
}
/// Sets the maximum number of logs that can be returned in a single response in `eth_getLogs`
/// calls.
pub fn max_logs_per_response(mut self, num: usize) -> Self {
self.max_logs_per_response = Some(num);
self
}
/// Sets how long a filter remains valid after the last poll before it will be removed.
pub fn stale_filter_ttl(mut self, duration: Duration) -> Self {
self.stale_filter_ttl = duration;
self
}
}
impl Default for EthFilterConfig {
fn default() -> Self {
Self {
max_blocks_per_filter: None,
max_logs_per_response: None,
// 5min
stale_filter_ttl: Duration::from_secs(5 * 60),
}
}
}
/// All active filters /// All active filters
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct ActiveFilters { pub struct ActiveFilters {
@ -599,6 +657,8 @@ enum FilterKind {
pub enum FilterError { pub enum FilterError {
#[error("filter not found")] #[error("filter not found")]
FilterNotFound(FilterId), FilterNotFound(FilterId),
#[error("query exceeds max block range {0}")]
QueryExceedsMaxBlocks(u64),
#[error("query exceeds max results {0}")] #[error("query exceeds max results {0}")]
QueryExceedsMaxResults(usize), QueryExceedsMaxResults(usize),
#[error(transparent)] #[error(transparent)]
@ -620,6 +680,9 @@ impl From<FilterError> for jsonrpsee::types::error::ErrorObject<'static> {
rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string()) rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
} }
FilterError::EthAPIError(err) => err.into(), FilterError::EthAPIError(err) => err.into(),
err @ FilterError::QueryExceedsMaxBlocks(_) => {
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
}
err @ FilterError::QueryExceedsMaxResults(_) => { err @ FilterError::QueryExceedsMaxResults(_) => {
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string()) rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
} }

View File

@ -15,6 +15,6 @@ pub(crate) mod utils;
pub use api::{EthApi, EthApiSpec, EthTransactions, TransactionSource, RPC_DEFAULT_GAS_CAP}; pub use api::{EthApi, EthApiSpec, EthTransactions, TransactionSource, RPC_DEFAULT_GAS_CAP};
pub use bundle::EthBundle; pub use bundle::EthBundle;
pub use filter::EthFilter; pub use filter::{EthFilter, EthFilterConfig};
pub use id_provider::EthSubscriptionIdProvider; pub use id_provider::EthSubscriptionIdProvider;
pub use pubsub::EthPubSub; pub use pubsub::EthPubSub;