feat(rpc): implement newHeads subscription (#1446)

This commit is contained in:
Matthias Seitz
2023-02-19 19:41:37 +01:00
committed by GitHub
parent e434e49f3f
commit eb1299194f
3 changed files with 114 additions and 63 deletions

View File

@ -1,8 +1,8 @@
//! Contains types that represent ethereum types in [reth_primitives] when used in RPC
use crate::Transaction;
use reth_primitives::{
Address, Block as PrimitiveBlock, Bloom, Bytes, Header as RethHeader, Withdrawal, H256, H64,
U256,
Address, Block as PrimitiveBlock, Bloom, Bytes, Header as PrimitiveHeader, Withdrawal, H256,
H64, U256,
};
use reth_rlp::Encodable;
use serde::{ser::Error, Deserialize, Serialize, Serializer};
@ -56,51 +56,12 @@ impl Block {
total_difficulty: U256,
) -> Result<Self, BlockError> {
let block_hash = block.header.hash_slow();
let header_length = block.header.length();
let block_length = block.length();
let block_number = block.number;
let uncles = block.ommers.into_iter().map(|h| h.hash_slow()).collect();
let base_fee_per_gas = block.header.base_fee_per_gas;
let RethHeader {
parent_hash,
ommers_hash,
beneficiary,
state_root,
transactions_root,
receipts_root,
logs_bloom,
difficulty,
number,
gas_limit,
gas_used,
timestamp,
mix_hash,
nonce,
base_fee_per_gas,
extra_data,
withdrawals_root,
} = block.header;
let header = Header {
hash: Some(block_hash),
parent_hash,
uncles_hash: ommers_hash,
author: beneficiary,
miner: beneficiary,
state_root,
transactions_root,
receipts_root,
withdrawals_root,
number: Some(U256::from(number)),
gas_used: U256::from(gas_used),
gas_limit: U256::from(gas_limit),
extra_data,
logs_bloom,
timestamp: U256::from(timestamp),
difficulty,
mix_hash,
nonce: Some(nonce.to_be_bytes().into()),
size: Some(U256::from(header_length)),
};
let header = Header::from_primitive_with_hash(block.header, block_hash);
let mut transactions = Vec::with_capacity(block.body.len());
for (idx, tx) in block.body.iter().enumerate() {
@ -108,7 +69,7 @@ impl Block {
transactions.push(Transaction::from_recovered_with_block_context(
signed_tx,
block_hash,
number,
block_number,
U256::from(idx),
))
}
@ -170,12 +131,77 @@ pub struct Header {
pub size: Option<U256>,
}
// === impl Header ===
impl Header {
/// Converts the primitive header type to this RPC type
///
/// CAUTION: this takes the header's hash as is and does _not_ calculate the hash.
pub fn from_primitive_with_hash(primitive_header: PrimitiveHeader, block_hash: H256) -> Self {
let header_length = primitive_header.length();
let PrimitiveHeader {
parent_hash,
ommers_hash,
beneficiary,
state_root,
transactions_root,
receipts_root,
logs_bloom,
difficulty,
number,
gas_limit,
gas_used,
timestamp,
mix_hash,
nonce,
base_fee_per_gas: _,
extra_data,
withdrawals_root,
} = primitive_header;
Header {
hash: Some(block_hash),
parent_hash,
uncles_hash: ommers_hash,
author: beneficiary,
miner: beneficiary,
state_root,
transactions_root,
receipts_root,
withdrawals_root,
number: Some(U256::from(number)),
gas_used: U256::from(gas_used),
gas_limit: U256::from(gas_limit),
extra_data,
logs_bloom,
timestamp: U256::from(timestamp),
difficulty,
mix_hash,
nonce: Some(nonce.to_be_bytes().into()),
size: Some(U256::from(header_length)),
}
}
}
/// A Block representation that allows to include additional fields
pub type RichBlock = Rich<Block>;
impl From<Block> for RichBlock {
fn from(block: Block) -> Self {
Rich { inner: block, extra_info: Default::default() }
}
}
/// Header representation with additional info.
pub type RichHeader = Rich<Header>;
impl From<Header> for RichHeader {
fn from(header: Header) -> Self {
Rich { inner: header, extra_info: Default::default() }
}
}
/// Value representation with additional info
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
pub struct Rich<T> {

View File

@ -1,51 +1,58 @@
//! `eth_` PubSub RPC handler implementation
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_interfaces::events::ChainEventSubscriptions;
use reth_primitives::{rpc::FilteredParams, TxHash};
use reth_provider::BlockProvider;
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::pubsub::{
Params, SubscriptionKind, SubscriptionResult as EthSubscriptionResult,
use reth_rpc_types::{
pubsub::{Params, SubscriptionKind, SubscriptionResult as EthSubscriptionResult},
Header,
};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::TransactionPool;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tokio_stream::{
wrappers::{ReceiverStream, UnboundedReceiverStream},
Stream, StreamExt,
};
/// `Eth` pubsub RPC implementation.
///
/// This handles
pub struct EthPubSub<Client, Pool> {
pub struct EthPubSub<Client, Pool, Events> {
/// All nested fields bundled together.
inner: EthPubSubInner<Client, Pool>,
inner: EthPubSubInner<Client, Pool, Events>,
/// The type that's used to spawn subscription tasks.
subscription_task_spawner: Box<dyn TaskSpawner>,
}
// === impl EthPubSub ===
impl<Client, Pool> EthPubSub<Client, Pool> {
impl<Client, Pool, Events> EthPubSub<Client, Pool, Events> {
/// Creates a new, shareable instance.
///
/// Subscription tasks are spawned via [tokio::task::spawn]
pub fn new(client: Client, pool: Pool) -> Self {
Self::with_spawner(client, pool, Box::<TokioTaskExecutor>::default())
pub fn new(client: Client, pool: Pool, chain_events: Events) -> Self {
Self::with_spawner(client, pool, chain_events, Box::<TokioTaskExecutor>::default())
}
/// Creates a new, shareable instance.
pub fn with_spawner(
client: Client,
pool: Pool,
chain_events: Events,
subscription_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let inner = EthPubSubInner { client, pool };
let inner = EthPubSubInner { client, pool, chain_events };
Self { inner, subscription_task_spawner }
}
}
impl<Client, Pool> EthPubSubApiServer for EthPubSub<Client, Pool>
impl<Client, Pool, Events> EthPubSubApiServer for EthPubSub<Client, Pool, Events>
where
Client: BlockProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
{
fn subscribe(
&self,
@ -65,14 +72,15 @@ where
}
/// The actual handler for and accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Client, Pool>(
pubsub: EthPubSubInner<Client, Pool>,
async fn handle_accepted<Client, Pool, Events>(
pubsub: EthPubSubInner<Client, Pool, Events>,
mut accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) where
Client: BlockProvider + 'static,
Pool: TransactionPool + 'static,
Events: ChainEventSubscriptions + 'static,
{
// if no params are provided, used default filter params
let _params = match params {
@ -82,7 +90,10 @@ async fn handle_accepted<Client, Pool>(
match kind {
SubscriptionKind::NewHeads => {
// TODO subscribe new blocks -> convert
let stream = pubsub
.into_new_headers_stream()
.map(|block| EthSubscriptionResult::Header(Box::new(block.into())));
accepted_sink.pipe_from_stream(stream).await;
}
SubscriptionKind::Logs => {
// TODO subscribe new blocks -> fetch logs via bloom
@ -99,7 +110,7 @@ async fn handle_accepted<Client, Pool>(
}
}
impl<Client, Pool> std::fmt::Debug for EthPubSub<Client, Pool> {
impl<Client, Pool, Events> std::fmt::Debug for EthPubSub<Client, Pool, Events> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthPubSub").finish_non_exhaustive()
}
@ -107,18 +118,19 @@ impl<Client, Pool> std::fmt::Debug for EthPubSub<Client, Pool> {
/// Container type `EthPubSub`
#[derive(Clone)]
struct EthPubSubInner<Client, Pool> {
struct EthPubSubInner<Client, Pool, Events> {
/// The transaction pool.
pool: Pool,
/// The client that can interact with the chain.
client: Client,
/// A type that allows to create new event subscriptions,
chain_events: Events,
}
// == impl EthPubSubInner ===
impl<Client, Pool> EthPubSubInner<Client, Pool>
impl<Client, Pool, Events> EthPubSubInner<Client, Pool, Events>
where
Client: BlockProvider + 'static,
Pool: TransactionPool + 'static,
{
/// Returns a stream that yields all transactions emitted by the txpool.
@ -126,3 +138,16 @@ where
ReceiverStream::new(self.pool.pending_transactions_listener())
}
}
impl<Client, Pool, Events> EthPubSubInner<Client, Pool, Events>
where
Client: BlockProvider + 'static,
Events: ChainEventSubscriptions + 'static,
{
/// Returns a stream that yields all new RPC blocks.
fn into_new_headers_stream(self) -> impl Stream<Item = Header> {
UnboundedReceiverStream::new(self.chain_events.subscribe_new_blocks()).map(|new_block| {
Header::from_primitive_with_hash(new_block.header.as_ref().clone(), new_block.hash)
})
}
}