feat(exex): send canonical notifications when head is provided (#11280)

This commit is contained in:
Alexey Shekhirin
2024-09-27 16:22:03 +01:00
committed by GitHub
parent 07e94e7fa5
commit dbd9a2bb35
4 changed files with 257 additions and 188 deletions

View File

@ -1,6 +1,4 @@
use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
use alloy_primitives::U256;
use eyre::OptionExt;
use futures::{Stream, StreamExt};
use reth_chainspec::Head;
use reth_evm::execute::BlockExecutorProvider;
@ -137,15 +135,16 @@ pub struct ExExNotificationsWithHead<P, E> {
provider: P,
executor: E,
notifications: Receiver<ExExNotification>,
#[allow(dead_code)]
wal_handle: WalHandle,
exex_head: ExExHead,
pending_sync: bool,
/// If true, then we need to check if the ExEx head is on the canonical chain and if not,
/// revert its head.
pending_check_canonical: bool,
/// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
/// the missing blocks.
pending_check_backfill: bool,
/// The backfill job to run before consuming any notifications.
backfill_job: Option<StreamBackfillJob<E, P, Chain>>,
/// Whether we're currently waiting for the node head to catch up to the same height as the
/// ExEx head.
node_head_catchup_in_progress: bool,
}
impl<P, E> ExExNotificationsWithHead<P, E>
@ -169,90 +168,76 @@ where
notifications,
wal_handle,
exex_head,
pending_sync: true,
pending_check_canonical: true,
pending_check_backfill: true,
backfill_job: None,
node_head_catchup_in_progress: false,
}
}
/// Compares the node head against the ExEx head, and synchronizes them in case of a mismatch.
/// Checks if the ExEx head is on the canonical chain.
///
/// If the head block is not found in the database, it means we're not on the canonical chain
/// and we need to revert the notification with the ExEx head block.
fn check_canonical(&mut self) -> eyre::Result<Option<ExExNotification>> {
if self.provider.header(&self.exex_head.block.hash)?.is_some() {
debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
return Ok(None)
}
// If the head block is not found in the database, it means we're not on the canonical
// chain.
// Get the committed notification for the head block from the WAL.
let Some(notification) =
self.wal_handle.get_committed_notification_by_block_hash(&self.exex_head.block.hash)?
else {
return Err(eyre::eyre!(
"Could not find notification for block hash {:?} in the WAL",
self.exex_head.block.hash
))
};
// Update the head block hash to the parent hash of the first committed block.
let committed_chain = notification.committed_chain().unwrap();
let new_exex_head =
(committed_chain.first().parent_hash, committed_chain.first().number - 1).into();
debug!(target: "exex::notifications", old_exex_head = ?self.exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
self.exex_head.block = new_exex_head;
// Return an inverted notification. See the documentation for
// `ExExNotification::into_inverted`.
Ok(Some(notification.into_inverted()))
}
/// Compares the node head against the ExEx head, and backfills if needed.
///
/// CAUTON: This method assumes that the ExEx head is <= the node head, and that it's on the
/// canonical chain.
///
/// Possible situations are:
/// - ExEx is behind the node head (`node_head.number < exex_head.number`).
/// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database).
/// Backfill from the node database.
/// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database).
/// Unwind the ExEx to the first block matching between the ExEx and the node, and then
/// bacfkill from the node database.
/// - ExEx is at the same block number (`node_head.number == exex_head.number`).
/// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). Nothing
/// to do.
/// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database).
/// Unwind the ExEx to the first block matching between the ExEx and the node, and then
/// backfill from the node database.
/// - ExEx is ahead of the node head (`node_head.number > exex_head.number`). Wait until the
/// node head catches up to the ExEx head, and then repeat the synchronization process.
fn synchronize(&mut self) -> eyre::Result<()> {
/// - ExEx is behind the node head (`node_head.number < exex_head.number`). Backfill from the
/// node database.
/// - ExEx is at the same block number as the node head (`node_head.number ==
/// exex_head.number`). Nothing to do.
fn check_backfill(&mut self) -> eyre::Result<()> {
debug!(target: "exex::manager", "Synchronizing ExEx head");
let backfill_job_factory =
BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
match self.exex_head.block.number.cmp(&self.node_head.number) {
std::cmp::Ordering::Less => {
// ExEx is behind the node head
if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? {
// ExEx is on the canonical chain
debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain");
if exex_header.number != self.exex_head.block.number {
eyre::bail!("ExEx head number does not match the hash")
}
// ExEx is on the canonical chain, start backfill
let backfill = backfill_job_factory
.backfill(self.exex_head.block.number + 1..=self.node_head.number)
.into_stream();
self.backfill_job = Some(backfill);
} else {
debug!(target: "exex::manager", "ExEx is behind the node head and not on the canonical chain");
// ExEx is not on the canonical chain, first unwind it and then backfill
// TODO(alexey): unwind and backfill
self.backfill_job = None;
}
// ExEx is behind the node head, start backfill
debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain, starting backfill");
let backfill = backfill_job_factory
.backfill(self.exex_head.block.number + 1..=self.node_head.number)
.into_stream();
self.backfill_job = Some(backfill);
}
#[allow(clippy::branches_sharing_code)]
std::cmp::Ordering::Equal => {
// ExEx is at the same block height as the node head
if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? {
// ExEx is on the canonical chain
debug!(target: "exex::manager", "ExEx is at the same block height as the node head and on the canonical chain");
if exex_header.number != self.exex_head.block.number {
eyre::bail!("ExEx head number does not match the hash")
}
// ExEx is on the canonical chain and the same as the node head, no need to
// backfill
self.backfill_job = None;
} else {
// ExEx is not on the canonical chain, first unwind it and then backfill
debug!(target: "exex::manager", "ExEx is at the same block height as the node head but not on the canonical chain");
// TODO(alexey): unwind and backfill
self.backfill_job = None;
}
debug!(target: "exex::manager", "ExEx is at the node head");
}
std::cmp::Ordering::Greater => {
debug!(target: "exex::manager", "ExEx is ahead of the node head");
// ExEx is ahead of the node head
// TODO(alexey): wait until the node head is at the same height as the ExEx head
// and then repeat the process above
self.node_head_catchup_in_progress = true;
return Err(eyre::eyre!("ExEx is ahead of the node head"))
}
};
@ -270,9 +255,18 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.pending_sync {
this.synchronize()?;
this.pending_sync = false;
if this.pending_check_canonical {
if let Some(canonical_notification) = this.check_canonical()? {
return Poll::Ready(Some(Ok(canonical_notification)))
}
// ExEx head is on the canonical chain, we no longer need to check it
this.pending_check_canonical = false;
}
if this.pending_check_backfill {
this.check_backfill()?;
this.pending_check_backfill = false;
}
if let Some(backfill_job) = &mut this.backfill_job {
@ -286,71 +280,23 @@ where
this.backfill_job = None;
}
loop {
let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
return Poll::Ready(None)
};
let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
return Poll::Ready(None)
};
// 1. Either committed or reverted chain from the notification.
// 2. Block number of the tip of the canonical chain:
// - For committed chain, it's the tip block number.
// - For reverted chain, it's the block number preceding the first block in the chain.
let (chain, tip) = notification
.committed_chain()
.map(|chain| (chain.clone(), chain.tip().number))
.or_else(|| {
notification
.reverted_chain()
.map(|chain| (chain.clone(), chain.first().number - 1))
})
.unzip();
if this.node_head_catchup_in_progress {
// If we are waiting for the node head to catch up to the same height as the ExEx
// head, then we need to check if the ExEx is on the canonical chain.
// Query the chain from the new notification for the ExEx head block number.
let exex_head_block = chain
.as_ref()
.and_then(|chain| chain.blocks().get(&this.exex_head.block.number));
// Compare the hash of the block from the new notification to the ExEx head
// hash.
if let Some((block, tip)) = exex_head_block.zip(tip) {
if block.hash() == this.exex_head.block.hash {
// ExEx is on the canonical chain, proceed with the notification
this.node_head_catchup_in_progress = false;
} else {
// ExEx is not on the canonical chain, synchronize
let tip =
this.provider.sealed_header(tip)?.ok_or_eyre("node head not found")?;
this.node_head = Head::new(
tip.number,
tip.hash(),
tip.difficulty,
U256::MAX,
tip.timestamp,
);
this.synchronize()?;
}
}
}
if notification
.committed_chain()
.or_else(|| notification.reverted_chain())
.map_or(false, |chain| chain.first().number > this.exex_head.block.number)
{
return Poll::Ready(Some(Ok(notification)))
}
if let Some(committed_chain) = notification.committed_chain() {
this.exex_head.block = committed_chain.tip().num_hash();
} else if let Some(reverted_chain) = notification.reverted_chain() {
let first_block = reverted_chain.first();
this.exex_head.block = (first_block.parent_hash, first_block.number - 1).into();
}
Poll::Ready(Some(Ok(notification)))
}
}
#[cfg(test)]
mod tests {
use std::future::poll_fn;
use crate::Wal;
use super::*;
@ -363,7 +309,7 @@ mod tests {
use reth_primitives::Block;
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockWriter,
Chain,
Chain, DatabaseProviderFactory,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
use tokio::sync::mpsc;
@ -451,12 +397,6 @@ mod tests {
Ok(())
}
#[ignore]
#[tokio::test]
async fn exex_notifications_behind_head_non_canonical() -> eyre::Result<()> {
Ok(())
}
#[tokio::test]
async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
let temp_dir = tempfile::tempdir().unwrap();
@ -512,18 +452,112 @@ mod tests {
Ok(())
}
#[ignore]
#[tokio::test]
async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
let mut rng = generators::rng();
let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory
.block(genesis_hash.into())?
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let provider = BlockchainProvider2::new(provider_factory)?;
let node_head_block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
)
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?;
let node_head = Head {
number: node_head_block.number,
hash: node_head_block.hash(),
..Default::default()
};
let provider_rw = provider.database_provider_rw()?;
provider_rw.insert_block(node_head_block)?;
provider_rw.commit()?;
let node_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(
BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
.backfill(node_head.number..=node_head.number)
.next()
.ok_or_else(|| eyre::eyre!("failed to backfill"))??,
),
};
let exex_head_block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
let exex_head = ExExHead { block: exex_head_block.num_hash() };
let exex_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![exex_head_block
.clone()
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
};
wal.commit(&exex_head_notification)?;
let new_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
notifications_tx.send(new_notification.clone()).await?;
let mut notifications = ExExNotifications::new(
node_head,
provider,
EthExecutorProvider::mainnet(),
notifications_rx,
wal.handle(),
)
.with_head(exex_head);
// First notification is the revert of the ExEx head block to get back to the canonical
// chain
assert_eq!(
notifications.next().await.transpose()?,
Some(exex_head_notification.into_inverted())
);
// Second notification is the backfilled block from the canonical chain to get back to the
// canonical tip
assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
// Third notification is the actual notification that we sent before
assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
Ok(())
}
#[tokio::test]
async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let mut rng = generators::rng();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
@ -538,6 +572,17 @@ mod tests {
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
let exex_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![exex_head_block
.clone()
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
};
wal.commit(&exex_head_notification)?;
let node_head =
Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() };
@ -545,20 +590,23 @@ mod tests {
block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
};
let new_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), ..Default::default() },
)
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
notifications_tx
.send(ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![exex_head_block
.clone()
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
})
.await?;
notifications_tx.send(new_notification.clone()).await?;
let mut notifications = ExExNotifications::new(
node_head,
@ -569,29 +617,15 @@ mod tests {
)
.with_head(exex_head);
// First notification is skipped because the node is catching up with the ExEx
let new_notification = poll_fn(|cx| Poll::Ready(notifications.poll_next_unpin(cx))).await;
assert!(new_notification.is_pending());
// First notification is the revert of the ExEx head block to get back to the canonical
// chain
assert_eq!(
notifications.next().await.transpose()?,
Some(exex_head_notification.into_inverted())
);
// Imitate the node catching up with the ExEx by sending a notification for the missing
// block
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
exex_head_block.number + 1,
BlockParams { parent: Some(exex_head_block.hash()), ..Default::default() },
)
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?],
Default::default(),
None,
)),
};
notifications_tx.send(notification.clone()).await?;
// Second notification is received because the node caught up with the ExEx
assert_eq!(notifications.next().await.transpose()?, Some(notification));
// Second notification is the actual notification that we sent before
assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
Ok(())
}

