feat: Add autoseal consensus and downloaders (#1880)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2023-03-31 20:51:52 +02:00
committed by GitHub
parent 00712d642e
commit 7576ee33f0
11 changed files with 837 additions and 21 deletions

View File

@ -0,0 +1,27 @@
[package]
name = "reth-auto-seal-consensus"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/paradigmxyz/reth"
readme = "README.md"
description = "A consensus impl for local testing purposes"
[dependencies]
# reth
reth-beacon-consensus = { path = "../beacon" }
reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }
reth-provider = { path = "../../storage/provider" }
reth-revm = { path = "../../revm" }
reth-executor = { path = "../../executor" }
reth-transaction-pool = { path = "../../transaction-pool" }
# async
futures-util = "0.3"
tokio = { version = "1", features = ["sync", "time"] }
tokio-stream = "0.1"
tracing = "0.1"
[dev-dependencies]
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }

View File

@ -0,0 +1,130 @@
//! This includes download client implementations for auto sealing miners.
use crate::Storage;
use reth_interfaces::p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
headers::client::{HeadersClient, HeadersFut, HeadersRequest},
priority::Priority,
};
use reth_primitives::{
BlockBody, BlockHashOrNumber, Header, HeadersDirection, PeerId, WithPeerId, H256,
};
use std::fmt::Debug;
use tracing::{trace, warn};
/// A download client that polls the miner for transactions and assembles blocks to be returned in
/// the download process.
///
/// When polled, the miner will assemble blocks when miners produce ready transactions and store the
/// blocks in memory.
#[derive(Debug, Clone)]
pub struct AutoSealClient {
storage: Storage,
}
impl AutoSealClient {
pub(crate) fn new(storage: Storage) -> Self {
Self { storage }
}
async fn fetch_headers(&self, request: HeadersRequest) -> Vec<Header> {
trace!(target: "consensus::auto", ?request, "received headers request");
let storage = self.storage.read().await;
let HeadersRequest { start, limit, direction } = request;
let mut headers = Vec::new();
let mut block: BlockHashOrNumber = match start {
BlockHashOrNumber::Hash(start) => start.into(),
BlockHashOrNumber::Number(num) => {
if let Some(hash) = storage.block_hash(num) {
hash.into()
} else {
warn!(target: "consensus::auto", num, "no matching block found");
return headers
}
}
};
for _ in 0..limit {
// fetch from storage
if let Some(header) = storage.header_by_hash_or_number(block) {
match direction {
HeadersDirection::Falling => block = header.parent_hash.into(),
HeadersDirection::Rising => {
let next = header.number + 1;
block = next.into()
}
}
headers.push(header);
} else {
break
}
}
trace!(target: "consensus::auto", ?headers, "returning headers");
headers
}
async fn fetch_bodies(&self, hashes: Vec<H256>) -> Vec<BlockBody> {
trace!(target: "consensus::auto", ?hashes, "received bodies request");
let storage = self.storage.read().await;
let mut bodies = Vec::new();
for hash in hashes {
if let Some(body) = storage.bodies.get(&hash).cloned() {
bodies.push(body);
} else {
break
}
}
trace!(target: "consensus::auto", ?bodies, "returning bodies");
bodies
}
}
impl HeadersClient for AutoSealClient {
type Output = HeadersFut;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
_priority: Priority,
) -> Self::Output {
let this = self.clone();
Box::pin(async move {
let headers = this.fetch_headers(request).await;
Ok(WithPeerId::new(PeerId::random(), headers))
})
}
}
impl BodiesClient for AutoSealClient {
type Output = BodiesFut;
fn get_block_bodies_with_priority(
&self,
hashes: Vec<H256>,
_priority: Priority,
) -> Self::Output {
let this = self.clone();
Box::pin(async move {
let bodies = this.fetch_bodies(hashes).await;
Ok(WithPeerId::new(PeerId::random(), bodies))
})
}
}
impl DownloadClient for AutoSealClient {
fn report_bad_message(&self, _peer_id: PeerId) {
warn!("Reported a bad message on a miner, we should never produce bad blocks");
// noop
}
fn num_connected_peers(&self) -> usize {
// no such thing as connected peers when we are mining ourselves
1
}
}

View File

