feat: better blocksize heuristics (#3748)

This commit is contained in:
Dan Cline
2023-07-15 15:36:39 -04:00
committed by GitHub
parent d001313f99
commit 6934428be9
13 changed files with 374 additions and 23 deletions

View File

@ -89,7 +89,7 @@ downloader_request_limit = 200
#
# A lower value means more frequent disk I/O (writes), but also
# lowers memory usage.
downloader_stream_batch_size = 10000
downloader_stream_batch_size = 1000
# The size of the internal block buffer in bytes.
#
# A bigger buffer means that bandwidth can be saturated for longer periods,
@ -98,8 +98,8 @@ downloader_stream_batch_size = 10000
# If the buffer is full, no more requests will be made to peers until
# space is made for new blocks in the buffer.
#
# Defaults to around 4GB.
downloader_max_buffered_blocks_size_bytes = 4294967296
# Defaults to around 2GB.
downloader_max_buffered_blocks_size_bytes = 2147483648
# The minimum and maximum number of concurrent requests to have in flight at a time.
#
# The downloader uses these as best effort targets, which means that the number

View File

@ -146,11 +146,11 @@ pub struct BodiesConfig {
pub downloader_request_limit: u64,
/// The maximum number of block bodies returned at once from the stream
///
/// Default: 10_000
/// Default: 1_000
pub downloader_stream_batch_size: usize,
/// The size of the internal block buffer in bytes.
///
/// Default: 4GB
/// Default: 2GB
pub downloader_max_buffered_blocks_size_bytes: usize,
/// The minimum number of requests to send concurrently.
///
@ -167,8 +167,8 @@ impl Default for BodiesConfig {
fn default() -> Self {
Self {
downloader_request_limit: 200,
downloader_stream_batch_size: 10_000,
downloader_max_buffered_blocks_size_bytes: 4 * 1024 * 1024 * 1024, // ~4GB
downloader_stream_batch_size: 1_000,
downloader_max_buffered_blocks_size_bytes: 2 * 1024 * 1024 * 1024, // ~2GB
downloader_min_concurrent_requests: 5,
downloader_max_concurrent_requests: 100,
}

View File

@ -18,13 +18,12 @@ impl BlockResponse {
}
}
/// Returns the total number of bytes of all transactions input data in the block
/// Calculates a heuristic for the in-memory size of the [BlockResponse].
#[inline]
pub fn size(&self) -> usize {
match self {
BlockResponse::Full(block) => {
block.body.iter().map(|tx| tx.transaction.input().len()).sum()
}
BlockResponse::Empty(_) => 0,
BlockResponse::Full(block) => SealedBlock::size(block),
BlockResponse::Empty(header) => SealedHeader::size(header),
}
}

View File

@ -19,6 +19,7 @@ use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
cmp::Ordering,
collections::BinaryHeap,
mem,
ops::RangeInclusive,
pin::Pin,
sync::Arc,
@ -225,13 +226,16 @@ where
self.metrics.buffered_responses.decrement(1.);
self.buffered_blocks_size_bytes -= resp.size();
self.metrics.buffered_blocks.decrement(resp.len() as f64);
self.metrics.buffered_blocks_size_bytes.set(resp.size() as f64);
self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
Some(resp)
}
/// Adds a new response to the internal buffer
fn buffer_bodies_response(&mut self, response: Vec<BlockResponse>) {
let size = response.iter().map(|b| b.size()).sum::<usize>();
// take into account capacity
let size = response.iter().map(BlockResponse::size).sum::<usize>() +
response.capacity() * mem::size_of::<BlockResponse>();
let response = OrderedBodiesResponse { resp: response, size };
let response_len = response.len();
@ -516,7 +520,7 @@ impl Default for BodiesDownloaderBuilder {
Self {
request_limit: 200,
stream_batch_size: 10_000,
max_buffered_blocks_size_bytes: 4 * 1024 * 1024 * 1024, // ~4GB
max_buffered_blocks_size_bytes: 2 * 1024 * 1024 * 1024, // ~2GB
concurrent_requests_range: 5..=100,
}
}

View File

@ -1,4 +1,4 @@
use crate::metrics::BodyDownloaderMetrics;
use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics};
use futures::{Future, FutureExt};
use reth_interfaces::{
consensus::{Consensus as ConsensusTrait, Consensus},
@ -11,6 +11,7 @@ use reth_interfaces::{
use reth_primitives::{BlockBody, PeerId, SealedBlock, SealedHeader, WithPeerId, H256};
use std::{
collections::VecDeque,
mem,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
@ -39,6 +40,9 @@ pub(crate) struct BodiesRequestFuture<B: BodiesClient> {
client: Arc<B>,
consensus: Arc<dyn Consensus>,
metrics: BodyDownloaderMetrics,
/// Metrics for individual responses. This can be used to observe how the size (in bytes) of
/// responses change while bodies are being downloaded.
response_metrics: ResponseMetrics,
// Headers to download. The collection is shrunk as responses are buffered.
pending_headers: VecDeque<SealedHeader>,
/// Internal buffer for all blocks
@ -62,6 +66,7 @@ where
client,
consensus,
metrics,
response_metrics: Default::default(),
pending_headers: Default::default(),
buffer: Default::default(),
last_request_len: None,
@ -153,8 +158,11 @@ where
/// This method removes headers from the internal collection.
/// If the response fails validation, then the header will be put back.
fn try_buffer_blocks(&mut self, bodies: Vec<BlockBody>) -> DownloadResult<()> {
let bodies_capacity = bodies.capacity();
let bodies_len = bodies.len();
let mut bodies = bodies.into_iter().peekable();
let mut total_size = bodies_capacity * mem::size_of::<BlockBody>();
while bodies.peek().is_some() {
let next_header = match self.pending_headers.pop_front() {
Some(header) => header,
@ -162,15 +170,16 @@ where
};
if next_header.is_empty() {
// increment empty block body metric
total_size += mem::size_of::<BlockBody>();
self.buffer.push(BlockResponse::Empty(next_header));
} else {
let next_body = bodies.next().unwrap();
let block = SealedBlock {
header: next_header,
body: next_body.transactions,
ommers: next_body.ommers,
withdrawals: next_body.withdrawals,
};
// increment full block body metric
total_size += next_body.size();
let block = SealedBlock::new(next_header, next_body);
if let Err(error) = self.consensus.validate_block(&block) {
// Body is invalid, put the header back and return an error
@ -183,6 +192,10 @@ where
}
}
// Increment per-response metric
self.response_metrics.response_size_bytes.set(total_size as f64);
self.response_metrics.response_length.set(bodies_len as f64);
Ok(())
}
}

View File

@ -62,6 +62,19 @@ impl BodyDownloaderMetrics {
}
}
/// Metrics for an individual response, i.e. the size in bytes, and length (number of bodies) in the
/// response.
///
/// These metrics will be initialized with the `downloaders.bodies.response` scope.
#[derive(Clone, Metrics)]
#[metrics(scope = "downloaders.bodies.response")]
pub struct ResponseMetrics {
/// The size (in bytes) of an individual bodies response received by the downloader.
pub response_size_bytes: Gauge,
/// The number of bodies in an individual bodies response received by the downloader.
pub response_length: Gauge,
}
/// Common header downloader metrics.
///
/// These metrics will be initialized with the `downloaders.headers` scope.

View File

@ -59,6 +59,16 @@ impl Block {
BlockWithSenders { block: self, senders }
}
/// Calculates a heuristic for the in-memory size of the [Block].
#[inline]
pub fn size(&self) -> usize {
self.header.size() +
// take into account capacity
self.body.iter().map(TransactionSigned::size).sum::<usize>() + self.body.capacity() * std::mem::size_of::<TransactionSigned>() +
self.ommers.iter().map(Header::size).sum::<usize>() + self.ommers.capacity() * std::mem::size_of::<Header>() +
self.withdrawals.as_ref().map(|w| w.iter().map(Withdrawal::size).sum::<usize>() + w.capacity() * std::mem::size_of::<Withdrawal>()).unwrap_or(std::mem::size_of::<Option<Vec<Withdrawal>>>())
}
}
impl Deref for Block {
@ -178,6 +188,16 @@ impl SealedBlock {
withdrawals: self.withdrawals,
}
}
/// Calculates a heuristic for the in-memory size of the [SealedBlock].
#[inline]
pub fn size(&self) -> usize {
self.header.size() +
// take into account capacity
self.body.iter().map(TransactionSigned::size).sum::<usize>() + self.body.capacity() * std::mem::size_of::<TransactionSigned>() +
self.ommers.iter().map(Header::size).sum::<usize>() + self.ommers.capacity() * std::mem::size_of::<Header>() +
self.withdrawals.as_ref().map(|w| w.iter().map(Withdrawal::size).sum::<usize>() + w.capacity() * std::mem::size_of::<Withdrawal>()).unwrap_or(std::mem::size_of::<Option<Vec<Withdrawal>>>())
}
}
impl From<SealedBlock> for Block {
@ -819,6 +839,22 @@ impl BlockBody {
withdrawals_root: self.calculate_withdrawals_root(),
}
}
/// Calculates a heuristic for the in-memory size of the [BlockBody].
#[inline]
pub fn size(&self) -> usize {
self.transactions.iter().map(TransactionSigned::size).sum::<usize>() +
self.transactions.capacity() * std::mem::size_of::<TransactionSigned>() +
self.ommers.iter().map(Header::size).sum::<usize>() +
self.ommers.capacity() * std::mem::size_of::<Header>() +
self.withdrawals
.as_ref()
.map(|w| {
w.iter().map(Withdrawal::size).sum::<usize>() +
w.capacity() * std::mem::size_of::<Withdrawal>()
})
.unwrap_or(std::mem::size_of::<Option<Vec<Withdrawal>>>())
}
}
/// A struct that represents roots associated with a block body. This can be used to correlate

View File

@ -9,7 +9,10 @@ use bytes::{Buf, BufMut, BytesMut};
use reth_codecs::{add_arbitrary_tests, derive_arbitrary, main_codec, Compact};
use reth_rlp::{length_of_length, Decodable, Encodable, EMPTY_STRING_CODE};
use serde::{Deserialize, Serialize};
use std::ops::{Deref, DerefMut};
use std::{
mem,
ops::{Deref, DerefMut},
};
/// Describes the current head block.
///
@ -180,6 +183,28 @@ impl Header {
self.seal(hash)
}
/// Calculate a heuristic for the in-memory size of the [Header].
#[inline]
pub fn size(&self) -> usize {
mem::size_of::<H256>() + // parent hash
mem::size_of::<H256>() + // ommers hash
mem::size_of::<H160>() + // beneficiary
mem::size_of::<H256>() + // state root
mem::size_of::<H256>() + // transactions root
mem::size_of::<H256>() + // receipts root
mem::size_of::<Option<H256>>() + // withdrawals root
mem::size_of::<Bloom>() + // logs bloom
mem::size_of::<U256>() + // difficulty
mem::size_of::<BlockNumber>() + // number
mem::size_of::<u64>() + // gas limit
mem::size_of::<u64>() + // gas used
mem::size_of::<u64>() + // timestamp
mem::size_of::<H256>() + // mix hash
mem::size_of::<u64>() + // nonce
mem::size_of::<Option<u64>>() + // base fee per gas
self.extra_data.len() // extra data
}
fn header_payload_length(&self) -> usize {
let mut length = 0;
length += self.parent_hash.length();
@ -331,6 +356,12 @@ impl SealedHeader {
pub fn num_hash(&self) -> BlockNumHash {
BlockNumHash::new(self.number, self.hash)
}
/// Calculates a heuristic for the in-memory size of the [SealedHeader].
#[inline]
pub fn size(&self) -> usize {
self.header.size() + mem::size_of::<BlockHash>()
}
}
#[cfg(any(test, feature = "arbitrary"))]

View File

@ -1,3 +1,5 @@
use std::mem;
use crate::{Address, H256};
use reth_codecs::{main_codec, Compact};
use reth_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper};
@ -22,6 +24,14 @@ pub struct AccessListItem {
pub storage_keys: Vec<H256>,
}
impl AccessListItem {
/// Calculates a heuristic for the in-memory size of the [AccessListItem].
#[inline]
pub fn size(&self) -> usize {
mem::size_of::<Address>() + self.storage_keys.capacity() * mem::size_of::<H256>()
}
}
/// AccessList as defined in EIP-2930
#[main_codec(rlp)]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Default, RlpDecodableWrapper, RlpEncodableWrapper)]
@ -48,6 +58,14 @@ impl AccessList {
})
.collect()
}
/// Calculates a heuristic for the in-memory size of the [AccessList].
#[inline]
pub fn size(&self) -> usize {
// take into account capacity
self.0.iter().map(AccessListItem::size).sum::<usize>() +
self.0.capacity() * mem::size_of::<AccessListItem>()
}
}
/// Access list with gas used appended.

