feat: Make deserialization work

This commit is contained in:
sprites0
2025-06-22 14:46:37 -04:00
parent a0f1b14391
commit 9437949187
3 changed files with 180 additions and 18 deletions

View File

@ -1,7 +1,11 @@
use super::handle::ImportHandle; use super::handle::ImportHandle;
use crate::{ use crate::{
consensus::HlConsensus, consensus::HlConsensus,
node::{network::HlNewBlock, rpc::engine_api::payload::HlPayloadTypes}, node::{
network::HlNewBlock,
rpc::engine_api::payload::HlPayloadTypes,
types::{BlockAndReceipts, EvmBlock},
},
HlBlock, HlBlockBody, HlBlock, HlBlockBody,
}; };
use alloy_consensus::{BlockBody, Header}; use alloy_consensus::{BlockBody, Header};
@ -9,6 +13,7 @@ use alloy_primitives::U128;
use alloy_rpc_types::engine::{ForkchoiceState, PayloadStatusEnum}; use alloy_rpc_types::engine::{ForkchoiceState, PayloadStatusEnum};
use futures::{future::Either, stream::FuturesUnordered, StreamExt}; use futures::{future::Either, stream::FuturesUnordered, StreamExt};
use reth_engine_primitives::{BeaconConsensusEngineHandle, EngineTypes}; use reth_engine_primitives::{BeaconConsensusEngineHandle, EngineTypes};
use reth_eth_wire::NewBlock;
use reth_network::{ use reth_network::{
import::{BlockImportError, BlockImportEvent, BlockImportOutcome, BlockValidation}, import::{BlockImportError, BlockImportEvent, BlockImportOutcome, BlockValidation},
message::NewBlockMessage, message::NewBlockMessage,
@ -59,6 +64,7 @@ where
to_network: UnboundedSender<ImportEvent>, to_network: UnboundedSender<ImportEvent>,
/// Pending block imports. /// Pending block imports.
pending_imports: FuturesUnordered<ImportFut>, pending_imports: FuturesUnordered<ImportFut>,
height: u64,
} }
impl<Provider> ImportService<Provider> impl<Provider> ImportService<Provider>
@ -78,6 +84,7 @@ where
from_network, from_network,
to_network, to_network,
pending_imports: FuturesUnordered::new(), pending_imports: FuturesUnordered::new(),
height: 2000000,
} }
} }
@ -91,10 +98,11 @@ where
match engine.new_payload(payload).await { match engine.new_payload(payload).await {
Ok(payload_status) => match payload_status.status { Ok(payload_status) => match payload_status.status {
PayloadStatusEnum::Valid => { PayloadStatusEnum::Valid => Outcome {
Outcome { peer: peer_id, result: Ok(BlockValidation::ValidBlock { block }) } peer: peer_id,
.into() result: Ok(BlockValidation::ValidBlock { block }),
} }
.into(),
PayloadStatusEnum::Invalid { validation_error } => Outcome { PayloadStatusEnum::Invalid { validation_error } => Outcome {
peer: peer_id, peer: peer_id,
result: Err(BlockImportError::Other(validation_error.into())), result: Err(BlockImportError::Other(validation_error.into())),
@ -127,13 +135,16 @@ where
finalized_block_hash: head_block_hash, finalized_block_hash: head_block_hash,
}; };
match engine.fork_choice_updated(state, None, EngineApiMessageVersion::default()).await match engine
.fork_choice_updated(state, None, EngineApiMessageVersion::default())
.await
{ {
Ok(response) => match response.payload_status.status { Ok(response) => match response.payload_status.status {
PayloadStatusEnum::Valid => { PayloadStatusEnum::Valid => Outcome {
Outcome { peer: peer_id, result: Ok(BlockValidation::ValidBlock { block }) } peer: peer_id,
.into() result: Ok(BlockValidation::ValidBlock { block }),
} }
.into(),
PayloadStatusEnum::Invalid { validation_error } => Outcome { PayloadStatusEnum::Invalid { validation_error } => Outcome {
peer: peer_id, peer: peer_id,
result: Err(BlockImportError::Other(validation_error.into())), result: Err(BlockImportError::Other(validation_error.into())),
@ -166,8 +177,23 @@ where
let this = self.get_mut(); let this = self.get_mut();
// Receive new blocks from network // Receive new blocks from network
while let Poll::Ready(Some((block, peer_id))) = this.from_network.poll_recv(cx) { while let Some(block) = collect_block(this.height) {
this.on_new_block(block, peer_id); let peer_id = PeerId::random();
let reth_block = block.to_reth_block();
let td = U128::from(reth_block.header().difficulty());
let msg = NewBlockMessage {
hash: reth_block.header().hash_slow(),
block: Arc::new(HlNewBlock(NewBlock {
block: reth_block,
td,
})),
};
this.on_new_block(msg, peer_id);
this.height += 1;
if this.height > 2000000 {
break;
}
} }
// Process completed imports and send events to network // Process completed imports and send events to network
@ -183,6 +209,22 @@ where
} }
} }
pub(crate) fn collect_block(height: u64) -> Option<BlockAndReceipts> {
let ingest_dir = "/home/user/personal/evm-blocks";
let f = ((height - 1) / 1_000_000) * 1_000_000;
let s = ((height - 1) / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{height}.rmp.lz4", ingest_dir);
if std::path::Path::new(&path).exists() {
let file = std::fs::File::open(path).unwrap();
let file = std::io::BufReader::new(file);
let mut decoder = lz4_flex::frame::FrameDecoder::new(file);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder).unwrap();
Some(blocks[0].clone())
} else {
None
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::chainspec::hl::hl_mainnet; use crate::chainspec::hl::hl_mainnet;
@ -292,12 +334,17 @@ mod tests {
impl EngineResponses { impl EngineResponses {
fn both_valid() -> Self { fn both_valid() -> Self {
Self { new_payload: PayloadStatusEnum::Valid, fcu: PayloadStatusEnum::Valid } Self {
new_payload: PayloadStatusEnum::Valid,
fcu: PayloadStatusEnum::Valid,
}
} }
fn invalid_new_payload() -> Self { fn invalid_new_payload() -> Self {
Self { Self {
new_payload: PayloadStatusEnum::Invalid { validation_error: "test error".into() }, new_payload: PayloadStatusEnum::Invalid {
validation_error: "test error".into(),
},
fcu: PayloadStatusEnum::Valid, fcu: PayloadStatusEnum::Valid,
} }
} }
@ -305,7 +352,9 @@ mod tests {
fn invalid_fcu() -> Self { fn invalid_fcu() -> Self {
Self { Self {
new_payload: PayloadStatusEnum::Valid, new_payload: PayloadStatusEnum::Valid,
fcu: PayloadStatusEnum::Invalid { validation_error: "fcu error".into() }, fcu: PayloadStatusEnum::Invalid {
validation_error: "fcu error".into(),
},
} }
} }
} }
@ -318,7 +367,9 @@ mod tests {
impl TestFixture { impl TestFixture {
/// Create a new test fixture with the given engine responses /// Create a new test fixture with the given engine responses
async fn new(responses: EngineResponses) -> Self { async fn new(responses: EngineResponses) -> Self {
let consensus = Arc::new(HlConsensus { provider: MockProvider }); let consensus = Arc::new(HlConsensus {
provider: MockProvider,
});
let (to_engine, from_engine) = mpsc::unbounded_channel(); let (to_engine, from_engine) = mpsc::unbounded_channel();
let engine_handle = BeaconConsensusEngineHandle::new(to_engine); let engine_handle = BeaconConsensusEngineHandle::new(to_engine);
@ -382,9 +433,15 @@ mod tests {
read_precompile_calls: None, read_precompile_calls: None,
}, },
}; };
let new_block = HlNewBlock(NewBlock { block, td: U128::from(1) }); let new_block = HlNewBlock(NewBlock {
block,
td: U128::from(1),
});
let hash = new_block.0.block.header.hash_slow(); let hash = new_block.0.block.header.hash_slow();
NewBlockMessage { hash, block: Arc::new(new_block) } NewBlockMessage {
hash,
block: Arc::new(new_block),
}
} }
/// Helper function to handle engine messages with specified payload statuses /// Helper function to handle engine messages with specified payload statuses

View File

@ -9,12 +9,16 @@ use reth_primitives::{SealedBlock, Transaction};
use revm::primitives::HashMap; use revm::primitives::HashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::HlBlock;
pub type ReadPrecompileCall = (Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>); pub type ReadPrecompileCall = (Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>);
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct ReadPrecompileCalls(pub Vec<ReadPrecompileCall>); pub struct ReadPrecompileCalls(pub Vec<ReadPrecompileCall>);
pub type ReadPrecompileMap = HashMap<Address, HashMap<ReadPrecompileInput, ReadPrecompileResult>>; pub type ReadPrecompileMap = HashMap<Address, HashMap<ReadPrecompileInput, ReadPrecompileResult>>;
mod reth_compat;
impl From<ReadPrecompileCalls> for ReadPrecompileMap { impl From<ReadPrecompileCalls> for ReadPrecompileMap {
fn from(calls: ReadPrecompileCalls) -> Self { fn from(calls: ReadPrecompileCalls) -> Self {
calls calls
@ -59,9 +63,16 @@ pub struct BlockAndReceipts {
pub read_precompile_calls: ReadPrecompileCalls, pub read_precompile_calls: ReadPrecompileCalls,
} }
impl BlockAndReceipts {
pub fn to_reth_block(self) -> HlBlock {
let EvmBlock::Reth115(block) = self.block;
block.to_reth_block(self.read_precompile_calls.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EvmBlock { pub enum EvmBlock {
Reth115(SealedBlock), Reth115(reth_compat::SealedBlock),
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -83,7 +94,7 @@ enum LegacyTxType {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemTx { pub struct SystemTx {
pub tx: Transaction, pub tx: reth_compat::Transaction,
pub receipt: Option<LegacyReceipt>, pub receipt: Option<LegacyReceipt>,
} }

View File

@ -0,0 +1,94 @@
//! Copy of reth codebase to preserve serialization compatibility
use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy};
use serde::{Deserialize, Serialize};
use alloy_primitives::{BlockHash, Signature};
use crate::{node::types::ReadPrecompileCalls, HlBlock, HlBlockBody};
/// A raw transaction.
///
/// Transaction types were introduced in [EIP-2718](https://eips.ethereum.org/EIPS/eip-2718).
#[derive(Debug, Clone, PartialEq, Eq, Hash, derive_more::From, Serialize, Deserialize)]
pub enum Transaction {
Legacy(TxLegacy),
Eip2930(TxEip2930),
Eip1559(TxEip1559),
Eip4844(TxEip4844),
Eip7702(TxEip7702),
}
/// Signed Ethereum transaction.
#[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, derive_more::AsRef, derive_more::Deref,
)]
#[serde(rename_all = "camelCase")]
pub struct TransactionSigned {
/// The transaction signature values
signature: Signature,
/// Raw transaction info
#[deref]
#[as_ref]
transaction: Transaction,
}
impl TransactionSigned {
fn to_reth_transaction(&self) -> reth_primitives::TransactionSigned {
match self.transaction.clone() {
Transaction::Legacy(tx) => {
reth_primitives::TransactionSigned::Legacy(Signed::new_unhashed(tx, self.signature))
}
Transaction::Eip2930(tx) => reth_primitives::TransactionSigned::Eip2930(
Signed::new_unhashed(tx, self.signature),
),
Transaction::Eip1559(tx) => reth_primitives::TransactionSigned::Eip1559(
Signed::new_unhashed(tx, self.signature),
),
Transaction::Eip4844(tx) => reth_primitives::TransactionSigned::Eip4844(
Signed::new_unhashed(tx, self.signature),
),
Transaction::Eip7702(tx) => reth_primitives::TransactionSigned::Eip7702(
Signed::new_unhashed(tx, self.signature),
),
}
}
}
type BlockBody = alloy_consensus::BlockBody<TransactionSigned, Header>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SealedHeader {
hash: BlockHash,
header: Header,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SealedBlock {
/// Sealed Header.
header: SealedHeader,
/// the block's body.
body: BlockBody,
}
impl SealedBlock {
pub fn to_reth_block(&self, read_precompile_calls: ReadPrecompileCalls) -> HlBlock {
let block_body = HlBlockBody {
inner: reth_primitives::BlockBody {
transactions: self
.body
.transactions
.iter()
.map(|tx| tx.to_reth_transaction())
.collect(),
withdrawals: self.body.withdrawals.clone(),
ommers: self.body.ommers.clone(),
},
sidecars: None,
read_precompile_calls: Some(read_precompile_calls),
};
let reth_block = HlBlock {
header: self.header.header.clone(),
body: block_body,
};
reth_block.into()
}
}