feat(db): add zstd and CompactZstd to Transactions and Receipts (#2483)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
joshieDo
2023-05-12 18:30:15 +01:00
committed by GitHub
parent 4056b15882
commit 047f1e513c
12 changed files with 8729 additions and 46 deletions

View File

@ -0,0 +1,28 @@
mod receipt_dictionary;
mod transaction_dictionary;
pub use receipt_dictionary::RECEIPT_DICTIONARY;
pub use transaction_dictionary::TRANSACTION_DICTIONARY;
use std::{cell::RefCell, thread_local};
use zstd::bulk::{Compressor, Decompressor};
// Reason for using static compressors is that dictionaries can be quite big, and zstd-rs
// recommends to use one context/compressor per thread. Thus the usage of `thread_local`.
thread_local! {
/// Thread Transaction compressor.
pub static TRANSACTION_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(Compressor::with_dictionary(0, &TRANSACTION_DICTIONARY)
.expect("Failed to initialize compressor."));
/// Thread Transaction decompressor.
pub static TRANSACTION_DECOMPRESSOR: RefCell<Decompressor<'static>> = RefCell::new(Decompressor::with_dictionary(&TRANSACTION_DICTIONARY)
.expect("Failed to initialize decompressor."));
/// Thread receipt compressor.
pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(Compressor::with_dictionary(0, &RECEIPT_DICTIONARY)
.expect("Failed to initialize compressor."));
/// Thread receipt decompressor.
pub static RECEIPT_DECOMPRESSOR: RefCell<Decompressor<'static>> = RefCell::new(Decompressor::with_dictionary(&RECEIPT_DICTIONARY)
.expect("Failed to initialize decompressor."));
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ mod block;
pub mod bloom;
mod chain;
mod checkpoints;
mod compression;
pub mod constants;
pub mod contract;
mod forkid;
@ -50,6 +51,7 @@ pub use chain::{
MAINNET, SEPOLIA,
};
pub use checkpoints::{AccountHashingCheckpoint, MerkleCheckpoint, StorageHashingCheckpoint};
pub use compression::*;
pub use constants::{
EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS,
};

View File

@ -1,11 +1,15 @@
use crate::{bloom::logs_bloom, Bloom, Log, TxType};
use crate::{
bloom::logs_bloom,
compression::{RECEIPT_COMPRESSOR, RECEIPT_DECOMPRESSOR},
Bloom, Log, TxType,
};
use bytes::{Buf, BufMut, BytesMut};
use reth_codecs::{main_codec, Compact};
use reth_codecs::{main_codec, Compact, CompactZstd};
use reth_rlp::{length_of_length, Decodable, Encodable};
use std::cmp::Ordering;
/// Receipt containing result of transaction execution.
#[main_codec]
#[main_codec(zstd)]
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct Receipt {
/// Receipt type.

View File

@ -1,4 +1,7 @@
use crate::{keccak256, Address, Bytes, ChainId, TxHash, H256};
use crate::{
compression::{TRANSACTION_COMPRESSOR, TRANSACTION_DECOMPRESSOR},
keccak256, Address, Bytes, ChainId, TxHash, H256,
};
pub use access_list::{AccessList, AccessListItem, AccessListWithGasUsed};
use bytes::{Buf, BytesMut};
use derive_more::{AsRef, Deref};
@ -782,25 +785,70 @@ impl Compact for TransactionSignedNoHash {
where
B: bytes::BufMut + AsMut<[u8]>,
{
let before = buf.as_mut().len();
let start = buf.as_mut().len();
// placeholder for bitflags
// Placeholder for bitflags.
// The first byte uses 4 bits as flags: IsCompressed[1bit], TxType[2bits], Signature[1bit]
buf.put_u8(0);
let sig_bit = self.signature.to_compact(buf) as u8;
let tx_bit = self.transaction.to_compact(buf) as u8;
let zstd_bit = self.transaction.input().len() >= 32;
// replace with actual flags
buf.as_mut()[before] = sig_bit | (tx_bit << 1);
let tx_bits = if zstd_bit {
TRANSACTION_COMPRESSOR.with(|compressor| {
let mut compressor = compressor.borrow_mut();
let mut tmp = bytes::BytesMut::with_capacity(200);
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.as_mut().len() - before
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
})
} else {
self.transaction.to_compact(buf) as u8
};
// Replace bitflags with the actual values
buf.as_mut()[start] = sig_bit | (tx_bits << 1) | ((zstd_bit as u8) << 3);
buf.as_mut().len() - start
}
fn from_compact(mut buf: &[u8], _: usize) -> (Self, &[u8]) {
let prefix = buf.get_u8() as usize;
fn from_compact(mut buf: &[u8], _len: usize) -> (Self, &[u8]) {
// The first byte uses 4 bits as flags: IsCompressed[1], TxType[2], Signature[1]
let bitflags = buf.get_u8() as usize;
let (signature, buf) = Signature::from_compact(buf, prefix & 1);
let (transaction, buf) = Transaction::from_compact(buf, prefix >> 1);
let sig_bit = bitflags & 1;
let (signature, buf) = Signature::from_compact(buf, sig_bit);
let zstd_bit = bitflags >> 3;
let (transaction, buf) = if zstd_bit != 0 {
TRANSACTION_DECOMPRESSOR.with(|decompressor| {
let mut decompressor = decompressor.borrow_mut();
let mut tmp: Vec<u8> = Vec::with_capacity(200);
// `decompress_to_buffer` will return an error if the output buffer doesn't have
// enough capacity. However we don't actually have information on the required
// length. So we hope for the best, and keep trying again with a fairly bigger size
// if it fails.
while let Err(err) = decompressor.decompress_to_buffer(buf, &mut tmp) {
let err = err.to_string();
if !err.contains("Destination buffer is too small") {
panic!("Failed to decompress: {}", err);
}
tmp.reserve(tmp.capacity() + 24_000);
}
// TODO: enforce that zstd is only present at a "top" level type
let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) = Transaction::from_compact(tmp.as_slice(), transaction_type);
(transaction, buf)
})
} else {
let transaction_type = bitflags >> 1;
Transaction::from_compact(buf, transaction_type)
};
(TransactionSignedNoHash { signature, transaction }, buf)
}