View File

@ -1,3 +1,5 @@
use std::mem;
use crate::{
compression::{TRANSACTION_COMPRESSOR, TRANSACTION_DECOMPRESSOR},
keccak256, Address, Bytes, ChainId, TxHash, H256,
@ -64,6 +66,20 @@ pub struct TxLegacy {
pub input: Bytes,
}
impl TxLegacy {
/// Calculates a heuristic for the in-memory size of the [TxLegacy] transaction.
#[inline]
fn size(&self) -> usize {
mem::size_of::<Option<ChainId>>() + // chain_id
mem::size_of::<u64>() + // nonce
mem::size_of::<u128>() + // gas_price
mem::size_of::<u64>() + // gas_limit
self.to.size() + // to
mem::size_of::<u128>() + // value
self.input.len() // input
}
}
/// Transaction with an [`AccessList`] ([EIP-2930](https://eips.ethereum.org/EIPS/eip-2930)).
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
@ -112,6 +128,21 @@ pub struct TxEip2930 {
pub input: Bytes,
}
impl TxEip2930 {
/// Calculates a heuristic for the in-memory size of the [TxEip2930] transaction.
#[inline]
pub fn size(&self) -> usize {
mem::size_of::<ChainId>() + // chain_id
mem::size_of::<u64>() + // nonce
mem::size_of::<u128>() + // gas_price
mem::size_of::<u64>() + // gas_limit
self.to.size() + // to
mem::size_of::<u128>() + // value
self.access_list.size() + // access_list
self.input.len() // input
}
}
/// A transaction with a priority fee ([EIP-1559](https://eips.ethereum.org/EIPS/eip-1559)).
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
@ -168,6 +199,22 @@ pub struct TxEip1559 {
pub input: Bytes,
}
impl TxEip1559 {
/// Calculates a heuristic for the in-memory size of the [TxEip1559] transaction.
#[inline]
pub fn size(&self) -> usize {
mem::size_of::<ChainId>() + // chain_id
mem::size_of::<u64>() + // nonce
mem::size_of::<u64>() + // gas_limit
mem::size_of::<u128>() + // max_fee_per_gas
mem::size_of::<u128>() + // max_priority_fee_per_gas
self.to.size() + // to
mem::size_of::<u128>() + // value
self.access_list.size() + // access_list
self.input.len() // input
}
}
/// A raw transaction.
///
/// Transaction types were introduced in [EIP-2718](https://eips.ethereum.org/EIPS/eip-2718).
@ -251,6 +298,16 @@ impl Transaction {
Transaction::Eip1559(tx) => tx.input = input,
}
}
/// Calculates a heuristic for the in-memory size of the [Transaction].
#[inline]
fn size(&self) -> usize {
match self {
Transaction::Legacy(tx) => tx.size(),
Transaction::Eip2930(tx) => tx.size(),
Transaction::Eip1559(tx) => tx.size(),
}
}
}
impl Compact for Transaction {
@ -720,6 +777,12 @@ impl TransactionKind {
TransactionKind::Call(to) => Some(to),
}
}
/// Calculates a heuristic for the in-memory size of the [TransactionKind].
#[inline]
fn size(self) -> usize {
mem::size_of::<Self>()
}
}
impl Compact for TransactionKind {
@ -1033,6 +1096,12 @@ impl TransactionSigned {
initial_tx
}
/// Calculate a heuristic for the in-memory size of the [TransactionSigned].
#[inline]
pub fn size(&self) -> usize {
mem::size_of::<TxHash>() + self.transaction.size() + self.signature.size()
}
/// Decodes legacy transaction from the data buffer.
///
/// This expects `rlp(legacy_tx)`

View File

@ -136,6 +136,12 @@ impl Signature {
sig[64] = v;
sig
}
/// Calculates a heuristic for the in-memory size of the [Signature].
#[inline]
pub fn size(&self) -> usize {
std::mem::size_of::<Self>()
}
}
#[cfg(test)]
@ -220,4 +226,21 @@ mod tests {
let expected = Address::from_str("0x9d8a62f656a8d1615c1294fd71e9cfb3e4855a4f").unwrap();
assert_eq!(expected, signer);
}
#[test]
fn ensure_size_equals_sum_of_fields() {
let signature = Signature {
r: U256::from_str(
"18515461264373351373200002665853028612451056578545711640558177340181847433846",
)
.unwrap(),
s: U256::from_str(
"46948507304638947509940763649030358759909902576025900602547168820602576006531",
)
.unwrap(),
odd_y_parity: false,
};
assert!(signature.size() >= 65);
}
}

