feat: some progress on FCU handling (#9727)

This commit is contained in:
Matthias Seitz
2024-07-23 14:05:54 +02:00
committed by GitHub
parent ed16643f12
commit 48c42f5630
2 changed files with 140 additions and 20 deletions

View File

@ -251,3 +251,10 @@ pub enum DownloadRequest {
/// Download the given range of blocks. /// Download the given range of blocks.
BlockRange(B256, u64), BlockRange(B256, u64),
} }
impl DownloadRequest {
/// Returns a [`DownloadRequest`] for a single block.
pub fn single_block(hash: B256) -> Self {
Self::BlockSet(HashSet::from([hash]))
}
}

View File

@ -13,7 +13,7 @@ use reth_blockchain_tree::{
error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus, error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus,
}; };
use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk}; use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk};
use reth_chain_state::{BlockState, CanonicalInMemoryState, ExecutedBlock}; use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock};
use reth_consensus::{Consensus, PostExecutionInput}; use reth_consensus::{Consensus, PostExecutionInput};
use reth_engine_primitives::EngineTypes; use reth_engine_primitives::EngineTypes;
use reth_errors::{ConsensusError, ProviderResult}; use reth_errors::{ConsensusError, ProviderResult};
@ -26,7 +26,8 @@ use reth_primitives::{
SealedBlockWithSenders, SealedHeader, B256, U256, SealedBlockWithSenders, SealedHeader, B256, U256,
}; };
use reth_provider::{ use reth_provider::{
BlockReader, ExecutionOutcome, StateProvider, StateProviderFactory, StateRootProvider, BlockReader, CanonStateNotification, ExecutionOutcome, StateProvider, StateProviderFactory,
StateRootProvider,
}; };
use reth_revm::database::StateProviderDatabase; use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{ use reth_rpc_types::{
@ -59,19 +60,36 @@ const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256; const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
/// Keeps track of the state of the tree. /// Keeps track of the state of the tree.
///
/// ## Invariants
///
/// - This only stores blocks that are connected to the canonical chain.
/// - All executed blocks are valid and have been executed.
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct TreeState { pub struct TreeState {
/// All executed blocks by hash. /// __All__ executed blocks by block hash.
///
/// This includes blocks of all forks.
blocks_by_hash: HashMap<B256, ExecutedBlock>, blocks_by_hash: HashMap<B256, ExecutedBlock>,
/// Executed blocks grouped by their respective block number. /// Executed blocks grouped by their respective block number.
///
/// This maps unique block number to all known blocks for that height.
blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlock>>, blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlock>>,
/// Pending state not yet applied /// Currently tracked canonical head of the chain.
pending: Option<Arc<BlockState>>, current_canonical_head: BlockNumHash,
/// Block number and hash of the current head.
current_head: Option<(BlockNumber, B256)>,
} }
impl TreeState { impl TreeState {
/// Returns a new, empty tree state that points to the given canonical head.
fn new(current_canonical_head: BlockNumHash) -> Self {
Self {
blocks_by_hash: HashMap::new(),
blocks_by_number: BTreeMap::new(),
current_canonical_head,
}
}
/// Returns the block by hash.
fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlock>> { fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlock>> {
self.blocks_by_hash.get(&hash).map(|b| b.block.clone()) self.blocks_by_hash.get(&hash).map(|b| b.block.clone())
} }
@ -114,6 +132,38 @@ impl TreeState {
pub(crate) fn max_block_number(&self) -> BlockNumber { pub(crate) fn max_block_number(&self) -> BlockNumber {
*self.blocks_by_number.last_key_value().unwrap_or((&BlockNumber::default(), &vec![])).0 *self.blocks_by_number.last_key_value().unwrap_or((&BlockNumber::default(), &vec![])).0
} }
/// Returns the block number of the pending block: `head + 1`
const fn pending_block_number(&self) -> BlockNumber {
self.current_canonical_head.number + 1
}
/// Updates the canonical head to the given block.
fn set_canonical_head(&mut self, new_head: BlockNumHash) {
self.current_canonical_head = new_head;
}
/// Returns the tracked canonical head.
const fn canonical_head(&self) -> &BlockNumHash {
&self.current_canonical_head
}
/// Returns the block hash of the canonical head.
const fn canonical_block_hash(&self) -> B256 {
self.canonical_head().hash
}
/// Returns the new chain for the given head.
///
/// This also handles reorgs.
// TODO: this type needs to include more info, like missing block etc.
fn on_new_head(&self, new_head: B256) -> Option<CanonStateNotification> {
let new_head_block = self.blocks_by_hash.get(&new_head)?;
// TODO walk the chain back and connect to canonical chain or detect reorg
None
}
} }
/// Tracks the state of the engine api internals. /// Tracks the state of the engine api internals.
@ -133,11 +183,15 @@ pub struct EngineApiTreeState {
} }
impl EngineApiTreeState { impl EngineApiTreeState {
fn new(block_buffer_limit: u32, max_invalid_header_cache_length: u32) -> Self { fn new(
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
canonical_block: BlockNumHash,
) -> Self {
Self { Self {
invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length), invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
buffer: BlockBuffer::new(block_buffer_limit), buffer: BlockBuffer::new(block_buffer_limit),
tree_state: TreeState::default(), tree_state: TreeState::new(canonical_block),
forkchoice_state_tracker: ForkchoiceStateTracker::default(), forkchoice_state_tracker: ForkchoiceStateTracker::default(),
} }
} }
@ -312,15 +366,16 @@ where
persistence: PersistenceHandle, persistence: PersistenceHandle,
payload_builder: PayloadBuilderHandle<T>, payload_builder: PayloadBuilderHandle<T>,
) -> UnboundedReceiver<EngineApiEvent> { ) -> UnboundedReceiver<EngineApiEvent> {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel(); let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel();
let state = EngineApiTreeState::new( let state = EngineApiTreeState::new(
DEFAULT_BLOCK_BUFFER_LIMIT, DEFAULT_BLOCK_BUFFER_LIMIT,
DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH, DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
header.num_hash(),
); );
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
let task = Self::new( let task = Self::new(
provider, provider,
executor_provider, executor_provider,
@ -390,14 +445,17 @@ where
}, },
FromEngine::Request(request) => match request { FromEngine::Request(request) => match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let output = self.on_forkchoice_updated(state, payload_attrs); let mut output = self.on_forkchoice_updated(state, payload_attrs);
if let Ok(res) = &output { if let Ok(res) = &mut output {
// emit an event about the handled FCU // emit an event about the handled FCU
self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated( self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state, state,
res.outcome.forkchoice_status(), res.outcome.forkchoice_status(),
)); ));
// handle the event if any
self.on_maybe_tree_event(res.event.take());
} }
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) { if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) {
@ -459,7 +517,8 @@ where
.block(&sync_target_state.finalized_block_hash) .block(&sync_target_state.finalized_block_hash)
.map(|block| block.number); .map(|block| block.number);
// TODO: state housekeeping // TODO(mattsse): state housekeeping, this needs to update the tracked canonical state and
// attempt to make the current target canonical if we have all the blocks buffered
// The block number that the backfill finished at - if the progress or newest // The block number that the backfill finished at - if the progress or newest
// finalized is None then we can't check the distance anyways. // finalized is None then we can't check the distance anyways.
@ -489,6 +548,13 @@ where
// TODO: implement state updates and shift canonical state // TODO: implement state updates and shift canonical state
} }
/// Convenience function to handle an optional tree event.
fn on_maybe_tree_event(&self, event: Option<TreeEvent>) {
if let Some(event) = event {
self.on_tree_event(event);
}
}
/// Handles a tree event. /// Handles a tree event.
fn on_tree_event(&self, event: TreeEvent) { fn on_tree_event(&self, event: TreeEvent) {
match event { match event {
@ -572,6 +638,10 @@ where
} }
/// Return state provider with reference to in-memory blocks that overlay database state. /// Return state provider with reference to in-memory blocks that overlay database state.
///
/// This merges the state of all blocks that are part of the chain that the requested block is
/// the head of. This includes all blocks that connect back to the canonical block on disk.
// TODO: return error if the chain has gaps
fn state_provider( fn state_provider(
&self, &self,
hash: B256, hash: B256,
@ -1143,13 +1213,56 @@ where
) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> { ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? { if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
self.state.forkchoice_state_tracker.set_latest(state, on_updated.forkchoice_status()); self.state.forkchoice_state_tracker.set_latest(state, on_updated.forkchoice_status());
// TODO: make canonical and process payload attributes if valid
return Ok(TreeOutcome::new(on_updated)) return Ok(TreeOutcome::new(on_updated))
} }
todo!() let valid_outcome = |head| {
TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
PayloadStatusEnum::Valid,
Some(head),
)))
};
// Process the forkchoice update by trying to make the head block canonical
//
// We can only process this forkchoice update if:
// - we have the `head` block
// - the head block is part of a chain that is connected to the canonical chain. This
// includes reorgs.
//
// Performing a FCU involves:
// - marking the FCU's head block as canonical
// - updating in memory state to reflect the new canonical chain
// - updating canonical state trackers
// - emitting a canonicalization event for the new chain (including reorg)
// - if we have payload attributes, delegate them to the payload service
// 1. ensure we have a new head block
if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
// the head block is already canonical
return Ok(valid_outcome(state.head_block_hash))
}
// 2. ensure we can apply a new chain update for the head block
if let Some(update) = self.state.tree_state.on_new_head(state.head_block_hash) {
// update the tracked canonical head
self.state.tree_state.set_canonical_head(update.tip().num_hash());
// TODO
// update inmemory state
// update trackers
// emit notification
// validate and handle payload attributes
return Ok(valid_outcome(state.head_block_hash))
}
// 3. we don't have the block to perform the update
let target = self.lowest_buffered_ancestor_or(state.head_block_hash);
Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
PayloadStatusEnum::Syncing,
)))
.with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
} }
} }
@ -1190,7 +1303,7 @@ mod tests {
use super::*; use super::*;
use crate::static_files::StaticFileAction; use crate::static_files::StaticFileAction;
use reth_beacon_consensus::EthBeaconConsensus; use reth_beacon_consensus::EthBeaconConsensus;
use reth_chain_state::test_utils::get_executed_blocks; use reth_chain_state::{test_utils::get_executed_blocks, BlockState};
use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_ethereum_engine_primitives::EthEngineTypes; use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm::test_utils::MockExecutorProvider; use reth_evm::test_utils::MockExecutorProvider;