mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
fix/perf: Fix last line scanner, wait 0.5s before fallback if it's more recent, add tests
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -9357,6 +9357,7 @@ dependencies = [
|
|||||||
"rmp-serde",
|
"rmp-serde",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"tempfile",
|
||||||
"thiserror 2.0.12",
|
"thiserror 2.0.12",
|
||||||
"tikv-jemalloc-ctl",
|
"tikv-jemalloc-ctl",
|
||||||
"tikv-jemallocator",
|
"tikv-jemallocator",
|
||||||
|
|||||||
@ -164,5 +164,5 @@ client = [
|
|||||||
"reth-rpc-eth-api/client",
|
"reth-rpc-eth-api/client",
|
||||||
]
|
]
|
||||||
|
|
||||||
[profile.test]
|
[dev-dependencies]
|
||||||
inherits = "release"
|
tempfile = "3.20.0"
|
||||||
|
|||||||
@ -14,7 +14,7 @@ pub type ReadPrecompileCall = (Address, Vec<(ReadPrecompileInput, ReadPrecompile
|
|||||||
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)]
|
||||||
pub struct ReadPrecompileCalls(pub Vec<ReadPrecompileCall>);
|
pub struct ReadPrecompileCalls(pub Vec<ReadPrecompileCall>);
|
||||||
|
|
||||||
mod reth_compat;
|
pub(crate) mod reth_compat;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
pub struct HlExtras {
|
pub struct HlExtras {
|
||||||
@ -38,7 +38,7 @@ impl Decodable for ReadPrecompileCalls {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
|
||||||
pub struct BlockAndReceipts {
|
pub struct BlockAndReceipts {
|
||||||
pub block: EvmBlock,
|
pub block: EvmBlock,
|
||||||
pub receipts: Vec<LegacyReceipt>,
|
pub receipts: Vec<LegacyReceipt>,
|
||||||
@ -71,12 +71,12 @@ impl BlockAndReceipts {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
|
||||||
pub enum EvmBlock {
|
pub enum EvmBlock {
|
||||||
Reth115(reth_compat::SealedBlock),
|
Reth115(reth_compat::SealedBlock),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
|
||||||
pub struct LegacyReceipt {
|
pub struct LegacyReceipt {
|
||||||
tx_type: LegacyTxType,
|
tx_type: LegacyTxType,
|
||||||
success: bool,
|
success: bool,
|
||||||
@ -84,7 +84,7 @@ pub struct LegacyReceipt {
|
|||||||
logs: Vec<Log>,
|
logs: Vec<Log>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
|
||||||
enum LegacyTxType {
|
enum LegacyTxType {
|
||||||
Legacy = 0,
|
Legacy = 0,
|
||||||
Eip2930 = 1,
|
Eip2930 = 1,
|
||||||
@ -93,7 +93,7 @@ enum LegacyTxType {
|
|||||||
Eip7702 = 4,
|
Eip7702 = 4,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
|
||||||
pub struct SystemTx {
|
pub struct SystemTx {
|
||||||
pub tx: reth_compat::Transaction,
|
pub tx: reth_compat::Transaction,
|
||||||
pub receipt: Option<LegacyReceipt>,
|
pub receipt: Option<LegacyReceipt>,
|
||||||
|
|||||||
@ -9,7 +9,7 @@ use std::{
|
|||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use rangemap::RangeInclusiveMap;
|
use rangemap::RangeInclusiveMap;
|
||||||
use reth_network::cache::LruMap;
|
use reth_network::cache::LruMap;
|
||||||
use serde::Deserialize;
|
use serde::{Deserialize, Serialize};
|
||||||
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
|
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
@ -47,15 +47,7 @@ impl LocalBlocksCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block source that monitors the local ingest directory for the HL node.
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct HlNodeBlockSource {
|
|
||||||
pub fallback: BlockSourceBoxed,
|
|
||||||
pub local_ingest_dir: PathBuf,
|
|
||||||
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>, // height → block
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct LocalBlockAndReceipts(String, BlockAndReceipts);
|
struct LocalBlockAndReceipts(String, BlockAndReceipts);
|
||||||
|
|
||||||
struct ScanResult {
|
struct ScanResult {
|
||||||
@ -139,14 +131,38 @@ fn date_from_datetime(dt: OffsetDateTime) -> String {
|
|||||||
dt.format(&format_description!("[year][month][day]")).unwrap()
|
dt.format(&format_description!("[year][month][day]")).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Block source that monitors the local ingest directory for the HL node.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct HlNodeBlockSource {
|
||||||
|
pub fallback: BlockSourceBoxed,
|
||||||
|
pub local_ingest_dir: PathBuf,
|
||||||
|
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>, // height → block
|
||||||
|
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>, // for rate limiting requests to fallback
|
||||||
|
}
|
||||||
|
|
||||||
impl BlockSource for HlNodeBlockSource {
|
impl BlockSource for HlNodeBlockSource {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
let now = OffsetDateTime::now_utc();
|
||||||
if let Some(block) = self.try_collect_local_block(height).await {
|
if let Some(block) = self.try_collect_local_block(height).await {
|
||||||
|
self.update_last_fetch(height, now).await;
|
||||||
Ok(block)
|
Ok(block)
|
||||||
} else {
|
} else {
|
||||||
|
if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await {
|
||||||
|
let more_recent = last_height < height;
|
||||||
|
let too_soon =
|
||||||
|
now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
|
||||||
|
if more_recent && too_soon {
|
||||||
|
return Err(eyre::eyre!(
|
||||||
|
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Falling back to s3/ingest-dir for block @ Height [{height}]");
|
info!("Falling back to s3/ingest-dir for block @ Height [{height}]");
|
||||||
self.fallback.collect_block(height).await
|
let block = self.fallback.collect_block(height).await?;
|
||||||
|
self.update_last_fetch(height, now).await;
|
||||||
|
Ok(block)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -161,17 +177,16 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
return self.fallback.find_latest_block_number().await;
|
return self.fallback.find_latest_block_number().await;
|
||||||
};
|
};
|
||||||
let mut file = File::open(&dir).expect("Failed to open hour file path");
|
let mut file = File::open(&dir).expect("Failed to open hour file path");
|
||||||
let last_line = read_last_complete_line(&mut file);
|
if let Some((_, height)) = read_last_complete_line(&mut file) {
|
||||||
let Ok((_, height)) = line_to_evm_block(&last_line) else {
|
info!("Latest block number: {} with path {}", height, dir.display());
|
||||||
|
Some(height)
|
||||||
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
|
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
|
||||||
file
|
file
|
||||||
);
|
);
|
||||||
return self.fallback.find_latest_block_number().await;
|
self.fallback.find_latest_block_number().await
|
||||||
};
|
}
|
||||||
|
|
||||||
info!("Latest block number: {} with path {}", height, dir.display());
|
|
||||||
Some(height)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -180,8 +195,8 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> String {
|
fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> Option<(BlockAndReceipts, u64)> {
|
||||||
const CHUNK_SIZE: u64 = 4096;
|
const CHUNK_SIZE: u64 = 50000;
|
||||||
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
|
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
|
||||||
let mut pos = read.seek(SeekFrom::End(0)).unwrap();
|
let mut pos = read.seek(SeekFrom::End(0)).unwrap();
|
||||||
let mut last_line: Vec<u8> = Vec::new();
|
let mut last_line: Vec<u8> = Vec::new();
|
||||||
@ -201,9 +216,12 @@ fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> String {
|
|||||||
|
|
||||||
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
|
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
|
||||||
let candidate = &last_line[idx + 1..];
|
let candidate = &last_line[idx + 1..];
|
||||||
if line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()).is_ok() {
|
if let Ok((evm_block, height)) =
|
||||||
return String::from_utf8(candidate.to_vec()).unwrap();
|
line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap())
|
||||||
|
{
|
||||||
|
return Some((evm_block, height));
|
||||||
}
|
}
|
||||||
|
// Incomplete line; truncate and continue
|
||||||
last_line.truncate(idx);
|
last_line.truncate(idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,10 +231,31 @@ fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> String {
|
|||||||
pos -= read_size;
|
pos -= read_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
String::from_utf8(last_line).unwrap()
|
println!(
|
||||||
|
"last_line: {:?} {:?}",
|
||||||
|
String::from_utf8(last_line.clone()).unwrap(),
|
||||||
|
line_to_evm_block(&String::from_utf8(last_line.clone()).unwrap())
|
||||||
|
);
|
||||||
|
line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HlNodeBlockSource {
|
impl HlNodeBlockSource {
|
||||||
|
/// [HlNodeBlockSource] picks the faster one between local ingest directory and s3/ingest-dir.
|
||||||
|
/// But if we immediately fallback to s3/ingest-dir, in case of S3, it may cause unnecessary
|
||||||
|
/// requests to S3 while it'll return 404.
|
||||||
|
///
|
||||||
|
/// So we allow a small threshold to avoid unnecessary fallback.
|
||||||
|
pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(500);
|
||||||
|
|
||||||
|
async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) {
|
||||||
|
if let Some((last_height, _)) = *self.last_local_fetch.lock().await {
|
||||||
|
if last_height >= height {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*self.last_local_fetch.lock().await = Some((height, now));
|
||||||
|
}
|
||||||
|
|
||||||
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
||||||
let mut u_cache = self.local_blocks_cache.lock().await;
|
let mut u_cache = self.local_blocks_cache.lock().await;
|
||||||
if let Some(block) = u_cache.cache.remove(&height) {
|
if let Some(block) = u_cache.cache.remove(&height) {
|
||||||
@ -276,8 +315,9 @@ impl HlNodeBlockSource {
|
|||||||
|
|
||||||
for subfile in Self::all_hourly_files(root).unwrap_or_default() {
|
for subfile in Self::all_hourly_files(root).unwrap_or_default() {
|
||||||
let mut file = File::open(&subfile).expect("Failed to open hour file path");
|
let mut file = File::open(&subfile).expect("Failed to open hour file path");
|
||||||
let last_line = read_last_complete_line(&mut file);
|
println!("subfile: {:?} {:?}", subfile, read_last_complete_line(&mut file));
|
||||||
if let Ok((_, height)) = line_to_evm_block(&last_line) {
|
let mut file = File::open(&subfile).expect("Failed to open hour file path");
|
||||||
|
if let Some((_, height)) = read_last_complete_line(&mut file) {
|
||||||
if height < cutoff_height {
|
if height < cutoff_height {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -387,6 +427,7 @@ impl HlNodeBlockSource {
|
|||||||
fallback,
|
fallback,
|
||||||
local_ingest_dir,
|
local_ingest_dir,
|
||||||
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())),
|
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())),
|
||||||
|
last_local_fetch: Arc::new(Mutex::new(None)),
|
||||||
};
|
};
|
||||||
block_source.run(next_block_number).await.unwrap();
|
block_source.run(next_block_number).await.unwrap();
|
||||||
block_source
|
block_source
|
||||||
@ -395,7 +436,12 @@ impl HlNodeBlockSource {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::node::types::ReadPrecompileCalls;
|
||||||
|
use crate::pseudo_peer::sources::LocalBlockSource;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_datetime_from_path() {
|
fn test_datetime_from_path() {
|
||||||
let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4");
|
let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4");
|
||||||
@ -420,4 +466,175 @@ mod tests {
|
|||||||
Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22"))
|
Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22"))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use std::io::Write;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use crate::node::types::reth_compat;
|
||||||
|
use alloy_consensus::{BlockBody, Header};
|
||||||
|
|
||||||
|
fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult {
|
||||||
|
let height = match &block.block {
|
||||||
|
EvmBlock::Reth115(b) => b.header.header.number,
|
||||||
|
};
|
||||||
|
ScanResult {
|
||||||
|
path: PathBuf::from("/nonexistent-block"),
|
||||||
|
next_expected_height: height + 1,
|
||||||
|
new_blocks: vec![block],
|
||||||
|
new_block_ranges: vec![height..=height],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn empty_block(
|
||||||
|
number: u64,
|
||||||
|
timestamp: u64,
|
||||||
|
extra_data: &'static [u8],
|
||||||
|
) -> LocalBlockAndReceipts {
|
||||||
|
let extra_data = Bytes::from_static(extra_data);
|
||||||
|
let res = BlockAndReceipts {
|
||||||
|
block: EvmBlock::Reth115(reth_compat::SealedBlock {
|
||||||
|
header: reth_compat::SealedHeader {
|
||||||
|
header: Header {
|
||||||
|
parent_hash: B256::ZERO,
|
||||||
|
ommers_hash: B256::ZERO,
|
||||||
|
beneficiary: Address::ZERO,
|
||||||
|
state_root: B256::ZERO,
|
||||||
|
transactions_root: B256::ZERO,
|
||||||
|
receipts_root: B256::ZERO,
|
||||||
|
logs_bloom: Bloom::ZERO,
|
||||||
|
difficulty: U256::ZERO,
|
||||||
|
number,
|
||||||
|
gas_limit: 0,
|
||||||
|
gas_used: 0,
|
||||||
|
timestamp,
|
||||||
|
extra_data,
|
||||||
|
mix_hash: B256::ZERO,
|
||||||
|
nonce: B64::ZERO,
|
||||||
|
base_fee_per_gas: None,
|
||||||
|
withdrawals_root: None,
|
||||||
|
blob_gas_used: None,
|
||||||
|
excess_blob_gas: None,
|
||||||
|
parent_beacon_block_root: None,
|
||||||
|
requests_hash: None,
|
||||||
|
},
|
||||||
|
hash: B256::ZERO,
|
||||||
|
},
|
||||||
|
body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None },
|
||||||
|
}),
|
||||||
|
receipts: vec![],
|
||||||
|
system_txs: vec![],
|
||||||
|
read_precompile_calls: ReadPrecompileCalls(vec![]),
|
||||||
|
highest_precompile_address: None,
|
||||||
|
};
|
||||||
|
LocalBlockAndReceipts(timestamp.to_string(), res)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> {
|
||||||
|
let now = OffsetDateTime::now_utc();
|
||||||
|
let day_str = date_from_datetime(now);
|
||||||
|
let hour = now.hour();
|
||||||
|
|
||||||
|
let temp_dir = tempfile::tempdir()?;
|
||||||
|
let path = temp_dir.path().join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
|
||||||
|
std::fs::create_dir_all(path.parent().unwrap())?;
|
||||||
|
|
||||||
|
Ok((temp_dir, File::create(path)?))
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BlockSourceHierarchy {
|
||||||
|
block_source: HlNodeBlockSource,
|
||||||
|
_temp_dir: tempfile::TempDir,
|
||||||
|
file1: File,
|
||||||
|
current_block: LocalBlockAndReceipts,
|
||||||
|
future_block_hl_node: LocalBlockAndReceipts,
|
||||||
|
future_block_fallback: LocalBlockAndReceipts,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
|
||||||
|
// Setup fallback block source
|
||||||
|
let block_source_fallback = HlNodeBlockSource::new(
|
||||||
|
BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))),
|
||||||
|
PathBuf::from("/nonexistent"),
|
||||||
|
1000000,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node");
|
||||||
|
let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node");
|
||||||
|
let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback");
|
||||||
|
|
||||||
|
let (temp_dir1, mut file1) = setup_temp_dir_and_file()?;
|
||||||
|
writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?;
|
||||||
|
|
||||||
|
let block_source = HlNodeBlockSource::new(
|
||||||
|
BlockSourceBoxed::new(Box::new(block_source_fallback.clone())),
|
||||||
|
temp_dir1.path().to_path_buf(),
|
||||||
|
1000000,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
block_source_fallback
|
||||||
|
.local_blocks_cache
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone()));
|
||||||
|
|
||||||
|
Ok(BlockSourceHierarchy {
|
||||||
|
block_source,
|
||||||
|
_temp_dir: temp_dir1,
|
||||||
|
file1,
|
||||||
|
current_block: block_hl_node_0,
|
||||||
|
future_block_hl_node: block_hl_node_1,
|
||||||
|
future_block_fallback: block_fallback_1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> {
|
||||||
|
let hierarchy = setup_block_source_hierarchy().await?;
|
||||||
|
let BlockSourceHierarchy {
|
||||||
|
block_source,
|
||||||
|
current_block,
|
||||||
|
future_block_hl_node,
|
||||||
|
mut file1,
|
||||||
|
..
|
||||||
|
} = hierarchy;
|
||||||
|
|
||||||
|
let block = block_source.collect_block(1000000).await.unwrap();
|
||||||
|
assert_eq!(block, current_block.1);
|
||||||
|
|
||||||
|
let block = block_source.collect_block(1000001).await;
|
||||||
|
assert!(block.is_err());
|
||||||
|
|
||||||
|
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?;
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
let block = block_source.collect_block(1000001).await.unwrap();
|
||||||
|
assert_eq!(block, future_block_hl_node.1);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_update_last_fetch_fallback() -> eyre::Result<()> {
|
||||||
|
let hierarchy = setup_block_source_hierarchy().await?;
|
||||||
|
let BlockSourceHierarchy {
|
||||||
|
block_source,
|
||||||
|
current_block,
|
||||||
|
future_block_fallback,
|
||||||
|
mut file1,
|
||||||
|
..
|
||||||
|
} = hierarchy;
|
||||||
|
|
||||||
|
let block = block_source.collect_block(1000000).await.unwrap();
|
||||||
|
assert_eq!(block, current_block.1);
|
||||||
|
|
||||||
|
tokio::time::sleep(HlNodeBlockSource::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?;
|
||||||
|
let block = block_source.collect_block(1000001).await.unwrap();
|
||||||
|
assert_eq!(block, future_block_fallback.1);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user