From 952ec83aed27ae490f3c402ce69f5732333633b0 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 6 Feb 2023 10:56:43 +0200 Subject: [PATCH] chore(rpc): update forkchoice state in `fork_choice_updated` handler (#1177) --- Cargo.lock | 1 + crates/rpc/rpc-engine-api/Cargo.toml | 3 + crates/rpc/rpc-engine-api/src/engine_api.rs | 303 ++++++++++++++---- .../storage/provider/src/test_utils/mock.rs | 12 +- 4 files changed, 261 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c30903ea4..76f28eecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4629,6 +4629,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tracing", ] [[package]] diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index 71ce67faf..0c3c5e1a5 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -20,6 +20,9 @@ futures = "0.3" tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" +# tracing +tracing = "0.1" + # misc thiserror = "1.0.37" diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 7a0057444..b4c4c468a 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, task::{ready, Context, Poll}, }; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, watch}; use tokio_stream::wrappers::UnboundedReceiverStream; /// The Engine API response sender @@ -36,7 +36,8 @@ pub struct EngineApi { client: Arc, /// Consensus configuration chain_spec: ChainSpec, - rx: UnboundedReceiverStream, + message_rx: UnboundedReceiverStream, + forkchoice_state_tx: watch::Sender, // TODO: Placeholder for storing future blocks. Make cache bounded. // Use [lru](https://crates.io/crates/lru) crate local_store: HashMap, @@ -47,8 +48,6 @@ impl EngineApi { fn on_message(&mut self, msg: EngineApiMessage) { match msg { EngineApiMessage::GetPayload(payload_id, tx) => { - // NOTE: Will always result in `PayloadUnknown` since we don't support block - // building for now. let _ = tx.send(self.get_payload(payload_id).ok_or(EngineApiError::PayloadUnknown)); } EngineApiMessage::NewPayload(payload, tx) => { @@ -119,6 +118,9 @@ impl EngineApi { /// Called to retrieve the latest state of the network, validate new blocks, and maintain /// consistency between the Consensus and Execution layers. + /// + /// NOTE: Will always result in `PayloadUnknown` since we don't support block + /// building for now. pub fn get_payload(&self, payload_id: H64) -> Option { self.local_store.get(&payload_id).cloned() } @@ -189,7 +191,7 @@ impl EngineApi { pub fn fork_choice_updated( &self, fork_choice_state: ForkchoiceState, - _payload_attributes: Option, + payload_attributes: Option, ) -> EngineApiResult { let ForkchoiceState { head_block_hash, finalized_block_hash, .. } = fork_choice_state; @@ -209,7 +211,13 @@ impl EngineApi { return Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing)) } - // TODO: update tip + if let Err(error) = self.forkchoice_state_tx.send(fork_choice_state) { + tracing::error!(target: "rpc::engine_api", ?error, "Failed to update forkchoice state"); + } + + if let Some(_attr) = payload_attributes { + // TODO: optionally build the block + } let chain_info = self.client.chain_info()?; Ok(ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid) @@ -277,7 +285,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); loop { - match ready!(this.rx.poll_next_unpin(cx)) { + match ready!(this.message_rx.poll_next_unpin(cx)) { Some(msg) => this.on_message(msg), None => { // channel closed @@ -293,7 +301,7 @@ mod tests { use super::*; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::random_block; - use reth_primitives::H256; + use reth_primitives::{H256, MAINNET}; use reth_provider::test_utils::MockEthProvider; use tokio::sync::mpsc::unbounded_channel; @@ -301,7 +309,7 @@ mod tests { use super::*; use bytes::{Bytes, BytesMut}; use reth_interfaces::test_utils::generators::random_header; - use reth_primitives::{Block, MAINNET}; + use reth_primitives::Block; use reth_rlp::DecodeError; fn transform_block Block>(src: SealedBlock, f: F) -> SealedBlock { @@ -321,12 +329,14 @@ mod tests { #[tokio::test] async fn payload_validation() { - let (_tx, rx) = unbounded_channel(); + let (_msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let engine = EngineApi { client: Arc::new(MockEthProvider::default()), chain_spec: MAINNET.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; let block = random_block(100, Some(H256::random()), Some(3), Some(0)); @@ -409,13 +419,15 @@ mod tests { #[tokio::test] async fn payload_known() { - let (tx, rx) = unbounded_channel(); + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let client = Arc::new(MockEthProvider::default()); let engine = EngineApi { client: client.clone(), chain_spec: MAINNET.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; tokio::spawn(engine); @@ -427,7 +439,8 @@ mod tests { client.add_header(block_hash, block.header.unseal()); let (result_tx, result_rx) = oneshot::channel(); - tx.send(EngineApiMessage::NewPayload(execution_payload, result_tx)) + msg_tx + .send(EngineApiMessage::NewPayload(execution_payload, result_tx)) .expect("failed to send engine msg"); let result = result_rx.await; @@ -438,19 +451,22 @@ mod tests { #[tokio::test] async fn payload_parent_unknown() { - let (tx, rx) = unbounded_channel(); + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let engine = EngineApi { client: Arc::new(MockEthProvider::default()), chain_spec: MAINNET.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; tokio::spawn(engine); let (result_tx, result_rx) = oneshot::channel(); let block = random_block(100, Some(H256::random()), None, Some(0)); // payload must have no ommers - tx.send(EngineApiMessage::NewPayload(block.into(), result_tx)) + msg_tx + .send(EngineApiMessage::NewPayload(block.into(), result_tx)) .expect("failed to send engine msg"); let result = result_rx.await; @@ -461,14 +477,16 @@ mod tests { #[tokio::test] async fn payload_pre_merge() { - let (tx, rx) = unbounded_channel(); + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let chain_spec = MAINNET.clone(); let client = Arc::new(MockEthProvider::default()); let engine = EngineApi { client: client.clone(), chain_spec: chain_spec.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; tokio::spawn(engine); @@ -483,7 +501,8 @@ mod tests { client.add_block(parent.hash(), parent.clone().unseal()); - tx.send(EngineApiMessage::NewPayload(block.clone().into(), result_tx)) + msg_tx + .send(EngineApiMessage::NewPayload(block.clone().into(), result_tx)) .expect("failed to send engine msg"); let result = result_rx.await; @@ -496,14 +515,16 @@ mod tests { #[tokio::test] async fn invalid_payload_timestamp() { - let (tx, rx) = unbounded_channel(); + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let chain_spec = MAINNET.clone(); let client = Arc::new(MockEthProvider::default()); let engine = EngineApi { client: client.clone(), chain_spec: chain_spec.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; tokio::spawn(engine); @@ -525,7 +546,8 @@ mod tests { client.add_block(parent.hash(), parent.clone().unseal()); - tx.send(EngineApiMessage::NewPayload(block.clone().into(), result_tx)) + msg_tx + .send(EngineApiMessage::NewPayload(block.clone().into(), result_tx)) .expect("failed to send engine msg"); let result = result_rx.await; @@ -546,18 +568,18 @@ mod tests { // non exhaustive tests for engine_getPayload // TODO: amend when block building is implemented mod get_payload { - use reth_primitives::MAINNET; - use super::*; #[tokio::test] async fn payload_unknown() { - let (tx, rx) = unbounded_channel(); + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let engine = EngineApi { client: Arc::new(MockEthProvider::default()), chain_spec: MAINNET.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; tokio::spawn(engine); @@ -565,28 +587,187 @@ mod tests { let payload_id = H64::random(); let (result_tx, result_rx) = oneshot::channel(); - tx.send(EngineApiMessage::GetPayload(payload_id, result_tx)) + msg_tx + .send(EngineApiMessage::GetPayload(payload_id, result_tx)) .expect("failed to send engine msg"); assert_matches!(result_rx.await, Ok(Err(EngineApiError::PayloadUnknown))); } } - // https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-3 - mod exchange_transition_configuration { - use reth_primitives::MAINNET; + mod fork_choice_updated { + use reth_interfaces::test_utils::generators::random_header; use super::*; + #[tokio::test] + async fn empty_head() { + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default()); + let engine = EngineApi { + client: Arc::new(MockEthProvider::default()), + chain_spec: MAINNET.clone(), + local_store: Default::default(), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, + }; + + tokio::spawn(engine); + + let (result_tx, result_rx) = oneshot::channel(); + msg_tx + .send(EngineApiMessage::ForkchoiceUpdated( + ForkchoiceState::default(), + None, + result_tx, + )) + .expect("failed to send engine msg"); + + let result = result_rx.await; + assert_matches!(result, Ok(Ok(_))); + assert_eq!( + result.unwrap().unwrap(), + ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { + validation_error: EngineApiError::ForkchoiceEmptyHead.to_string(), + }) + ); + assert!(!tip_rx.has_changed().unwrap()); + } + + #[tokio::test] + async fn unknown_head_hash() { + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default()); + let engine = EngineApi { + client: Arc::new(MockEthProvider::default()), + chain_spec: MAINNET.clone(), + local_store: Default::default(), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, + }; + + tokio::spawn(engine); + + let state = ForkchoiceState { head_block_hash: H256::random(), ..Default::default() }; + + let (result_tx, result_rx) = oneshot::channel(); + msg_tx + .send(EngineApiMessage::ForkchoiceUpdated(state, None, result_tx)) + .expect("failed to send engine msg"); + + let result = result_rx.await; + assert_matches!(result, Ok(Ok(_))); + assert_eq!( + result.unwrap().unwrap(), + ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing) + ); + assert!(!tip_rx.has_changed().unwrap()); + } + + #[tokio::test] + async fn unknown_finalized_hash() { + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default()); + let client = Arc::new(MockEthProvider::default()); + let engine = EngineApi { + client: client.clone(), + chain_spec: MAINNET.clone(), + local_store: Default::default(), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, + }; + + tokio::spawn(engine); + + let head = random_header(100, None); + client.add_header(head.hash(), head.clone().unseal()); + + let state = ForkchoiceState { + head_block_hash: head.hash(), + finalized_block_hash: H256::random(), + ..Default::default() + }; + + let (result_tx, result_rx) = oneshot::channel(); + msg_tx + .send(EngineApiMessage::ForkchoiceUpdated(state, None, result_tx)) + .expect("failed to send engine msg"); + + let result = result_rx.await; + assert_matches!(result, Ok(Ok(_))); + assert_eq!( + result.unwrap().unwrap(), + ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing) + ); + assert!(!tip_rx.has_changed().unwrap()); + } + + #[tokio::test] + async fn forkchoice_state_is_updated() { + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default()); + let client = Arc::new(MockEthProvider::default()); + let engine = EngineApi { + client: client.clone(), + chain_spec: MAINNET.clone(), + local_store: Default::default(), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, + }; + + tokio::spawn(engine); + + let finalized = random_header(90, None); + let head = random_header(100, None); + client.extend_headers([ + (head.hash(), head.clone().unseal()), + (finalized.hash(), finalized.clone().unseal()), + ]); + + let state = ForkchoiceState { + head_block_hash: head.hash(), + finalized_block_hash: finalized.hash(), + ..Default::default() + }; + + let (result_tx, result_rx) = oneshot::channel(); + msg_tx + .send(EngineApiMessage::ForkchoiceUpdated(state.clone(), None, result_tx)) + .expect("failed to send engine msg"); + + let result = result_rx.await; + assert_matches!(result, Ok(Ok(_))); + assert_eq!( + result.unwrap().unwrap(), + ForkchoiceUpdated { + payload_id: None, + payload_status: PayloadStatus { + status: PayloadStatusEnum::Valid, + latest_valid_hash: Some(head.hash()) + } + } + ); + + assert!(tip_rx.has_changed().unwrap()); + assert_eq!(tip_rx.borrow().clone(), state); + } + } + + // https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-3 + mod exchange_transition_configuration { + use super::*; + #[tokio::test] async fn terminal_td_mismatch() { - let (tx, rx) = unbounded_channel(); + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let chain_spec = MAINNET.clone(); let engine = EngineApi { client: Arc::new(MockEthProvider::default()), chain_spec: chain_spec.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; tokio::spawn(engine); @@ -601,11 +782,12 @@ mod tests { }; let (result_tx, result_rx) = oneshot::channel(); - tx.send(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )) - .expect("failed to send engine msg"); + msg_tx + .send(EngineApiMessage::ExchangeTransitionConfiguration( + transition_config.clone(), + result_tx, + )) + .expect("failed to send engine msg"); assert_matches!( result_rx.await, @@ -617,14 +799,16 @@ mod tests { #[tokio::test] async fn terminal_block_hash_mismatch() { - let (tx, rx) = unbounded_channel(); + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let client = Arc::new(MockEthProvider::default()); let chain_spec = MAINNET.clone(); let engine = EngineApi { client: client.clone(), chain_spec: chain_spec.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; tokio::spawn(engine); @@ -644,11 +828,12 @@ mod tests { // Unknown block number let (result_tx, result_rx) = oneshot::channel(); - tx.send(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )) - .expect("failed to send engine msg"); + msg_tx + .send(EngineApiMessage::ExchangeTransitionConfiguration( + transition_config.clone(), + result_tx, + )) + .expect("failed to send engine msg"); assert_matches!( result_rx.await, @@ -664,11 +849,12 @@ mod tests { ); let (result_tx, result_rx) = oneshot::channel(); - tx.send(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )) - .expect("failed to send engine msg"); + msg_tx + .send(EngineApiMessage::ExchangeTransitionConfiguration( + transition_config.clone(), + result_tx, + )) + .expect("failed to send engine msg"); assert_matches!( result_rx.await, @@ -680,14 +866,16 @@ mod tests { #[tokio::test] async fn configurations_match() { - let (tx, rx) = unbounded_channel(); + let (msg_tx, msg_rx) = unbounded_channel(); + let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); let client = Arc::new(MockEthProvider::default()); let chain_spec = MAINNET.clone(); let engine = EngineApi { client: client.clone(), chain_spec: chain_spec.clone(), local_store: Default::default(), - rx: UnboundedReceiverStream::new(rx), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx: tip_tx, }; tokio::spawn(engine); @@ -707,11 +895,12 @@ mod tests { client.add_block(terminal_block.hash(), terminal_block.clone().unseal()); let (result_tx, result_rx) = oneshot::channel(); - tx.send(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )) - .expect("failed to send engine msg"); + msg_tx + .send(EngineApiMessage::ExchangeTransitionConfiguration( + transition_config.clone(), + result_tx, + )) + .expect("failed to send engine msg"); assert_matches!( result_rx.await, diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index b9c80fd24..2a1f703f2 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -128,7 +128,17 @@ impl BlockHashProvider for MockEthProvider { impl BlockProvider for MockEthProvider { fn chain_info(&self) -> Result { - todo!() + let lock = self.headers.lock(); + Ok(lock + .iter() + .max_by_key(|h| h.1.number) + .map(|(hash, header)| ChainInfo { + best_hash: *hash, + best_number: header.number, + last_finalized: None, + safe_finalized: None, + }) + .expect("provider is empty")) } fn block(&self, id: BlockId) -> Result> {