@ -0,0 +1,203 @@
#![warn(missing_docs, unreachable_pub, unused_crate_dependencies)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! A [Consensus] implementation for local testing purposes
//! that automatically seals blocks.
//!
//! The Mining task polls a [MiningMode], and will return a list of transactions that are ready to
//! be mined.
//!
//! These downloaders poll the miner, assemble the block, and return transactions that are ready to
//! be mined.
use reth_interfaces::consensus::{Consensus, ConsensusError};
use reth_primitives::{
BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, ChainSpec, Header, SealedBlock,
SealedHeader, H256, U256,
};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::trace;
mod client;
mod mode;
mod task;
pub use crate::client::AutoSealClient;
pub use mode::{FixedBlockTimeMiner, MiningMode, ReadyTransactionMiner};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_transaction_pool::TransactionPool;
pub use task::MiningTask;
/// A consensus implementation intended for local development and testing purposes.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct AutoSealConsensus {
/// Configuration
chain_spec: Arc<ChainSpec>,
}
impl AutoSealConsensus {
/// Create a new instance of [AutoSealConsensus]
pub fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self { chain_spec }
}
}
impl Consensus for AutoSealConsensus {
fn pre_validate_header(
&self,
_header: &SealedHeader,
_parent: &SealedHeader,
) -> Result<(), ConsensusError> {
Ok(())
}
fn validate_header(
&self,
_header: &SealedHeader,
_total_difficulty: U256,
) -> Result<(), ConsensusError> {
Ok(())
}
fn pre_validate_block(&self, _block: &SealedBlock) -> Result<(), ConsensusError> {
Ok(())
}
fn has_block_reward(&self, _total_difficulty: U256, _difficulty: U256) -> bool {
false
}
}
/// Builder type for configuring the setup
pub struct AutoSealBuilder<Client, Pool> {
client: Client,
consensus: AutoSealConsensus,
pool: Pool,
mode: MiningMode,
storage: Storage,
to_engine: UnboundedSender<BeaconEngineMessage>,
}
// === impl AutoSealBuilder ===
impl<Client, Pool: TransactionPool> AutoSealBuilder<Client, Pool> {
/// Creates a new builder instance to configure all parts.
pub fn new(
chain_spec: Arc<ChainSpec>,
client: Client,
pool: Pool,
to_engine: UnboundedSender<BeaconEngineMessage>,
) -> Self {
let mode = MiningMode::interval(std::time::Duration::from_secs(1));
Self {
storage: Storage::new(&chain_spec),
client,
consensus: AutoSealConsensus::new(chain_spec),
pool,
mode,
to_engine,
}
}
/// Sets the [MiningMode] it operates in, default is [MiningMode::Auto]
pub fn mode(mut self, mode: MiningMode) -> Self {
self.mode = mode;
self
}
/// Consumes the type and returns all components
pub fn build(self) -> (AutoSealConsensus, AutoSealClient, MiningTask<Client, Pool>) {
let Self { client, consensus, pool, mode, storage, to_engine } = self;
let auto_client = AutoSealClient::new(storage.clone());
let task = MiningTask::new(
Arc::clone(&consensus.chain_spec),
mode,
to_engine,
storage,
client,
pool,
);
(consensus, auto_client, task)
}
}
/// In memory storage
#[derive(Debug, Clone, Default)]
pub(crate) struct Storage {
inner: Arc<RwLock<StorageInner>>,
}
// == impl Storage ===
impl Storage {
fn new(chain_spec: &ChainSpec) -> Self {
let header = chain_spec.genesis_header();
let best_hash = header.hash_slow();
Self { inner: Arc::new(RwLock::new(StorageInner { best_hash, ..Default::default() })) }
}
/// Returns the write lock of the storage
pub(crate) async fn write(&self) -> RwLockWriteGuard<'_, StorageInner> {
self.inner.write().await
}
/// Returns the read lock of the storage
pub(crate) async fn read(&self) -> RwLockReadGuard<'_, StorageInner> {
self.inner.read().await
}
}
#[derive(Default, Debug)]
pub(crate) struct StorageInner {
/// Headers buffered for download.
pub(crate) headers: HashMap<BlockNumber, Header>,
/// A mapping between block hash and number.
pub(crate) hash_to_number: HashMap<BlockHash, BlockNumber>,
/// Bodies buffered for download.
pub(crate) bodies: HashMap<BlockHash, BlockBody>,
/// Tracks best block
pub(crate) best_block: u64,
/// Tracks hash of best block
pub(crate) best_hash: H256,
}
// === impl StorageInner ===
impl StorageInner {
/// Returns the block hash for the given block number if it exists.
pub(crate) fn block_hash(&self, num: u64) -> Option<BlockHash> {
self.hash_to_number.iter().find_map(|(k, v)| num.eq(v).then_some(*k))
}
/// Returns the matching header if it exists.
pub(crate) fn header_by_hash_or_number(
&self,
hash_or_num: BlockHashOrNumber,
) -> Option<Header> {
let num = match hash_or_num {
BlockHashOrNumber::Hash(hash) => self.hash_to_number.get(&hash).copied()?,
BlockHashOrNumber::Number(num) => num,
};
self.headers.get(&num).cloned()
}
/// Inserts a new header+body pair
pub(crate) fn insert_new_block(&mut self, mut header: Header, body: BlockBody) {
header.number = self.best_block + 1;
header.parent_hash = self.best_hash;
self.best_hash = header.hash_slow();
self.best_block = header.number;
trace!(target: "consensus::auto", num=self.best_block, hash=?self.best_hash, "inserting new block");
self.headers.insert(header.number, header);
self.bodies.insert(self.best_hash, body);
self.hash_to_number.insert(self.best_hash, self.best_block);
}
}

