diff --git a/book/run/config.md b/book/run/config.md index 43319d9d2..3d5c6ce5d 100644 --- a/book/run/config.md +++ b/book/run/config.md @@ -89,7 +89,7 @@ downloader_request_limit = 200 # # A lower value means more frequent disk I/O (writes), but also # lowers memory usage. -downloader_stream_batch_size = 10000 +downloader_stream_batch_size = 1000 # The size of the internal block buffer in bytes. # # A bigger buffer means that bandwidth can be saturated for longer periods, @@ -98,8 +98,8 @@ downloader_stream_batch_size = 10000 # If the buffer is full, no more requests will be made to peers until # space is made for new blocks in the buffer. # -# Defaults to around 4GB. -downloader_max_buffered_blocks_size_bytes = 4294967296 +# Defaults to around 2GB. +downloader_max_buffered_blocks_size_bytes = 2147483648 # The minimum and maximum number of concurrent requests to have in flight at a time. # # The downloader uses these as best effort targets, which means that the number diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 659df006b..d46e66543 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -146,11 +146,11 @@ pub struct BodiesConfig { pub downloader_request_limit: u64, /// The maximum number of block bodies returned at once from the stream /// - /// Default: 10_000 + /// Default: 1_000 pub downloader_stream_batch_size: usize, /// The size of the internal block buffer in bytes. /// - /// Default: 4GB + /// Default: 2GB pub downloader_max_buffered_blocks_size_bytes: usize, /// The minimum number of requests to send concurrently. /// @@ -167,8 +167,8 @@ impl Default for BodiesConfig { fn default() -> Self { Self { downloader_request_limit: 200, - downloader_stream_batch_size: 10_000, - downloader_max_buffered_blocks_size_bytes: 4 * 1024 * 1024 * 1024, // ~4GB + downloader_stream_batch_size: 1_000, + downloader_max_buffered_blocks_size_bytes: 2 * 1024 * 1024 * 1024, // ~2GB downloader_min_concurrent_requests: 5, downloader_max_concurrent_requests: 100, } diff --git a/crates/interfaces/src/p2p/bodies/response.rs b/crates/interfaces/src/p2p/bodies/response.rs index a3033bd4d..2b32b7009 100644 --- a/crates/interfaces/src/p2p/bodies/response.rs +++ b/crates/interfaces/src/p2p/bodies/response.rs @@ -18,13 +18,12 @@ impl BlockResponse { } } - /// Returns the total number of bytes of all transactions input data in the block + /// Calculates a heuristic for the in-memory size of the [BlockResponse]. + #[inline] pub fn size(&self) -> usize { match self { - BlockResponse::Full(block) => { - block.body.iter().map(|tx| tx.transaction.input().len()).sum() - } - BlockResponse::Empty(_) => 0, + BlockResponse::Full(block) => SealedBlock::size(block), + BlockResponse::Empty(header) => SealedHeader::size(header), } } diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 7a0c3aa4b..e615520e3 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -19,6 +19,7 @@ use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use std::{ cmp::Ordering, collections::BinaryHeap, + mem, ops::RangeInclusive, pin::Pin, sync::Arc, @@ -225,13 +226,16 @@ where self.metrics.buffered_responses.decrement(1.); self.buffered_blocks_size_bytes -= resp.size(); self.metrics.buffered_blocks.decrement(resp.len() as f64); - self.metrics.buffered_blocks_size_bytes.set(resp.size() as f64); + self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64); Some(resp) } /// Adds a new response to the internal buffer fn buffer_bodies_response(&mut self, response: Vec) { - let size = response.iter().map(|b| b.size()).sum::(); + // take into account capacity + let size = response.iter().map(BlockResponse::size).sum::() + + response.capacity() * mem::size_of::(); + let response = OrderedBodiesResponse { resp: response, size }; let response_len = response.len(); @@ -516,7 +520,7 @@ impl Default for BodiesDownloaderBuilder { Self { request_limit: 200, stream_batch_size: 10_000, - max_buffered_blocks_size_bytes: 4 * 1024 * 1024 * 1024, // ~4GB + max_buffered_blocks_size_bytes: 2 * 1024 * 1024 * 1024, // ~2GB concurrent_requests_range: 5..=100, } } diff --git a/crates/net/downloaders/src/bodies/request.rs b/crates/net/downloaders/src/bodies/request.rs index a71b81f33..8698e881e 100644 --- a/crates/net/downloaders/src/bodies/request.rs +++ b/crates/net/downloaders/src/bodies/request.rs @@ -1,4 +1,4 @@ -use crate::metrics::BodyDownloaderMetrics; +use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics}; use futures::{Future, FutureExt}; use reth_interfaces::{ consensus::{Consensus as ConsensusTrait, Consensus}, @@ -11,6 +11,7 @@ use reth_interfaces::{ use reth_primitives::{BlockBody, PeerId, SealedBlock, SealedHeader, WithPeerId, H256}; use std::{ collections::VecDeque, + mem, pin::Pin, sync::Arc, task::{ready, Context, Poll}, @@ -39,6 +40,9 @@ pub(crate) struct BodiesRequestFuture { client: Arc, consensus: Arc, metrics: BodyDownloaderMetrics, + /// Metrics for individual responses. This can be used to observe how the size (in bytes) of + /// responses change while bodies are being downloaded. + response_metrics: ResponseMetrics, // Headers to download. The collection is shrunk as responses are buffered. pending_headers: VecDeque, /// Internal buffer for all blocks @@ -62,6 +66,7 @@ where client, consensus, metrics, + response_metrics: Default::default(), pending_headers: Default::default(), buffer: Default::default(), last_request_len: None, @@ -153,8 +158,11 @@ where /// This method removes headers from the internal collection. /// If the response fails validation, then the header will be put back. fn try_buffer_blocks(&mut self, bodies: Vec) -> DownloadResult<()> { + let bodies_capacity = bodies.capacity(); + let bodies_len = bodies.len(); let mut bodies = bodies.into_iter().peekable(); + let mut total_size = bodies_capacity * mem::size_of::(); while bodies.peek().is_some() { let next_header = match self.pending_headers.pop_front() { Some(header) => header, @@ -162,15 +170,16 @@ where }; if next_header.is_empty() { + // increment empty block body metric + total_size += mem::size_of::(); self.buffer.push(BlockResponse::Empty(next_header)); } else { let next_body = bodies.next().unwrap(); - let block = SealedBlock { - header: next_header, - body: next_body.transactions, - ommers: next_body.ommers, - withdrawals: next_body.withdrawals, - }; + + // increment full block body metric + total_size += next_body.size(); + + let block = SealedBlock::new(next_header, next_body); if let Err(error) = self.consensus.validate_block(&block) { // Body is invalid, put the header back and return an error @@ -183,6 +192,10 @@ where } } + // Increment per-response metric + self.response_metrics.response_size_bytes.set(total_size as f64); + self.response_metrics.response_length.set(bodies_len as f64); + Ok(()) } } diff --git a/crates/net/downloaders/src/metrics.rs b/crates/net/downloaders/src/metrics.rs index 38fc642eb..a227f38f8 100644 --- a/crates/net/downloaders/src/metrics.rs +++ b/crates/net/downloaders/src/metrics.rs @@ -62,6 +62,19 @@ impl BodyDownloaderMetrics { } } +/// Metrics for an individual response, i.e. the size in bytes, and length (number of bodies) in the +/// response. +/// +/// These metrics will be initialized with the `downloaders.bodies.response` scope. +#[derive(Clone, Metrics)] +#[metrics(scope = "downloaders.bodies.response")] +pub struct ResponseMetrics { + /// The size (in bytes) of an individual bodies response received by the downloader. + pub response_size_bytes: Gauge, + /// The number of bodies in an individual bodies response received by the downloader. + pub response_length: Gauge, +} + /// Common header downloader metrics. /// /// These metrics will be initialized with the `downloaders.headers` scope. diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index 6d652f92e..0c9866ba2 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -59,6 +59,16 @@ impl Block { BlockWithSenders { block: self, senders } } + + /// Calculates a heuristic for the in-memory size of the [Block]. + #[inline] + pub fn size(&self) -> usize { + self.header.size() + + // take into account capacity + self.body.iter().map(TransactionSigned::size).sum::() + self.body.capacity() * std::mem::size_of::() + + self.ommers.iter().map(Header::size).sum::() + self.ommers.capacity() * std::mem::size_of::
() + + self.withdrawals.as_ref().map(|w| w.iter().map(Withdrawal::size).sum::() + w.capacity() * std::mem::size_of::()).unwrap_or(std::mem::size_of::>>()) + } } impl Deref for Block { @@ -178,6 +188,16 @@ impl SealedBlock { withdrawals: self.withdrawals, } } + + /// Calculates a heuristic for the in-memory size of the [SealedBlock]. + #[inline] + pub fn size(&self) -> usize { + self.header.size() + + // take into account capacity + self.body.iter().map(TransactionSigned::size).sum::() + self.body.capacity() * std::mem::size_of::() + + self.ommers.iter().map(Header::size).sum::() + self.ommers.capacity() * std::mem::size_of::
() + + self.withdrawals.as_ref().map(|w| w.iter().map(Withdrawal::size).sum::() + w.capacity() * std::mem::size_of::()).unwrap_or(std::mem::size_of::>>()) + } } impl From for Block { @@ -819,6 +839,22 @@ impl BlockBody { withdrawals_root: self.calculate_withdrawals_root(), } } + + /// Calculates a heuristic for the in-memory size of the [BlockBody]. + #[inline] + pub fn size(&self) -> usize { + self.transactions.iter().map(TransactionSigned::size).sum::() + + self.transactions.capacity() * std::mem::size_of::() + + self.ommers.iter().map(Header::size).sum::() + + self.ommers.capacity() * std::mem::size_of::
() + + self.withdrawals + .as_ref() + .map(|w| { + w.iter().map(Withdrawal::size).sum::() + + w.capacity() * std::mem::size_of::() + }) + .unwrap_or(std::mem::size_of::>>()) + } } /// A struct that represents roots associated with a block body. This can be used to correlate diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index 54b81649b..ba8e8f31d 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -9,7 +9,10 @@ use bytes::{Buf, BufMut, BytesMut}; use reth_codecs::{add_arbitrary_tests, derive_arbitrary, main_codec, Compact}; use reth_rlp::{length_of_length, Decodable, Encodable, EMPTY_STRING_CODE}; use serde::{Deserialize, Serialize}; -use std::ops::{Deref, DerefMut}; +use std::{ + mem, + ops::{Deref, DerefMut}, +}; /// Describes the current head block. /// @@ -180,6 +183,28 @@ impl Header { self.seal(hash) } + /// Calculate a heuristic for the in-memory size of the [Header]. + #[inline] + pub fn size(&self) -> usize { + mem::size_of::() + // parent hash + mem::size_of::() + // ommers hash + mem::size_of::() + // beneficiary + mem::size_of::() + // state root + mem::size_of::() + // transactions root + mem::size_of::() + // receipts root + mem::size_of::>() + // withdrawals root + mem::size_of::() + // logs bloom + mem::size_of::() + // difficulty + mem::size_of::() + // number + mem::size_of::() + // gas limit + mem::size_of::() + // gas used + mem::size_of::() + // timestamp + mem::size_of::() + // mix hash + mem::size_of::() + // nonce + mem::size_of::>() + // base fee per gas + self.extra_data.len() // extra data + } + fn header_payload_length(&self) -> usize { let mut length = 0; length += self.parent_hash.length(); @@ -331,6 +356,12 @@ impl SealedHeader { pub fn num_hash(&self) -> BlockNumHash { BlockNumHash::new(self.number, self.hash) } + + /// Calculates a heuristic for the in-memory size of the [SealedHeader]. + #[inline] + pub fn size(&self) -> usize { + self.header.size() + mem::size_of::() + } } #[cfg(any(test, feature = "arbitrary"))] diff --git a/crates/primitives/src/transaction/access_list.rs b/crates/primitives/src/transaction/access_list.rs index eaa60b260..a86b33b69 100644 --- a/crates/primitives/src/transaction/access_list.rs +++ b/crates/primitives/src/transaction/access_list.rs @@ -1,3 +1,5 @@ +use std::mem; + use crate::{Address, H256}; use reth_codecs::{main_codec, Compact}; use reth_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper}; @@ -22,6 +24,14 @@ pub struct AccessListItem { pub storage_keys: Vec, } +impl AccessListItem { + /// Calculates a heuristic for the in-memory size of the [AccessListItem]. + #[inline] + pub fn size(&self) -> usize { + mem::size_of::
() + self.storage_keys.capacity() * mem::size_of::() + } +} + /// AccessList as defined in EIP-2930 #[main_codec(rlp)] #[derive(Clone, Debug, PartialEq, Eq, Hash, Default, RlpDecodableWrapper, RlpEncodableWrapper)] @@ -48,6 +58,14 @@ impl AccessList { }) .collect() } + + /// Calculates a heuristic for the in-memory size of the [AccessList]. + #[inline] + pub fn size(&self) -> usize { + // take into account capacity + self.0.iter().map(AccessListItem::size).sum::() + + self.0.capacity() * mem::size_of::() + } } /// Access list with gas used appended. diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 796baf298..c0371aa2a 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -1,3 +1,5 @@ +use std::mem; + use crate::{ compression::{TRANSACTION_COMPRESSOR, TRANSACTION_DECOMPRESSOR}, keccak256, Address, Bytes, ChainId, TxHash, H256, @@ -64,6 +66,20 @@ pub struct TxLegacy { pub input: Bytes, } +impl TxLegacy { + /// Calculates a heuristic for the in-memory size of the [TxLegacy] transaction. + #[inline] + fn size(&self) -> usize { + mem::size_of::>() + // chain_id + mem::size_of::() + // nonce + mem::size_of::() + // gas_price + mem::size_of::() + // gas_limit + self.to.size() + // to + mem::size_of::() + // value + self.input.len() // input + } +} + /// Transaction with an [`AccessList`] ([EIP-2930](https://eips.ethereum.org/EIPS/eip-2930)). #[main_codec] #[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] @@ -112,6 +128,21 @@ pub struct TxEip2930 { pub input: Bytes, } +impl TxEip2930 { + /// Calculates a heuristic for the in-memory size of the [TxEip2930] transaction. + #[inline] + pub fn size(&self) -> usize { + mem::size_of::() + // chain_id + mem::size_of::() + // nonce + mem::size_of::() + // gas_price + mem::size_of::() + // gas_limit + self.to.size() + // to + mem::size_of::() + // value + self.access_list.size() + // access_list + self.input.len() // input + } +} + /// A transaction with a priority fee ([EIP-1559](https://eips.ethereum.org/EIPS/eip-1559)). #[main_codec] #[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] @@ -168,6 +199,22 @@ pub struct TxEip1559 { pub input: Bytes, } +impl TxEip1559 { + /// Calculates a heuristic for the in-memory size of the [TxEip1559] transaction. + #[inline] + pub fn size(&self) -> usize { + mem::size_of::() + // chain_id + mem::size_of::() + // nonce + mem::size_of::() + // gas_limit + mem::size_of::() + // max_fee_per_gas + mem::size_of::() + // max_priority_fee_per_gas + self.to.size() + // to + mem::size_of::() + // value + self.access_list.size() + // access_list + self.input.len() // input + } +} + /// A raw transaction. /// /// Transaction types were introduced in [EIP-2718](https://eips.ethereum.org/EIPS/eip-2718). @@ -251,6 +298,16 @@ impl Transaction { Transaction::Eip1559(tx) => tx.input = input, } } + + /// Calculates a heuristic for the in-memory size of the [Transaction]. + #[inline] + fn size(&self) -> usize { + match self { + Transaction::Legacy(tx) => tx.size(), + Transaction::Eip2930(tx) => tx.size(), + Transaction::Eip1559(tx) => tx.size(), + } + } } impl Compact for Transaction { @@ -720,6 +777,12 @@ impl TransactionKind { TransactionKind::Call(to) => Some(to), } } + + /// Calculates a heuristic for the in-memory size of the [TransactionKind]. + #[inline] + fn size(self) -> usize { + mem::size_of::() + } } impl Compact for TransactionKind { @@ -1033,6 +1096,12 @@ impl TransactionSigned { initial_tx } + /// Calculate a heuristic for the in-memory size of the [TransactionSigned]. + #[inline] + pub fn size(&self) -> usize { + mem::size_of::() + self.transaction.size() + self.signature.size() + } + /// Decodes legacy transaction from the data buffer. /// /// This expects `rlp(legacy_tx)` diff --git a/crates/primitives/src/transaction/signature.rs b/crates/primitives/src/transaction/signature.rs index faf44c2b9..6977d8c26 100644 --- a/crates/primitives/src/transaction/signature.rs +++ b/crates/primitives/src/transaction/signature.rs @@ -136,6 +136,12 @@ impl Signature { sig[64] = v; sig } + + /// Calculates a heuristic for the in-memory size of the [Signature]. + #[inline] + pub fn size(&self) -> usize { + std::mem::size_of::() + } } #[cfg(test)] @@ -220,4 +226,21 @@ mod tests { let expected = Address::from_str("0x9d8a62f656a8d1615c1294fd71e9cfb3e4855a4f").unwrap(); assert_eq!(expected, signer); } + + #[test] + fn ensure_size_equals_sum_of_fields() { + let signature = Signature { + r: U256::from_str( + "18515461264373351373200002665853028612451056578545711640558177340181847433846", + ) + .unwrap(), + s: U256::from_str( + "46948507304638947509940763649030358759909902576025900602547168820602576006531", + ) + .unwrap(), + odd_y_parity: false, + }; + + assert!(signature.size() >= 65); + } } diff --git a/crates/primitives/src/withdrawal.rs b/crates/primitives/src/withdrawal.rs index d8d0145b4..098110803 100644 --- a/crates/primitives/src/withdrawal.rs +++ b/crates/primitives/src/withdrawal.rs @@ -1,3 +1,5 @@ +use std::mem; + use crate::{constants::GWEI_TO_WEI, serde_helper::u64_hex, Address, U256}; use reth_codecs::{main_codec, Compact}; use reth_rlp::{RlpDecodable, RlpEncodable}; @@ -24,6 +26,12 @@ impl Withdrawal { pub fn amount_wei(&self) -> U256 { U256::from(self.amount) * U256::from(GWEI_TO_WEI) } + + /// Calculate a heuristic for the in-memory size of the [Withdrawal]. + #[inline] + pub fn size(&self) -> usize { + mem::size_of::() + } } #[cfg(test)] diff --git a/etc/grafana/dashboards/overview.json b/etc/grafana/dashboards/overview.json index 4d27f32f7..4e11ec203 100644 --- a/etc/grafana/dashboards/overview.json +++ b/etc/grafana/dashboards/overview.json @@ -2643,6 +2643,143 @@ "title": "Downloader buffer", "type": "timeseries" }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "The number of blocks in a request and size in bytes of those block responses", + "fieldConfig": { + "defaults": { + "custom": { + "drawStyle": "line", + "lineInterpolation": "linear", + "barAlignment": 0, + "lineWidth": 1, + "fillOpacity": 0, + "gradientMode": "none", + "spanNulls": false, + "showPoints": "auto", + "pointSize": 5, + "stacking": { + "mode": "none", + "group": "A" + }, + "axisPlacement": "auto", + "axisLabel": "", + "axisColorMode": "text", + "scaleDistribution": { + "type": "linear" + }, + "axisCenteredZero": false, + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [ + { + "matcher": { + "id": "byFrameRefID", + "options": "B" + }, + "properties": [ + { + "id": "custom.axisPlacement", + "value": "right" + }, + { + "id": "unit", + "value": "blocks" + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 5, + "y": 110 + }, + "id": 102, + "options": { + "tooltip": { + "mode": "multi", + "sort": "none" + }, + "legend": { + "showLegend": true, + "displayMode": "list", + "placement": "bottom", + "calcs": [] + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "builder", + "expr": "reth_downloaders_bodies_response_response_size_bytes{instance=~\"$instance\"}", + "hide": false, + "legendFormat": "Response size", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "builder", + "expr": "reth_downloaders_bodies_response_response_length{instance=~\"$instance\"}", + "hide": false, + "legendFormat": "Individual response length", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "builder", + "expr": "reth_downloaders_bodies_response_response_size_bytes / reth_downloaders_bodies_response_response_length", + "hide": false, + "instant": false, + "legendFormat": "Mean body size in response", + "range": true, + "refId": "C" + } + ], + "title": "Block body response sizes", + "type": "timeseries" + }, { "collapsed": false, "gridPos": {