feat: execute downloaded blocks in batches (#10155)

This commit is contained in:
Matthias Seitz
2024-08-07 12:58:17 +02:00
committed by GitHub
parent 7486d5bafe
commit a77ce062fe
2 changed files with 81 additions and 5 deletions

View File

@ -4,6 +4,8 @@ const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 256;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
/// The configuration of the engine tree.
#[derive(Debug)]
pub struct TreeConfig {
@ -14,6 +16,8 @@ pub struct TreeConfig {
block_buffer_limit: u32,
/// Number of invalid headers to keep in cache.
max_invalid_header_cache_length: u32,
/// Maximum number of blocks to execute sequentially in a batch.
max_execute_block_batch_size: usize,
}
impl Default for TreeConfig {
@ -22,6 +26,7 @@ impl Default for TreeConfig {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
}
}
}
@ -32,8 +37,14 @@ impl TreeConfig {
persistence_threshold: u64,
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
max_execute_block_batch_size: usize,
) -> Self {
Self { persistence_threshold, block_buffer_limit, max_invalid_header_cache_length }
Self {
persistence_threshold,
block_buffer_limit,
max_invalid_header_cache_length,
max_execute_block_batch_size,
}
}
/// Return the persistence threshold.
@ -51,6 +62,11 @@ impl TreeConfig {
self.max_invalid_header_cache_length
}
/// Return the maximum execute block batch size.
pub const fn max_execute_block_batch_size(&self) -> usize {
self.max_execute_block_batch_size
}
/// Setter for persistence threshold.
pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self {
self.persistence_threshold = persistence_threshold;
@ -71,4 +87,13 @@ impl TreeConfig {
self.max_invalid_header_cache_length = max_invalid_header_cache_length;
self
}
/// Setter for maximum execute block batch size.
pub const fn with_max_execute_block_batch_size(
mut self,
max_execute_block_batch_size: usize,
) -> Self {
self.max_execute_block_batch_size = max_execute_block_batch_size;
self
}
}

View File

@ -518,19 +518,34 @@ where
}
/// Invoked when previously requested blocks were downloaded.
fn on_downloaded(&mut self, blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
///
/// If the block count exceeds the configured batch size we're allowed to execute at once, this
/// will execute the first batch and send the remaining blocks back through the channel so that
/// don't block request processing for a long time.
fn on_downloaded(&mut self, mut blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
if blocks.is_empty() {
// nothing to execute
return None
}
trace!(target: "engine", block_count = %blocks.len(), "received downloaded blocks");
// TODO(mattsse): on process a certain number of blocks sequentially
for block in blocks {
let batch = self.config.max_execute_block_batch_size().min(blocks.len());
for block in blocks.drain(..batch) {
if let Some(event) = self.on_downloaded_block(block) {
let needs_backfill = event.is_backfill_action();
self.on_tree_event(event);
if needs_backfill {
// can exit early if backfill is needed
break
return None
}
}
}
// if we still have blocks to execute, send them as a followup request
if !blocks.is_empty() {
let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
}
None
}
@ -2234,6 +2249,41 @@ mod tests {
}
}
#[test]
fn test_tree_persist_block_batch() {
let tree_config = TreeConfig::default();
let chain_spec = MAINNET.clone();
let mut test_block_builder =
TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
// we need more than tree_config.persistence_threshold() +1 blocks to
// trigger the persistence task.
let blocks: Vec<_> = test_block_builder
.get_executed_blocks(1..tree_config.persistence_threshold() + 2)
.collect();
let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
let mut blocks = vec![];
for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
}
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
// process the message
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
test_harness.tree.on_engine_message(msg);
// we now should receive the other batch
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
match msg {
FromEngine::DownloadedBlocks(blocks) => {
assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
}
_ => panic!("unexpected message: {:#?}", msg),
}
}
#[tokio::test]
async fn test_tree_persist_blocks() {
let tree_config = TreeConfig::default();
@ -2721,6 +2771,7 @@ mod tests {
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());
test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
// create base chain and setup test harness with it
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();