diff --git a/Cargo.lock b/Cargo.lock index f6230673c..65b05a07e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5576,6 +5576,7 @@ dependencies = [ "reth-snapshot", "reth-stages", "reth-tasks", + "reth-tokio-util", "reth-tracing", "schnellru", "thiserror", @@ -5985,6 +5986,7 @@ dependencies = [ "reth-provider", "reth-rpc-types", "reth-tasks", + "reth-tokio-util", "reth-tracing", "reth-transaction-pool", "secp256k1 0.27.0", @@ -6099,8 +6101,6 @@ dependencies = [ "tempfile", "test-fuzz", "thiserror", - "tokio", - "tokio-stream", "toml 0.8.2", "tracing", "triehash", @@ -6147,6 +6147,7 @@ dependencies = [ "reth-provider", "reth-snapshot", "reth-stages", + "reth-tokio-util", "thiserror", "tokio", "tokio-stream", @@ -6386,6 +6387,7 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-revm", + "reth-tokio-util", "reth-trie", "revm", "serde", @@ -6410,6 +6412,14 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "reth-tokio-util" +version = "0.1.0-alpha.10" +dependencies = [ + "tokio", + "tokio-stream", +] + [[package]] name = "reth-tracing" version = "0.1.0-alpha.10" diff --git a/Cargo.toml b/Cargo.toml index 0fc83fb98..f177c8147 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "crates/trie", "testing/ef-tests", "crates/rpc/rpc-types-compat", + "crates/tokio-util", "examples", "examples/additional-rpc-namespace-in-cli", "examples/cli-extension-event-hooks", @@ -107,6 +108,7 @@ reth-discv4 = { path = "./crates/net/discv4" } reth-eth-wire = { path = "./crates/net/eth-wire" } reth-ecies = { path = "./crates/net/ecies" } reth-tracing = { path = "./crates/tracing" } +reth-tokio-util = { path = "crates/tokio-util" } # revm revm = { git = "https://github.com/bluealloy/revm", rev = "df44297bc3949dc9e0cec06594c62dd946708b2a" } revm-primitives = { git = "https://github.com/bluealloy/revm", rev = "df44297bc3949dc9e0cec06594c62dd946708b2a" } diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 276850126..f26995235 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -21,6 +21,7 @@ reth-payload-builder.workspace = true reth-prune = { path = "../../prune" } reth-snapshot = { path = "../../snapshot" } reth-rpc-types-compat.workspace = true +reth-tokio-util.workspace = true # async tokio = { workspace = true, features = ["sync"] } tokio-stream.workspace = true diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index b972259bb..496b47d1d 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -22,8 +22,8 @@ use reth_interfaces::{ }; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; use reth_primitives::{ - constants::EPOCH_SLOTS, listener::EventListeners, stage::StageId, BlockNumHash, BlockNumber, - ChainSpec, Head, Header, SealedBlock, SealedHeader, B256, U256, + constants::EPOCH_SLOTS, stage::StageId, BlockNumHash, BlockNumber, ChainSpec, Head, Header, + SealedBlock, SealedHeader, B256, U256, }; use reth_provider::{ BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError, @@ -36,6 +36,7 @@ use reth_rpc_types::engine::{ use reth_rpc_types_compat::engine::payload::{try_into_block, validate_block_hash}; use reth_stages::{ControlFlow, Pipeline, PipelineError}; use reth_tasks::TaskSpawner; +use reth_tokio_util::EventListeners; use std::{ pin::Pin, sync::Arc, diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index d9524fac4..2ed6c01a8 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -30,6 +30,7 @@ reth-tasks.workspace = true reth-transaction-pool.workspace = true reth-provider.workspace = true reth-rpc-types.workspace = true +reth-tokio-util.workspace = true alloy-rlp.workspace = true diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 22454d851..c8387623b 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -41,9 +41,10 @@ use reth_eth_wire::{ use reth_metrics::common::mpsc::UnboundedMeteredSender; use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_network_api::ReputationChangeKind; -use reth_primitives::{listener::EventListeners, ForkId, NodeRecord, PeerId, B256}; +use reth_primitives::{ForkId, NodeRecord, PeerId, B256}; use reth_provider::{BlockNumReader, BlockReader}; use reth_rpc_types::{EthProtocolInfo, NetworkStatus}; +use reth_tokio_util::EventListeners; use std::{ net::SocketAddr, pin::Pin, diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 005929ecb..2f290cefd 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -31,10 +31,6 @@ crc = "3" # tracing tracing.workspace = true -# tokio -tokio = { workspace = true, default-features = false, features = ["sync"] } -tokio-stream.workspace = true - # misc bytes.workspace = true byteorder = "1" diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index f2f3fee47..187bc7e00 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -30,7 +30,6 @@ mod genesis; mod hardfork; mod header; mod integer_list; -pub mod listener; mod log; mod net; mod peer; diff --git a/crates/primitives/src/listener/mod.rs b/crates/primitives/src/listener/mod.rs deleted file mode 100644 index 553d18d67..000000000 --- a/crates/primitives/src/listener/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Event listeners. - -mod event_listeners; -pub use event_listeners::EventListeners; diff --git a/crates/prune/Cargo.toml b/crates/prune/Cargo.toml index 6259777eb..27ed8cbd7 100644 --- a/crates/prune/Cargo.toml +++ b/crates/prune/Cargo.toml @@ -17,6 +17,7 @@ reth-db.workspace = true reth-provider.workspace = true reth-interfaces.workspace = true reth-snapshot = { path = "../snapshot" } +reth-tokio-util.workspace = true # async tokio = { workspace = true, features = ["sync"] } diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 0025fd769..12214d6d3 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -6,11 +6,10 @@ use crate::{ Metrics, PrunerError, PrunerEvent, }; use reth_db::database::Database; -use reth_primitives::{ - listener::EventListeners, BlockNumber, ChainSpec, PruneMode, PruneProgress, PruneSegment, -}; +use reth_primitives::{BlockNumber, ChainSpec, PruneMode, PruneProgress, PruneSegment}; use reth_provider::{ProviderFactory, PruneCheckpointReader}; use reth_snapshot::HighestSnapshotsTracker; +use reth_tokio_util::EventListeners; use std::{collections::BTreeMap, sync::Arc, time::Instant}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, trace}; diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 79a2e989b..25018ff8e 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -22,6 +22,7 @@ reth-db.workspace = true reth-codecs = { path = "../storage/codecs" } reth-provider.workspace = true reth-trie = { path = "../trie" } +reth-tokio-util.workspace = true # revm revm.workspace = true diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 4fdfe4e6d..64663d4af 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -6,10 +6,10 @@ use futures_util::Future; use reth_db::database::Database; use reth_interfaces::executor::BlockExecutionError; use reth_primitives::{ - constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, listener::EventListeners, stage::StageId, - BlockNumber, ChainSpec, B256, + constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, stage::StageId, BlockNumber, ChainSpec, B256, }; use reth_provider::{ProviderFactory, StageCheckpointReader, StageCheckpointWriter}; +use reth_tokio_util::EventListeners; use std::{pin::Pin, sync::Arc}; use tokio::sync::watch; use tokio_stream::wrappers::UnboundedReceiverStream; diff --git a/crates/tokio-util/Cargo.toml b/crates/tokio-util/Cargo.toml new file mode 100644 index 000000000..5b8d24337 --- /dev/null +++ b/crates/tokio-util/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "reth-tokio-util" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = """ +Additional utilities for working with Tokio in reth. +""" + +[dependencies] + +# async +tokio = { workspace = true, features = ["sync"] } +tokio-stream = { workspace = true, features = ["sync"] } diff --git a/crates/primitives/src/listener/event_listeners.rs b/crates/tokio-util/src/event_listeners.rs similarity index 100% rename from crates/primitives/src/listener/event_listeners.rs rename to crates/tokio-util/src/event_listeners.rs diff --git a/crates/tokio-util/src/lib.rs b/crates/tokio-util/src/lib.rs new file mode 100644 index 000000000..244b44346 --- /dev/null +++ b/crates/tokio-util/src/lib.rs @@ -0,0 +1,13 @@ +//! Event listeners + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![warn(missing_debug_implementations, missing_docs, unreachable_pub, rustdoc::all)] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod event_listeners; +pub use event_listeners::EventListeners;