View File

@ -0,0 +1,155 @@
//! The mode the auto seal miner is operating in.
use futures_util::{stream::Fuse, StreamExt};
use reth_primitives::TxHash;
use reth_transaction_pool::{TransactionPool, ValidPoolTransaction};
use std::{
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::{sync::mpsc::Receiver, time::Interval};
use tokio_stream::{wrappers::ReceiverStream, Stream};
/// Mode of operations for the `Miner`
#[derive(Debug)]
pub enum MiningMode {
/// A miner that does nothing
None,
/// A miner that listens for new transactions that are ready.
///
/// Either one transaction will be mined per block, or any number of transactions will be
/// allowed
Auto(ReadyTransactionMiner),
/// A miner that constructs a new block every `interval` tick
FixedBlockTime(FixedBlockTimeMiner),
}
// === impl MiningMode ===
impl MiningMode {
/// Creates a new instant mining mode that listens for new transactions and tries to build
/// non-empty blocks as soon as transactions arrive.
pub fn instant(max_transactions: usize, listener: Receiver<TxHash>) -> Self {
MiningMode::Auto(ReadyTransactionMiner {
max_transactions,
has_pending_txs: None,
rx: ReceiverStream::new(listener).fuse(),
})
}
/// Creates a new interval miner that builds a block ever `duration`.
pub fn interval(duration: Duration) -> Self {
MiningMode::FixedBlockTime(FixedBlockTimeMiner::new(duration))
}
/// polls the Pool and returns those transactions that should be put in a block, if any.
pub(crate) fn poll<Pool>(
&mut self,
pool: &Pool,
cx: &mut Context<'_>,
) -> Poll<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>
where
Pool: TransactionPool,
{
match self {
MiningMode::None => Poll::Pending,
MiningMode::Auto(miner) => miner.poll(pool, cx),
MiningMode::FixedBlockTime(miner) => miner.poll(pool, cx),
}
}
}
/// A miner that's supposed to create a new block every `interval`, mining all transactions that are
/// ready at that time.
///
/// The default blocktime is set to 6 seconds
#[derive(Debug)]
pub struct FixedBlockTimeMiner {
/// The interval this fixed block time miner operates with
interval: Interval,
}
// === impl FixedBlockTimeMiner ===
impl FixedBlockTimeMiner {
/// Creates a new instance with an interval of `duration`
pub(crate) fn new(duration: Duration) -> Self {
let start = tokio::time::Instant::now() + duration;
Self { interval: tokio::time::interval_at(start, duration) }
}
fn poll<Pool>(
&mut self,
pool: &Pool,
cx: &mut Context<'_>,
) -> Poll<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>
where
Pool: TransactionPool,
{
if self.interval.poll_tick(cx).is_ready() {
// drain the pool
return Poll::Ready(pool.best_transactions().collect())
}
Poll::Pending
}
}
impl Default for FixedBlockTimeMiner {
fn default() -> Self {
Self::new(Duration::from_secs(6))
}
}
/// A miner that Listens for new ready transactions
pub struct ReadyTransactionMiner {
/// how many transactions to mine per block
max_transactions: usize,
/// stores whether there are pending transactions (if known)
has_pending_txs: Option<bool>,
/// Receives hashes of transactions that are ready
rx: Fuse<ReceiverStream<TxHash>>,
}
// === impl ReadyTransactionMiner ===
impl ReadyTransactionMiner {
fn poll<Pool>(
&mut self,
pool: &Pool,
cx: &mut Context<'_>,
) -> Poll<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>
where
Pool: TransactionPool,
{
// drain the notification stream
while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) {
self.has_pending_txs = Some(true);
}
if self.has_pending_txs == Some(false) {
return Poll::Pending
}
let transactions = pool.best_transactions().take(self.max_transactions).collect::<Vec<_>>();
// there are pending transactions if we didn't drain the pool
self.has_pending_txs = Some(transactions.len() >= self.max_transactions);
if transactions.is_empty() {
return Poll::Pending
}
Poll::Ready(transactions)
}
}
impl fmt::Debug for ReadyTransactionMiner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadyTransactionMiner")
.field("max_transactions", &self.max_transactions)
.finish_non_exhaustive()
}
}

View File

