feat: emit new block events in --auto-mine (#2079)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2023-04-05 14:16:03 +02:00
committed by GitHub
parent 633d84ded0
commit 77a41e5edf
10 changed files with 175 additions and 76 deletions

2
Cargo.lock generated
View File

@ -4496,6 +4496,7 @@ dependencies = [
"reth-primitives",
"reth-provider",
"reth-revm",
"reth-stages",
"reth-transaction-pool",
"tokio",
"tokio-stream",
@ -4756,6 +4757,7 @@ dependencies = [
"hex-literal 0.3.4",
"modular-bitfield",
"parity-scale-codec",
"parking_lot 0.12.1",
"rand 0.8.5",
"reth-codecs",
"reth-db",

View File

@ -9,10 +9,9 @@ use crate::{
utils::get_single_header,
};
use clap::{crate_version, Parser};
use events::NodeEvent;
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, FutureExt, Stream, StreamExt};
use futures::{pin_mut, stream::select as stream_select, FutureExt, StreamExt};
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage};
use reth_db::{
@ -27,10 +26,11 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_executor::blockchain_tree::{
externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
events::NewBlockNotificationSink,
p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader},
headers::{client::StatusUpdater, downloader::HeaderDownloader},
@ -208,19 +208,36 @@ impl Command {
}
};
let pipeline = if self.auto_mine {
let (_, client, task) = AutoSealBuilder::new(
// configure blockchain tree
let tree_externals = TreeExternals::new(
db.clone(),
Arc::clone(&consensus),
Factory::new(self.chain.clone()),
Arc::clone(&self.chain),
);
let tree_config = BlockchainTreeConfig::default();
// The size of the broadcast is twice the maximum reorg depth, because at maximum reorg
// depth at least N blocks must be sent at once.
let new_block_notification_sender =
NewBlockNotificationSink::new(tree_config.max_reorg_depth() as usize * 2);
let blockchain_tree = ShareableBlockchainTree::new(BlockchainTree::new(
tree_externals,
new_block_notification_sender.clone(),
tree_config,
)?);
// Configure the pipeline
let mut pipeline = if self.auto_mine {
let (_, client, mut task) = AutoSealBuilder::new(
Arc::clone(&self.chain),
shareable_db.clone(),
transaction_pool.clone(),
consensus_engine_tx.clone(),
new_block_notification_sender.clone(),
)
.build();
debug!(target: "reth::cli", "Spawning auto mine task");
ctx.task_executor.spawn(Box::pin(task));
let (pipeline, events) = self
let mut pipeline = self
.build_networked_pipeline(
&mut config,
network.clone(),
@ -231,42 +248,31 @@ impl Command {
)
.await?;
ctx.task_executor.spawn_critical(
"events task",
events::handle_events(Some(network.clone()), events),
);
let pipeline_events = pipeline.events();
task.set_pipeline_events(pipeline_events);
debug!(target: "reth::cli", "Spawning auto mine task");
ctx.task_executor.spawn(Box::pin(task));
pipeline
} else {
let client = network.fetch_client().await?;
let (pipeline, events) = self
.build_networked_pipeline(
&mut config,
network.clone(),
client,
Arc::clone(&consensus),
db.clone(),
&ctx.task_executor,
)
.await?;
ctx.task_executor.spawn_critical(
"events task",
events::handle_events(Some(network.clone()), events),
);
pipeline
self.build_networked_pipeline(
&mut config,
network.clone(),
client,
Arc::clone(&consensus),
db.clone(),
&ctx.task_executor,
)
.await?
};
// configure blockchain tree
let tree_externals = TreeExternals::new(
db.clone(),
consensus,
Factory::new(self.chain.clone()),
Arc::clone(&self.chain),
let events = stream_select(
network.event_listener().map(Into::into),
pipeline.events().map(Into::into),
);
let blockchain_tree =
ShareableBlockchainTree::new(BlockchainTree::new(tree_externals, Default::default())?);
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));
let beacon_consensus_engine = BeaconConsensusEngine::new(
Arc::clone(&db),
@ -276,7 +282,6 @@ impl Command {
consensus_engine_rx,
self.debug.max_block,
);
info!(target: "reth::cli", "Consensus engine initialized");
let engine_api_handle =
@ -344,7 +349,7 @@ impl Command {
consensus: Arc<dyn Consensus>,
db: Arc<Env<WriteMap>>,
task_executor: &TaskExecutor,
) -> eyre::Result<(Pipeline<Env<WriteMap>, NetworkHandle>, impl Stream<Item = NodeEvent>)>
) -> eyre::Result<Pipeline<Env<WriteMap>, NetworkHandle>>
where
Client: HeadersClient + BodiesClient + Clone + 'static,
{
@ -365,7 +370,7 @@ impl Command {
.build(client, Arc::clone(&consensus), db.clone())
.into_task_with(task_executor);
let mut pipeline = self
let pipeline = self
.build_pipeline(
config,
header_downloader,
@ -377,11 +382,7 @@ impl Command {
)
.await?;
let events = stream_select(
network.event_listener().map(Into::into),
pipeline.events().map(Into::into),
);
Ok((pipeline, events))
Ok(pipeline)
}
fn load_config(&self) -> eyre::Result<Config> {

View File

@ -13,6 +13,7 @@ reth-beacon-consensus = { path = "../beacon" }
reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }
reth-provider = { path = "../../storage/provider" }
reth-stages = { path = "../../stages" }
reth-revm = { path = "../../revm" }
reth-transaction-pool = { path = "../../transaction-pool" }

View File

@ -14,11 +14,13 @@
//! These downloaders poll the miner, assemble the block, and return transactions that are ready to
//! be mined.
use reth_beacon_consensus::BeaconEngineMessage;
use reth_interfaces::consensus::{Consensus, ConsensusError};
use reth_primitives::{
BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, ChainSpec, Header, SealedBlock,
SealedHeader, H256, U256,
};
use reth_transaction_pool::TransactionPool;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::trace;
@ -29,8 +31,7 @@ mod task;
pub use crate::client::AutoSealClient;
pub use mode::{FixedBlockTimeMiner, MiningMode, ReadyTransactionMiner};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_transaction_pool::TransactionPool;
use reth_interfaces::events::NewBlockNotificationSink;
pub use task::MiningTask;
/// A consensus implementation intended for local development and testing purposes.
@ -82,6 +83,7 @@ pub struct AutoSealBuilder<Client, Pool> {
mode: MiningMode,
storage: Storage,
to_engine: UnboundedSender<BeaconEngineMessage>,
new_block_notification_sender: NewBlockNotificationSink,
}
// === impl AutoSealBuilder ===
@ -93,6 +95,7 @@ impl<Client, Pool: TransactionPool> AutoSealBuilder<Client, Pool> {
client: Client,
pool: Pool,
to_engine: UnboundedSender<BeaconEngineMessage>,
new_block_notification_sender: NewBlockNotificationSink,
) -> Self {
let mode = MiningMode::interval(std::time::Duration::from_secs(1));
Self {
@ -102,6 +105,7 @@ impl<Client, Pool: TransactionPool> AutoSealBuilder<Client, Pool> {
pool,
mode,
to_engine,
new_block_notification_sender,
}
}
@ -113,12 +117,21 @@ impl<Client, Pool: TransactionPool> AutoSealBuilder<Client, Pool> {
/// Consumes the type and returns all components
pub fn build(self) -> (AutoSealConsensus, AutoSealClient, MiningTask<Client, Pool>) {
let Self { client, consensus, pool, mode, storage, to_engine } = self;
let Self {
client,
consensus,
pool,
mode,
storage,
to_engine,
new_block_notification_sender,
} = self;
let auto_client = AutoSealClient::new(storage.clone());
let task = MiningTask::new(
Arc::clone(&consensus.chain_spec),
mode,
to_engine,
new_block_notification_sender,
storage,
client,
pool,

View File

@ -1,7 +1,7 @@
use crate::{mode::MiningMode, Storage};
use futures_util::{future::BoxFuture, FutureExt};
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_interfaces::consensus::ForkchoiceState;
use reth_interfaces::{consensus::ForkchoiceState, events::NewBlockNotificationSink};
use reth_primitives::{
constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS},
proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom,
@ -12,6 +12,7 @@ use reth_revm::{
database::{State, SubState},
executor::Executor,
};
use reth_stages::{stages::FINISH, PipelineEvent};
use reth_transaction_pool::{TransactionPool, ValidPoolTransaction};
use std::{
collections::VecDeque,
@ -22,7 +23,8 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::{trace, warn};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace, warn};
/// A Future that listens for new ready transactions and puts new blocks into storage
pub struct MiningTask<Client, Pool: TransactionPool> {
@ -33,7 +35,7 @@ pub struct MiningTask<Client, Pool: TransactionPool> {
/// The active miner
miner: MiningMode,
/// Single active future that inserts a new block into `storage`
insert_task: Option<BoxFuture<'static, ()>>,
insert_task: Option<BoxFuture<'static, Option<UnboundedReceiverStream<PipelineEvent>>>>,
/// Shared storage to insert new blocks
storage: Storage,
/// Pool where transactions are stored
@ -42,6 +44,10 @@ pub struct MiningTask<Client, Pool: TransactionPool> {
queued: VecDeque<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>,
/// TODO: ideally this would just be a sender of hashes
to_engine: UnboundedSender<BeaconEngineMessage>,
/// Used to notify consumers of new blocks
new_block_notification_sender: NewBlockNotificationSink,
/// The pipeline events to listen on
pipe_line_events: Option<UnboundedReceiverStream<PipelineEvent>>,
}
// === impl MiningTask ===
@ -52,6 +58,7 @@ impl<Client, Pool: TransactionPool> MiningTask<Client, Pool> {
chain_spec: Arc<ChainSpec>,
miner: MiningMode,
to_engine: UnboundedSender<BeaconEngineMessage>,
new_block_notification_sender: NewBlockNotificationSink,
storage: Storage,
client: Client,
pool: Pool,
@ -64,9 +71,16 @@ impl<Client, Pool: TransactionPool> MiningTask<Client, Pool> {
storage,
pool,
to_engine,
new_block_notification_sender,
queued: Default::default(),
pipe_line_events: None,
}
}
/// Sets the pipeline events to listen on.
pub fn set_pipeline_events(&mut self, events: UnboundedReceiverStream<PipelineEvent>) {
self.pipe_line_events = Some(events);
}
}
impl<Client, Pool> Future for MiningTask<Client, Pool>
@ -101,6 +115,11 @@ where
let client = this.client.clone();
let chain_spec = Arc::clone(&this.chain_spec);
let pool = this.pool.clone();
let mut events = this.pipe_line_events.take();
let new_block_notification_sender = this.new_block_notification_sender.clone();
// Create the mining future that creates a block, notifies the engine that drives
// the pipeline
this.insert_task = Some(Box::pin(async move {
let mut storage = storage.write().await;
let mut header = Header {
@ -151,7 +170,6 @@ where
let Block { mut header, body, .. } = block;
// clear all transactions from pool
// TODO this should happen automatically via events
pool.remove_transactions(body.iter().map(|tx| tx.hash));
header.receipts_root = if res.receipts().is_empty() {
@ -169,7 +187,7 @@ where
BlockBody { transactions: body, ommers: vec![], withdrawals: None };
header.gas_used = gas_used;
storage.insert_new_block(header, body);
storage.insert_new_block(header.clone(), body);
let new_hash = storage.best_hash;
let state = ForkchoiceState {
@ -177,25 +195,54 @@ where
finalized_block_hash: new_hash,
safe_block_hash: new_hash,
};
drop(storage);
trace!(target: "consensus::auto", ?state, "sending fork choice update");
// send the new update to the engine, this will trigger the pipeline to
// download the block, execute it and store it in the database.
let (tx, _rx) = oneshot::channel();
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
});
debug!(target: "consensus::auto", ?state, "sent fork choice update");
// wait for the pipeline to finish
if let Some(events) = events.as_mut() {
debug!(target: "consensus::auto", "waiting for finish stage event...");
// wait for the finish stage to
loop {
if let Some(PipelineEvent::Running { stage_id, .. }) =
events.next().await
{
if stage_id == FINISH {
debug!(target: "consensus::auto", "received finish stage event");
break
}
}
}
}
let header = header.seal_slow();
debug!(target: "consensus::auto", header=?header.hash(), "sending block notification");
// send block notification
let _ = new_block_notification_sender.send(Arc::new(header));
}
Err(err) => {
warn!(target: "consensus::auto", ?err, "failed to execute block")
}
}
events
}));
}
if let Some(mut fut) = this.insert_task.take() {
match fut.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Ready(events) => {
this.pipe_line_events = events;
}
Poll::Pending => {
this.insert_task = Some(fut);
break

View File

@ -469,7 +469,9 @@ mod tests {
post_state::PostState,
test_utils::TestExecutorFactory,
};
use reth_interfaces::{sync::NoopSyncStateUpdate, test_utils::TestConsensus};
use reth_interfaces::{
events::NewBlockNotificationSink, sync::NoopSyncStateUpdate, test_utils::TestConsensus,
};
use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET};
use reth_provider::Transaction;
use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError};
@ -550,7 +552,8 @@ mod tests {
let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec);
let config = BlockchainTreeConfig::new(1, 2, 3);
let tree = ShareableBlockchainTree::new(
BlockchainTree::new(externals, config).expect("failed to create tree"),
BlockchainTree::new(externals, NewBlockNotificationSink::new(2), config)
.expect("failed to create tree"),
);
let (sync_tx, sync_rx) = unbounded_channel();

View File

@ -2,11 +2,8 @@
use chain::{BlockChainId, Chain, ForkBlock};
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_interfaces::{
blockchain_tree::BlockStatus,
consensus::Consensus,
events::{NewBlockNotifications, NewBlockNotificationsSender},
executor::Error as ExecError,
Error,
blockchain_tree::BlockStatus, consensus::Consensus, events::NewBlockNotifications,
executor::Error as ExecError, Error,
};
use reth_primitives::{
BlockHash, BlockNumber, Hardfork, SealedBlock, SealedBlockWithSenders, SealedHeader, U256,
@ -34,6 +31,7 @@ pub use shareable::ShareableBlockchainTree;
pub mod post_state_data;
pub use post_state_data::{PostStateData, PostStateDataRef};
use reth_interfaces::events::NewBlockNotificationSink;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Tree of chains and its identifications.
@ -88,7 +86,7 @@ pub struct BlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
/// Tree configuration
config: BlockchainTreeConfig,
/// Unbounded channel for sending new block notifications.
new_block_notication_sender: NewBlockNotificationsSender,
new_block_notification_sender: NewBlockNotificationSink,
}
/// A container that wraps chains and block indices to allow searching for block hashes across all
@ -104,6 +102,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
/// Create a new blockchain tree.
pub fn new(
externals: TreeExternals<DB, C, EF>,
new_block_notification_sender: NewBlockNotificationSink,
config: BlockchainTreeConfig,
) -> Result<Self, Error> {
let max_reorg_depth = config.max_reorg_depth();
@ -127,11 +126,6 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
last_canonical_hashes.last().cloned().unwrap_or_default()
};
// size of the broadcast is double of max reorg depth because at max reorg depth we can have
// send at least N block at the time.
let (new_block_notication_sender, _) =
tokio::sync::broadcast::channel(2 * max_reorg_depth as usize);
Ok(Self {
externals,
block_chain_id_generator: 0,
@ -141,7 +135,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
BTreeMap::from_iter(last_canonical_hashes.into_iter()),
),
config,
new_block_notication_sender,
new_block_notification_sender,
})
}
@ -590,7 +584,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
// Broadcast new canonical blocks.
headers.into_iter().for_each(|header| {
// ignore if receiver is dropped.
let _ = self.new_block_notication_sender.send(header);
let _ = self.new_block_notification_sender.send(header);
});
Ok(())
@ -600,7 +594,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
///
/// Note: Only canonical blocks are send.
pub fn subscribe_new_blocks(&self) -> NewBlockNotifications {
self.new_block_notication_sender.subscribe()
self.new_block_notification_sender.subscribe()
}
/// Canonicalize the given chain and commit it to the database.
@ -783,7 +777,8 @@ mod tests {
// make tree
let config = BlockchainTreeConfig::new(1, 2, 3);
let mut tree = BlockchainTree::new(externals, config).expect("failed to create tree");
let mut tree = BlockchainTree::new(externals, NewBlockNotificationSink::new(10), config)
.expect("failed to create tree");
let mut new_block_notification = tree.subscribe_new_blocks();
// genesis block 10 is already canonical

View File

@ -32,6 +32,7 @@ secp256k1 = { version = "0.26.0", default-features = false, features = [
"rand",
], optional = true }
modular-bitfield = "0.11.2"
parking_lot = "0.12.1"
[dev-dependencies]
reth-db = { path = "../storage/db", features = ["test-utils"] }

View File

@ -1,6 +1,7 @@
use parking_lot::Mutex;
use reth_primitives::SealedHeader;
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::broadcast::{error::SendError, Receiver, Sender};
/// New block notification that is Arc around [SealedHeader].
pub type NewBlockNotification = Arc<SealedHeader>;
@ -16,3 +17,38 @@ pub trait ChainEventSubscriptions: Send + Sync {
/// Get notified when a new block was imported.
fn subscribe_new_blocks(&self) -> NewBlockNotifications;
}
/// A shareable Sender that allows to send [NewBlockNotification] to all receivers.
#[derive(Debug, Clone)]
pub struct NewBlockNotificationSink {
inner: Arc<Mutex<Sender<NewBlockNotification>>>,
}
// === impl NewBlockNotificationSink ===
impl NewBlockNotificationSink {
/// Creates a new NewBlockNotificationSink with the given capacity.
// // size of the broadcast is double of max reorg depth because at max reorg depth we can have
// // send at least N block at the time.
pub fn new(capacity: usize) -> Self {
let inner = tokio::sync::broadcast::channel(capacity);
Self { inner: Arc::new(Mutex::new(inner.0)) }
}
/// Attempts to send a value to all active Receiver handles, returning it back if it could not
/// be sent.
pub fn send(
&self,
header: NewBlockNotification,
) -> Result<usize, SendError<NewBlockNotification>> {
let sender = self.inner.lock();
sender.send(header)
}
/// Creates a new Receiver handle that will receive notifications sent after this call to
/// subscribe.
pub fn subscribe(&self) -> Receiver<NewBlockNotification> {
let sender = self.inner.lock();
sender.subscribe()
}
}

View File

@ -6,7 +6,7 @@ use reth_rpc_types::pubsub::{Params, SubscriptionKind};
pub trait EthPubSubApi {
/// Create an ethereum subscription for the given params
#[subscription(
name = "eth_subscribe",
name = "eth_subscribe" => "eth_subscription",
unsubscribe = "eth_unsubscribe",
item = reth_rpc_types::pubsub::SubscriptionResult
)]