View File

@ -18,17 +18,17 @@ pub struct BlockCache {
/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks
/// in the cache.
files: RwLock<BTreeMap<u64, VecDeque<CachedBlock>>>,
/// A mapping of `Block Hash -> Block`.
/// A mapping of committed blocks `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
blocks: DashMap<B256, CachedBlock>,
committed_blocks: DashMap<B256, (u64, CachedBlock)>,
}
impl BlockCache {
/// Creates a new instance of [`BlockCache`].
pub(super) fn new() -> Self {
Self { files: RwLock::new(BTreeMap::new()), blocks: DashMap::new() }
Self { files: RwLock::new(BTreeMap::new()), committed_blocks: DashMap::new() }
}
/// Returns `true` if the cache is empty.
@ -95,6 +95,12 @@ impl BlockCache {
Some((key, last_block))
}
/// Returns the file ID for the notification containing the given committed block hash, if it
/// exists.
pub(super) fn get_file_id_by_committed_block_hash(&self, block_hash: &B256) -> Option<u64> {
self.committed_blocks.get(block_hash).map(|entry| entry.0)
}
/// Inserts the blocks from the notification into the cache with the given file ID.
///
/// First, inserts the reverted blocks (if any), then the committed blocks (if any).
@ -126,7 +132,7 @@ impl BlockCache {
parent_hash: block.parent_hash,
};
files.entry(file_id).or_default().push_back(cached_block);
self.blocks.insert(block.hash(), cached_block);
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
}
}
}

