mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Convert pending block to a watch channel (#10203)
This commit is contained in:
@ -20,7 +20,7 @@ use std::{
|
||||
sync::Arc,
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
|
||||
/// Size of the broadcast channel used to notify canonical state events.
|
||||
const CANON_STATE_NOTIFICATION_CHANNEL_SIZE: usize = 256;
|
||||
@ -48,7 +48,7 @@ pub(crate) struct InMemoryState {
|
||||
/// Mapping of block numbers to block hashes.
|
||||
numbers: RwLock<BTreeMap<u64, B256>>,
|
||||
/// The pending block that has not yet been made canonical.
|
||||
pending: RwLock<Option<BlockState>>,
|
||||
pending: watch::Sender<Option<BlockState>>,
|
||||
/// Metrics for the in-memory state.
|
||||
metrics: InMemoryStateMetrics,
|
||||
}
|
||||
@ -59,10 +59,11 @@ impl InMemoryState {
|
||||
numbers: BTreeMap<u64, B256>,
|
||||
pending: Option<BlockState>,
|
||||
) -> Self {
|
||||
let (pending, _) = watch::channel(pending);
|
||||
let this = Self {
|
||||
blocks: RwLock::new(blocks),
|
||||
numbers: RwLock::new(numbers),
|
||||
pending: RwLock::new(pending),
|
||||
pending,
|
||||
metrics: Default::default(),
|
||||
};
|
||||
this.update_metrics();
|
||||
@ -112,7 +113,7 @@ impl InMemoryState {
|
||||
/// Returns the pending state corresponding to the current head plus one,
|
||||
/// from the payload received in newPayload that does not have a FCU yet.
|
||||
pub(crate) fn pending_state(&self) -> Option<Arc<BlockState>> {
|
||||
self.pending.read().as_ref().map(|state| Arc::new(BlockState::new(state.block.clone())))
|
||||
self.pending.borrow().as_ref().map(|state| Arc::new(BlockState::new(state.block.clone())))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -140,11 +141,11 @@ impl CanonicalInMemoryStateInner {
|
||||
{
|
||||
let mut blocks = self.in_memory_state.blocks.write();
|
||||
let mut numbers = self.in_memory_state.numbers.write();
|
||||
let mut pending = self.in_memory_state.pending.write();
|
||||
|
||||
blocks.clear();
|
||||
numbers.clear();
|
||||
pending.take();
|
||||
self.in_memory_state.pending.send_modify(|p| {
|
||||
p.take();
|
||||
});
|
||||
}
|
||||
self.in_memory_state.update_metrics();
|
||||
}
|
||||
@ -229,7 +230,9 @@ impl CanonicalInMemoryState {
|
||||
// fetch the state of the pending block's parent block
|
||||
let parent = self.state_by_hash(pending.block().parent_hash);
|
||||
let pending = BlockState::with_parent(pending, parent.map(|p| (*p).clone()));
|
||||
*self.inner.in_memory_state.pending.write() = Some(pending);
|
||||
self.inner.in_memory_state.pending.send_modify(|p| {
|
||||
p.replace(pending);
|
||||
});
|
||||
self.inner.in_memory_state.update_metrics();
|
||||
}
|
||||
|
||||
@ -242,7 +245,6 @@ impl CanonicalInMemoryState {
|
||||
// acquire all locks
|
||||
let mut numbers = self.inner.in_memory_state.numbers.write();
|
||||
let mut blocks = self.inner.in_memory_state.blocks.write();
|
||||
let mut pending = self.inner.in_memory_state.pending.write();
|
||||
|
||||
// we first remove the blocks from the reorged chain
|
||||
for block in reorged {
|
||||
@ -266,7 +268,9 @@ impl CanonicalInMemoryState {
|
||||
}
|
||||
|
||||
// remove the pending state
|
||||
pending.take();
|
||||
self.inner.in_memory_state.pending.send_modify(|p| {
|
||||
p.take();
|
||||
});
|
||||
}
|
||||
self.inner.in_memory_state.update_metrics();
|
||||
}
|
||||
@ -291,7 +295,6 @@ impl CanonicalInMemoryState {
|
||||
{
|
||||
let mut blocks = self.inner.in_memory_state.blocks.write();
|
||||
let mut numbers = self.inner.in_memory_state.numbers.write();
|
||||
let mut pending = self.inner.in_memory_state.pending.write();
|
||||
|
||||
// clear all numbers
|
||||
numbers.clear();
|
||||
@ -319,12 +322,14 @@ impl CanonicalInMemoryState {
|
||||
}
|
||||
|
||||
// also shift the pending state if it exists
|
||||
if let Some(pending) = pending.as_mut() {
|
||||
pending.parent = blocks
|
||||
.get(&pending.block().block.parent_hash)
|
||||
.cloned()
|
||||
.map(|p| Box::new((*p).clone()));
|
||||
}
|
||||
self.inner.in_memory_state.pending.send_modify(|p| {
|
||||
if let Some(p) = p.as_mut() {
|
||||
p.parent = blocks
|
||||
.get(&p.block().block.parent_hash)
|
||||
.cloned()
|
||||
.map(|p| Box::new((*p).clone()));
|
||||
}
|
||||
});
|
||||
}
|
||||
self.inner.in_memory_state.update_metrics();
|
||||
}
|
||||
@ -486,7 +491,7 @@ impl CanonicalInMemoryState {
|
||||
|
||||
/// Returns an iterator over all canonical blocks in the in-memory state, from newest to oldest.
|
||||
pub fn canonical_chain(&self) -> impl Iterator<Item = Arc<BlockState>> {
|
||||
let pending = self.inner.in_memory_state.pending.read().clone();
|
||||
let pending = self.inner.in_memory_state.pending.borrow().clone();
|
||||
let head = self.inner.in_memory_state.head_state();
|
||||
|
||||
// this clone is cheap because we only expect to keep in memory a few
|
||||
|
||||
Reference in New Issue
Block a user