feat: add S3Stage downloader (#13784)

This commit is contained in:
joshieDo
2025-01-21 18:10:02 +00:00
committed by GitHub
parent ace28d8a90
commit 6c3b1b8bcd
11 changed files with 910 additions and 0 deletions

25
Cargo.lock generated
View File

@ -1375,6 +1375,19 @@ dependencies = [
"wyz",
]
[[package]]
name = "blake3"
version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e"
dependencies = [
"arrayref",
"arrayvec",
"cc",
"cfg-if",
"constant_time_eq",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
@ -2045,6 +2058,12 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
[[package]]
name = "constant_time_eq"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
[[package]]
name = "convert_case"
version = "0.6.0"
@ -6378,6 +6397,7 @@ checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da"
dependencies = [
"base64 0.22.1",
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"http",
@ -9094,6 +9114,7 @@ dependencies = [
"alloy-rlp",
"assert_matches",
"bincode",
"blake3",
"codspeed-criterion-compat",
"futures-util",
"itertools 0.13.0",
@ -9102,6 +9123,7 @@ dependencies = [
"pprof",
"rand 0.8.5",
"rayon",
"reqwest",
"reth-chainspec",
"reth-codecs",
"reth-config",
@ -9115,6 +9137,7 @@ dependencies = [
"reth-execution-errors",
"reth-execution-types",
"reth-exex",
"reth-fs-util",
"reth-network-p2p",
"reth-network-peers",
"reth-primitives",
@ -9127,8 +9150,10 @@ dependencies = [
"reth-static-file",
"reth-storage-errors",
"reth-testing-utils",
"reth-tracing",
"reth-trie",
"reth-trie-db",
"serde",
"tempfile",
"thiserror 2.0.11",
"tokio",

View File

@ -487,6 +487,7 @@ backon = { version = "1.2", default-features = false, features = [
] }
bincode = "1.3"
bitflags = "2.4"
blake3 = "1.5.5"
boyer-moore-magiclen = "0.2.16"
bytes = { version = "1.5", default-features = false }
cfg-if = "1.0"

View File

@ -22,6 +22,7 @@ reth-db-api.workspace = true
reth-etl.workspace = true
reth-evm.workspace = true
reth-exex.workspace = true
reth-fs-util.workspace = true
reth-network-p2p.workspace = true
reth-primitives = { workspace = true, features = ["secp256k1"] }
reth-primitives-traits = { workspace = true, features = [
@ -57,6 +58,12 @@ rayon.workspace = true
num-traits = "0.2.15"
tempfile = { workspace = true, optional = true }
bincode.workspace = true
blake3.workspace = true
reqwest = { workspace = true, default-features = false, features = [
"rustls-tls-native-roots",
"blocking"
] }
serde = { workspace = true, features = ["derive"] }
[dev-dependencies]
# reth
@ -75,6 +82,7 @@ reth-testing-utils.workspace = true
reth-trie = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-network-peers.workspace = true
reth-tracing.workspace = true
alloy-rlp.workspace = true
itertools.workspace = true

View File

@ -17,6 +17,8 @@ mod index_storage_history;
/// Stage for computing state root.
mod merkle;
mod prune;
/// The s3 download stage
mod s3;
/// The sender recovery stage.
mod sender_recovery;
/// The transaction lookup stage
@ -32,6 +34,7 @@ pub use index_account_history::*;
pub use index_storage_history::*;
pub use merkle::*;
pub use prune::*;
pub use s3::*;
pub use sender_recovery::*;
pub use tx_lookup::*;

View File

@ -0,0 +1,31 @@
use alloy_primitives::B256;
use reth_fs_util::FsPathError;
/// Possible downloader error variants.
#[derive(Debug, thiserror::Error)]
pub enum DownloaderError {
/// Requires a valid `total_size` {0}
#[error("requires a valid total_size")]
InvalidMetadataTotalSize(Option<usize>),
#[error("tried to access chunk on index {0}, but there's only {1} chunks")]
/// Invalid chunk access
InvalidChunk(usize, usize),
// File hash mismatch.
#[error("file hash does not match the expected one {0} != {1} ")]
InvalidFileHash(B256, B256),
// Empty content length returned from the server.
#[error("metadata got an empty content length from server")]
EmptyContentLength,
/// Reqwest error
#[error(transparent)]
FsPath(#[from] FsPathError),
/// Reqwest error
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
/// Std Io error
#[error(transparent)]
StdIo(#[from] std::io::Error),
/// Bincode error
#[error(transparent)]
Bincode(#[from] bincode::Error),
}

View File

@ -0,0 +1,184 @@
use crate::stages::s3::downloader::{worker::spawn_workers, RemainingChunkRange};
use super::{
error::DownloaderError,
meta::Metadata,
worker::{WorkerRequest, WorkerResponse},
};
use alloy_primitives::B256;
use reqwest::{header::CONTENT_LENGTH, Client};
use std::{
collections::HashMap,
fs::{File, OpenOptions},
io::BufReader,
path::Path,
};
use tracing::{debug, error, info};
/// Downloads file from url to data file path.
///
/// If a `file_hash` is passed, it will verify it at the end.
///
/// ## Details
///
/// 1) A [`Metadata`] file is created or opened in `{target_dir}/download/{filename}.metadata`. It
/// tracks the download progress including total file size, downloaded bytes, chunk sizes, and
/// ranges that still need downloading. Allows for resumability.
/// 2) The target file is preallocated with the total size of the file in
/// `{target_dir}/download/{filename}`.
/// 3) Multiple `workers` are spawned for downloading of specific chunks of the file.
/// 4) `Orchestrator` manages workers, distributes chunk ranges, and ensures the download progresses
/// efficiently by dynamically assigning tasks to workers as they become available.
/// 5) Once the file is downloaded:
/// * If `file_hash` is `Some`, verifies its blake3 hash.
/// * Deletes the metadata file
/// * Moves downloaded file to target directory.
pub async fn fetch(
filename: &str,
target_dir: &Path,
url: &str,
mut concurrent: u64,
file_hash: Option<B256>,
) -> Result<(), DownloaderError> {
// Create a temporary directory to download files to, before moving them to target_dir.
let download_dir = target_dir.join("download");
reth_fs_util::create_dir_all(&download_dir)?;
let data_file = download_dir.join(filename);
let mut metadata = metadata(&data_file, url).await?;
if metadata.is_done() {
return Ok(())
}
// Ensure the file is preallocated so we can download it concurrently
{
let file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&data_file)?;
if file.metadata()?.len() as usize != metadata.total_size {
info!(target: "sync::stages::s3::downloader", ?filename, length = metadata.total_size, "Preallocating space.");
file.set_len(metadata.total_size as u64)?;
}
}
while !metadata.is_done() {
info!(target: "sync::stages::s3::downloader", ?filename, "Downloading.");
// Find the missing file chunks and the minimum number of workers required
let missing_chunks = metadata.needed_ranges();
concurrent = concurrent
.min(std::thread::available_parallelism()?.get() as u64)
.min(missing_chunks.len() as u64);
let mut orchestrator_rx = spawn_workers(url, concurrent, &data_file);
let mut workers = HashMap::new();
let mut missing_chunks = missing_chunks.into_iter();
// Distribute chunk ranges to workers when they free up
while let Some(worker_msg) = orchestrator_rx.recv().await {
debug!(target: "sync::stages::s3::downloader", ?worker_msg, "received message from worker");
let available_worker = match worker_msg {
WorkerResponse::Ready { worker_id, tx } => {
debug!(target: "sync::stages::s3::downloader", ?worker_id, "Worker ready.");
workers.insert(worker_id, tx);
worker_id
}
WorkerResponse::DownloadedChunk { worker_id, chunk_index, written_bytes } => {
metadata.update_chunk(chunk_index, written_bytes)?;
worker_id
}
WorkerResponse::Err { worker_id, error } => {
error!(target: "sync::stages::s3::downloader", ?worker_id, "Worker found an error: {:?}", error);
return Err(error)
}
};
let msg = if let Some(RemainingChunkRange { index, start, end }) = missing_chunks.next()
{
debug!(target: "sync::stages::s3::downloader", ?available_worker, start, end, "Worker download request.");
WorkerRequest::Download { chunk_index: index, start, end }
} else {
debug!(target: "sync::stages::s3::downloader", ?available_worker, "Sent Finish command to worker.");
WorkerRequest::Finish
};
let _ = workers.get(&available_worker).expect("should exist").send(msg);
}
}
if let Some(file_hash) = file_hash {
info!(target: "sync::stages::s3::downloader", ?filename, "Checking file integrity.");
check_file_hash(&data_file, &file_hash)?;
}
// No longer need the metadata file.
metadata.delete()?;
// Move downloaded file to desired directory.
let file_directory = target_dir.join(filename);
reth_fs_util::rename(data_file, &file_directory)?;
info!(target: "sync::stages::s3::downloader", ?file_directory, "Moved file from temporary to target directory.");
Ok(())
}
/// Creates a metadata file used to keep track of the downloaded chunks. Useful on resuming after a
/// shutdown.
async fn metadata(data_file: &Path, url: &str) -> Result<Metadata, DownloaderError> {
if Metadata::file_path(data_file).exists() {
debug!(target: "sync::stages::s3::downloader", ?data_file, "Loading metadata ");
return Metadata::load(data_file)
}
let client = Client::new();
let resp = client.head(url).send().await?;
let total_length: usize = resp
.headers()
.get(CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok())
.ok_or(DownloaderError::EmptyContentLength)?;
debug!(target: "sync::stages::s3::downloader", ?data_file, "Creating metadata ");
Metadata::builder(data_file).with_total_size(total_length).build()
}
/// Ensures the file on path has the expected blake3 hash.
fn check_file_hash(path: &Path, expected: &B256) -> Result<(), DownloaderError> {
let mut reader = BufReader::new(File::open(path)?);
let mut hasher = blake3::Hasher::new();
std::io::copy(&mut reader, &mut hasher)?;
let file_hash = hasher.finalize();
if file_hash.as_bytes() != expected {
return Err(DownloaderError::InvalidFileHash(file_hash.as_bytes().into(), *expected))
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::b256;
#[tokio::test]
async fn test_download() {
reth_tracing::init_test_tracing();
let b3sum = b256!("81a7318f69fc1d6bb0a58a24af302f3b978bc75a435e4ae5d075f999cd060cfd");
let url = "https://link.testfile.org/500MB";
let file = tempfile::NamedTempFile::new().unwrap();
let filename = file.path().file_name().unwrap().to_str().unwrap();
let target_dir = file.path().parent().unwrap();
fetch(filename, target_dir, url, 4, Some(b3sum)).await.unwrap();
}
}

View File

@ -0,0 +1,195 @@
use super::{error::DownloaderError, RemainingChunkRange};
use serde::{Deserialize, Serialize};
use std::{
fs::File,
ops::RangeInclusive,
path::{Path, PathBuf},
};
use tracing::info;
/// Tracks download progress and manages chunked downloads for resumable file transfers.
#[derive(Debug)]
pub struct Metadata {
/// Total file size
pub total_size: usize,
/// Total file size
pub downloaded: usize,
/// Download chunk size. Default 150MB.
pub chunk_size: usize,
/// Remaining download ranges for each chunk.
/// - `Some(RangeInclusive)`: range to be downloaded.
/// - `None`: Chunk fully downloaded.
chunks: Vec<Option<RangeInclusive<usize>>>,
/// Path with the stored metadata.
path: PathBuf,
}
impl Metadata {
/// Build a [`Metadata`] using a builder.
pub fn builder(data_file: &Path) -> MetadataBuilder {
MetadataBuilder::new(Self::file_path(data_file))
}
/// Returns the metadata file path of a data file: `{data_file}.metadata`
pub fn file_path(data_file: &Path) -> PathBuf {
data_file.with_file_name(format!(
"{}.metadata",
data_file.file_name().unwrap_or_default().to_string_lossy()
))
}
/// Returns a list of all chunks with their remaining ranges to be downloaded:
/// `RemainingChunkRange`.
pub fn needed_ranges(&self) -> Vec<RemainingChunkRange> {
self.chunks
.iter()
.enumerate()
.filter(|(_, remaining)| remaining.is_some())
.map(|(index, remaining)| {
let range = remaining.as_ref().expect("qed");
RemainingChunkRange { index, start: *range.start(), end: *range.end() }
})
.collect()
}
/// Updates a downloaded chunk.
pub fn update_chunk(
&mut self,
index: usize,
downloaded_bytes: usize,
) -> Result<(), DownloaderError> {
self.downloaded += downloaded_bytes;
let num_chunks = self.chunks.len();
if index >= self.chunks.len() {
return Err(DownloaderError::InvalidChunk(index, num_chunks))
}
// Update chunk with downloaded range
if let Some(range) = &self.chunks[index] {
let start = range.start() + downloaded_bytes;
if start > *range.end() {
self.chunks[index] = None;
} else {
self.chunks[index] = Some(start..=*range.end());
}
}
let file = self.path.file_stem().unwrap_or_default().to_string_lossy().into_owned();
info!(
target: "sync::stages::s3::downloader",
file,
"{}/{}", self.downloaded / 1024 / 1024, self.total_size / 1024 / 1024);
self.commit()
}
/// Commits the [`Metadata`] to file.
pub fn commit(&self) -> Result<(), DownloaderError> {
Ok(reth_fs_util::atomic_write_file(&self.path, |file| {
bincode::serialize_into(file, &MetadataFile::from(self))
})?)
}
/// Loads a [`Metadata`] file from disk using the target data file.
pub fn load(data_file: &Path) -> Result<Self, DownloaderError> {
let metadata_file_path = Self::file_path(data_file);
let MetadataFile { total_size, downloaded, chunk_size, chunks } =
bincode::deserialize_from(File::open(&metadata_file_path)?)?;
Ok(Self { total_size, downloaded, chunk_size, chunks, path: metadata_file_path })
}
/// Returns true if we have downloaded all chunks.
pub fn is_done(&self) -> bool {
!self.chunks.iter().any(|c| c.is_some())
}
/// Deletes [`Metadata`] file from disk.
pub fn delete(self) -> Result<(), DownloaderError> {
Ok(reth_fs_util::remove_file(&self.path)?)
}
}
/// A builder that can configure [Metadata]
#[derive(Debug)]
pub struct MetadataBuilder {
/// Path with the stored metadata.
metadata_path: PathBuf,
/// Total file size
total_size: Option<usize>,
/// Download chunk size. Default 150MB.
chunk_size: usize,
}
impl MetadataBuilder {
const fn new(metadata_path: PathBuf) -> Self {
Self {
metadata_path,
total_size: None,
chunk_size: 150 * (1024 * 1024), // 150MB
}
}
pub const fn with_total_size(mut self, total_size: usize) -> Self {
self.total_size = Some(total_size);
self
}
pub const fn with_chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}
/// Returns a [Metadata] if
pub fn build(&self) -> Result<Metadata, DownloaderError> {
match &self.total_size {
Some(total_size) if *total_size > 0 => {
let chunks = (0..*total_size)
.step_by(self.chunk_size)
.map(|start| {
Some(start..=(start + self.chunk_size).min(*total_size).saturating_sub(1))
})
.collect();
let metadata = Metadata {
path: self.metadata_path.clone(),
total_size: *total_size,
downloaded: 0,
chunk_size: self.chunk_size,
chunks,
};
metadata.commit()?;
Ok(metadata)
}
_ => Err(DownloaderError::InvalidMetadataTotalSize(self.total_size)),
}
}
}
/// Helper type that can serialize and deserialize [`Metadata`] to disk.
#[derive(Debug, Serialize, Deserialize)]
struct MetadataFile {
/// Total file size
total_size: usize,
/// Total file size
downloaded: usize,
/// Download chunk size. Default 150MB.
chunk_size: usize,
/// Remaining download ranges for each chunk.
/// - `Some(RangeInclusive)`: range to be downloaded.
/// - `None`: Chunk fully downloaded.
chunks: Vec<Option<RangeInclusive<usize>>>,
}
impl From<&Metadata> for MetadataFile {
fn from(metadata: &Metadata) -> Self {
Self {
total_size: metadata.total_size,
downloaded: metadata.downloaded,
chunk_size: metadata.chunk_size,
chunks: metadata.chunks.clone(),
}
}
}

View File

@ -0,0 +1,38 @@
//! Provides functionality for downloading files in chunks from a remote source. It supports
//! concurrent downloads, resuming interrupted downloads, and file integrity verification.
mod error;
mod fetch;
mod meta;
mod worker;
pub(crate) use error::DownloaderError;
pub use fetch::fetch;
pub use meta::Metadata;
/// Response sent by the fetch task to `S3Stage` once it has downloaded all files of a block
/// range.
pub(crate) enum S3DownloaderResponse {
/// A new block range was downloaded.
AddedNewRange,
/// The last requested block range was downloaded.
Done,
}
impl S3DownloaderResponse {
/// Whether the downloaded block range is the last requested one.
pub(crate) const fn is_done(&self) -> bool {
matches!(self, Self::Done)
}
}
/// Chunk nth remaining range to be downloaded.
#[derive(Debug)]
pub struct RemainingChunkRange {
/// The nth chunk
pub index: usize,
/// Start of range
pub start: usize,
/// End of range
pub end: usize,
}

View File

@ -0,0 +1,110 @@
use super::error::DownloaderError;
use reqwest::{header::RANGE, Client};
use std::path::{Path, PathBuf};
use tokio::{
fs::OpenOptions,
io::{AsyncSeekExt, AsyncWriteExt, BufWriter},
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
};
use tracing::debug;
/// Responses sent by a worker.
#[derive(Debug)]
pub(crate) enum WorkerResponse {
/// Worker has been spawned and awaiting work.
Ready { worker_id: u64, tx: UnboundedSender<WorkerRequest> },
/// Worker has downloaded
DownloadedChunk { worker_id: u64, chunk_index: usize, written_bytes: usize },
/// Worker has encountered an error.
Err { worker_id: u64, error: DownloaderError },
}
/// Requests sent to a worker.
#[derive(Debug)]
pub(crate) enum WorkerRequest {
/// Requests a range to be downloaded.
Download { chunk_index: usize, start: usize, end: usize },
/// Signals a worker exit.
Finish,
}
/// Spawns the requested number of workers and returns a `UnboundedReceiver` that all of them will
/// respond to.
pub(crate) fn spawn_workers(
url: &str,
worker_count: u64,
data_file: &Path,
) -> UnboundedReceiver<WorkerResponse> {
// Create channels for communication between workers and orchestrator
let (orchestrator_tx, orchestrator_rx) = unbounded_channel();
// Initiate workers
for worker_id in 0..worker_count {
let orchestrator_tx = orchestrator_tx.clone();
let data_file = data_file.to_path_buf();
let url = url.to_string();
debug!(target: "sync::stages::s3::downloader", ?worker_id, "Spawning.");
tokio::spawn(async move {
if let Err(error) = worker_fetch(worker_id, &orchestrator_tx, data_file, url).await {
let _ = orchestrator_tx.send(WorkerResponse::Err { worker_id, error });
}
});
}
orchestrator_rx
}
/// Downloads requested chunk ranges to the data file.
async fn worker_fetch(
worker_id: u64,
orchestrator_tx: &UnboundedSender<WorkerResponse>,
data_file: PathBuf,
url: String,
) -> Result<(), DownloaderError> {
let client = Client::new();
let mut data_file = BufWriter::new(OpenOptions::new().write(true).open(data_file).await?);
// Signals readiness to download
let (tx, mut rx) = unbounded_channel::<WorkerRequest>();
orchestrator_tx.send(WorkerResponse::Ready { worker_id, tx }).unwrap_or_else(|_| {
debug!("Failed to notify orchestrator of readiness");
});
while let Some(req) = rx.recv().await {
debug!(
target: "sync::stages::s3::downloader",
worker_id,
?req,
"received from orchestrator"
);
match req {
WorkerRequest::Download { chunk_index, start, end } => {
data_file.seek(tokio::io::SeekFrom::Start(start as u64)).await?;
let mut response = client
.get(&url)
.header(RANGE, format!("bytes={}-{}", start, end))
.send()
.await?;
let mut written_bytes = 0;
while let Some(chunk) = response.chunk().await? {
written_bytes += chunk.len();
data_file.write_all(&chunk).await?;
}
data_file.flush().await?;
let _ = orchestrator_tx.send(WorkerResponse::DownloadedChunk {
worker_id,
chunk_index,
written_bytes,
});
}
WorkerRequest::Finish => break,
}
}
Ok(())
}

View File

@ -0,0 +1,21 @@
use alloy_primitives::B256;
/// File list to be downloaded with their hashes.
pub(crate) static DOWNLOAD_FILE_LIST: [[(&str, B256); 3]; 2] = [
[
("static_file_transactions_0_499999", B256::ZERO),
("static_file_transactions_0_499999.off", B256::ZERO),
("static_file_transactions_0_499999.conf", B256::ZERO),
// ("static_file_blockmeta_0_499999", B256::ZERO),
// ("static_file_blockmeta_0_499999.off", B256::ZERO),
// ("static_file_blockmeta_0_499999.conf", B256::ZERO),
],
[
("static_file_transactions_500000_999999", B256::ZERO),
("static_file_transactions_500000_999999.off", B256::ZERO),
("static_file_transactions_500000_999999.conf", B256::ZERO),
// ("static_file_blockmeta_500000_999999", B256::ZERO),
// ("static_file_blockmeta_500000_999999.off", B256::ZERO),
// ("static_file_blockmeta_500000_999999.conf", B256::ZERO),
],
];

View File

@ -0,0 +1,294 @@
mod downloader;
pub use downloader::{fetch, Metadata};
use downloader::{DownloaderError, S3DownloaderResponse};
mod filelist;
use filelist::DOWNLOAD_FILE_LIST;
use reth_db::transaction::DbTxMut;
use reth_primitives::StaticFileSegment;
use reth_provider::{
DBProvider, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use std::{
path::PathBuf,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
/// S3 `StageId`
const S3_STAGE_ID: StageId = StageId::Other("S3");
/// The S3 stage.
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct S3Stage {
/// Static file directory.
static_file_directory: PathBuf,
/// Remote server URL.
url: String,
/// Maximum number of connections per download.
max_concurrent_requests: u64,
/// Channel to receive the downloaded ranges from the fetch task.
fetch_rx: Option<UnboundedReceiver<Result<S3DownloaderResponse, DownloaderError>>>,
}
impl<Provider> Stage<Provider> for S3Stage
where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StageCheckpointReader
+ StageCheckpointWriter,
{
fn id(&self) -> StageId {
S3_STAGE_ID
}
fn poll_execute_ready(
&mut self,
cx: &mut Context<'_>,
input: ExecInput,
) -> Poll<Result<(), StageError>> {
loop {
// We are currently fetching and may have downloaded ranges that we can process.
if let Some(rx) = &mut self.fetch_rx {
// Whether we have downloaded all the required files.
let mut is_done = false;
let response = match ready!(rx.poll_recv(cx)) {
Some(Ok(response)) => {
is_done = response.is_done();
Ok(())
}
Some(Err(_)) => todo!(), // TODO: DownloaderError -> StageError
None => Err(StageError::ChannelClosed),
};
if is_done {
self.fetch_rx = None;
}
return Poll::Ready(response)
}
// Spawns the downloader task if there are any missing files
if let Some(fetch_rx) = self.maybe_spawn_fetch(input) {
self.fetch_rx = Some(fetch_rx);
// Polls fetch_rx & registers waker
continue
}
// No files to be downloaded
return Poll::Ready(Ok(()))
}
}
fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError>
where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StageCheckpointReader
+ StageCheckpointWriter,
{
// Re-initializes the provider to detect the new additions
provider.static_file_provider().initialize_index()?;
// TODO logic for appending tx_block
// let (_, _to_block) = input.next_block_range().into_inner();
// let static_file_provider = provider.static_file_provider();
// let mut _tx_block_cursor =
// provider.tx_ref().cursor_write::<tables::TransactionBlocks>()?;
// tx_block_cursor.append(indice.last_tx_num(), &block_number)?;
// let checkpoint = StageCheckpoint { block_number: highest_block, stage_checkpoint: None };
// provider.save_stage_checkpoint(StageId::Bodies, checkpoint)?;
// provider.save_stage_checkpoint(S3_STAGE_ID, checkpoint)?;
// // TODO: verify input.target according to s3 stage specifications
// let done = highest_block == to_block;
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
}
fn unwind(
&mut self,
_provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// TODO
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
}
impl S3Stage {
/// It will only spawn a task to fetch files from the remote server, it there are any missing
/// static files.
///
/// Every time a block range is ready with all the necessary files, it sends a
/// [`S3DownloaderResponse`] to `self.fetch_rx`. If it's the last requested block range, the
/// response will have `is_done` set to true.
fn maybe_spawn_fetch(
&self,
input: ExecInput,
) -> Option<UnboundedReceiver<Result<S3DownloaderResponse, DownloaderError>>> {
let checkpoint = input.checkpoint();
// TODO: input target can only be certain numbers. eg. 499_999 , 999_999 etc.
// Create a list of all the missing files per block range that need to be downloaded.
let mut requests = vec![];
for block_range_files in &DOWNLOAD_FILE_LIST {
let (_, block_range) =
StaticFileSegment::parse_filename(block_range_files[0].0).expect("qed");
if block_range.end() <= checkpoint.block_number {
continue
}
let mut block_range_requests = vec![];
for (filename, file_hash) in block_range_files {
// If the file already exists, then we are resuming a previously interrupted stage
// run.
if self.static_file_directory.join(filename).exists() {
// TODO: check hash if the file already exists
continue
}
block_range_requests.push((filename, file_hash));
}
requests.push((block_range, block_range_requests));
}
// Return None, if we have downloaded all the files that are required.
if requests.is_empty() {
return None
}
let static_file_directory = self.static_file_directory.clone();
let url = self.url.clone();
let max_concurrent_requests = self.max_concurrent_requests;
let (fetch_tx, fetch_rx) = unbounded_channel();
tokio::spawn(async move {
let mut requests_iter = requests.into_iter().peekable();
while let Some((_, file_requests)) = requests_iter.next() {
for (filename, file_hash) in file_requests {
if let Err(err) = fetch(
filename,
&static_file_directory,
&format!("{}/{filename}", url),
max_concurrent_requests,
Some(*file_hash),
)
.await
{
let _ = fetch_tx.send(Err(err));
return
}
}
let response = if requests_iter.peek().is_none() {
S3DownloaderResponse::Done
} else {
S3DownloaderResponse::AddedNewRange
};
let _ = fetch_tx.send(Ok(response));
}
});
Some(fetch_rx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB,
UnwindStageTestRunner,
};
use reth_primitives::SealedHeader;
use reth_testing_utils::{
generators,
generators::{random_header, random_header_range},
};
// stage_test_suite_ext!(S3TestRunner, s3);
#[derive(Default)]
struct S3TestRunner {
db: TestStageDB,
}
impl StageTestRunner for S3TestRunner {
type S = S3Stage;
fn db(&self) -> &TestStageDB {
&self.db
}
fn stage(&self) -> Self::S {
S3Stage::default()
}
}
impl ExecuteStageTestRunner for S3TestRunner {
type Seed = Vec<SealedHeader>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let start = input.checkpoint().block_number;
let mut rng = generators::rng();
let head = random_header(&mut rng, start, None);
self.db.insert_headers_with_td(std::iter::once(&head))?;
// use previous progress as seed size
let end = input.target.unwrap_or_default() + 1;
if start + 1 >= end {
return Ok(Vec::default())
}
let mut headers = random_header_range(&mut rng, start + 1..end, head.hash());
self.db.insert_headers_with_td(headers.iter())?;
headers.insert(0, head);
Ok(headers)
}
fn validate_execution(
&self,
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
assert!(output.done, "stage should always be done");
assert_eq!(
output.checkpoint.block_number,
input.target(),
"stage progress should always match progress of previous stage"
);
}
Ok(())
}
}
impl UnwindStageTestRunner for S3TestRunner {
fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
Ok(())
}
}
#[test]
fn parse_files() {
for block_range_files in &DOWNLOAD_FILE_LIST {
let (_, _) = StaticFileSegment::parse_filename(block_range_files[0].0).expect("qed");
}
}
}