View File

@ -1,3 +1,5 @@
use std::mem;
use crate::{constants::GWEI_TO_WEI, serde_helper::u64_hex, Address, U256};
use reth_codecs::{main_codec, Compact};
use reth_rlp::{RlpDecodable, RlpEncodable};
@ -24,6 +26,12 @@ impl Withdrawal {
pub fn amount_wei(&self) -> U256 {
U256::from(self.amount) * U256::from(GWEI_TO_WEI)
}
/// Calculate a heuristic for the in-memory size of the [Withdrawal].
#[inline]
pub fn size(&self) -> usize {
mem::size_of::<Self>()
}
}
#[cfg(test)]

View File

@ -2643,6 +2643,143 @@
"title": "Downloader buffer",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The number of blocks in a request and size in bytes of those block responses",
"fieldConfig": {
"defaults": {
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"barAlignment": 0,
"lineWidth": 1,
"fillOpacity": 0,
"gradientMode": "none",
"spanNulls": false,
"showPoints": "auto",
"pointSize": 5,
"stacking": {
"mode": "none",
"group": "A"
},
"axisPlacement": "auto",
"axisLabel": "",
"axisColorMode": "text",
"scaleDistribution": {
"type": "linear"
},
"axisCenteredZero": false,
"hideFrom": {
"tooltip": false,
"viz": false,
"legend": false
},
"thresholdsStyle": {
"mode": "off"
}
},
"color": {
"mode": "palette-classic"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "bytes"
},
"overrides": [
{
"matcher": {
"id": "byFrameRefID",
"options": "B"
},
"properties": [
{
"id": "custom.axisPlacement",
"value": "right"
},
{
"id": "unit",
"value": "blocks"
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 12,
"x": 5,
"y": 110
},
"id": 102,
"options": {
"tooltip": {
"mode": "multi",
"sort": "none"
},
"legend": {
"showLegend": true,
"displayMode": "list",
"placement": "bottom",
"calcs": []
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "Prometheus"
},
"editorMode": "builder",
"expr": "reth_downloaders_bodies_response_response_size_bytes{instance=~\"$instance\"}",
"hide": false,
"legendFormat": "Response size",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "Prometheus"
},
"editorMode": "builder",
"expr": "reth_downloaders_bodies_response_response_length{instance=~\"$instance\"}",
"hide": false,
"legendFormat": "Individual response length",
"range": true,
"refId": "B"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "reth_downloaders_bodies_response_response_size_bytes / reth_downloaders_bodies_response_response_length",
"hide": false,
"instant": false,
"legendFormat": "Mean body size in response",
"range": true,
"refId": "C"
}
],
"title": "Block body response sizes",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": {