refactor(sdk): complete generic impl for PersistenceService over N::Primitives (#13044)

This commit is contained in:
Léa Narzis
2024-12-04 11:50:46 +01:00
committed by GitHub
parent c060df92af
commit 025885f2ad
5 changed files with 143 additions and 131 deletions

View File

@ -88,7 +88,7 @@ where
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let persistence_handle =
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx);
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(

View File

@ -18,6 +18,7 @@ reth-engine-tree.workspace = true
reth-evm.workspace = true
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-stages-api.workspace = true

View File

@ -19,6 +19,7 @@ use reth_evm::execute::BlockExecutorProvider;
use reth_network_p2p::EthBlockClient;
use reth_node_types::{BlockTy, NodeTypesWithEngine};
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives::EthPrimitives;
use reth_provider::{providers::BlockchainProvider2, ProviderFactory};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
@ -90,7 +91,7 @@ where
let downloader = BasicBlockDownloader::new(client, consensus.clone().as_consensus());
let persistence_handle =
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx);
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();

View File

@ -30,11 +30,14 @@ impl<T> PersistenceNodeTypes for T where T: ProviderNodeTypes<Primitives = EthPr
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
/// blocking I/O operations in an endless loop.
#[derive(Debug)]
pub struct PersistenceService<N: ProviderNodeTypes> {
pub struct PersistenceService<N>
where
N: PersistenceNodeTypes,
{
/// The provider factory to use
provider: ProviderFactory<N>,
/// Incoming requests
incoming: Receiver<PersistenceAction>,
incoming: Receiver<PersistenceAction<N::Primitives>>,
/// The pruner
pruner: PrunerWithFactory<ProviderFactory<N>>,
/// metrics
@ -43,11 +46,14 @@ pub struct PersistenceService<N: ProviderNodeTypes> {
sync_metrics_tx: MetricEventsSender,
}
impl<N: ProviderNodeTypes> PersistenceService<N> {
impl<N> PersistenceService<N>
where
N: PersistenceNodeTypes,
{
/// Create a new persistence service
pub fn new(
provider: ProviderFactory<N>,
incoming: Receiver<PersistenceAction>,
incoming: Receiver<PersistenceAction<N::Primitives>>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
) -> Self {
@ -66,7 +72,10 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
}
}
impl<N: PersistenceNodeTypes> PersistenceService<N> {
impl<N> PersistenceService<N>
where
N: PersistenceNodeTypes,
{
/// This is the main loop, that will listen to database events and perform the requested
/// database actions
pub fn run(mut self) -> Result<(), PersistenceError> {
@ -135,7 +144,7 @@ impl<N: PersistenceNodeTypes> PersistenceService<N> {
fn on_save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
blocks: Vec<ExecutedBlock<N::Primitives>>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
let start_time = Instant::now();
@ -194,27 +203,29 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
#[derive(Debug, Clone)]
pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
/// The channel used to communicate with the persistence service
sender: Sender<PersistenceAction>,
_marker: std::marker::PhantomData<N>,
sender: Sender<PersistenceAction<N>>,
}
impl<T: NodePrimitives> PersistenceHandle<T> {
/// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
pub const fn new(sender: Sender<PersistenceAction>) -> Self {
Self { sender, _marker: std::marker::PhantomData }
pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
Self { sender }
}
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
pub fn spawn_service<N: PersistenceNodeTypes>(
pub fn spawn_service<N>(
provider_factory: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
) -> Self {
) -> PersistenceHandle<N::Primitives>
where
N: PersistenceNodeTypes,
{
// create the initial channels
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
// construct persistence handle
let persistence_handle = Self::new(db_service_tx);
let persistence_handle = PersistenceHandle::new(db_service_tx);
// spawn the persistence service
let db_service =
@ -235,8 +246,8 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
/// for creating any channels for the given action.
pub fn send_action(
&self,
action: PersistenceAction,
) -> Result<(), SendError<PersistenceAction>> {
action: PersistenceAction<T>,
) -> Result<(), SendError<PersistenceAction<T>>> {
self.sender.send(action)
}
@ -250,9 +261,9 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
/// If there are no blocks to persist, then `None` is sent in the sender.
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
blocks: Vec<ExecutedBlock<T>>,
tx: oneshot::Sender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction>> {
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}
@ -260,7 +271,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
pub fn save_finalized_block_number(
&self,
finalized_block: u64,
) -> Result<(), SendError<PersistenceAction>> {
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
}
@ -268,7 +279,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
pub fn save_safe_block_number(
&self,
safe_block: u64,
) -> Result<(), SendError<PersistenceAction>> {
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
}
@ -281,7 +292,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
&self,
block_num: u64,
tx: oneshot::Sender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction>> {
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
}
}
@ -296,7 +307,7 @@ mod tests {
use reth_prune::Pruner;
use tokio::sync::mpsc::unbounded_channel;
fn default_persistence_handle() -> PersistenceHandle {
fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
let provider = create_test_provider_factory();
let (_finished_exex_height_tx, finished_exex_height_rx) =
@ -306,7 +317,7 @@ mod tests {
Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx)
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
}
#[tokio::test]