fix(cli): read exact chunk len (#7777)

This commit is contained in:
Emilia Hane
2024-04-22 15:34:31 +02:00
committed by GitHub
parent 250da4e449
commit 24a8202481

View File

@ -16,9 +16,11 @@ use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt}; use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
use tracing::{trace, warn}; use tracing::{debug, trace, warn};
/// Byte length of chunk to read from chain file. /// Default byte length of chunk to read from chain file.
///
/// Default is 1 GB.
pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000; pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
/// Front-end API for fetching chain data from a file. /// Front-end API for fetching chain data from a file.
@ -70,7 +72,7 @@ impl FileClient {
let file_len = metadata.len(); let file_len = metadata.len();
let mut reader = vec![]; let mut reader = vec![];
file.read_to_end(&mut reader).await.unwrap(); file.read_to_end(&mut reader).await?;
Ok(Self::from_reader(&reader[..], file_len).await?.0) Ok(Self::from_reader(&reader[..], file_len).await?.0)
} }
@ -87,9 +89,15 @@ impl FileClient {
let mut hash_to_number = HashMap::new(); let mut hash_to_number = HashMap::new();
let mut bodies = HashMap::new(); let mut bodies = HashMap::new();
// use with_capacity to make sure the internal buffer contains the entire file // use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize); let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);
trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
capacity=stream.read_buffer().capacity(),
"init decode stream"
);
let mut remaining_bytes = vec![]; let mut remaining_bytes = vec![];
let mut log_interval = 0; let mut log_interval = 0;
@ -98,7 +106,12 @@ impl FileClient {
while let Some(block_res) = stream.next().await { while let Some(block_res) = stream.next().await {
let block = match block_res { let block = match block_res {
Ok(block) => block, Ok(block) => block,
Err(FileClientError::Rlp(_err, bytes)) => { Err(FileClientError::Rlp(err, bytes)) => {
trace!(target: "downloaders::file",
%err,
bytes_len=bytes.len(),
"partial block returned from decoding chunk"
);
remaining_bytes = bytes; remaining_bytes = bytes;
break break
} }
@ -135,7 +148,7 @@ impl FileClient {
log_interval += 1; log_interval += 1;
} }
trace!(blocks = headers.len(), "Initialized file client"); trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
Ok((Self { headers, hash_to_number, bodies }, remaining_bytes)) Ok((Self { headers, hash_to_number, bodies }, remaining_bytes))
} }
@ -204,6 +217,16 @@ impl FileClient {
} }
self self
} }
/// Returns the current number of headers in the client.
pub fn headers_len(&self) -> usize {
self.headers.len()
}
/// Returns the current number of bodies in the client.
pub fn bodies_len(&self) -> usize {
self.bodies.len()
}
} }
impl HeadersClient for FileClient { impl HeadersClient for FileClient {
@ -297,8 +320,8 @@ impl DownloadClient for FileClient {
pub struct ChunkedFileReader { pub struct ChunkedFileReader {
/// File to read from. /// File to read from.
file: File, file: File,
/// Current file length. /// Current file byte length.
file_len: u64, file_byte_len: u64,
/// Bytes that have been read. /// Bytes that have been read.
chunk: Vec<u8>, chunk: Vec<u8>,
/// Max bytes per chunk. /// Max bytes per chunk.
@ -322,20 +345,20 @@ impl ChunkedFileReader {
pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> { pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
// get file len from metadata before reading // get file len from metadata before reading
let metadata = file.metadata().await?; let metadata = file.metadata().await?;
let file_len = metadata.len(); let file_byte_len = metadata.len();
Ok(Self { file, file_len, chunk: vec![], chunk_byte_len }) Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len })
} }
/// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
/// length and the remaining file length. /// length and the remaining file length.
fn chunk_len(&self) -> u64 { fn chunk_len(&self) -> u64 {
let Self { chunk_byte_len, file_len, .. } = *self; let Self { chunk_byte_len, file_byte_len, .. } = *self;
let file_len = file_len + self.chunk.len() as u64; let file_byte_len = file_byte_len + self.chunk.len() as u64;
if chunk_byte_len > file_len { if chunk_byte_len > file_byte_len {
// last chunk // last chunk
file_len file_byte_len
} else { } else {
chunk_byte_len chunk_byte_len
} }
@ -343,37 +366,52 @@ impl ChunkedFileReader {
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk(&mut self) -> Result<Option<FileClient>, FileClientError> { pub async fn next_chunk(&mut self) -> Result<Option<FileClient>, FileClientError> {
if self.file_len == 0 && self.chunk.is_empty() { if self.file_byte_len == 0 && self.chunk.is_empty() {
// eof // eof
return Ok(None) return Ok(None)
} }
let chunk_len = self.chunk_len(); let chunk_target_len = self.chunk_len();
let old_bytes_len = self.chunk.len() as u64; let old_bytes_len = self.chunk.len() as u64;
// calculate reserved space in chunk // calculate reserved space in chunk
let new_bytes_len = chunk_len - old_bytes_len; let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
// read new bytes from file // read new bytes from file
let mut reader = BytesMut::with_capacity(new_bytes_len as usize); let mut reader = BytesMut::zeroed(new_read_bytes_target_len as usize);
self.file.read_buf(&mut reader).await?; // actual bytes that have been read
let new_read_bytes_len = self.file.read_exact(&mut reader).await? as u64;
// update remaining file length // update remaining file length
self.file_len -= new_bytes_len; self.file_byte_len -= new_read_bytes_len;
trace!(target: "downloaders::file", let prev_read_bytes_len = self.chunk.len();
max_chunk_byte_len=self.chunk_byte_len,
prev_read_bytes_len=self.chunk.len(),
new_bytes_len,
remaining_file_byte_len=self.file_len,
"new bytes were read from file"
);
// read new bytes from file into chunk // read new bytes from file into chunk
self.chunk.extend_from_slice(&reader[..]); self.chunk.extend_from_slice(&reader[..]);
let next_chunk_byte_len = self.chunk.len();
debug!(target: "downloaders::file",
max_chunk_byte_len=self.chunk_byte_len,
prev_read_bytes_len,
new_read_bytes_target_len,
new_read_bytes_len,
reader_capacity=reader.capacity(),
next_chunk_byte_len,
remaining_file_byte_len=self.file_byte_len,
"new bytes were read from file"
);
// make new file client from chunk // make new file client from chunk
let (file_client, bytes) = FileClient::from_reader(&self.chunk[..], chunk_len).await?; let (file_client, bytes) =
FileClient::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?;
debug!(target: "downloaders::file",
headers_len=file_client.headers.len(),
bodies_len=file_client.bodies.len(),
remaining_bytes_len=bytes.len(),
"parsed blocks that were read from file"
);
// save left over bytes // save left over bytes
self.chunk = bytes; self.chunk = bytes;