chore(rpc): update forkchoice state in fork_choice_updated handler (#1177)

This commit is contained in:
Roman Krasiuk
2023-02-06 10:56:43 +02:00
committed by GitHub
parent 54744b3e6b
commit 952ec83aed
4 changed files with 261 additions and 58 deletions

1
Cargo.lock generated
View File

@ -4629,6 +4629,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]

View File

@ -20,6 +20,9 @@ futures = "0.3"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
# tracing
tracing = "0.1"
# misc
thiserror = "1.0.37"

View File

@ -23,7 +23,7 @@ use std::{
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// The Engine API response sender
@ -36,7 +36,8 @@ pub struct EngineApi<Client> {
client: Arc<Client>,
/// Consensus configuration
chain_spec: ChainSpec,
rx: UnboundedReceiverStream<EngineApiMessage>,
message_rx: UnboundedReceiverStream<EngineApiMessage>,
forkchoice_state_tx: watch::Sender<ForkchoiceState>,
// TODO: Placeholder for storing future blocks. Make cache bounded.
// Use [lru](https://crates.io/crates/lru) crate
local_store: HashMap<H64, ExecutionPayload>,
@ -47,8 +48,6 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> EngineApi<Client> {
fn on_message(&mut self, msg: EngineApiMessage) {
match msg {
EngineApiMessage::GetPayload(payload_id, tx) => {
// NOTE: Will always result in `PayloadUnknown` since we don't support block
// building for now.
let _ = tx.send(self.get_payload(payload_id).ok_or(EngineApiError::PayloadUnknown));
}
EngineApiMessage::NewPayload(payload, tx) => {
@ -119,6 +118,9 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> EngineApi<Client> {
/// Called to retrieve the latest state of the network, validate new blocks, and maintain
/// consistency between the Consensus and Execution layers.
///
/// NOTE: Will always result in `PayloadUnknown` since we don't support block
/// building for now.
pub fn get_payload(&self, payload_id: H64) -> Option<ExecutionPayload> {
self.local_store.get(&payload_id).cloned()
}
@ -189,7 +191,7 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> EngineApi<Client> {
pub fn fork_choice_updated(
&self,
fork_choice_state: ForkchoiceState,
_payload_attributes: Option<PayloadAttributes>,
payload_attributes: Option<PayloadAttributes>,
) -> EngineApiResult<ForkchoiceUpdated> {
let ForkchoiceState { head_block_hash, finalized_block_hash, .. } = fork_choice_state;
@ -209,7 +211,13 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> EngineApi<Client> {
return Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing))
}
// TODO: update tip
if let Err(error) = self.forkchoice_state_tx.send(fork_choice_state) {
tracing::error!(target: "rpc::engine_api", ?error, "Failed to update forkchoice state");
}
if let Some(_attr) = payload_attributes {
// TODO: optionally build the block
}
let chain_info = self.client.chain_info()?;
Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid)
@ -277,7 +285,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match ready!(this.rx.poll_next_unpin(cx)) {
match ready!(this.message_rx.poll_next_unpin(cx)) {
Some(msg) => this.on_message(msg),
None => {
// channel closed
@ -293,7 +301,7 @@ mod tests {
use super::*;
use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::random_block;
use reth_primitives::H256;
use reth_primitives::{H256, MAINNET};
use reth_provider::test_utils::MockEthProvider;
use tokio::sync::mpsc::unbounded_channel;
@ -301,7 +309,7 @@ mod tests {
use super::*;
use bytes::{Bytes, BytesMut};
use reth_interfaces::test_utils::generators::random_header;
use reth_primitives::{Block, MAINNET};
use reth_primitives::Block;
use reth_rlp::DecodeError;
fn transform_block<F: FnOnce(Block) -> Block>(src: SealedBlock, f: F) -> SealedBlock {
@ -321,12 +329,14 @@ mod tests {
#[tokio::test]
async fn payload_validation() {
let (_tx, rx) = unbounded_channel();
let (_msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
let block = random_block(100, Some(H256::random()), Some(3), Some(0));
@ -409,13 +419,15 @@ mod tests {
#[tokio::test]
async fn payload_known() {
let (tx, rx) = unbounded_channel();
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let client = Arc::new(MockEthProvider::default());
let engine = EngineApi {
client: client.clone(),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
@ -427,7 +439,8 @@ mod tests {
client.add_header(block_hash, block.header.unseal());
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineApiMessage::NewPayload(execution_payload, result_tx))
msg_tx
.send(EngineApiMessage::NewPayload(execution_payload, result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
@ -438,19 +451,22 @@ mod tests {
#[tokio::test]
async fn payload_parent_unknown() {
let (tx, rx) = unbounded_channel();
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
let (result_tx, result_rx) = oneshot::channel();
let block = random_block(100, Some(H256::random()), None, Some(0)); // payload must have no ommers
tx.send(EngineApiMessage::NewPayload(block.into(), result_tx))
msg_tx
.send(EngineApiMessage::NewPayload(block.into(), result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
@ -461,14 +477,16 @@ mod tests {
#[tokio::test]
async fn payload_pre_merge() {
let (tx, rx) = unbounded_channel();
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let chain_spec = MAINNET.clone();
let client = Arc::new(MockEthProvider::default());
let engine = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
@ -483,7 +501,8 @@ mod tests {
client.add_block(parent.hash(), parent.clone().unseal());
tx.send(EngineApiMessage::NewPayload(block.clone().into(), result_tx))
msg_tx
.send(EngineApiMessage::NewPayload(block.clone().into(), result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
@ -496,14 +515,16 @@ mod tests {
#[tokio::test]
async fn invalid_payload_timestamp() {
let (tx, rx) = unbounded_channel();
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let chain_spec = MAINNET.clone();
let client = Arc::new(MockEthProvider::default());
let engine = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
@ -525,7 +546,8 @@ mod tests {
client.add_block(parent.hash(), parent.clone().unseal());
tx.send(EngineApiMessage::NewPayload(block.clone().into(), result_tx))
msg_tx
.send(EngineApiMessage::NewPayload(block.clone().into(), result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
@ -546,18 +568,18 @@ mod tests {
// non exhaustive tests for engine_getPayload
// TODO: amend when block building is implemented
mod get_payload {
use reth_primitives::MAINNET;
use super::*;
#[tokio::test]
async fn payload_unknown() {
let (tx, rx) = unbounded_channel();
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
@ -565,28 +587,187 @@ mod tests {
let payload_id = H64::random();
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineApiMessage::GetPayload(payload_id, result_tx))
msg_tx
.send(EngineApiMessage::GetPayload(payload_id, result_tx))
.expect("failed to send engine msg");
assert_matches!(result_rx.await, Ok(Err(EngineApiError::PayloadUnknown)));
}
}
// https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-3
mod exchange_transition_configuration {
use reth_primitives::MAINNET;
mod fork_choice_updated {
use reth_interfaces::test_utils::generators::random_header;
use super::*;
#[tokio::test]
async fn empty_head() {
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default());
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
let (result_tx, result_rx) = oneshot::channel();
msg_tx
.send(EngineApiMessage::ForkchoiceUpdated(
ForkchoiceState::default(),
None,
result_tx,
))
.expect("failed to send engine msg");
let result = result_rx.await;
assert_matches!(result, Ok(Ok(_)));
assert_eq!(
result.unwrap().unwrap(),
ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid {
validation_error: EngineApiError::ForkchoiceEmptyHead.to_string(),
})
);
assert!(!tip_rx.has_changed().unwrap());
}
#[tokio::test]
async fn unknown_head_hash() {
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default());
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
let state = ForkchoiceState { head_block_hash: H256::random(), ..Default::default() };
let (result_tx, result_rx) = oneshot::channel();
msg_tx
.send(EngineApiMessage::ForkchoiceUpdated(state, None, result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
assert_matches!(result, Ok(Ok(_)));
assert_eq!(
result.unwrap().unwrap(),
ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing)
);
assert!(!tip_rx.has_changed().unwrap());
}
#[tokio::test]
async fn unknown_finalized_hash() {
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default());
let client = Arc::new(MockEthProvider::default());
let engine = EngineApi {
client: client.clone(),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
let head = random_header(100, None);
client.add_header(head.hash(), head.clone().unseal());
let state = ForkchoiceState {
head_block_hash: head.hash(),
finalized_block_hash: H256::random(),
..Default::default()
};
let (result_tx, result_rx) = oneshot::channel();
msg_tx
.send(EngineApiMessage::ForkchoiceUpdated(state, None, result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
assert_matches!(result, Ok(Ok(_)));
assert_eq!(
result.unwrap().unwrap(),
ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing)
);
assert!(!tip_rx.has_changed().unwrap());
}
#[tokio::test]
async fn forkchoice_state_is_updated() {
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default());
let client = Arc::new(MockEthProvider::default());
let engine = EngineApi {
client: client.clone(),
chain_spec: MAINNET.clone(),
local_store: Default::default(),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
let finalized = random_header(90, None);
let head = random_header(100, None);
client.extend_headers([
(head.hash(), head.clone().unseal()),
(finalized.hash(), finalized.clone().unseal()),
]);
let state = ForkchoiceState {
head_block_hash: head.hash(),
finalized_block_hash: finalized.hash(),
..Default::default()
};
let (result_tx, result_rx) = oneshot::channel();
msg_tx
.send(EngineApiMessage::ForkchoiceUpdated(state.clone(), None, result_tx))
.expect("failed to send engine msg");
let result = result_rx.await;
assert_matches!(result, Ok(Ok(_)));
assert_eq!(
result.unwrap().unwrap(),
ForkchoiceUpdated {
payload_id: None,
payload_status: PayloadStatus {
status: PayloadStatusEnum::Valid,
latest_valid_hash: Some(head.hash())
}
}
);
assert!(tip_rx.has_changed().unwrap());
assert_eq!(tip_rx.borrow().clone(), state);
}
}
// https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-3
mod exchange_transition_configuration {
use super::*;
#[tokio::test]
async fn terminal_td_mismatch() {
let (tx, rx) = unbounded_channel();
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let chain_spec = MAINNET.clone();
let engine = EngineApi {
client: Arc::new(MockEthProvider::default()),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
@ -601,7 +782,8 @@ mod tests {
};
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineApiMessage::ExchangeTransitionConfiguration(
msg_tx
.send(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
))
@ -617,14 +799,16 @@ mod tests {
#[tokio::test]
async fn terminal_block_hash_mismatch() {
let (tx, rx) = unbounded_channel();
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let client = Arc::new(MockEthProvider::default());
let chain_spec = MAINNET.clone();
let engine = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
@ -644,7 +828,8 @@ mod tests {
// Unknown block number
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineApiMessage::ExchangeTransitionConfiguration(
msg_tx
.send(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
))
@ -664,7 +849,8 @@ mod tests {
);
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineApiMessage::ExchangeTransitionConfiguration(
msg_tx
.send(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
))
@ -680,14 +866,16 @@ mod tests {
#[tokio::test]
async fn configurations_match() {
let (tx, rx) = unbounded_channel();
let (msg_tx, msg_rx) = unbounded_channel();
let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default());
let client = Arc::new(MockEthProvider::default());
let chain_spec = MAINNET.clone();
let engine = EngineApi {
client: client.clone(),
chain_spec: chain_spec.clone(),
local_store: Default::default(),
rx: UnboundedReceiverStream::new(rx),
message_rx: UnboundedReceiverStream::new(msg_rx),
forkchoice_state_tx: tip_tx,
};
tokio::spawn(engine);
@ -707,7 +895,8 @@ mod tests {
client.add_block(terminal_block.hash(), terminal_block.clone().unseal());
let (result_tx, result_rx) = oneshot::channel();
tx.send(EngineApiMessage::ExchangeTransitionConfiguration(
msg_tx
.send(EngineApiMessage::ExchangeTransitionConfiguration(
transition_config.clone(),
result_tx,
))

View File

@ -128,7 +128,17 @@ impl BlockHashProvider for MockEthProvider {
impl BlockProvider for MockEthProvider {
fn chain_info(&self) -> Result<ChainInfo> {
todo!()
let lock = self.headers.lock();
Ok(lock
.iter()
.max_by_key(|h| h.1.number)
.map(|(hash, header)| ChainInfo {
best_hash: *hash,
best_number: header.number,
last_finalized: None,
safe_finalized: None,
})
.expect("provider is empty"))
}
fn block(&self, id: BlockId) -> Result<Option<Block>> {