feat(exex): WAL metrics (#11431)

This commit is contained in:
Alexey Shekhirin
2024-10-02 18:49:03 +03:00
committed by GitHub
parent 2ca8231e73
commit e882d00f1a
5 changed files with 147 additions and 35 deletions

View File

@ -162,7 +162,7 @@ impl ExExHandle {
/// Metrics for the `ExEx` manager.
#[derive(Metrics)]
#[metrics(scope = "exex_manager")]
#[metrics(scope = "exex.manager")]
pub struct ExExManagerMetrics {
/// Max size of the internal state notifications buffer.
max_capacity: Gauge,

View File

@ -14,18 +14,25 @@ use reth_exex_types::ExExNotification;
#[derive(Debug, Default)]
pub struct BlockCache {
/// A min heap of `(Block Number, File ID)` tuples.
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
///
/// Contains one highest block in notification. In a notification with both committed and
/// reverted chain, the highest block is chosen between both chains.
pub(super) notification_max_blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
/// A mapping of committed blocks `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>,
/// Block height of the lowest committed block currently in the cache.
pub(super) lowest_committed_block_height: Option<BlockNumber>,
/// Block height of the highest committed block currently in the cache.
pub(super) highest_committed_block_height: Option<BlockNumber>,
}
impl BlockCache {
/// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool {
self.blocks.is_empty()
self.notification_max_blocks.is_empty()
}
/// Removes all files from the cache that has notifications with a tip block less than or equal
@ -37,9 +44,11 @@ impl BlockCache {
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u32> {
let mut file_ids = HashSet::default();
while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() {
while let Some(block @ Reverse((max_block, file_id))) =
self.notification_max_blocks.peek().copied()
{
if max_block <= block_number {
let popped_block = self.blocks.pop().unwrap();
let popped_block = self.notification_max_blocks.pop().unwrap();
debug_assert_eq!(popped_block, block);
file_ids.insert(file_id);
} else {
@ -47,7 +56,25 @@ impl BlockCache {
}
}
self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id));
let (mut lowest_committed_block_height, mut highest_committed_block_height) = (None, None);
self.committed_blocks.retain(|_, (file_id, block)| {
let retain = !file_ids.contains(file_id);
if retain {
lowest_committed_block_height = Some(
lowest_committed_block_height
.map_or(block.block.number, |lowest| block.block.number.min(lowest)),
);
highest_committed_block_height = Some(
highest_committed_block_height
.map_or(block.block.number, |highest| block.block.number.max(highest)),
);
}
retain
});
self.lowest_committed_block_height = lowest_committed_block_height;
self.highest_committed_block_height = highest_committed_block_height;
file_ids
}
@ -70,7 +97,7 @@ impl BlockCache {
let max_block =
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max();
if let Some(max_block) = max_block {
self.blocks.push(Reverse((max_block, file_id)));
self.notification_max_blocks.push(Reverse((max_block, file_id)));
}
if let Some(committed_chain) = &committed_chain {
@ -81,12 +108,19 @@ impl BlockCache {
};
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
}
self.highest_committed_block_height = Some(committed_chain.tip().number);
}
}
#[cfg(test)]
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> {
self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect()
self.notification_max_blocks
.clone()
.into_sorted_vec()
.into_iter()
.map(|entry| entry.0)
.collect()
}
#[cfg(test)]

View File

@ -0,0 +1,18 @@
use metrics::Gauge;
use reth_metrics::Metrics;
/// Metrics for the [WAL](`super::Wal`)
#[derive(Metrics)]
#[metrics(scope = "exex.wal")]
pub(super) struct Metrics {
/// Size of all notifications in WAL in bytes
pub size_bytes: Gauge,
/// Total number of notifications in WAL
pub notifications_total: Gauge,
/// Total number of committed blocks in WAL
pub committed_blocks_total: Gauge,
/// Lowest committed block height in WAL
pub lowest_committed_block_height: Gauge,
/// Highest committed block height in WAL
pub highest_committed_block_height: Gauge,
}

View File

@ -3,8 +3,9 @@
mod cache;
pub use cache::BlockCache;
mod storage;
use parking_lot::{RwLock, RwLockReadGuard};
pub use storage::Storage;
mod metrics;
use metrics::Metrics;
use std::{
path::Path,
@ -16,6 +17,7 @@ use std::{
use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use parking_lot::{RwLock, RwLockReadGuard};
use reth_exex_types::ExExNotification;
use reth_tracing::tracing::{debug, instrument};
@ -74,6 +76,7 @@ struct WalInner {
storage: Storage,
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
block_cache: RwLock<BlockCache>,
metrics: Metrics,
}
impl WalInner {
@ -82,6 +85,7 @@ impl WalInner {
next_file_id: AtomicU32::new(0),
storage: Storage::new(directory)?,
block_cache: RwLock::new(BlockCache::default()),
metrics: Metrics::default(),
};
wal.fill_block_cache()?;
Ok(wal)
@ -98,9 +102,12 @@ impl WalInner {
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);
let mut block_cache = self.block_cache.write();
let mut notifications_size = 0;
for entry in self.storage.iter_notifications(files_range) {
let (file_id, notification) = entry?;
let (file_id, size, notification) = entry?;
notifications_size += size;
let committed_chain = notification.committed_chain();
let reverted_chain = notification.reverted_chain();
@ -116,6 +123,8 @@ impl WalInner {
block_cache.insert_notification_blocks_with_file_id(file_id, &notification);
}
self.update_metrics(&block_cache, notifications_size as i64);
Ok(())
}
@ -127,17 +136,20 @@ impl WalInner {
let mut block_cache = self.block_cache.write();
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
self.storage.write_notification(file_id, notification)?;
let size = self.storage.write_notification(file_id, notification)?;
debug!(?file_id, "Inserting notification blocks into the block cache");
block_cache.insert_notification_blocks_with_file_id(file_id, notification);
self.update_metrics(&block_cache, size as i64);
Ok(())
}
#[instrument(target = "exex::wal", skip(self))]
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
let file_ids = self.block_cache.write().remove_before(to_block.number);
let mut block_cache = self.block_cache.write();
let file_ids = block_cache.remove_before(to_block.number);
// Remove notifications from the storage.
if file_ids.is_empty() {
@ -145,12 +157,34 @@ impl WalInner {
return Ok(())
}
let removed_notifications = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, "Storage was finalized");
let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, ?removed_size, "Storage was finalized");
self.update_metrics(&block_cache, -(removed_size as i64));
Ok(())
}
fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) {
if size_delta >= 0 {
self.metrics.size_bytes.increment(size_delta as f64);
} else {
self.metrics.size_bytes.decrement(size_delta as f64);
}
self.metrics.notifications_total.set(block_cache.notification_max_blocks.len() as f64);
self.metrics.committed_blocks_total.set(block_cache.committed_blocks.len() as f64);
if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height {
self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64);
}
if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height {
self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64);
}
}
/// Returns an iterator over all notifications in the WAL.
fn iter_notifications(
&self,
@ -159,7 +193,7 @@ impl WalInner {
return Ok(Box::new(std::iter::empty()))
};
Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.1))))
Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2))))
}
}
@ -180,7 +214,10 @@ impl WalHandle {
return Ok(None)
};
self.wal.storage.read_notification(file_id)
self.wal
.storage
.read_notification(file_id)
.map(|entry| entry.map(|(notification, _)| notification))
}
}
@ -205,7 +242,7 @@ mod tests {
wal.inner
.storage
.iter_notifications(files_range)
.map(|entry| Ok(entry?.1))
.map(|entry| Ok(entry?.2))
.collect::<eyre::Result<_>>()
}

