feat: keep sender in tree task (#10148)

This commit is contained in:
Matthias Seitz
2024-08-06 21:19:49 +02:00
committed by GitHub
parent b01ccc271d
commit b0ded038ba
2 changed files with 29 additions and 17 deletions

View File

@ -45,7 +45,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
ops::Bound,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError},
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Arc,
},
time::Instant,
@ -365,6 +365,15 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
payload_validator: ExecutionPayloadValidator,
/// Keeps track of internals such as executed and buffered blocks.
state: EngineApiTreeState,
/// The half for sending messages to the engine.
///
/// This is kept so that we can queue in messages to ourself that we can process later, for
/// example distributing workload across multiple messages that would otherwise take too long
/// to process. E.g. we might receive a range of downloaded blocks and we want to process
/// them one by one so that we can handle incoming engine API in between and don't become
/// unresponsive. This can happen during live sync transition where we're trying to close the
/// gap (up to 3 epochs of blocks in the worst case).
incoming_tx: Sender<FromEngine<BeaconEngineMessage<T>>>,
/// Incoming engine API requests.
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
/// Outgoing events that are emitted to the handler.
@ -400,7 +409,6 @@ where
executor_provider: E,
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
state: EngineApiTreeState,
canonical_in_memory_state: CanonicalInMemoryState,
@ -409,6 +417,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
config: TreeConfig,
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();
Self {
provider,
executor_provider,
@ -424,24 +433,26 @@ where
payload_builder,
config,
metrics: Default::default(),
incoming_tx,
}
}
/// Creates a new `EngineApiTreeHandlerImpl` instance and spawns it in its
/// own thread. Returns the receiver end of a `EngineApiEvent` unbounded
/// channel to receive events from the engine.
/// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
/// own thread.
///
/// Returns the sender through which incoming requests can be sent to the task and the receiver
/// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
#[allow(clippy::too_many_arguments)]
pub fn spawn_new(
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
persistence: PersistenceHandle,
payload_builder: PayloadBuilderHandle<T>,
canonical_in_memory_state: CanonicalInMemoryState,
config: TreeConfig,
) -> UnboundedReceiver<EngineApiEvent> {
) -> (Sender<FromEngine<BeaconEngineMessage<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
@ -463,7 +474,6 @@ where
executor_provider,
consensus,
payload_validator,
incoming,
tx,
state,
canonical_in_memory_state,
@ -472,8 +482,14 @@ where
payload_builder,
config,
);
let incoming = task.incoming_tx.clone();
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
outgoing
(incoming, outgoing)
}
/// Returns a new [`Sender`] to send messages to this type.
pub fn sender(&self) -> Sender<FromEngine<BeaconEngineMessage<T>>> {
self.incoming_tx.clone()
}
/// Run the engine API handler.
@ -504,6 +520,7 @@ where
/// Invoked when previously requested blocks were downloaded.
fn on_downloaded(&mut self, blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
trace!(target: "engine", block_count = %blocks.len(), "received downloaded blocks");
// TODO(mattsse): on process a certain number of blocks sequentially
for block in blocks {
if let Some(event) = self.on_downloaded_block(block) {
let needs_backfill = event.is_backfill_action();
@ -1970,7 +1987,6 @@ mod tests {
let payload_validator = ExecutionPayloadValidator::new(chain_spec.clone());
let (to_tree_tx, to_tree_rx) = channel();
let (from_tree_tx, from_tree_rx) = unbounded_channel();
let header = chain_spec.genesis_header().seal_slow();
@ -1985,7 +2001,6 @@ mod tests {
executor_provider.clone(),
consensus,
payload_validator,
to_tree_rx,
from_tree_tx,
engine_api_tree_state,
canonical_in_memory_state,
@ -1999,8 +2014,8 @@ mod tests {
.with_chain_spec((*chain_spec).clone())
.with_signer(Address::random());
Self {
to_tree_tx: tree.incoming_tx.clone(),
tree,
to_tree_tx,
from_tree_rx,
blocks: vec![],
action_rx,

View File

@ -25,7 +25,7 @@ use reth_stages_api::Pipeline;
use reth_tasks::TaskSpawner;
use std::{
pin::Pin,
sync::{mpsc::channel, Arc},
sync::Arc,
task::{Context, Poll},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -73,20 +73,17 @@ where
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let (to_tree_tx, to_tree_rx) = channel();
let persistence_handle = PersistenceHandle::spawn_service(provider, pruner);
let payload_validator = ExecutionPayloadValidator::new(chain_spec.clone());
let executor_factory = EthExecutorProvider::ethereum(chain_spec);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let from_tree = EngineApiTreeHandler::spawn_new(
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
blockchain_db,
executor_factory,
consensus,
payload_validator,
to_tree_rx,
persistence_handle,
payload_builder,
canonical_in_memory_state,