mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: stale filters cleaner (#5080)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@ -74,6 +74,7 @@ where
|
||||
eth_cache.clone(),
|
||||
DEFAULT_MAX_LOGS_PER_RESPONSE,
|
||||
Box::new(executor.clone()),
|
||||
EthConfig::default().stale_filter_ttl,
|
||||
);
|
||||
launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await
|
||||
}
|
||||
|
||||
@ -39,8 +39,14 @@ pub struct EthConfig {
|
||||
///
|
||||
/// Defaults to [RPC_DEFAULT_GAS_CAP]
|
||||
pub rpc_gas_cap: u64,
|
||||
///
|
||||
/// Sets TTL for stale filters
|
||||
pub stale_filter_ttl: std::time::Duration,
|
||||
}
|
||||
|
||||
/// Default value for stale filter ttl
|
||||
const DEFAULT_STALE_FILTER_TTL: std::time::Duration = std::time::Duration::from_secs(5 * 60);
|
||||
|
||||
impl Default for EthConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@ -49,6 +55,7 @@ impl Default for EthConfig {
|
||||
max_tracing_requests: DEFAULT_MAX_TRACING_REQUESTS,
|
||||
max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE,
|
||||
rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(),
|
||||
stale_filter_ttl: DEFAULT_STALE_FILTER_TTL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1044,6 +1044,7 @@ where
|
||||
cache.clone(),
|
||||
self.config.eth.max_logs_per_response,
|
||||
executor.clone(),
|
||||
self.config.eth.stale_filter_ttl,
|
||||
);
|
||||
|
||||
let pubsub = EthPubSub::with_spawner(
|
||||
|
||||
@ -17,8 +17,17 @@ use reth_rpc_api::EthFilterApiServer;
|
||||
use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use reth_transaction_pool::TransactionPool;
|
||||
use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant};
|
||||
use tokio::sync::{mpsc::Receiver, Mutex};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
iter::StepBy,
|
||||
ops::RangeInclusive,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::{
|
||||
sync::{mpsc::Receiver, Mutex},
|
||||
time::MissedTickBehavior,
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
/// The maximum number of headers we read at once when handling a range filter.
|
||||
@ -30,19 +39,26 @@ pub struct EthFilter<Provider, Pool> {
|
||||
inner: Arc<EthFilterInner<Provider, Pool>>,
|
||||
}
|
||||
|
||||
impl<Provider, Pool> EthFilter<Provider, Pool> {
|
||||
impl<Provider, Pool> EthFilter<Provider, Pool>
|
||||
where
|
||||
Provider: Send + Sync + 'static,
|
||||
Pool: Send + Sync + 'static,
|
||||
{
|
||||
/// Creates a new, shareable instance.
|
||||
///
|
||||
/// This uses the given pool to get notified about new transactions, the provider to interact
|
||||
/// with the blockchain, the cache to fetch cacheable data, like the logs and the
|
||||
/// max_logs_per_response to limit the amount of logs returned in a single response
|
||||
/// `eth_getLogs`
|
||||
///
|
||||
/// This also spawns a task that periodically clears stale filters.
|
||||
pub fn new(
|
||||
provider: Provider,
|
||||
pool: Pool,
|
||||
eth_cache: EthStateCache,
|
||||
max_logs_per_response: usize,
|
||||
task_spawner: Box<dyn TaskSpawner>,
|
||||
stale_filter_ttl: Duration,
|
||||
) -> Self {
|
||||
let inner = EthFilterInner {
|
||||
provider,
|
||||
@ -53,14 +69,51 @@ impl<Provider, Pool> EthFilter<Provider, Pool> {
|
||||
eth_cache,
|
||||
max_headers_range: MAX_HEADERS_RANGE,
|
||||
task_spawner,
|
||||
stale_filter_ttl,
|
||||
};
|
||||
Self { inner: Arc::new(inner) }
|
||||
|
||||
let eth_filter = Self { inner: Arc::new(inner) };
|
||||
|
||||
let this = eth_filter.clone();
|
||||
eth_filter.inner.task_spawner.clone().spawn_critical(
|
||||
"eth-filters_stale-filters-clean",
|
||||
Box::pin(async move {
|
||||
this.watch_and_clear_stale_filters().await;
|
||||
}),
|
||||
);
|
||||
|
||||
eth_filter
|
||||
}
|
||||
|
||||
/// Returns all currently active filters
|
||||
pub fn active_filters(&self) -> &ActiveFilters {
|
||||
&self.inner.active_filters
|
||||
}
|
||||
|
||||
/// Endless future that [Self::clear_stale_filters] every `stale_filter_ttl` interval.
|
||||
async fn watch_and_clear_stale_filters(&self) {
|
||||
let mut interval = tokio::time::interval(self.inner.stale_filter_ttl);
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
self.clear_stale_filters(Instant::now()).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Clears all filters that have not been polled for longer than the configured
|
||||
/// `stale_filter_ttl` at the given instant.
|
||||
pub async fn clear_stale_filters(&self, now: Instant) {
|
||||
trace!(target: "rpc::eth", "clear stale filters");
|
||||
self.active_filters().inner.lock().await.retain(|id, filter| {
|
||||
let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
|
||||
|
||||
if !is_valid {
|
||||
trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
|
||||
}
|
||||
|
||||
is_valid
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<Provider, Pool> EthFilter<Provider, Pool>
|
||||
@ -244,7 +297,6 @@ impl<Provider, Pool> Clone for EthFilter<Provider, Pool> {
|
||||
#[derive(Debug)]
|
||||
struct EthFilterInner<Provider, Pool> {
|
||||
/// The transaction pool.
|
||||
#[allow(unused)] // we need this for non standard full transactions eventually
|
||||
pool: Pool,
|
||||
/// The provider that can interact with the chain.
|
||||
provider: Provider,
|
||||
@ -259,8 +311,9 @@ struct EthFilterInner<Provider, Pool> {
|
||||
/// maximum number of headers to read at once for range filter
|
||||
max_headers_range: u64,
|
||||
/// The type that can spawn tasks.
|
||||
#[allow(unused)]
|
||||
task_spawner: Box<dyn TaskSpawner>,
|
||||
/// Duration since the last filter poll, after which the filter is considered stale
|
||||
stale_filter_ttl: Duration,
|
||||
}
|
||||
|
||||
impl<Provider, Pool> EthFilterInner<Provider, Pool>
|
||||
|
||||
Reference in New Issue
Block a user