From 57ee0c31bebca57d5b964beb466328e0ccab7ca1 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 3 Apr 2024 21:27:32 +0200 Subject: [PATCH] fix: race condition in test setup (#7448) --- Cargo.lock | 1 + crates/node-e2e-tests/Cargo.toml | 3 ++- crates/node-e2e-tests/tests/it/eth.rs | 30 +++++++++++++++++---------- crates/payload/builder/src/service.rs | 20 +++++------------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c142385b9..a9c3c04ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4610,6 +4610,7 @@ dependencies = [ "reth-node-core", "reth-node-ethereum", "reth-primitives", + "reth-tracing", "secp256k1 0.27.0", "serde_json", "tokio", diff --git a/crates/node-e2e-tests/Cargo.toml b/crates/node-e2e-tests/Cargo.toml index 9072d4ce1..e60734d97 100644 --- a/crates/node-e2e-tests/Cargo.toml +++ b/crates/node-e2e-tests/Cargo.toml @@ -11,8 +11,9 @@ reth.workspace = true reth-node-core.workspace = true reth-primitives.workspace = true reth-node-ethereum.workspace = true -futures-util.workspace = true +reth-tracing.workspace = true +futures-util.workspace = true eyre.workspace = true tokio.workspace = true serde_json.workspace = true diff --git a/crates/node-e2e-tests/tests/it/eth.rs b/crates/node-e2e-tests/tests/it/eth.rs index 763b6af1f..c1608e612 100644 --- a/crates/node-e2e-tests/tests/it/eth.rs +++ b/crates/node-e2e-tests/tests/it/eth.rs @@ -18,6 +18,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; #[tokio::test] async fn can_run_eth_node() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); let tasks = TaskManager::current(); let test_suite = TestSuite::new(); @@ -35,6 +36,7 @@ async fn can_run_eth_node() -> eyre::Result<()> { // setup engine api events and payload service events let mut notifications = node.provider.canonical_state_stream(); let payload_events = node.payload_builder.subscribe().await?; + let mut payload_event_stream = payload_events.into_stream(); // push tx into pool via RPC server let eth_api = node.rpc_registry.eth_api(); @@ -45,14 +47,6 @@ async fn can_run_eth_node() -> eyre::Result<()> { let eth_attr = eth_payload_attributes(); let payload_id = node.payload_builder.new_payload(eth_attr.clone()).await?; - // resolve best payload via engine api - let client = node.engine_http_client(); - - // ensure we can get the payload over the engine api - let _payload = client.get_payload_v3(payload_id).await?; - - let mut payload_event_stream = payload_events.into_stream(); - // first event is the payload attributes let first_event = payload_event_stream.next().await.unwrap()?; if let reth::payload::Events::Attributes(attr) = first_event { @@ -61,7 +55,21 @@ async fn can_run_eth_node() -> eyre::Result<()> { panic!("Expect first event as payload attributes.") } - // second event is built payload + // wait until an actual payload is built before we resolve it via engine api + loop { + let payload = node.payload_builder.best_payload(payload_id).await.unwrap().unwrap(); + if payload.block().body.is_empty() { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + break; + } + + let client = node.engine_http_client(); + // trigger resolve payload via engine api + let _ = client.get_payload_v3(payload_id).await?; + + // ensure we're also receiving the built payload as event let second_event = payload_event_stream.next().await.unwrap()?; if let reth::payload::Events::BuiltPayload(payload) = second_event { // setup payload for submission @@ -93,8 +101,8 @@ async fn can_run_eth_node() -> eyre::Result<()> { // get head block from notifications stream and verify the tx has been pushed to the pool // is actually present in the canonical block let head = notifications.next().await.unwrap(); - let tx = head.tip().transactions().next().unwrap(); - assert_eq!(tx.hash(), transfer_tx.hash); + let tx = head.tip().transactions().next(); + assert_eq!(tx.unwrap().hash(), transfer_tx.hash); // make sure the block hash we submitted via FCU engine api is the new latest block using an // RPC call diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 85c4aad49..10534b48d 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -131,7 +131,9 @@ where } /// Returns the best payload for the given identifier. - async fn best_payload( + /// + /// Note: this does not resolve the job if it's still in progress. + pub async fn best_payload( &self, id: PayloadId, ) -> Option> { @@ -261,18 +263,6 @@ where (service, handle) } - /// Notifies the service on new attribute event. - pub fn on_new_attributes( - &self, - attributes: &Option< - Result<::PayloadBuilderAttributes, PayloadBuilderError>, - >, - ) { - if let Some(Ok(ref attributes)) = attributes { - self.payload_events.send(Events::Attributes(attributes.clone())).ok(); - } - } - /// Returns a handle to the service. pub fn handle(&self) -> PayloadBuilderHandle { PayloadBuilderHandle::new(self.service_tx.clone()) @@ -417,12 +407,13 @@ where } else { // no job for this payload yet, create one let parent = attr.parent(); - match this.generator.new_payload_job(attr) { + match this.generator.new_payload_job(attr.clone()) { Ok(job) => { info!(%id, %parent, "New payload job created"); this.metrics.inc_initiated_jobs(); new_job = true; this.payload_jobs.push((job, id)); + this.payload_events.send(Events::Attributes(attr.clone())).ok(); } Err(err) => { this.metrics.inc_failed_jobs(); @@ -440,7 +431,6 @@ where } PayloadServiceCommand::PayloadAttributes(id, tx) => { let attributes = this.payload_attributes(id); - this.on_new_attributes(&attributes); let _ = tx.send(attributes); } PayloadServiceCommand::Resolve(id, tx) => {