fix: race condition in test setup (#7448)

This commit is contained in:
Matthias Seitz
2024-04-03 21:27:32 +02:00
committed by GitHub
parent f892b5c511
commit 57ee0c31be
4 changed files with 27 additions and 27 deletions

1
Cargo.lock generated
View File

@ -4610,6 +4610,7 @@ dependencies = [
"reth-node-core",
"reth-node-ethereum",
"reth-primitives",
"reth-tracing",
"secp256k1 0.27.0",
"serde_json",
"tokio",

View File

@ -11,8 +11,9 @@ reth.workspace = true
reth-node-core.workspace = true
reth-primitives.workspace = true
reth-node-ethereum.workspace = true
futures-util.workspace = true
reth-tracing.workspace = true
futures-util.workspace = true
eyre.workspace = true
tokio.workspace = true
serde_json.workspace = true

View File

@ -18,6 +18,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
#[tokio::test]
async fn can_run_eth_node() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let test_suite = TestSuite::new();
@ -35,6 +36,7 @@ async fn can_run_eth_node() -> eyre::Result<()> {
// setup engine api events and payload service events
let mut notifications = node.provider.canonical_state_stream();
let payload_events = node.payload_builder.subscribe().await?;
let mut payload_event_stream = payload_events.into_stream();
// push tx into pool via RPC server
let eth_api = node.rpc_registry.eth_api();
@ -45,14 +47,6 @@ async fn can_run_eth_node() -> eyre::Result<()> {
let eth_attr = eth_payload_attributes();
let payload_id = node.payload_builder.new_payload(eth_attr.clone()).await?;
// resolve best payload via engine api
let client = node.engine_http_client();
// ensure we can get the payload over the engine api
let _payload = client.get_payload_v3(payload_id).await?;
let mut payload_event_stream = payload_events.into_stream();
// first event is the payload attributes
let first_event = payload_event_stream.next().await.unwrap()?;
if let reth::payload::Events::Attributes(attr) = first_event {
@ -61,7 +55,21 @@ async fn can_run_eth_node() -> eyre::Result<()> {
panic!("Expect first event as payload attributes.")
}
// second event is built payload
// wait until an actual payload is built before we resolve it via engine api
loop {
let payload = node.payload_builder.best_payload(payload_id).await.unwrap().unwrap();
if payload.block().body.is_empty() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
break;
}
let client = node.engine_http_client();
// trigger resolve payload via engine api
let _ = client.get_payload_v3(payload_id).await?;
// ensure we're also receiving the built payload as event
let second_event = payload_event_stream.next().await.unwrap()?;
if let reth::payload::Events::BuiltPayload(payload) = second_event {
// setup payload for submission
@ -93,8 +101,8 @@ async fn can_run_eth_node() -> eyre::Result<()> {
// get head block from notifications stream and verify the tx has been pushed to the pool
// is actually present in the canonical block
let head = notifications.next().await.unwrap();
let tx = head.tip().transactions().next().unwrap();
assert_eq!(tx.hash(), transfer_tx.hash);
let tx = head.tip().transactions().next();
assert_eq!(tx.unwrap().hash(), transfer_tx.hash);
// make sure the block hash we submitted via FCU engine api is the new latest block using an
// RPC call

View File

@ -131,7 +131,9 @@ where
}
/// Returns the best payload for the given identifier.
async fn best_payload(
///
/// Note: this does not resolve the job if it's still in progress.
pub async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<Engine::BuiltPayload, PayloadBuilderError>> {
@ -261,18 +263,6 @@ where
(service, handle)
}
/// Notifies the service on new attribute event.
pub fn on_new_attributes(
&self,
attributes: &Option<
Result<<Engine as EngineTypes>::PayloadBuilderAttributes, PayloadBuilderError>,
>,
) {
if let Some(Ok(ref attributes)) = attributes {
self.payload_events.send(Events::Attributes(attributes.clone())).ok();
}
}
/// Returns a handle to the service.
pub fn handle(&self) -> PayloadBuilderHandle<Engine> {
PayloadBuilderHandle::new(self.service_tx.clone())
@ -417,12 +407,13 @@ where
} else {
// no job for this payload yet, create one
let parent = attr.parent();
match this.generator.new_payload_job(attr) {
match this.generator.new_payload_job(attr.clone()) {
Ok(job) => {
info!(%id, %parent, "New payload job created");
this.metrics.inc_initiated_jobs();
new_job = true;
this.payload_jobs.push((job, id));
this.payload_events.send(Events::Attributes(attr.clone())).ok();
}
Err(err) => {
this.metrics.inc_failed_jobs();
@ -440,7 +431,6 @@ where
}
PayloadServiceCommand::PayloadAttributes(id, tx) => {
let attributes = this.payload_attributes(id);
this.on_new_attributes(&attributes);
let _ = tx.send(attributes);
}
PayloadServiceCommand::Resolve(id, tx) => {