feat: add invalid block hook field to tree (#10432)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
Dan Cline
2024-09-03 09:59:43 -04:00
committed by GitHub
parent d30e3a4888
commit dea1e21af9
10 changed files with 393 additions and 13 deletions

1
Cargo.lock generated
View File

@ -7611,6 +7611,7 @@ dependencies = [
"serde",
"serde_json",
"shellexpand",
"strum",
"tempfile",
"tokio",
"toml",

View File

@ -533,6 +533,13 @@ Debug:
--debug.engine-api-store <PATH>
The path to store engine API messages at. If specified, all of the intercepted engine API messages will be written to specified location
--debug.invalid-block-hook <INVALID_BLOCK_HOOK>
Determines which type of bad block hook to install
Example: `witness,prestate`
[possible values: witness, pre-state, opcode]
Database:
--db.log-level <LOG_LEVEL>
Database logging level. Levels higher than "notice" require a debug build

View File

@ -10,7 +10,7 @@ use reth_engine_tree::{
download::BasicBlockDownloader,
engine::{EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, TreeConfig},
tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
};
pub use reth_engine_tree::{
chain::{ChainEvent, ChainOrchestrator},
@ -80,6 +80,7 @@ where
pruner: Pruner<DB, ProviderFactory<DB>>,
payload_builder: PayloadBuilderHandle<T>,
tree_config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook>,
) -> Self {
let downloader = BasicBlockDownloader::new(client, consensus.clone());
@ -97,6 +98,7 @@ where
payload_builder,
canonical_in_memory_state,
tree_config,
invalid_block_hook,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
@ -141,7 +143,7 @@ mod tests {
use super::*;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_engine_tree::test_utils::TestPipelineBuilder;
use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::NoopInvalidBlockHook};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_exex_types::FinishedExExHeight;
@ -196,6 +198,7 @@ mod tests {
pruner,
PayloadBuilderHandle::new(tx),
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
);
}
}

View File

@ -0,0 +1,52 @@
use reth_primitives::{Receipt, SealedBlockWithSenders, SealedHeader, B256};
use reth_provider::BlockExecutionOutput;
use reth_trie::updates::TrieUpdates;
/// A bad block hook.
pub trait InvalidBlockHook: Send + Sync {
/// Invoked when a bad block is encountered.
fn on_invalid_block(
&self,
block: SealedBlockWithSenders,
header: SealedHeader,
output: BlockExecutionOutput<Receipt>,
trie_updates: Option<(TrieUpdates, B256)>,
);
}
impl<F> InvalidBlockHook for F
where
F: Fn(
SealedBlockWithSenders,
SealedHeader,
BlockExecutionOutput<Receipt>,
Option<(TrieUpdates, B256)>,
) + Send
+ Sync,
{
fn on_invalid_block(
&self,
block: SealedBlockWithSenders,
header: SealedHeader,
output: BlockExecutionOutput<Receipt>,
trie_updates: Option<(TrieUpdates, B256)>,
) {
self(block, header, output, trie_updates)
}
}
/// A no-op [`InvalidBlockHook`] that does nothing.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct NoopInvalidBlockHook;
impl InvalidBlockHook for NoopInvalidBlockHook {
fn on_invalid_block(
&self,
_block: SealedBlockWithSenders,
_header: SealedHeader,
_output: BlockExecutionOutput<Receipt>,
_trie_updates: Option<(TrieUpdates, B256)>,
) {
}
}

View File

@ -42,6 +42,7 @@ use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
fmt::Debug,
ops::Bound,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
@ -57,9 +58,11 @@ use tokio::sync::{
use tracing::*;
mod config;
mod invalid_block_hook;
mod metrics;
use crate::{engine::EngineApiRequest, tree::metrics::EngineApiMetrics};
pub use config::TreeConfig;
pub use invalid_block_hook::{InvalidBlockHook, NoopInvalidBlockHook};
/// Keeps track of the state of the tree.
///
@ -481,7 +484,6 @@ pub enum TreeAction {
///
/// This type is responsible for processing engine API requests, maintaining the canonical state and
/// emitting events.
#[derive(Debug)]
pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
provider: P,
executor_provider: E,
@ -518,6 +520,29 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
config: TreeConfig,
/// Metrics for the engine api.
metrics: EngineApiMetrics,
/// A bad block hook.
invalid_block_hook: Box<dyn InvalidBlockHook>,
}
impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTreeHandler<P, E, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EngineApiTreeHandler")
.field("provider", &self.provider)
.field("executor_provider", &self.executor_provider)
.field("consensus", &self.consensus)
.field("payload_validator", &self.payload_validator)
.field("state", &self.state)
.field("incoming_tx", &self.incoming_tx)
.field("persistence", &self.persistence)
.field("persistence_state", &self.persistence_state)
.field("backfill_sync_state", &self.backfill_sync_state)
.field("canonical_in_memory_state", &self.canonical_in_memory_state)
.field("payload_builder", &self.payload_builder)
.field("config", &self.config)
.field("metrics", &self.metrics)
.field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
.finish()
}
}
impl<P, E, T> EngineApiTreeHandler<P, E, T>
@ -558,9 +583,15 @@ where
config,
metrics: Default::default(),
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
}
}
/// Sets the bad block hook.
fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook>) {
self.invalid_block_hook = invalid_block_hook;
}
/// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
/// own thread.
///
@ -576,6 +607,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
canonical_in_memory_state: CanonicalInMemoryState,
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook>,
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
@ -593,7 +625,7 @@ where
header.num_hash(),
);
let task = Self::new(
let mut task = Self::new(
provider,
executor_provider,
consensus,
@ -606,6 +638,7 @@ where
payload_builder,
config,
);
task.set_invalid_block_hook(invalid_block_hook);
let incoming = task.incoming_tx.clone();
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
(incoming, outgoing)
@ -1871,10 +1904,14 @@ where
let output = executor.execute((&block, U256::MAX).into())?;
debug!(target: "engine", elapsed=?exec_time.elapsed(), ?block_number, "Executed block");
self.consensus.validate_block_post_execution(
if let Err(err) = self.consensus.validate_block_post_execution(
&block,
PostExecutionInput::new(&output.receipts, &output.requests),
)?;
) {
// call post-block hook
self.invalid_block_hook.on_invalid_block(block.seal_slow(), parent_block, output, None);
return Err(err.into())
}
let hashed_state = HashedPostState::from_bundle_state(&output.state.state);
@ -1882,6 +1919,13 @@ where
let (state_root, trie_output) =
state_provider.state_root_with_updates(hashed_state.clone())?;
if state_root != block.state_root {
// call post-block hook
self.invalid_block_hook.on_invalid_block(
block.clone().seal_slow(),
parent_block,
output,
Some((trie_output, state_root)),
);
return Err(ConsensusError::BodyStateRootDiff(
GotExpected { got: state_root, expected: block.state_root }.into(),
)

View File

@ -10,7 +10,7 @@ use reth_chainspec::ChainSpec;
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{
engine::{EngineApiRequest, EngineRequestHandler},
tree::TreeConfig,
tree::{NoopInvalidBlockHook, TreeConfig},
};
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
@ -30,7 +30,7 @@ use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_rpc_types::{engine::ClientVersionV1, WithOtherFields};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_tracing::tracing::{debug, error, info};
use reth_tracing::tracing::{debug, error, info, warn};
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -202,6 +202,13 @@ where
let pruner_events = pruner.events();
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
// TODO: implement methods which convert this value into an actual function
if let Some(ref hook_type) = ctx.node_config().debug.invalid_block_hook {
warn!(target: "reth::cli", ?hook_type, "Invalid block hooks are not implemented yet! The `debug.invalid-block-hook` flag will do nothing for now.");
}
let invalid_block_hook = Box::new(NoopInvalidBlockHook::default());
// Configure the consensus engine
let mut eth_service = EngineService::new(
ctx.consensus(),
@ -216,6 +223,7 @@ where
pruner,
ctx.components().payload_builder().clone(),
TreeConfig::default(),
invalid_block_hook,
);
let event_sender = EventSender::default();

View File

@ -34,7 +34,7 @@ use reth_provider::providers::BlockchainProvider;
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_rpc_types::{engine::ClientVersionV1, WithOtherFields};
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, info};
use reth_tracing::tracing::{debug, info, warn};
use reth_transaction_pool::TransactionPool;
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -210,6 +210,10 @@ where
let max_block = ctx.max_block(network_client.clone()).await?;
let mut hooks = EngineHooks::new();
if let Some(ref hook_type) = ctx.node_config().debug.invalid_block_hook {
warn!(target: "reth::cli", ?hook_type, "Bad block hooks are not implemented yet! The `debug.bad-block-hook` flag will do nothing for now.");
}
let static_file_producer = ctx.static_file_producer();
let static_file_producer_events = static_file_producer.lock().events();
hooks.add(StaticFileHook::new(

View File

@ -55,6 +55,7 @@ rand.workspace = true
derive_more.workspace = true
toml.workspace = true
serde.workspace = true
strum = { workspace = true, features = ["derive"] }
# io
dirs-next = "2.0.0"

View File

@ -1,8 +1,12 @@
//! clap [Args](clap::Args) for debugging purposes
use clap::Args;
use clap::{
builder::{PossibleValue, TypedValueParser},
Arg, Args, Command,
};
use reth_primitives::B256;
use std::path::PathBuf;
use std::{collections::HashSet, ffi::OsStr, fmt, path::PathBuf, str::FromStr};
use strum::{AsRefStr, EnumIter, IntoStaticStr, ParseError, VariantArray, VariantNames};
/// Parameters for debugging purposes
#[derive(Debug, Clone, Args, PartialEq, Eq, Default)]
@ -63,6 +67,208 @@ pub struct DebugArgs {
/// will be written to specified location.
#[arg(long = "debug.engine-api-store", help_heading = "Debug", value_name = "PATH")]
pub engine_api_store: Option<PathBuf>,
/// Determines which type of bad block hook to install
///
/// Example: `witness,prestate`
#[arg(long = "debug.invalid-block-hook", help_heading = "Debug", value_parser = InvalidBlockSelectionValueParser::default())]
pub invalid_block_hook: Option<InvalidBlockSelection>,
}
/// Describes the invalid block hooks that should be installed.
///
/// # Example
///
/// Create a [`InvalidBlockSelection`] from a selection.
///
/// ```
/// use reth_node_core::args::{InvalidBlockHook, InvalidBlockSelection};
/// let config: InvalidBlockSelection = vec![InvalidBlockHook::Witness].into();
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InvalidBlockSelection(HashSet<InvalidBlockHook>);
impl InvalidBlockSelection {
/// Creates a new _unique_ [`InvalidBlockSelection`] from the given items.
///
/// # Note
///
/// This will dedupe the selection and remove duplicates while preserving the order.
///
/// # Example
///
/// Create a selection from the [`InvalidBlockHook`] string identifiers
///
/// ```
/// use reth_node_core::args::{InvalidBlockHook, InvalidBlockSelection};
/// let selection = vec!["witness", "prestate", "opcode"];
/// let config = InvalidBlockSelection::try_from_selection(selection).unwrap();
/// assert_eq!(
/// config,
/// InvalidBlockSelection::from([
/// InvalidBlockHook::Witness,
/// InvalidBlockHook::PreState,
/// InvalidBlockHook::Opcode
/// ])
/// );
/// ```
///
/// Create a unique selection from the [`InvalidBlockHook`] string identifiers
///
/// ```
/// use reth_node_core::args::{InvalidBlockHook, InvalidBlockSelection};
/// let selection = vec!["witness", "prestate", "opcode", "witness", "prestate"];
/// let config = InvalidBlockSelection::try_from_selection(selection).unwrap();
/// assert_eq!(
/// config,
/// InvalidBlockSelection::from([
/// InvalidBlockHook::Witness,
/// InvalidBlockHook::PreState,
/// InvalidBlockHook::Opcode
/// ])
/// );
/// ```
pub fn try_from_selection<I, T>(selection: I) -> Result<Self, T::Error>
where
I: IntoIterator<Item = T>,
T: TryInto<InvalidBlockHook>,
{
selection.into_iter().map(TryInto::try_into).collect()
}
}
impl From<&[InvalidBlockHook]> for InvalidBlockSelection {
fn from(s: &[InvalidBlockHook]) -> Self {
Self(s.iter().copied().collect())
}
}
impl From<Vec<InvalidBlockHook>> for InvalidBlockSelection {
fn from(s: Vec<InvalidBlockHook>) -> Self {
Self(s.into_iter().collect())
}
}
impl<const N: usize> From<[InvalidBlockHook; N]> for InvalidBlockSelection {
fn from(s: [InvalidBlockHook; N]) -> Self {
Self(s.iter().copied().collect())
}
}
impl FromIterator<InvalidBlockHook> for InvalidBlockSelection {
fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = InvalidBlockHook>,
{
Self(iter.into_iter().collect())
}
}
impl FromStr for InvalidBlockSelection {
type Err = ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.is_empty() {
return Ok(Self(Default::default()))
}
let hooks = s.split(',').map(str::trim).peekable();
Self::try_from_selection(hooks)
}
}
impl fmt::Display for InvalidBlockSelection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[{}]", self.0.iter().map(|s| s.to_string()).collect::<Vec<_>>().join(", "))
}
}
/// clap value parser for [`InvalidBlockSelection`].
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
struct InvalidBlockSelectionValueParser;
impl TypedValueParser for InvalidBlockSelectionValueParser {
type Value = InvalidBlockSelection;
fn parse_ref(
&self,
_cmd: &Command,
arg: Option<&Arg>,
value: &OsStr,
) -> Result<Self::Value, clap::Error> {
let val =
value.to_str().ok_or_else(|| clap::Error::new(clap::error::ErrorKind::InvalidUtf8))?;
val.parse::<InvalidBlockSelection>().map_err(|err| {
let arg = arg.map(|a| a.to_string()).unwrap_or_else(|| "...".to_owned());
let possible_values = InvalidBlockHook::all_variant_names().to_vec().join(",");
let msg = format!(
"Invalid value '{val}' for {arg}: {err}.\n [possible values: {possible_values}]"
);
clap::Error::raw(clap::error::ErrorKind::InvalidValue, msg)
})
}
fn possible_values(&self) -> Option<Box<dyn Iterator<Item = PossibleValue> + '_>> {
let values = InvalidBlockHook::all_variant_names().iter().map(PossibleValue::new);
Some(Box::new(values))
}
}
/// The type of bad block hook to install
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Hash,
AsRefStr,
IntoStaticStr,
VariantNames,
VariantArray,
EnumIter,
)]
#[strum(serialize_all = "kebab-case")]
pub enum InvalidBlockHook {
/// A witness value enum
Witness,
/// A prestate trace value enum
PreState,
/// An opcode trace value enum
Opcode,
}
impl FromStr for InvalidBlockHook {
type Err = ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"witness" => Self::Witness,
"prestate" => Self::PreState,
"opcode" => Self::Opcode,
_ => return Err(ParseError::VariantNotFound),
})
}
}
impl TryFrom<&str> for InvalidBlockHook {
type Error = ParseError;
fn try_from(s: &str) -> Result<Self, <Self as TryFrom<&str>>::Error> {
FromStr::from_str(s)
}
}
impl fmt::Display for InvalidBlockHook {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad(self.as_ref())
}
}
impl InvalidBlockHook {
/// Returns all variant names of the enum
pub const fn all_variant_names() -> &'static [&'static str] {
<Self as VariantNames>::VARIANTS
}
}
#[cfg(test)]
@ -78,9 +284,63 @@ mod tests {
}
#[test]
fn test_parse_database_args() {
fn test_parse_default_debug_args() {
let default_args = DebugArgs::default();
let args = CommandParser::<DebugArgs>::parse_from(["reth"]).args;
assert_eq!(args, default_args);
}
#[test]
fn test_parse_invalid_block_args() {
let expected_args = DebugArgs {
invalid_block_hook: Some(InvalidBlockSelection::from([InvalidBlockHook::Witness])),
..Default::default()
};
let args = CommandParser::<DebugArgs>::parse_from([
"reth",
"--debug.invalid-block-hook",
"witness",
])
.args;
assert_eq!(args, expected_args);
let expected_args = DebugArgs {
invalid_block_hook: Some(InvalidBlockSelection::from([
InvalidBlockHook::Witness,
InvalidBlockHook::PreState,
])),
..Default::default()
};
let args = CommandParser::<DebugArgs>::parse_from([
"reth",
"--debug.invalid-block-hook",
"witness,prestate",
])
.args;
assert_eq!(args, expected_args);
let args = CommandParser::<DebugArgs>::parse_from([
"reth",
"--debug.invalid-block-hook",
"witness,prestate,prestate",
])
.args;
assert_eq!(args, expected_args);
let args = CommandParser::<DebugArgs>::parse_from([
"reth",
"--debug.invalid-block-hook",
"witness,witness,prestate",
])
.args;
assert_eq!(args, expected_args);
let args = CommandParser::<DebugArgs>::parse_from([
"reth",
"--debug.invalid-block-hook",
"prestate,witness,prestate",
])
.args;
assert_eq!(args, expected_args);
}
}

View File

@ -14,7 +14,7 @@ pub use rpc_state_cache::RpcStateCacheArgs;
/// DebugArgs struct for debugging purposes
mod debug;
pub use debug::DebugArgs;
pub use debug::{DebugArgs, InvalidBlockHook, InvalidBlockSelection};
/// DatabaseArgs struct for configuring the database
mod database;