View File

@ -3,12 +3,14 @@
mod cache;
pub use cache::BlockCache;
mod storage;
use eyre::OptionExt;
pub use storage::Storage;
use std::{path::Path, sync::Arc};
use alloy_eips::BlockNumHash;
use reth_exex_types::ExExNotification;
use reth_primitives::B256;
use reth_tracing::tracing::{debug, instrument};
/// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes.
@ -135,7 +137,10 @@ impl WalInner {
block.block.number == to_block.number &&
block.block.hash == to_block.hash
{
let notification = self.storage.read_notification(file_id)?;
let notification = self
.storage
.read_notification(file_id)?
.ok_or_eyre("notification not found")?;
if notification.committed_chain().unwrap().blocks().len() == 1 {
unfinalized_from_file_id = Some(
block_cache.peek().map(|(file_id, _)| *file_id).unwrap_or(u64::MAX),
@ -207,6 +212,21 @@ pub struct WalHandle {
wal: Arc<WalInner>,
}
impl WalHandle {
/// Returns the notification for the given committed block hash if it exists.
pub fn get_committed_notification_by_block_hash(
&self,
block_hash: &B256,
) -> eyre::Result<Option<ExExNotification>> {
let Some(file_id) = self.wal.block_cache.get_file_id_by_committed_block_hash(block_hash)
else {
return Ok(None)
};
self.wal.storage.read_notification(file_id)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@ -84,16 +84,25 @@ impl Storage {
&self,
range: RangeInclusive<u64>,
) -> impl Iterator<Item = eyre::Result<(u64, ExExNotification)>> + '_ {
range.map(move |id| self.read_notification(id).map(|notification| (id, notification)))
range.map(move |id| {
let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?;
Ok((id, notification))
})
}
/// Reads the notification from the file with the given id.
#[instrument(target = "exex::wal::storage", skip(self))]
pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result<ExExNotification> {
pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result<Option<ExExNotification>> {
let file_path = self.file_path(file_id);
debug!(?file_path, "Reading notification from WAL");
let mut file = File::open(&file_path)?;
let mut file = match File::open(&file_path) {
Ok(file) => file,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
};
// TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved
Ok(serde_json::from_reader(&mut file)?)
}
@ -149,7 +158,7 @@ mod tests {
let file_id = 0;
storage.write_notification(file_id, &notification)?;
let deserialized_notification = storage.read_notification(file_id)?;
assert_eq!(deserialized_notification, notification);
assert_eq!(deserialized_notification, Some(notification));
Ok(())
}