Merge pull request #45 from hl-archive-node/feat/add-cli-params-for-sources

feat: Add --local.fallback-threshold, --s3.polling-interval
This commit is contained in:
sprites0
2025-08-26 11:29:34 +09:00
committed by GitHub
8 changed files with 121 additions and 66 deletions

View File

@ -1,3 +1,7 @@
use std::time::Duration;
use crate::pseudo_peer::HlNodeBlockSourceArgs;
use super::config::BlockSourceConfig; use super::config::BlockSourceConfig;
use clap::{Args, Parser}; use clap::{Args, Parser};
use reth_node_core::args::LogArgs; use reth_node_core::args::LogArgs;
@ -13,7 +17,7 @@ pub struct BlockSourceArgs {
block_source: Option<String>, block_source: Option<String>,
#[arg(long, alias = "local-ingest-dir")] #[arg(long, alias = "local-ingest-dir")]
block_source_from_node: Option<String>, local_ingest_dir: Option<String>,
/// Shorthand of --block-source=s3://hl-mainnet-evm-blocks /// Shorthand of --block-source=s3://hl-mainnet-evm-blocks
#[arg(long, default_value_t = false)] #[arg(long, default_value_t = false)]
@ -22,6 +26,19 @@ pub struct BlockSourceArgs {
/// Shorthand of --block-source-from-node=~/hl/data/evm_blocks_and_receipts /// Shorthand of --block-source-from-node=~/hl/data/evm_blocks_and_receipts
#[arg(long)] #[arg(long)]
local: bool, local: bool,
/// Interval for polling new blocks in S3 in milliseconds.
#[arg(id = "s3.polling-interval", long = "s3.polling-interval", default_value = "25")]
s3_polling_interval: u64,
/// Maximum allowed delay for the hl-node block source in milliseconds.
/// If this threshold is exceeded, the client falls back to other sources.
#[arg(
id = "local.fallback-threshold",
long = "local.fallback-threshold",
default_value = "5000"
)]
local_fallback_threshold: u64,
} }
impl BlockSourceArgs { impl BlockSourceArgs {
@ -33,7 +50,10 @@ impl BlockSourceArgs {
async fn create_base_config(&self) -> eyre::Result<BlockSourceConfig> { async fn create_base_config(&self) -> eyre::Result<BlockSourceConfig> {
if self.s3 { if self.s3 {
return Ok(BlockSourceConfig::s3_default().await); return Ok(BlockSourceConfig::s3_default(Duration::from_millis(
self.s3_polling_interval,
))
.await);
} }
if self.local { if self.local {
@ -47,18 +67,25 @@ impl BlockSourceArgs {
}; };
if let Some(bucket) = value.strip_prefix("s3://") { if let Some(bucket) = value.strip_prefix("s3://") {
Ok(BlockSourceConfig::s3(bucket.to_string()).await) Ok(BlockSourceConfig::s3(
bucket.to_string(),
Duration::from_millis(self.s3_polling_interval),
)
.await)
} else { } else {
Ok(BlockSourceConfig::local(value.into())) Ok(BlockSourceConfig::local(value.into()))
} }
} }
fn apply_node_source_config(&self, config: BlockSourceConfig) -> BlockSourceConfig { fn apply_node_source_config(&self, config: BlockSourceConfig) -> BlockSourceConfig {
let Some(block_source_from_node) = self.block_source_from_node.as_ref() else { let Some(local_ingest_dir) = self.local_ingest_dir.as_ref() else {
return config; return config;
}; };
config.with_block_source_from_node(block_source_from_node.to_string()) config.with_block_source_from_node(HlNodeBlockSourceArgs {
root: local_ingest_dir.into(),
fallback_threshold: Duration::from_millis(self.local_fallback_threshold),
})
} }
} }

View File

@ -1,31 +1,38 @@
use crate::chainspec::HlChainSpec; use crate::chainspec::HlChainSpec;
use super::sources::{ use super::sources::{
BlockSourceBoxed, CachedBlockSource, HlNodeBlockSource, LocalBlockSource, S3BlockSource, BlockSourceBoxed, CachedBlockSource, HlNodeBlockSource, HlNodeBlockSourceArgs,
LocalBlockSource, S3BlockSource,
}; };
use aws_config::BehaviorVersion; use aws_config::BehaviorVersion;
use std::{env::home_dir, path::PathBuf, sync::Arc}; use std::{env::home_dir, path::PathBuf, sync::Arc, time::Duration};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct BlockSourceConfig { pub struct BlockSourceConfig {
pub source_type: BlockSourceType, pub source_type: BlockSourceType,
pub block_source_from_node: Option<String>, pub block_source_from_node: Option<HlNodeBlockSourceArgs>,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum BlockSourceType { pub enum BlockSourceType {
S3Default, S3Default { polling_interval: Duration },
S3 { bucket: String }, S3 { bucket: String, polling_interval: Duration },
Local { path: PathBuf }, Local { path: PathBuf },
} }
impl BlockSourceConfig { impl BlockSourceConfig {
pub async fn s3_default() -> Self { pub async fn s3_default(polling_interval: Duration) -> Self {
Self { source_type: BlockSourceType::S3Default, block_source_from_node: None } Self {
source_type: BlockSourceType::S3Default { polling_interval },
block_source_from_node: None,
}
} }
pub async fn s3(bucket: String) -> Self { pub async fn s3(bucket: String, polling_interval: Duration) -> Self {
Self { source_type: BlockSourceType::S3 { bucket }, block_source_from_node: None } Self {
source_type: BlockSourceType::S3 { bucket, polling_interval },
block_source_from_node: None,
}
} }
pub fn local(path: PathBuf) -> Self { pub fn local(path: PathBuf) -> Self {
@ -45,15 +52,22 @@ impl BlockSourceConfig {
} }
} }
pub fn with_block_source_from_node(mut self, block_source_from_node: String) -> Self { pub fn with_block_source_from_node(
mut self,
block_source_from_node: HlNodeBlockSourceArgs,
) -> Self {
self.block_source_from_node = Some(block_source_from_node); self.block_source_from_node = Some(block_source_from_node);
self self
} }
pub async fn create_block_source(&self, chain_spec: HlChainSpec) -> BlockSourceBoxed { pub async fn create_block_source(&self, chain_spec: HlChainSpec) -> BlockSourceBoxed {
match &self.source_type { match &self.source_type {
BlockSourceType::S3Default => s3_block_source(chain_spec.official_s3_bucket()).await, BlockSourceType::S3Default { polling_interval } => {
BlockSourceType::S3 { bucket } => s3_block_source(bucket).await, s3_block_source(chain_spec.official_s3_bucket(), *polling_interval).await
}
BlockSourceType::S3 { bucket, polling_interval } => {
s3_block_source(bucket, *polling_interval).await
}
BlockSourceType::Local { path } => { BlockSourceType::Local { path } => {
Arc::new(Box::new(LocalBlockSource::new(path.clone()))) Arc::new(Box::new(LocalBlockSource::new(path.clone())))
} }
@ -72,7 +86,7 @@ impl BlockSourceConfig {
Arc::new(Box::new( Arc::new(Box::new(
HlNodeBlockSource::new( HlNodeBlockSource::new(
fallback_block_source, fallback_block_source,
PathBuf::from(block_source_from_node.clone()), block_source_from_node.clone(),
next_block_number, next_block_number,
) )
.await, .await,
@ -91,9 +105,9 @@ impl BlockSourceConfig {
} }
} }
async fn s3_block_source(bucket: impl AsRef<str>) -> BlockSourceBoxed { async fn s3_block_source(bucket: impl AsRef<str>, polling_interval: Duration) -> BlockSourceBoxed {
let client = aws_sdk_s3::Client::new( let client = aws_sdk_s3::Client::new(
&aws_config::defaults(BehaviorVersion::latest()).region("ap-northeast-1").load().await, &aws_config::defaults(BehaviorVersion::latest()).region("ap-northeast-1").load().await,
); );
Arc::new(Box::new(S3BlockSource::new(client, bucket.as_ref().to_string()))) Arc::new(Box::new(S3BlockSource::new(client, bucket.as_ref().to_string(), polling_interval)))
} }

View File

@ -26,7 +26,6 @@ use std::{
pin::Pin, pin::Pin,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use tokio::{sync::mpsc, task::JoinHandle}; use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{debug, info}; use tracing::{debug, info};
@ -49,8 +48,6 @@ pub struct BlockPoller {
} }
impl BlockPoller { impl BlockPoller {
const POLL_INTERVAL: Duration = Duration::from_millis(25);
pub fn new_suspended<BS: BlockSource>( pub fn new_suspended<BS: BlockSource>(
chain_id: u64, chain_id: u64,
block_source: BS, block_source: BS,
@ -77,6 +74,7 @@ impl BlockPoller {
start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?; start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?;
info!("Starting block poller"); info!("Starting block poller");
let polling_interval = block_source.polling_interval();
let mut next_block_number = block_source let mut next_block_number = block_source
.find_latest_block_number() .find_latest_block_number()
.await .await
@ -88,7 +86,7 @@ impl BlockPoller {
block_tx_clone.send((next_block_number, block)).await?; block_tx_clone.send((next_block_number, block)).await?;
next_block_number += 1; next_block_number += 1;
} }
Err(_) => tokio::time::sleep(Self::POLL_INTERVAL).await, Err(_) => tokio::time::sleep(polling_interval).await,
} }
} }
} }

View File

@ -41,4 +41,8 @@ impl BlockSource for CachedBlockSource {
fn recommended_chunk_size(&self) -> u64 { fn recommended_chunk_size(&self) -> u64 {
self.block_source.recommended_chunk_size() self.block_source.recommended_chunk_size()
} }
fn polling_interval(&self) -> std::time::Duration {
self.block_source.polling_interval()
}
} }

View File

@ -14,35 +14,39 @@ use self::{
use super::{BlockSource, BlockSourceBoxed}; use super::{BlockSource, BlockSourceBoxed};
use crate::node::types::BlockAndReceipts; use crate::node::types::BlockAndReceipts;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
time::Duration,
}; };
use time::{Duration, OffsetDateTime}; use time::OffsetDateTime;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{info, warn}; use tracing::{info, warn};
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
const HOURLY_SUBDIR: &str = "hourly"; const HOURLY_SUBDIR: &str = "hourly";
const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour
const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000); const ONE_HOUR: Duration = Duration::from_secs(60 * 60);
const TAIL_INTERVAL: Duration = Duration::from_millis(25);
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct LocalBlockAndReceipts(String, BlockAndReceipts); pub struct HlNodeBlockSourceArgs {
pub root: PathBuf,
pub fallback_threshold: Duration,
}
/// Block source that monitors the local ingest directory for the HL node. /// Block source that monitors the local ingest directory for the HL node.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HlNodeBlockSource { pub struct HlNodeBlockSource {
pub fallback: BlockSourceBoxed, pub fallback: BlockSourceBoxed,
pub local_ingest_dir: PathBuf,
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>, pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>, pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
pub args: HlNodeBlockSourceArgs,
} }
impl BlockSource for HlNodeBlockSource { impl BlockSource for HlNodeBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> { fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
let fallback = self.fallback.clone(); let fallback = self.fallback.clone();
let args = self.args.clone();
let local_blocks_cache = self.local_blocks_cache.clone(); let local_blocks_cache = self.local_blocks_cache.clone();
let last_local_fetch = self.last_local_fetch.clone(); let last_local_fetch = self.last_local_fetch.clone();
Box::pin(async move { Box::pin(async move {
@ -55,7 +59,7 @@ impl BlockSource for HlNodeBlockSource {
if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await { if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await {
let more_recent = last_height < height; let more_recent = last_height < height;
let too_soon = now - last_poll_time < MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; let too_soon = now - last_poll_time < args.fallback_threshold;
if more_recent && too_soon { if more_recent && too_soon {
return Err(eyre::eyre!( return Err(eyre::eyre!(
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
@ -71,12 +75,12 @@ impl BlockSource for HlNodeBlockSource {
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> { fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
let fallback = self.fallback.clone(); let fallback = self.fallback.clone();
let local_ingest_dir = self.local_ingest_dir.clone(); let args = self.args.clone();
Box::pin(async move { Box::pin(async move {
let Some(dir) = FileOperations::find_latest_hourly_file(&local_ingest_dir) else { let Some(dir) = FileOperations::find_latest_hourly_file(&args.root) else {
warn!( warn!(
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir", "No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
local_ingest_dir args.root
); );
return fallback.find_latest_block_number().await; return fallback.find_latest_block_number().await;
}; };
@ -160,7 +164,7 @@ impl HlNodeBlockSource {
} }
async fn start_local_ingest_loop(&self, current_head: u64) { async fn start_local_ingest_loop(&self, current_head: u64) {
let root = self.local_ingest_dir.to_owned(); let root = self.args.root.to_owned();
let cache = self.local_blocks_cache.clone(); let cache = self.local_blocks_cache.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut next_height = current_head; let mut next_height = current_head;
@ -185,8 +189,8 @@ impl HlNodeBlockSource {
cache.lock().await.load_scan_result(scan_result); cache.lock().await.load_scan_result(scan_result);
} }
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
if dt + Duration::HOUR < now { if dt + ONE_HOUR < now {
dt += Duration::HOUR; dt += ONE_HOUR;
(hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0); (hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0);
info!( info!(
"Moving to new file: {:?}", "Moving to new file: {:?}",
@ -201,7 +205,7 @@ impl HlNodeBlockSource {
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
let _ = Self::try_backfill_local_blocks( let _ = Self::try_backfill_local_blocks(
&self.local_ingest_dir, &self.args.root,
&self.local_blocks_cache, &self.local_blocks_cache,
next_block_number, next_block_number,
) )
@ -212,12 +216,12 @@ impl HlNodeBlockSource {
pub async fn new( pub async fn new(
fallback: BlockSourceBoxed, fallback: BlockSourceBoxed,
local_ingest_dir: PathBuf, args: HlNodeBlockSourceArgs,
next_block_number: u64, next_block_number: u64,
) -> Self { ) -> Self {
let block_source = Self { let block_source = Self {
fallback, fallback,
local_ingest_dir, args,
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))), local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))),
last_local_fetch: Arc::new(Mutex::new(None)), last_local_fetch: Arc::new(Mutex::new(None)),
}; };

View File

@ -1,11 +1,13 @@
use super::*; use super::*;
use crate::{ use crate::{
node::types::{reth_compat, ReadPrecompileCalls}, node::types::{reth_compat, ReadPrecompileCalls},
pseudo_peer::sources::LocalBlockSource, pseudo_peer::sources::{hl_node::scan::LocalBlockAndReceipts, LocalBlockSource},
}; };
use alloy_consensus::{BlockBody, Header}; use alloy_consensus::{BlockBody, Header};
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
use std::{io::Write, time::Duration as StdDuration}; use std::{io::Write, time::Duration};
const DEFAULT_FALLBACK_THRESHOLD_FOR_TEST: Duration = Duration::from_millis(5000);
#[test] #[test]
fn test_datetime_from_path() { fn test_datetime_from_path() {
@ -111,7 +113,10 @@ async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
// Setup fallback block source // Setup fallback block source
let block_source_fallback = HlNodeBlockSource::new( let block_source_fallback = HlNodeBlockSource::new(
BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))), BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))),
PathBuf::from("/nonexistent"), HlNodeBlockSourceArgs {
root: { PathBuf::from("/nonexistent") },
fallback_threshold: DEFAULT_FALLBACK_THRESHOLD_FOR_TEST,
},
1000000, 1000000,
) )
.await; .await;
@ -124,7 +129,10 @@ async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
let block_source = HlNodeBlockSource::new( let block_source = HlNodeBlockSource::new(
BlockSourceBoxed::new(Box::new(block_source_fallback.clone())), BlockSourceBoxed::new(Box::new(block_source_fallback.clone())),
temp_dir1.path().to_path_buf(), HlNodeBlockSourceArgs {
root: temp_dir1.path().to_path_buf(),
fallback_threshold: DEFAULT_FALLBACK_THRESHOLD_FOR_TEST,
},
1000000, 1000000,
) )
.await; .await;
@ -159,7 +167,7 @@ async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> {
assert!(block.is_err()); assert!(block.is_err());
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?; writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?;
tokio::time::sleep(StdDuration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
let block = block_source.collect_block(1000001).await.unwrap(); let block = block_source.collect_block(1000001).await.unwrap();
assert_eq!(block, future_block_hl_node.1); assert_eq!(block, future_block_hl_node.1);
@ -177,7 +185,7 @@ async fn test_update_last_fetch_fallback() -> eyre::Result<()> {
let block = block_source.collect_block(1000000).await.unwrap(); let block = block_source.collect_block(1000000).await.unwrap();
assert_eq!(block, current_block.1); assert_eq!(block, current_block.1);
tokio::time::sleep(MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()).await; tokio::time::sleep(DEFAULT_FALLBACK_THRESHOLD_FOR_TEST).await;
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?; writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?;
let block = block_source.collect_block(1000001).await.unwrap(); let block = block_source.collect_block(1000001).await.unwrap();

View File

@ -1,6 +1,7 @@
use crate::node::types::BlockAndReceipts; use crate::node::types::BlockAndReceipts;
use auto_impl::auto_impl;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use std::sync::Arc; use std::{sync::Arc, time::Duration};
// Module declarations // Module declarations
mod cached; mod cached;
@ -11,11 +12,14 @@ mod utils;
// Public exports // Public exports
pub use cached::CachedBlockSource; pub use cached::CachedBlockSource;
pub use hl_node::HlNodeBlockSource; pub use hl_node::{HlNodeBlockSource, HlNodeBlockSourceArgs};
pub use local::LocalBlockSource; pub use local::LocalBlockSource;
pub use s3::S3BlockSource; pub use s3::S3BlockSource;
const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_millis(25);
/// Trait for block sources that can retrieve blocks from various sources /// Trait for block sources that can retrieve blocks from various sources
#[auto_impl(&, &mut, Box, Arc)]
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static { pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
/// Retrieves a block at the specified height /// Retrieves a block at the specified height
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>>; fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>>;
@ -25,21 +29,12 @@ pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
/// Returns the recommended chunk size for batch operations /// Returns the recommended chunk size for batch operations
fn recommended_chunk_size(&self) -> u64; fn recommended_chunk_size(&self) -> u64;
/// Returns the polling interval
fn polling_interval(&self) -> Duration {
DEFAULT_POLLING_INTERVAL
}
} }
/// Type alias for a boxed block source /// Type alias for a boxed block source
pub type BlockSourceBoxed = Arc<Box<dyn BlockSource>>; pub type BlockSourceBoxed = Arc<Box<dyn BlockSource>>;
impl BlockSource for BlockSourceBoxed {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
self.as_ref().collect_block(height)
}
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
self.as_ref().find_latest_block_number()
}
fn recommended_chunk_size(&self) -> u64 {
self.as_ref().recommended_chunk_size()
}
}

View File

@ -2,7 +2,7 @@ use super::{utils, BlockSource};
use crate::node::types::BlockAndReceipts; use crate::node::types::BlockAndReceipts;
use aws_sdk_s3::types::RequestPayer; use aws_sdk_s3::types::RequestPayer;
use futures::{future::BoxFuture, FutureExt}; use futures::{future::BoxFuture, FutureExt};
use std::sync::Arc; use std::{sync::Arc, time::Duration};
use tracing::info; use tracing::info;
/// Block source that reads blocks from S3 (--s3) /// Block source that reads blocks from S3 (--s3)
@ -10,11 +10,12 @@ use tracing::info;
pub struct S3BlockSource { pub struct S3BlockSource {
client: Arc<aws_sdk_s3::Client>, client: Arc<aws_sdk_s3::Client>,
bucket: String, bucket: String,
polling_interval: Duration,
} }
impl S3BlockSource { impl S3BlockSource {
pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self { pub fn new(client: aws_sdk_s3::Client, bucket: String, polling_interval: Duration) -> Self {
Self { client: client.into(), bucket } Self { client: client.into(), bucket, polling_interval }
} }
async fn pick_path_with_highest_number( async fn pick_path_with_highest_number(
@ -87,4 +88,8 @@ impl BlockSource for S3BlockSource {
fn recommended_chunk_size(&self) -> u64 { fn recommended_chunk_size(&self) -> u64 {
1000 1000
} }
fn polling_interval(&self) -> Duration {
self.polling_interval
}
} }