diff --git a/crates/ethereum-forks/src/forkid.rs b/crates/ethereum-forks/src/forkid.rs index 40fdd4964..862aecfa6 100644 --- a/crates/ethereum-forks/src/forkid.rs +++ b/crates/ethereum-forks/src/forkid.rs @@ -365,7 +365,7 @@ impl Cache { // Create ForkId using the last past fork's hash and the next epoch start. let fork_id = ForkId { - hash: past.last().expect("there is always at least one - genesis - fork hash; qed").1, + hash: past.last().expect("there is always at least one - genesis - fork hash").1, next: epoch_end.unwrap_or(ForkFilterKey::Block(0)).into(), }; diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index a15f983e8..7170f60bd 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -545,8 +545,7 @@ impl Discv4Service { for (key, val) in config.additional_eip868_rlp_pairs.iter() { builder.add_value_rlp(key, val.clone()); } - - builder.build(&secret_key).expect("v4 is set; qed") + builder.build(&secret_key).expect("v4 is set") }; let (to_service, commands_rx) = mpsc::unbounded_channel(); diff --git a/crates/net/discv4/src/proto.rs b/crates/net/discv4/src/proto.rs index 9f00749a7..682376413 100644 --- a/crates/net/discv4/src/proto.rs +++ b/crates/net/discv4/src/proto.rs @@ -113,7 +113,7 @@ impl Message { // Sign the payload with the secret key using recoverable ECDSA let signature: RecoverableSignature = SECP256K1.sign_ecdsa_recoverable( &secp256k1::Message::from_slice(keccak256(&payload).as_ref()) - .expect("is correct MESSAGE_SIZE; qed"), + .expect("B256.len() == MESSAGE_SIZE"), secret_key, ); diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index 5f799b2ac..25e6bf0e2 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -359,32 +359,32 @@ pub struct SharedCapabilities(Vec); impl SharedCapabilities { /// Merges the local and peer capabilities and returns a new [`SharedCapabilities`] instance. + #[inline] pub fn try_new( local_protocols: Vec, peer_capabilities: Vec, ) -> Result { - Ok(Self(shared_capability_offsets(local_protocols, peer_capabilities)?)) + shared_capability_offsets(local_protocols, peer_capabilities).map(Self) } /// Iterates over the shared capabilities. + #[inline] pub fn iter_caps(&self) -> impl Iterator { self.0.iter() } /// Returns the eth capability if it is shared. + #[inline] pub fn eth(&self) -> Result<&SharedCapability, P2PStreamError> { - for cap in self.iter_caps() { - if cap.is_eth() { - return Ok(cap) - } - } - Err(P2PStreamError::CapabilityNotShared) + self.iter_caps().find(|c| c.is_eth()).ok_or(P2PStreamError::CapabilityNotShared) } /// Returns the negotiated eth version if it is shared. #[inline] pub fn eth_version(&self) -> Result { - self.eth().map(|cap| cap.eth_version().expect("is eth; qed")) + self.iter_caps() + .find_map(SharedCapability::eth_version) + .ok_or(P2PStreamError::CapabilityNotShared) } /// Returns true if the shared capabilities contain the given capability. @@ -526,15 +526,13 @@ pub fn shared_capability_offsets( // alphabetic order. let mut offset = MAX_RESERVED_MESSAGE_ID + 1; for name in shared_capability_names { - let proto_version = shared_capabilities.get(&name).expect("shared; qed"); - + let proto_version = &shared_capabilities[&name]; let shared_capability = SharedCapability::new( &name, proto_version.version as u8, offset, proto_version.messages, )?; - offset += shared_capability.num_messages(); shared_with_offsets.push(shared_capability); } diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index e244e70aa..19c605fb9 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -138,7 +138,7 @@ impl StateFetcher { let Some(peer_id) = self.next_peer() else { return PollAction::NoPeersAvailable }; - let request = self.queued_requests.pop_front().expect("not empty; qed"); + let request = self.queued_requests.pop_front().expect("not empty"); let request = self.prepare_block_request(peer_id, request); PollAction::Ready(FetchAction::BlockRequest { peer_id, request }) diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index a56124257..489f53ce8 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -857,36 +857,39 @@ where NetworkEvent::SessionEstablished { peer_id, client_version, messages, version, .. } => { - // insert a new peer into the peerset - self.peers.insert(peer_id, Peer::new(messages, version, client_version)); + // Insert a new peer into the peerset. + let peer = Peer::new(messages, version, client_version); + let peer = match self.peers.entry(peer_id) { + Entry::Occupied(mut entry) => { + entry.insert(peer); + entry.into_mut() + } + Entry::Vacant(entry) => entry.insert(peer), + }; // Send a `NewPooledTransactionHashes` to the peer with up to - // `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the - // pool - if !self.network.is_initially_syncing() { - if self.network.tx_gossip_disabled() { - return - } - let peer = self.peers.get_mut(&peer_id).expect("is present; qed"); - - let mut msg_builder = PooledTransactionsHashesBuilder::new(version); - - let pooled_txs = self.pool.pooled_transactions_max( - SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, - ); - if pooled_txs.is_empty() { - // do not send a message if there are no transactions in the pool - return - } - - for pooled_tx in pooled_txs.into_iter() { - peer.seen_transactions.insert(*pooled_tx.hash()); - msg_builder.push_pooled(pooled_tx); - } - - let msg = msg_builder.build(); - self.network.send_transactions_hashes(peer_id, msg); + // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE` + // transactions in the pool. + if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() { + return } + + let pooled_txs = self.pool.pooled_transactions_max( + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, + ); + if pooled_txs.is_empty() { + // do not send a message if there are no transactions in the pool + return + } + + let mut msg_builder = PooledTransactionsHashesBuilder::new(version); + for pooled_tx in pooled_txs { + peer.seen_transactions.insert(*pooled_tx.hash()); + msg_builder.push_pooled(pooled_tx); + } + + let msg = msg_builder.build(); + self.network.send_transactions_hashes(peer_id, msg); } _ => {} } diff --git a/crates/primitives/src/receipt.rs b/crates/primitives/src/receipt.rs index e56d76564..ae3f59a64 100644 --- a/crates/primitives/src/receipt.rs +++ b/crates/primitives/src/receipt.rs @@ -500,6 +500,8 @@ impl<'a> ReceiptWithBloomEncoder<'a> { } match self.receipt.tx_type { + TxType::Legacy => unreachable!("legacy already handled"), + TxType::EIP2930 => { out.put_u8(0x01); } @@ -513,7 +515,6 @@ impl<'a> ReceiptWithBloomEncoder<'a> { TxType::DEPOSIT => { out.put_u8(0x7E); } - _ => unreachable!("legacy handled; qed."), } out.put_slice(payload.as_ref()); } diff --git a/crates/primitives/src/transaction/signature.rs b/crates/primitives/src/transaction/signature.rs index de1f09c7c..3cbfa1c74 100644 --- a/crates/primitives/src/transaction/signature.rs +++ b/crates/primitives/src/transaction/signature.rs @@ -42,18 +42,16 @@ impl Compact for Signature { where B: bytes::BufMut + AsMut<[u8]>, { - buf.put_slice(self.r.as_le_bytes().as_ref()); - buf.put_slice(self.s.as_le_bytes().as_ref()); + buf.put_slice(&self.r.as_le_bytes()); + buf.put_slice(&self.s.as_le_bytes()); self.odd_y_parity as usize } fn from_compact(mut buf: &[u8], identifier: usize) -> (Self, &[u8]) { - let r = U256::try_from_le_slice(&buf[..32]).expect("qed"); - buf.advance(32); - - let s = U256::try_from_le_slice(&buf[..32]).expect("qed"); - buf.advance(32); - + assert!(buf.len() >= 64); + let r = U256::from_le_slice(&buf[0..32]); + let s = U256::from_le_slice(&buf[32..64]); + buf.advance(64); (Signature { r, s, odd_y_parity: identifier != 0 }, buf) } } diff --git a/crates/prune/src/segments/receipts_by_logs.rs b/crates/prune/src/segments/receipts_by_logs.rs index e05c87533..984072b30 100644 --- a/crates/prune/src/segments/receipts_by_logs.rs +++ b/crates/prune/src/segments/receipts_by_logs.rs @@ -158,11 +158,12 @@ impl Segment for ReceiptsByLogs { // For accurate checkpoints we need to know that we have checked every transaction. // Example: we reached the end of the range, and the last receipt is supposed to skip // its deletion. - last_pruned_transaction = - Some(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction)); + let last_pruned_transaction = *last_pruned_transaction + .insert(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction)); + last_pruned_block = Some( provider - .transaction_block(last_pruned_transaction.expect("qed"))? + .transaction_block(last_pruned_transaction)? .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? // If there's more receipts to prune, set the checkpoint block number to // previous, so we could finish pruning its receipts on the @@ -175,7 +176,7 @@ impl Segment for ReceiptsByLogs { break } - from_tx_number = last_pruned_transaction.expect("qed") + 1; + from_tx_number = last_pruned_transaction + 1; } // If there are contracts using `PruneMode::Distance(_)` there will be receipts before diff --git a/crates/rpc/ipc/src/server/future.rs b/crates/rpc/ipc/src/server/future.rs index eabf1da48..84df306a5 100644 --- a/crates/rpc/ipc/src/server/future.rs +++ b/crates/rpc/ipc/src/server/future.rs @@ -195,7 +195,7 @@ impl ConnectionGuard { match self.0.clone().try_acquire_owned() { Ok(guard) => Some(guard), Err(TryAcquireError::Closed) => { - unreachable!("Semaphore::Close is never called and can't be closed; qed") + unreachable!("Semaphore::Close is never called and can't be closed") } Err(TryAcquireError::NoPermits) => None, } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 1f9db9b67..10262b54d 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1272,76 +1272,77 @@ where where F: FnOnce(&EthHandlers) -> R, { - if self.eth.is_none() { - let cache = EthStateCache::spawn_with( - self.provider.clone(), - self.config.eth.cache.clone(), - self.executor.clone(), - self.evm_config.clone(), - ); - let gas_oracle = GasPriceOracle::new( - self.provider.clone(), - self.config.eth.gas_oracle.clone(), - cache.clone(), - ); - let new_canonical_blocks = self.events.canonical_state_stream(); - let c = cache.clone(); + f(match &self.eth { + Some(eth) => eth, + None => self.eth.insert(self.init_eth()), + }) + } - self.executor.spawn_critical( - "cache canonical blocks task", - Box::pin(async move { - cache_new_blocks_task(c, new_canonical_blocks).await; - }), - ); + fn init_eth(&self) -> EthHandlers { + let cache = EthStateCache::spawn_with( + self.provider.clone(), + self.config.eth.cache.clone(), + self.executor.clone(), + self.evm_config.clone(), + ); + let gas_oracle = GasPriceOracle::new( + self.provider.clone(), + self.config.eth.gas_oracle.clone(), + cache.clone(), + ); + let new_canonical_blocks = self.events.canonical_state_stream(); + let c = cache.clone(); - let fee_history_cache = - FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone()); - let new_canonical_blocks = self.events.canonical_state_stream(); - let fhc = fee_history_cache.clone(); - let provider_clone = self.provider.clone(); - self.executor.spawn_critical( - "cache canonical blocks for fee history task", - Box::pin(async move { - fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone) - .await; - }), - ); + self.executor.spawn_critical( + "cache canonical blocks task", + Box::pin(async move { + cache_new_blocks_task(c, new_canonical_blocks).await; + }), + ); - let executor = Box::new(self.executor.clone()); - let blocking_task_pool = - BlockingTaskPool::build().expect("failed to build tracing pool"); - let api = EthApi::with_spawner( - self.provider.clone(), - self.pool.clone(), - self.network.clone(), - cache.clone(), - gas_oracle, - self.config.eth.rpc_gas_cap, - executor.clone(), - blocking_task_pool.clone(), - fee_history_cache, - self.evm_config.clone(), - ); - let filter = EthFilter::new( - self.provider.clone(), - self.pool.clone(), - cache.clone(), - self.config.eth.filter_config(), - executor.clone(), - ); + let fee_history_cache = + FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone()); + let new_canonical_blocks = self.events.canonical_state_stream(); + let fhc = fee_history_cache.clone(); + let provider_clone = self.provider.clone(); + self.executor.spawn_critical( + "cache canonical blocks for fee history task", + Box::pin(async move { + fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone).await; + }), + ); - let pubsub = EthPubSub::with_spawner( - self.provider.clone(), - self.pool.clone(), - self.events.clone(), - self.network.clone(), - executor, - ); + let executor = Box::new(self.executor.clone()); + let blocking_task_pool = BlockingTaskPool::build().expect("failed to build tracing pool"); + let api = EthApi::with_spawner( + self.provider.clone(), + self.pool.clone(), + self.network.clone(), + cache.clone(), + gas_oracle, + self.config.eth.rpc_gas_cap, + executor.clone(), + blocking_task_pool.clone(), + fee_history_cache, + self.evm_config.clone(), + ); + let filter = EthFilter::new( + self.provider.clone(), + self.pool.clone(), + cache.clone(), + self.config.eth.filter_config(), + executor.clone(), + ); - let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool }; - self.eth = Some(eth); - } - f(self.eth.as_ref().expect("exists; qed")) + let pubsub = EthPubSub::with_spawner( + self.provider.clone(), + self.pool.clone(), + self.events.clone(), + self.network.clone(), + executor, + ); + + EthHandlers { api, cache, filter, pubsub, blocking_task_pool } } /// Returns the configured [EthHandlers] or creates it if it does not exist yet @@ -1643,9 +1644,7 @@ impl RpcServerConfig { } Some(ws_cors) } - (None, cors @ Some(_)) => cors, - (cors @ Some(_), None) => cors, - _ => None, + (a, b) => a.or(b), } .cloned(); @@ -1656,7 +1655,7 @@ impl RpcServerConfig { modules.config.ensure_ws_http_identical()?; - let builder = self.http_server_config.take().expect("is set; qed"); + let builder = self.http_server_config.take().expect("http_server_config is Some"); let (server, addr) = WsHttpServerKind::build( builder, http_socket_addr, diff --git a/crates/rpc/rpc/src/eth/api/state.rs b/crates/rpc/rpc/src/eth/api/state.rs index e88e6adca..30f477bc4 100644 --- a/crates/rpc/rpc/src/eth/api/state.rs +++ b/crates/rpc/rpc/src/eth/api/state.rs @@ -48,25 +48,11 @@ where block_id: Option, ) -> EthResult { if let Some(BlockId::Number(BlockNumberOrTag::Pending)) = block_id { - // lookup transactions in pool let address_txs = self.pool().get_transactions_by_sender(address); - - if !address_txs.is_empty() { - // get max transaction with the highest nonce - let highest_nonce_tx = address_txs - .into_iter() - .reduce(|accum, item| { - if item.transaction.nonce() > accum.transaction.nonce() { - item - } else { - accum - } - }) - .expect("Not empty; qed"); - - let tx_count = highest_nonce_tx - .transaction - .nonce() + if let Some(highest_nonce) = + address_txs.iter().map(|item| item.transaction.nonce()).max() + { + let tx_count = highest_nonce .checked_add(1) .ok_or(RpcInvalidTransactionError::NonceMaxValue)?; return Ok(U256::from(tx_count)) diff --git a/crates/rpc/rpc/src/eth/gas_oracle.rs b/crates/rpc/rpc/src/eth/gas_oracle.rs index 5b645e6b8..4c1827ecc 100644 --- a/crates/rpc/rpc/src/eth/gas_oracle.rs +++ b/crates/rpc/rpc/src/eth/gas_oracle.rs @@ -191,7 +191,7 @@ where results.sort_unstable(); price = *results .get((results.len() - 1) * self.oracle_config.percentile as usize / 100) - .expect("gas price index is a percent of nonzero array length, so a value always exists; qed"); + .expect("gas price index is a percent of nonzero array length, so a value always exists"); } // constrain to the max price diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index bf114688b..2d3cb1474 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -305,7 +305,7 @@ where fn log_stream(&self, filter: FilteredParams) -> impl Stream { BroadcastStream::new(self.chain_events.subscribe_to_canonical_state()) .map(move |canon_state| { - canon_state.expect("new block subscription never ends; qed").block_receipts() + canon_state.expect("new block subscription never ends").block_receipts() }) .flat_map(futures::stream::iter) .flat_map(move |(block_receipts, removed)| { diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 7688c6031..3661c64d3 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -8,7 +8,7 @@ use crate::{ transaction::{DbTx, DbTxMut}, DatabaseError, }; -use parking_lot::RwLock; +use parking_lot::Mutex; use reth_interfaces::db::{DatabaseWriteError, DatabaseWriteOperation}; use reth_libmdbx::{ffi::DBI, CommitLatency, Transaction, TransactionKind, WriteFlags, RW}; use reth_tracing::tracing::{debug, trace, warn}; @@ -30,36 +30,43 @@ const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60); pub struct Tx { /// Libmdbx-sys transaction. pub inner: Transaction, - /// Database table handle cache. - pub(crate) db_handles: Arc; Tables::COUNT]>>, + /// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't /// closed by [Tx::commit] or [Tx::abort], but we still need to report it in the metrics. /// /// If [Some], then metrics are reported. metrics_handler: Option>, + + /// Database table handle cache. + db_handles: Mutex<[Option; Tables::COUNT]>, } impl Tx { /// Creates new `Tx` object with a `RO` or `RW` transaction. + #[inline] pub fn new(inner: Transaction) -> Self { - Self { inner, db_handles: Default::default(), metrics_handler: None } + Self::new_inner(inner, None) } /// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics. + #[inline] #[track_caller] pub fn new_with_metrics( inner: Transaction, - metrics: Option>, + env_metrics: Option>, ) -> Self { - let metrics_handler = if let Some(metrics) = metrics { - let handler = MetricsHandler::::new(inner.id(), metrics); + let metrics_handler = env_metrics.map(|env_metrics| { + let handler = MetricsHandler::::new(inner.id(), env_metrics); handler.env_metrics.record_opened_transaction(handler.transaction_mode()); handler.log_transaction_opened(); - Some(handler) - } else { - None - }; - Self { inner, db_handles: Default::default(), metrics_handler } + handler + }); + Self::new_inner(inner, metrics_handler) + } + + #[inline] + fn new_inner(inner: Transaction, metrics_handler: Option>) -> Self { + Self { inner, db_handles: Mutex::new([None; Tables::COUNT]), metrics_handler } } /// Gets this transaction ID. @@ -69,18 +76,14 @@ impl Tx { /// Gets a table database handle if it exists, otherwise creates it. pub fn get_dbi(&self) -> Result { - let mut handles = self.db_handles.write(); - - let table = T::TABLE; - - let dbi_handle = handles.get_mut(table as usize).expect("should exist"); - if dbi_handle.is_none() { - *dbi_handle = Some( - self.inner.open_db(Some(T::NAME)).map_err(|e| DatabaseError::Open(e.into()))?.dbi(), - ); + match self.db_handles.lock()[T::TABLE as usize] { + Some(handle) => Ok(handle), + ref mut handle @ None => { + let db = + self.inner.open_db(Some(T::NAME)).map_err(|e| DatabaseError::Open(e.into()))?; + Ok(*handle.insert(db.dbi())) + } } - - Ok(dbi_handle.expect("is some; qed")) } /// Create db Cursor diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index ad112c9d0..46b7dc26b 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -239,15 +239,14 @@ impl NippyJar { Ok(obj) } - /// Loads filters into memory - pub fn load_filters(mut self) -> Result { + /// Loads filters into memory. + pub fn load_filters(&mut self) -> Result<(), NippyJarError> { // Read the offsets lists located at the index file. let mut offsets_file = File::open(self.index_path())?; - self.offsets_index = PrefixSummedEliasFano::deserialize_from(&mut offsets_file)?; self.phf = bincode::deserialize_from(&mut offsets_file)?; self.filter = bincode::deserialize_from(&mut offsets_file)?; - Ok(self) + Ok(()) } /// Returns the path for the data file @@ -588,8 +587,8 @@ mod tests { nippy .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows) .unwrap(); - let loaded_nippy = - NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap(); + let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + loaded_nippy.load_filters().unwrap(); assert_eq!(indexes, collect_indexes(&loaded_nippy)); }; @@ -633,8 +632,8 @@ mod tests { assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok()); nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); - let loaded_nippy = - NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap(); + let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + loaded_nippy.load_filters().unwrap(); assert_eq!(nippy, loaded_nippy); @@ -687,8 +686,8 @@ mod tests { nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); - let loaded_nippy = - NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap(); + let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + loaded_nippy.load_filters().unwrap(); assert_eq!(nippy.version, loaded_nippy.version); assert_eq!(nippy.columns, loaded_nippy.columns); assert_eq!(nippy.filter, loaded_nippy.filter); @@ -730,8 +729,8 @@ mod tests { nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); - let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); - let loaded_nippy = loaded_nippy.load_filters().unwrap(); + let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + loaded_nippy.load_filters().unwrap(); assert_eq!(nippy, loaded_nippy); if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() { @@ -767,8 +766,8 @@ mod tests { nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap(); - let loaded_nippy = - NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap(); + let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + loaded_nippy.load_filters().unwrap(); assert_eq!(nippy, loaded_nippy); if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() { @@ -823,8 +822,8 @@ mod tests { // Read file { - let loaded_nippy = - NippyJar::::load(file_path.path()).unwrap().load_filters().unwrap(); + let mut loaded_nippy = NippyJar::::load(file_path.path()).unwrap(); + loaded_nippy.load_filters().unwrap(); assert!(loaded_nippy.compressor().is_some()); assert!(loaded_nippy.filter.is_some()); @@ -894,8 +893,8 @@ mod tests { // Read file { - let loaded_nippy = - NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap(); + let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap(); + loaded_nippy.load_filters().unwrap(); if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() { let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap(); diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 6bf1b2424..c46cab325 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -156,20 +156,19 @@ impl SnapshotProvider { tx_range: &RangeInclusive, ) -> ProviderResult> { let key = (*block_range.end(), segment); - if let Some(jar) = self.map.get(&key) { - Ok(jar.into()) - } else { - let jar = NippyJar::load(&self.path.join(segment.filename(block_range, tx_range))) - .map(|jar| { + let entry = match self.map.entry(key) { + dashmap::mapref::entry::Entry::Occupied(entry) => entry.into_ref(), + dashmap::mapref::entry::Entry::Vacant(entry) => { + let path = self.path.join(segment.filename(block_range, tx_range)); + let mut jar = NippyJar::load(&path)?; if self.load_filters { - return jar.load_filters() + jar.load_filters()?; } - Ok(jar) - })??; - - self.map.insert(key, LoadedJar::new(jar)?); - Ok(self.map.get(&key).expect("qed").into()) - } + let loaded_jar = LoadedJar::new(jar)?; + entry.insert(loaded_jar) + } + }; + Ok(entry.downgrade().into()) } /// Gets a snapshot segment's block range and transaction range from the provider inner block diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 17d18d9eb..839af9339 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -350,7 +350,8 @@ where transaction: Self::Transaction, ) -> PoolResult { let (_, tx) = self.validate(origin, transaction).await; - self.pool.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed") + let mut results = self.pool.add_transactions(origin, std::iter::once(tx)); + results.pop().expect("result length is the same as the input") } async fn add_transactions( diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index b547fc0e4..0a4ac3c3a 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -476,7 +476,8 @@ where let mut listener = self.event_listener.write(); listener.subscribe(tx.tx_hash()) }; - self.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed")?; + let mut results = self.add_transactions(origin, std::iter::once(tx)); + results.pop().expect("result length is the same as the input")?; Ok(listener) } diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 2c0ee3e02..ff7fe0796 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -1583,7 +1583,7 @@ impl AllTransactions { } else { // The transaction was added above so the _inclusive_ descendants iterator // returns at least 1 tx. - let (id, tx) = descendants.peek().expect("Includes >= 1; qed."); + let (id, tx) = descendants.peek().expect("includes >= 1"); if id.nonce < inserted_tx_id.nonce { !tx.state.is_pending() } else { diff --git a/docs/crates/network.md b/docs/crates/network.md index 7d43589f4..cc47f35d6 100644 --- a/docs/crates/network.md +++ b/docs/crates/network.md @@ -440,7 +440,7 @@ fn poll_action(&mut self) -> PollAction { return PollAction::NoPeersAvailable }; - let request = self.queued_requests.pop_front().expect("not empty; qed"); + let request = self.queued_requests.pop_front().expect("not empty"); let request = self.prepare_block_request(peer_id, request); PollAction::Ready(FetchAction::BlockRequest { peer_id, request })