mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore: clippy
This commit is contained in:
@ -133,16 +133,19 @@ pub struct HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSource for HlNodeBlockSource {
|
impl BlockSource for HlNodeBlockSource {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
|
let fallback = self.fallback.clone();
|
||||||
|
let local_blocks_cache = self.local_blocks_cache.clone();
|
||||||
|
let last_local_fetch = self.last_local_fetch.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let now = OffsetDateTime::now_utc();
|
let now = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
if let Some(block) = self.try_collect_local_block(height).await {
|
if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await {
|
||||||
self.update_last_fetch(height, now).await;
|
Self::update_last_fetch(last_local_fetch, height, now).await;
|
||||||
return Ok(block);
|
return Ok(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await {
|
if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await {
|
||||||
let more_recent = last_height < height;
|
let more_recent = last_height < height;
|
||||||
let too_soon = now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
|
let too_soon = now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
|
||||||
if more_recent && too_soon {
|
if more_recent && too_soon {
|
||||||
@ -152,20 +155,22 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let block = self.fallback.collect_block(height).await?;
|
let block = fallback.collect_block(height).await?;
|
||||||
self.update_last_fetch(height, now).await;
|
Self::update_last_fetch(last_local_fetch, height, now).await;
|
||||||
Ok(block)
|
Ok(block)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
|
let fallback = self.fallback.clone();
|
||||||
|
let local_ingest_dir = self.local_ingest_dir.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let Some(dir) = Self::find_latest_hourly_file(&self.local_ingest_dir) else {
|
let Some(dir) = Self::find_latest_hourly_file(&local_ingest_dir) else {
|
||||||
warn!(
|
warn!(
|
||||||
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
|
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
|
||||||
self.local_ingest_dir
|
local_ingest_dir
|
||||||
);
|
);
|
||||||
return self.fallback.find_latest_block_number().await;
|
return fallback.find_latest_block_number().await;
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut file = File::open(&dir).expect("Failed to open hour file path");
|
let mut file = File::open(&dir).expect("Failed to open hour file path");
|
||||||
@ -177,7 +182,7 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
|
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
|
||||||
file
|
file
|
||||||
);
|
);
|
||||||
self.fallback.find_latest_block_number().await
|
fallback.find_latest_block_number().await
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -228,15 +233,22 @@ impl HlNodeBlockSource {
|
|||||||
/// fallback attempts.
|
/// fallback attempts.
|
||||||
pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000);
|
pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000);
|
||||||
|
|
||||||
async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) {
|
async fn update_last_fetch(
|
||||||
let mut last_fetch = self.last_local_fetch.lock().await;
|
last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
||||||
|
height: u64,
|
||||||
|
now: OffsetDateTime,
|
||||||
|
) {
|
||||||
|
let mut last_fetch = last_local_fetch.lock().await;
|
||||||
if last_fetch.is_none_or(|(h, _)| h < height) {
|
if last_fetch.is_none_or(|(h, _)| h < height) {
|
||||||
*last_fetch = Some((height, now));
|
*last_fetch = Some((height, now));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
async fn try_collect_local_block(
|
||||||
let mut u_cache = self.local_blocks_cache.lock().await;
|
local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
||||||
|
height: u64,
|
||||||
|
) -> Option<BlockAndReceipts> {
|
||||||
|
let mut u_cache = local_blocks_cache.lock().await;
|
||||||
if let Some(block) = u_cache.cache.remove(&height) {
|
if let Some(block) = u_cache.cache.remove(&height) {
|
||||||
return Some(block);
|
return Some(block);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -13,8 +13,8 @@ mod hl_node;
|
|||||||
pub use hl_node::HlNodeBlockSource;
|
pub use hl_node::HlNodeBlockSource;
|
||||||
|
|
||||||
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
|
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>>;
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>>;
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>>;
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>>;
|
||||||
fn recommended_chunk_size(&self) -> u64;
|
fn recommended_chunk_size(&self) -> u64;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,7 +81,7 @@ impl S3BlockSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSource for S3BlockSource {
|
impl BlockSource for S3BlockSource {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
let bucket = self.bucket.clone();
|
let bucket = self.bucket.clone();
|
||||||
async move {
|
async move {
|
||||||
@ -100,7 +100,7 @@ impl BlockSource for S3BlockSource {
|
|||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
let bucket = self.bucket.clone();
|
let bucket = self.bucket.clone();
|
||||||
async move {
|
async move {
|
||||||
@ -138,7 +138,7 @@ impl BlockSource for S3BlockSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSource for LocalBlockSource {
|
impl BlockSource for LocalBlockSource {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
let dir = self.dir.clone();
|
let dir = self.dir.clone();
|
||||||
async move {
|
async move {
|
||||||
let path = dir.join(rmp_path(height));
|
let path = dir.join(rmp_path(height));
|
||||||
@ -152,7 +152,7 @@ impl BlockSource for LocalBlockSource {
|
|||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
let dir = self.dir.clone();
|
let dir = self.dir.clone();
|
||||||
async move {
|
async move {
|
||||||
let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?;
|
let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?;
|
||||||
@ -202,11 +202,11 @@ fn rmp_path(height: u64) -> String {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSource for BlockSourceBoxed {
|
impl BlockSource for BlockSourceBoxed {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
self.as_ref().collect_block(height)
|
self.as_ref().collect_block(height)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
self.as_ref().find_latest_block_number()
|
self.as_ref().find_latest_block_number()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,7 +229,7 @@ impl CachedBlockSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSource for CachedBlockSource {
|
impl BlockSource for CachedBlockSource {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
let block_source = self.block_source.clone();
|
let block_source = self.block_source.clone();
|
||||||
let cache = self.cache.clone();
|
let cache = self.cache.clone();
|
||||||
async move {
|
async move {
|
||||||
@ -243,7 +243,7 @@ impl BlockSource for CachedBlockSource {
|
|||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
self.block_source.find_latest_block_number()
|
self.block_source.find_latest_block_number()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -37,7 +37,7 @@ impl EthForwarderExt {
|
|||||||
Self { client }
|
Self { client }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_client_error(e: ClientError, internal_error_prefix: &str) -> ErrorObject {
|
fn from_client_error(e: ClientError, internal_error_prefix: &str) -> ErrorObject<'static> {
|
||||||
match e {
|
match e {
|
||||||
ClientError::Call(e) => e,
|
ClientError::Call(e) => e,
|
||||||
_ => ErrorObject::owned(
|
_ => ErrorObject::owned(
|
||||||
|
|||||||
Reference in New Issue
Block a user