View File

@ -40,16 +40,23 @@ impl Storage {
}
/// Removes notification for the given file ID from the storage.
///
/// # Returns
///
/// The size of the file that was removed in bytes, if any.
#[instrument(target = "exex::wal::storage", skip(self))]
fn remove_notification(&self, file_id: u32) -> bool {
fn remove_notification(&self, file_id: u32) -> Option<u64> {
let path = self.file_path(file_id);
let size = path.metadata().ok()?.len();
match reth_fs_util::remove_file(self.file_path(file_id)) {
Ok(()) => {
debug!("Notification was removed from the storage");
true
Some(size)
}
Err(err) => {
debug!(?err, "Failed to remove notification from the storage");
false
None
}
}
}
@ -77,36 +84,42 @@ impl Storage {
///
/// # Returns
///
/// Number of removed notifications.
/// Number of removed notifications and the total size of the removed files in bytes.
pub(super) fn remove_notifications(
&self,
file_ids: impl IntoIterator<Item = u32>,
) -> eyre::Result<usize> {
let mut deleted = 0;
) -> eyre::Result<(usize, u64)> {
let mut deleted_total = 0;
let mut deleted_size = 0;
for id in file_ids {
if self.remove_notification(id) {
deleted += 1;
if let Some(size) = self.remove_notification(id) {
deleted_total += 1;
deleted_size += size;
}
}
Ok(deleted)
Ok((deleted_total, deleted_size))
}
pub(super) fn iter_notifications(
&self,
range: RangeInclusive<u32>,
) -> impl Iterator<Item = eyre::Result<(u32, ExExNotification)>> + '_ {
) -> impl Iterator<Item = eyre::Result<(u32, u64, ExExNotification)>> + '_ {
range.map(move |id| {
let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?;
let (notification, size) =
self.read_notification(id)?.ok_or_eyre("notification not found")?;
Ok((id, notification))
Ok((id, size, notification))
})
}
/// Reads the notification from the file with the given ID.
#[instrument(target = "exex::wal::storage", skip(self))]
pub(super) fn read_notification(&self, file_id: u32) -> eyre::Result<Option<ExExNotification>> {
pub(super) fn read_notification(
&self,
file_id: u32,
) -> eyre::Result<Option<(ExExNotification, u64)>> {
let file_path = self.file_path(file_id);
debug!(?file_path, "Reading notification from WAL");
@ -115,21 +128,26 @@ impl Storage {
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
};
let size = file.metadata()?.len();
// Deserialize using the bincode- and msgpack-compatible serde wrapper
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> =
rmp_serde::decode::from_read(&mut file)?;
Ok(Some(notification.into()))
Ok(Some((notification.into(), size)))
}
/// Writes the notification to the file with the given ID.
///
/// # Returns
///
/// The size of the file that was written in bytes.
#[instrument(target = "exex::wal::storage", skip(self, notification))]
pub(super) fn write_notification(
&self,
file_id: u32,
notification: &ExExNotification,
) -> eyre::Result<()> {
) -> eyre::Result<u64> {
let file_path = self.file_path(file_id);
debug!(?file_path, "Writing notification to WAL");
@ -137,9 +155,11 @@ impl Storage {
let notification =
reth_exex_types::serde_bincode_compat::ExExNotification::from(notification);
Ok(reth_fs_util::atomic_write_file(&file_path, |file| {
reth_fs_util::atomic_write_file(&file_path, |file| {
rmp_serde::encode::write(file, &notification)
})?)
})?;
Ok(file_path.metadata()?.len())
}
}
@ -177,7 +197,10 @@ mod tests {
let file_id = 0;
storage.write_notification(file_id, &notification)?;
let deserialized_notification = storage.read_notification(file_id)?;
assert_eq!(deserialized_notification, Some(notification));
assert_eq!(
deserialized_notification.map(|(notification, _)| notification),
Some(notification)
);
Ok(())
}