feat(engine): integrate executor with StateRootTask (#12335)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Federico Gimenez
2024-11-06 20:27:33 +01:00
committed by GitHub
parent e084bed089
commit 2c5ba732b7
4 changed files with 86 additions and 15 deletions

View File

@ -43,6 +43,7 @@ revm-primitives.workspace = true
# common
futures.workspace = true
pin-project.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
tokio-stream.workspace = true
thiserror.workspace = true

View File

@ -57,9 +57,8 @@ use std::{
time::Instant,
};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
oneshot::error::TryRecvError,
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot::{self, error::TryRecvError},
};
use tracing::*;
@ -612,7 +611,7 @@ where
remove_above_state: VecDeque::new(),
};
let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel();
let (tx, outgoing) = unbounded_channel();
let state = EngineApiTreeState::new(
config.block_buffer_limit(),
config.max_invalid_header_cache_length(),
@ -2188,6 +2187,7 @@ where
let block = block.unseal();
let exec_time = Instant::now();
// TODO: create StateRootTask with the receiving end of a channel and
// pass the sending end of the channel to the state hook.
let noop_state_hook = |_result_and_state: &ResultAndState| {};
@ -2198,6 +2198,7 @@ where
)?;
trace!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block");
if let Err(err) = self.consensus.validate_block_post_execution(
&block,
PostExecutionInput::new(&output.receipts, &output.requests),
@ -2218,6 +2219,8 @@ where
let root_time = Instant::now();
let mut state_root_result = None;
// TODO: switch to calculate state root using `StateRootTask`.
// We attempt to compute state root in parallel if we are currently not persisting anything
// to database. This is safe, because the database state cannot change until we
// finish parallel computation. It is important that nothing is being persisted as
@ -2305,6 +2308,9 @@ where
parent_hash: B256,
hashed_state: &HashedPostState,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
// TODO: when we switch to calculate state root using `StateRootTask` this
// method can be still useful to calculate the required `TrieInput` to
// create the task.
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut input = TrieInput::default();
@ -2607,7 +2613,6 @@ mod tests {
str::FromStr,
sync::mpsc::{channel, Sender},
};
use tokio::sync::mpsc::unbounded_channel;
/// This is a test channel that allows you to `release` any value that is in the channel.
///

View File

@ -1,5 +1,7 @@
//! State root task related functionality.
use futures::Stream;
use pin_project::pin_project;
use reth_provider::providers::ConsistentDbView;
use reth_trie::{updates::TrieUpdates, TrieInput};
use reth_trie_parallel::parallel_root::ParallelStateRootError;
@ -7,31 +9,55 @@ use revm_primitives::{EvmState, B256};
use std::{
future::Future,
pin::Pin,
sync::Arc,
sync::{mpsc, Arc},
task::{Context, Poll},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug;
/// Result of the state root calculation
pub(crate) type StateRootResult = Result<(B256, TrieUpdates), ParallelStateRootError>;
/// Handle to a spawned state root task.
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct StateRootHandle {
/// Channel for receiving the final result.
rx: mpsc::Receiver<StateRootResult>,
}
#[allow(dead_code)]
impl StateRootHandle {
/// Waits for the state root calculation to complete.
pub(crate) fn wait_for_result(self) -> StateRootResult {
self.rx.recv().expect("state root task was dropped without sending result")
}
}
/// Standalone task that receives a transaction state stream and updates relevant
/// data structures to calculate state root.
///
/// It is responsile of initializing a blinded sparse trie and subscribe to
/// It is responsible of initializing a blinded sparse trie and subscribe to
/// transaction state stream. As it receives transaction execution results, it
/// fetches the proofs for relevant accounts from the database and reveal them
/// to the tree.
/// Then it updates relevant leaves according to the result of the transaction.
#[allow(dead_code)]
#[pin_project]
pub(crate) struct StateRootTask<Factory> {
/// View over the state in the database.
consistent_view: ConsistentDbView<Factory>,
/// Incoming state updates.
#[pin]
state_stream: UnboundedReceiverStream<EvmState>,
/// Latest trie input.
input: Arc<TrieInput>,
}
#[allow(dead_code)]
impl<Factory> StateRootTask<Factory> {
impl<Factory> StateRootTask<Factory>
where
Factory: Send + 'static,
{
/// Creates a new `StateRootTask`.
pub(crate) const fn new(
consistent_view: ConsistentDbView<Factory>,
@ -41,20 +67,58 @@ impl<Factory> StateRootTask<Factory> {
Self { consistent_view, state_stream, input }
}
/// Spawns the state root task and returns a handle to await its result.
pub(crate) fn spawn(self) -> StateRootHandle {
let (tx, rx) = mpsc::channel();
// Spawn the task that will process state updates and calculate the root
tokio::spawn(async move {
debug!(target: "engine::tree", "Starting state root task");
let result = self.await;
let _ = tx.send(result);
});
StateRootHandle { rx }
}
/// Handles state updates.
pub(crate) fn on_state_update(&self, _update: EvmState) {
fn on_state_update(
_view: &ConsistentDbView<Factory>,
_input: &Arc<TrieInput>,
_state: EvmState,
) {
// TODO: calculate hashed state update and dispatch proof gathering for it.
}
}
impl<Factory> Future for StateRootTask<Factory> {
type Output = Result<(B256, TrieUpdates), ParallelStateRootError>;
impl<Factory> Future for StateRootTask<Factory>
where
Factory: Send + 'static,
{
type Output = StateRootResult;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
// Process all items until the stream is closed
loop {
match this.state_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(state)) => {
Self::on_state_update(this.consistent_view, this.input, state);
}
Poll::Ready(None) => {
// stream closed, return final result
return Poll::Ready(Ok((B256::default(), TrieUpdates::default())));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
// TODO:
// * poll incoming state updates stream
// * keep track of proof calculation
// * keep track of intermediate root computation
Poll::Pending
// * return final state root result
}
}