@ -0,0 +1,213 @@
use crate::{mode::MiningMode, Storage};
use futures_util::{future::BoxFuture, FutureExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_executor::executor::Executor;
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{
constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS},
proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom,
EMPTY_OMMER_ROOT, U256,
};
use reth_provider::StateProviderFactory;
use reth_revm::database::{State, SubState};
use reth_transaction_pool::{TransactionPool, ValidPoolTransaction};
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::{trace, warn};
/// A Future that listens for new ready transactions and puts new blocks into storage
pub struct MiningTask<Client, Pool: TransactionPool> {
/// The configured chain spec
chain_spec: Arc<ChainSpec>,
/// The client used to interact with the state
client: Client,
/// The active miner
miner: MiningMode,
/// Single active future that inserts a new block into `storage`
insert_task: Option<BoxFuture<'static, ()>>,
/// Shared storage to insert new blocks
storage: Storage,
/// Pool where transactions are stored
pool: Pool,
/// backlog of sets of transactions ready to be mined
queued: VecDeque<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>,
/// TODO: ideally this would just be a sender of hashes
to_engine: UnboundedSender<BeaconEngineMessage>,
}
// === impl MiningTask ===
impl<Client, Pool: TransactionPool> MiningTask<Client, Pool> {
/// Creates a new instance of the task
pub(crate) fn new(
chain_spec: Arc<ChainSpec>,
miner: MiningMode,
to_engine: UnboundedSender<BeaconEngineMessage>,
storage: Storage,
client: Client,
pool: Pool,
) -> Self {
Self {
chain_spec,
client,
miner,
insert_task: None,
storage,
pool,
to_engine,
queued: Default::default(),
}
}
}
impl<Client, Pool> Future for MiningTask<Client, Pool>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// this drives block production and
loop {
if let Poll::Ready(transactions) = this.miner.poll(&this.pool, cx) {
// miner returned a set of transaction that we feed to the producer
this.queued.push_back(transactions);
}
if this.insert_task.is_none() {
if this.queued.is_empty() {
// nothing to insert
break
}
// ready to queue in new insert task
let storage = this.storage.clone();
let transactions = this.queued.pop_front().expect("not empty");
let to_engine = this.to_engine.clone();
let client = this.client.clone();
let chain_spec = Arc::clone(&this.chain_spec);
let pool = this.pool.clone();
this.insert_task = Some(Box::pin(async move {
let mut storage = storage.write().await;
let mut header = Header {
parent_hash: storage.best_hash,
ommers_hash: EMPTY_OMMER_ROOT,
beneficiary: Default::default(),
state_root: Default::default(),
transactions_root: Default::default(),
receipts_root: Default::default(),
withdrawals_root: None,
logs_bloom: Default::default(),
difficulty: Default::default(),
number: storage.best_block + 1,
gas_limit: 30_000_000,
gas_used: 0,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
mix_hash: Default::default(),
nonce: 0,
base_fee_per_gas: None,
extra_data: Default::default(),
};
let transactions = transactions
.into_iter()
.map(|tx| tx.to_recovered_transaction().into_signed())
.collect::<Vec<_>>();
header.transactions_root = if transactions.is_empty() {
EMPTY_TRANSACTIONS
} else {
proofs::calculate_transaction_root(transactions.iter())
};
let block =
Block { header, body: transactions, ommers: vec![], withdrawals: None };
// execute the new block
let substate = SubState::new(State::new(client.latest().unwrap()));
let mut executor = Executor::new(chain_spec, substate);
trace!(target: "consensus::auto", transactions=?&block.body, "executing transactions");
match executor.execute_transactions(&block, U256::ZERO, None) {
Ok((res, gas_used)) => {
let Block { mut header, body, .. } = block;
// clear all transactions from pool
// TODO this should happen automatically via events
pool.remove_transactions(body.iter().map(|tx| tx.hash));
header.receipts_root = if res.receipts().is_empty() {
EMPTY_RECEIPTS
} else {
let receipts_with_bloom = res
.receipts()
.iter()
.map(|r| r.clone().into())
.collect::<Vec<ReceiptWithBloom>>();
proofs::calculate_receipt_root(receipts_with_bloom.iter())
};
let body =
BlockBody { transactions: body, ommers: vec![], withdrawals: None };
header.gas_used = gas_used;
storage.insert_new_block(header, body);
let new_hash = storage.best_hash;
let state = ForkchoiceState {
head_block_hash: new_hash,
finalized_block_hash: new_hash,
safe_block_hash: new_hash,
};
trace!(target: "consensus::auto", ?state, "sending fork choice update");
let (tx, _rx) = oneshot::channel();
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
});
}
Err(err) => {
warn!(target: "consensus::auto", ?err, "failed to execute block")
}
}
}));
}
if let Some(mut fut) = this.insert_task.take() {
match fut.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
this.insert_task = Some(fut);
break
}
}
}
}
Poll::Pending
}
}
impl<Client, Pool: TransactionPool> std::fmt::Debug for MiningTask<Client, Pool> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MiningTask").finish_non_exhaustive()
}
}