mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore: remove some unnecessary calls to unwrap/expect (#6727)
This commit is contained in:
@ -365,7 +365,7 @@ impl Cache {
|
||||
|
||||
// Create ForkId using the last past fork's hash and the next epoch start.
|
||||
let fork_id = ForkId {
|
||||
hash: past.last().expect("there is always at least one - genesis - fork hash; qed").1,
|
||||
hash: past.last().expect("there is always at least one - genesis - fork hash").1,
|
||||
next: epoch_end.unwrap_or(ForkFilterKey::Block(0)).into(),
|
||||
};
|
||||
|
||||
|
||||
@ -545,8 +545,7 @@ impl Discv4Service {
|
||||
for (key, val) in config.additional_eip868_rlp_pairs.iter() {
|
||||
builder.add_value_rlp(key, val.clone());
|
||||
}
|
||||
|
||||
builder.build(&secret_key).expect("v4 is set; qed")
|
||||
builder.build(&secret_key).expect("v4 is set")
|
||||
};
|
||||
|
||||
let (to_service, commands_rx) = mpsc::unbounded_channel();
|
||||
|
||||
@ -113,7 +113,7 @@ impl Message {
|
||||
// Sign the payload with the secret key using recoverable ECDSA
|
||||
let signature: RecoverableSignature = SECP256K1.sign_ecdsa_recoverable(
|
||||
&secp256k1::Message::from_slice(keccak256(&payload).as_ref())
|
||||
.expect("is correct MESSAGE_SIZE; qed"),
|
||||
.expect("B256.len() == MESSAGE_SIZE"),
|
||||
secret_key,
|
||||
);
|
||||
|
||||
|
||||
@ -359,32 +359,32 @@ pub struct SharedCapabilities(Vec<SharedCapability>);
|
||||
|
||||
impl SharedCapabilities {
|
||||
/// Merges the local and peer capabilities and returns a new [`SharedCapabilities`] instance.
|
||||
#[inline]
|
||||
pub fn try_new(
|
||||
local_protocols: Vec<Protocol>,
|
||||
peer_capabilities: Vec<Capability>,
|
||||
) -> Result<Self, P2PStreamError> {
|
||||
Ok(Self(shared_capability_offsets(local_protocols, peer_capabilities)?))
|
||||
shared_capability_offsets(local_protocols, peer_capabilities).map(Self)
|
||||
}
|
||||
|
||||
/// Iterates over the shared capabilities.
|
||||
#[inline]
|
||||
pub fn iter_caps(&self) -> impl Iterator<Item = &SharedCapability> {
|
||||
self.0.iter()
|
||||
}
|
||||
|
||||
/// Returns the eth capability if it is shared.
|
||||
#[inline]
|
||||
pub fn eth(&self) -> Result<&SharedCapability, P2PStreamError> {
|
||||
for cap in self.iter_caps() {
|
||||
if cap.is_eth() {
|
||||
return Ok(cap)
|
||||
}
|
||||
}
|
||||
Err(P2PStreamError::CapabilityNotShared)
|
||||
self.iter_caps().find(|c| c.is_eth()).ok_or(P2PStreamError::CapabilityNotShared)
|
||||
}
|
||||
|
||||
/// Returns the negotiated eth version if it is shared.
|
||||
#[inline]
|
||||
pub fn eth_version(&self) -> Result<EthVersion, P2PStreamError> {
|
||||
self.eth().map(|cap| cap.eth_version().expect("is eth; qed"))
|
||||
self.iter_caps()
|
||||
.find_map(SharedCapability::eth_version)
|
||||
.ok_or(P2PStreamError::CapabilityNotShared)
|
||||
}
|
||||
|
||||
/// Returns true if the shared capabilities contain the given capability.
|
||||
@ -526,15 +526,13 @@ pub fn shared_capability_offsets(
|
||||
// alphabetic order.
|
||||
let mut offset = MAX_RESERVED_MESSAGE_ID + 1;
|
||||
for name in shared_capability_names {
|
||||
let proto_version = shared_capabilities.get(&name).expect("shared; qed");
|
||||
|
||||
let proto_version = &shared_capabilities[&name];
|
||||
let shared_capability = SharedCapability::new(
|
||||
&name,
|
||||
proto_version.version as u8,
|
||||
offset,
|
||||
proto_version.messages,
|
||||
)?;
|
||||
|
||||
offset += shared_capability.num_messages();
|
||||
shared_with_offsets.push(shared_capability);
|
||||
}
|
||||
|
||||
@ -138,7 +138,7 @@ impl StateFetcher {
|
||||
|
||||
let Some(peer_id) = self.next_peer() else { return PollAction::NoPeersAvailable };
|
||||
|
||||
let request = self.queued_requests.pop_front().expect("not empty; qed");
|
||||
let request = self.queued_requests.pop_front().expect("not empty");
|
||||
let request = self.prepare_block_request(peer_id, request);
|
||||
|
||||
PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
|
||||
|
||||
@ -857,36 +857,39 @@ where
|
||||
NetworkEvent::SessionEstablished {
|
||||
peer_id, client_version, messages, version, ..
|
||||
} => {
|
||||
// insert a new peer into the peerset
|
||||
self.peers.insert(peer_id, Peer::new(messages, version, client_version));
|
||||
// Insert a new peer into the peerset.
|
||||
let peer = Peer::new(messages, version, client_version);
|
||||
let peer = match self.peers.entry(peer_id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.insert(peer);
|
||||
entry.into_mut()
|
||||
}
|
||||
Entry::Vacant(entry) => entry.insert(peer),
|
||||
};
|
||||
|
||||
// Send a `NewPooledTransactionHashes` to the peer with up to
|
||||
// `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the
|
||||
// pool
|
||||
if !self.network.is_initially_syncing() {
|
||||
if self.network.tx_gossip_disabled() {
|
||||
return
|
||||
}
|
||||
let peer = self.peers.get_mut(&peer_id).expect("is present; qed");
|
||||
|
||||
let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
|
||||
|
||||
let pooled_txs = self.pool.pooled_transactions_max(
|
||||
SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
|
||||
);
|
||||
if pooled_txs.is_empty() {
|
||||
// do not send a message if there are no transactions in the pool
|
||||
return
|
||||
}
|
||||
|
||||
for pooled_tx in pooled_txs.into_iter() {
|
||||
peer.seen_transactions.insert(*pooled_tx.hash());
|
||||
msg_builder.push_pooled(pooled_tx);
|
||||
}
|
||||
|
||||
let msg = msg_builder.build();
|
||||
self.network.send_transactions_hashes(peer_id, msg);
|
||||
// `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
|
||||
// transactions in the pool.
|
||||
if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
|
||||
return
|
||||
}
|
||||
|
||||
let pooled_txs = self.pool.pooled_transactions_max(
|
||||
SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
|
||||
);
|
||||
if pooled_txs.is_empty() {
|
||||
// do not send a message if there are no transactions in the pool
|
||||
return
|
||||
}
|
||||
|
||||
let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
|
||||
for pooled_tx in pooled_txs {
|
||||
peer.seen_transactions.insert(*pooled_tx.hash());
|
||||
msg_builder.push_pooled(pooled_tx);
|
||||
}
|
||||
|
||||
let msg = msg_builder.build();
|
||||
self.network.send_transactions_hashes(peer_id, msg);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
@ -500,6 +500,8 @@ impl<'a> ReceiptWithBloomEncoder<'a> {
|
||||
}
|
||||
|
||||
match self.receipt.tx_type {
|
||||
TxType::Legacy => unreachable!("legacy already handled"),
|
||||
|
||||
TxType::EIP2930 => {
|
||||
out.put_u8(0x01);
|
||||
}
|
||||
@ -513,7 +515,6 @@ impl<'a> ReceiptWithBloomEncoder<'a> {
|
||||
TxType::DEPOSIT => {
|
||||
out.put_u8(0x7E);
|
||||
}
|
||||
_ => unreachable!("legacy handled; qed."),
|
||||
}
|
||||
out.put_slice(payload.as_ref());
|
||||
}
|
||||
|
||||
@ -42,18 +42,16 @@ impl Compact for Signature {
|
||||
where
|
||||
B: bytes::BufMut + AsMut<[u8]>,
|
||||
{
|
||||
buf.put_slice(self.r.as_le_bytes().as_ref());
|
||||
buf.put_slice(self.s.as_le_bytes().as_ref());
|
||||
buf.put_slice(&self.r.as_le_bytes());
|
||||
buf.put_slice(&self.s.as_le_bytes());
|
||||
self.odd_y_parity as usize
|
||||
}
|
||||
|
||||
fn from_compact(mut buf: &[u8], identifier: usize) -> (Self, &[u8]) {
|
||||
let r = U256::try_from_le_slice(&buf[..32]).expect("qed");
|
||||
buf.advance(32);
|
||||
|
||||
let s = U256::try_from_le_slice(&buf[..32]).expect("qed");
|
||||
buf.advance(32);
|
||||
|
||||
assert!(buf.len() >= 64);
|
||||
let r = U256::from_le_slice(&buf[0..32]);
|
||||
let s = U256::from_le_slice(&buf[32..64]);
|
||||
buf.advance(64);
|
||||
(Signature { r, s, odd_y_parity: identifier != 0 }, buf)
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,11 +158,12 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
|
||||
// For accurate checkpoints we need to know that we have checked every transaction.
|
||||
// Example: we reached the end of the range, and the last receipt is supposed to skip
|
||||
// its deletion.
|
||||
last_pruned_transaction =
|
||||
Some(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
|
||||
let last_pruned_transaction = *last_pruned_transaction
|
||||
.insert(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
|
||||
|
||||
last_pruned_block = Some(
|
||||
provider
|
||||
.transaction_block(last_pruned_transaction.expect("qed"))?
|
||||
.transaction_block(last_pruned_transaction)?
|
||||
.ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
|
||||
// If there's more receipts to prune, set the checkpoint block number to
|
||||
// previous, so we could finish pruning its receipts on the
|
||||
@ -175,7 +176,7 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
|
||||
break
|
||||
}
|
||||
|
||||
from_tx_number = last_pruned_transaction.expect("qed") + 1;
|
||||
from_tx_number = last_pruned_transaction + 1;
|
||||
}
|
||||
|
||||
// If there are contracts using `PruneMode::Distance(_)` there will be receipts before
|
||||
|
||||
@ -195,7 +195,7 @@ impl ConnectionGuard {
|
||||
match self.0.clone().try_acquire_owned() {
|
||||
Ok(guard) => Some(guard),
|
||||
Err(TryAcquireError::Closed) => {
|
||||
unreachable!("Semaphore::Close is never called and can't be closed; qed")
|
||||
unreachable!("Semaphore::Close is never called and can't be closed")
|
||||
}
|
||||
Err(TryAcquireError::NoPermits) => None,
|
||||
}
|
||||
|
||||
@ -1272,76 +1272,77 @@ where
|
||||
where
|
||||
F: FnOnce(&EthHandlers<Provider, Pool, Network, Events, EvmConfig>) -> R,
|
||||
{
|
||||
if self.eth.is_none() {
|
||||
let cache = EthStateCache::spawn_with(
|
||||
self.provider.clone(),
|
||||
self.config.eth.cache.clone(),
|
||||
self.executor.clone(),
|
||||
self.evm_config.clone(),
|
||||
);
|
||||
let gas_oracle = GasPriceOracle::new(
|
||||
self.provider.clone(),
|
||||
self.config.eth.gas_oracle.clone(),
|
||||
cache.clone(),
|
||||
);
|
||||
let new_canonical_blocks = self.events.canonical_state_stream();
|
||||
let c = cache.clone();
|
||||
f(match &self.eth {
|
||||
Some(eth) => eth,
|
||||
None => self.eth.insert(self.init_eth()),
|
||||
})
|
||||
}
|
||||
|
||||
self.executor.spawn_critical(
|
||||
"cache canonical blocks task",
|
||||
Box::pin(async move {
|
||||
cache_new_blocks_task(c, new_canonical_blocks).await;
|
||||
}),
|
||||
);
|
||||
fn init_eth(&self) -> EthHandlers<Provider, Pool, Network, Events, EvmConfig> {
|
||||
let cache = EthStateCache::spawn_with(
|
||||
self.provider.clone(),
|
||||
self.config.eth.cache.clone(),
|
||||
self.executor.clone(),
|
||||
self.evm_config.clone(),
|
||||
);
|
||||
let gas_oracle = GasPriceOracle::new(
|
||||
self.provider.clone(),
|
||||
self.config.eth.gas_oracle.clone(),
|
||||
cache.clone(),
|
||||
);
|
||||
let new_canonical_blocks = self.events.canonical_state_stream();
|
||||
let c = cache.clone();
|
||||
|
||||
let fee_history_cache =
|
||||
FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone());
|
||||
let new_canonical_blocks = self.events.canonical_state_stream();
|
||||
let fhc = fee_history_cache.clone();
|
||||
let provider_clone = self.provider.clone();
|
||||
self.executor.spawn_critical(
|
||||
"cache canonical blocks for fee history task",
|
||||
Box::pin(async move {
|
||||
fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone)
|
||||
.await;
|
||||
}),
|
||||
);
|
||||
self.executor.spawn_critical(
|
||||
"cache canonical blocks task",
|
||||
Box::pin(async move {
|
||||
cache_new_blocks_task(c, new_canonical_blocks).await;
|
||||
}),
|
||||
);
|
||||
|
||||
let executor = Box::new(self.executor.clone());
|
||||
let blocking_task_pool =
|
||||
BlockingTaskPool::build().expect("failed to build tracing pool");
|
||||
let api = EthApi::with_spawner(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
self.network.clone(),
|
||||
cache.clone(),
|
||||
gas_oracle,
|
||||
self.config.eth.rpc_gas_cap,
|
||||
executor.clone(),
|
||||
blocking_task_pool.clone(),
|
||||
fee_history_cache,
|
||||
self.evm_config.clone(),
|
||||
);
|
||||
let filter = EthFilter::new(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
cache.clone(),
|
||||
self.config.eth.filter_config(),
|
||||
executor.clone(),
|
||||
);
|
||||
let fee_history_cache =
|
||||
FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone());
|
||||
let new_canonical_blocks = self.events.canonical_state_stream();
|
||||
let fhc = fee_history_cache.clone();
|
||||
let provider_clone = self.provider.clone();
|
||||
self.executor.spawn_critical(
|
||||
"cache canonical blocks for fee history task",
|
||||
Box::pin(async move {
|
||||
fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone).await;
|
||||
}),
|
||||
);
|
||||
|
||||
let pubsub = EthPubSub::with_spawner(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
self.events.clone(),
|
||||
self.network.clone(),
|
||||
executor,
|
||||
);
|
||||
let executor = Box::new(self.executor.clone());
|
||||
let blocking_task_pool = BlockingTaskPool::build().expect("failed to build tracing pool");
|
||||
let api = EthApi::with_spawner(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
self.network.clone(),
|
||||
cache.clone(),
|
||||
gas_oracle,
|
||||
self.config.eth.rpc_gas_cap,
|
||||
executor.clone(),
|
||||
blocking_task_pool.clone(),
|
||||
fee_history_cache,
|
||||
self.evm_config.clone(),
|
||||
);
|
||||
let filter = EthFilter::new(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
cache.clone(),
|
||||
self.config.eth.filter_config(),
|
||||
executor.clone(),
|
||||
);
|
||||
|
||||
let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool };
|
||||
self.eth = Some(eth);
|
||||
}
|
||||
f(self.eth.as_ref().expect("exists; qed"))
|
||||
let pubsub = EthPubSub::with_spawner(
|
||||
self.provider.clone(),
|
||||
self.pool.clone(),
|
||||
self.events.clone(),
|
||||
self.network.clone(),
|
||||
executor,
|
||||
);
|
||||
|
||||
EthHandlers { api, cache, filter, pubsub, blocking_task_pool }
|
||||
}
|
||||
|
||||
/// Returns the configured [EthHandlers] or creates it if it does not exist yet
|
||||
@ -1643,9 +1644,7 @@ impl RpcServerConfig {
|
||||
}
|
||||
Some(ws_cors)
|
||||
}
|
||||
(None, cors @ Some(_)) => cors,
|
||||
(cors @ Some(_), None) => cors,
|
||||
_ => None,
|
||||
(a, b) => a.or(b),
|
||||
}
|
||||
.cloned();
|
||||
|
||||
@ -1656,7 +1655,7 @@ impl RpcServerConfig {
|
||||
|
||||
modules.config.ensure_ws_http_identical()?;
|
||||
|
||||
let builder = self.http_server_config.take().expect("is set; qed");
|
||||
let builder = self.http_server_config.take().expect("http_server_config is Some");
|
||||
let (server, addr) = WsHttpServerKind::build(
|
||||
builder,
|
||||
http_socket_addr,
|
||||
|
||||
@ -48,25 +48,11 @@ where
|
||||
block_id: Option<BlockId>,
|
||||
) -> EthResult<U256> {
|
||||
if let Some(BlockId::Number(BlockNumberOrTag::Pending)) = block_id {
|
||||
// lookup transactions in pool
|
||||
let address_txs = self.pool().get_transactions_by_sender(address);
|
||||
|
||||
if !address_txs.is_empty() {
|
||||
// get max transaction with the highest nonce
|
||||
let highest_nonce_tx = address_txs
|
||||
.into_iter()
|
||||
.reduce(|accum, item| {
|
||||
if item.transaction.nonce() > accum.transaction.nonce() {
|
||||
item
|
||||
} else {
|
||||
accum
|
||||
}
|
||||
})
|
||||
.expect("Not empty; qed");
|
||||
|
||||
let tx_count = highest_nonce_tx
|
||||
.transaction
|
||||
.nonce()
|
||||
if let Some(highest_nonce) =
|
||||
address_txs.iter().map(|item| item.transaction.nonce()).max()
|
||||
{
|
||||
let tx_count = highest_nonce
|
||||
.checked_add(1)
|
||||
.ok_or(RpcInvalidTransactionError::NonceMaxValue)?;
|
||||
return Ok(U256::from(tx_count))
|
||||
|
||||
@ -191,7 +191,7 @@ where
|
||||
results.sort_unstable();
|
||||
price = *results
|
||||
.get((results.len() - 1) * self.oracle_config.percentile as usize / 100)
|
||||
.expect("gas price index is a percent of nonzero array length, so a value always exists; qed");
|
||||
.expect("gas price index is a percent of nonzero array length, so a value always exists");
|
||||
}
|
||||
|
||||
// constrain to the max price
|
||||
|
||||
@ -305,7 +305,7 @@ where
|
||||
fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
|
||||
BroadcastStream::new(self.chain_events.subscribe_to_canonical_state())
|
||||
.map(move |canon_state| {
|
||||
canon_state.expect("new block subscription never ends; qed").block_receipts()
|
||||
canon_state.expect("new block subscription never ends").block_receipts()
|
||||
})
|
||||
.flat_map(futures::stream::iter)
|
||||
.flat_map(move |(block_receipts, removed)| {
|
||||
|
||||
@ -8,7 +8,7 @@ use crate::{
|
||||
transaction::{DbTx, DbTxMut},
|
||||
DatabaseError,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use parking_lot::Mutex;
|
||||
use reth_interfaces::db::{DatabaseWriteError, DatabaseWriteOperation};
|
||||
use reth_libmdbx::{ffi::DBI, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
|
||||
use reth_tracing::tracing::{debug, trace, warn};
|
||||
@ -30,36 +30,43 @@ const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
|
||||
pub struct Tx<K: TransactionKind> {
|
||||
/// Libmdbx-sys transaction.
|
||||
pub inner: Transaction<K>,
|
||||
/// Database table handle cache.
|
||||
pub(crate) db_handles: Arc<RwLock<[Option<DBI>; Tables::COUNT]>>,
|
||||
|
||||
/// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't
|
||||
/// closed by [Tx::commit] or [Tx::abort], but we still need to report it in the metrics.
|
||||
///
|
||||
/// If [Some], then metrics are reported.
|
||||
metrics_handler: Option<MetricsHandler<K>>,
|
||||
|
||||
/// Database table handle cache.
|
||||
db_handles: Mutex<[Option<DBI>; Tables::COUNT]>,
|
||||
}
|
||||
|
||||
impl<K: TransactionKind> Tx<K> {
|
||||
/// Creates new `Tx` object with a `RO` or `RW` transaction.
|
||||
#[inline]
|
||||
pub fn new(inner: Transaction<K>) -> Self {
|
||||
Self { inner, db_handles: Default::default(), metrics_handler: None }
|
||||
Self::new_inner(inner, None)
|
||||
}
|
||||
|
||||
/// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics.
|
||||
#[inline]
|
||||
#[track_caller]
|
||||
pub fn new_with_metrics(
|
||||
inner: Transaction<K>,
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
) -> Self {
|
||||
let metrics_handler = if let Some(metrics) = metrics {
|
||||
let handler = MetricsHandler::<K>::new(inner.id(), metrics);
|
||||
let metrics_handler = env_metrics.map(|env_metrics| {
|
||||
let handler = MetricsHandler::<K>::new(inner.id(), env_metrics);
|
||||
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
|
||||
handler.log_transaction_opened();
|
||||
Some(handler)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Self { inner, db_handles: Default::default(), metrics_handler }
|
||||
handler
|
||||
});
|
||||
Self::new_inner(inner, metrics_handler)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn new_inner(inner: Transaction<K>, metrics_handler: Option<MetricsHandler<K>>) -> Self {
|
||||
Self { inner, db_handles: Mutex::new([None; Tables::COUNT]), metrics_handler }
|
||||
}
|
||||
|
||||
/// Gets this transaction ID.
|
||||
@ -69,18 +76,14 @@ impl<K: TransactionKind> Tx<K> {
|
||||
|
||||
/// Gets a table database handle if it exists, otherwise creates it.
|
||||
pub fn get_dbi<T: Table>(&self) -> Result<DBI, DatabaseError> {
|
||||
let mut handles = self.db_handles.write();
|
||||
|
||||
let table = T::TABLE;
|
||||
|
||||
let dbi_handle = handles.get_mut(table as usize).expect("should exist");
|
||||
if dbi_handle.is_none() {
|
||||
*dbi_handle = Some(
|
||||
self.inner.open_db(Some(T::NAME)).map_err(|e| DatabaseError::Open(e.into()))?.dbi(),
|
||||
);
|
||||
match self.db_handles.lock()[T::TABLE as usize] {
|
||||
Some(handle) => Ok(handle),
|
||||
ref mut handle @ None => {
|
||||
let db =
|
||||
self.inner.open_db(Some(T::NAME)).map_err(|e| DatabaseError::Open(e.into()))?;
|
||||
Ok(*handle.insert(db.dbi()))
|
||||
}
|
||||
}
|
||||
|
||||
Ok(dbi_handle.expect("is some; qed"))
|
||||
}
|
||||
|
||||
/// Create db Cursor
|
||||
|
||||
@ -239,15 +239,14 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
Ok(obj)
|
||||
}
|
||||
|
||||
/// Loads filters into memory
|
||||
pub fn load_filters(mut self) -> Result<Self, NippyJarError> {
|
||||
/// Loads filters into memory.
|
||||
pub fn load_filters(&mut self) -> Result<(), NippyJarError> {
|
||||
// Read the offsets lists located at the index file.
|
||||
let mut offsets_file = File::open(self.index_path())?;
|
||||
|
||||
self.offsets_index = PrefixSummedEliasFano::deserialize_from(&mut offsets_file)?;
|
||||
self.phf = bincode::deserialize_from(&mut offsets_file)?;
|
||||
self.filter = bincode::deserialize_from(&mut offsets_file)?;
|
||||
Ok(self)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the path for the data file
|
||||
@ -588,8 +587,8 @@ mod tests {
|
||||
nippy
|
||||
.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
|
||||
.unwrap();
|
||||
let loaded_nippy =
|
||||
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
assert_eq!(indexes, collect_indexes(&loaded_nippy));
|
||||
};
|
||||
|
||||
@ -633,8 +632,8 @@ mod tests {
|
||||
assert!(InclusionFilter::add(&mut nippy, &col1[3]).is_ok());
|
||||
|
||||
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
|
||||
let loaded_nippy =
|
||||
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
|
||||
assert_eq!(nippy, loaded_nippy);
|
||||
|
||||
@ -687,8 +686,8 @@ mod tests {
|
||||
|
||||
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
|
||||
|
||||
let loaded_nippy =
|
||||
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
assert_eq!(nippy.version, loaded_nippy.version);
|
||||
assert_eq!(nippy.columns, loaded_nippy.columns);
|
||||
assert_eq!(nippy.filter, loaded_nippy.filter);
|
||||
@ -730,8 +729,8 @@ mod tests {
|
||||
|
||||
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
|
||||
|
||||
let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
let loaded_nippy = loaded_nippy.load_filters().unwrap();
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
assert_eq!(nippy, loaded_nippy);
|
||||
|
||||
if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
|
||||
@ -767,8 +766,8 @@ mod tests {
|
||||
|
||||
nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows).unwrap();
|
||||
|
||||
let loaded_nippy =
|
||||
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
assert_eq!(nippy, loaded_nippy);
|
||||
|
||||
if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
|
||||
@ -823,8 +822,8 @@ mod tests {
|
||||
|
||||
// Read file
|
||||
{
|
||||
let loaded_nippy =
|
||||
NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap().load_filters().unwrap();
|
||||
let mut loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
|
||||
assert!(loaded_nippy.compressor().is_some());
|
||||
assert!(loaded_nippy.filter.is_some());
|
||||
@ -894,8 +893,8 @@ mod tests {
|
||||
|
||||
// Read file
|
||||
{
|
||||
let loaded_nippy =
|
||||
NippyJar::load_without_header(file_path.path()).unwrap().load_filters().unwrap();
|
||||
let mut loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
|
||||
loaded_nippy.load_filters().unwrap();
|
||||
|
||||
if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
|
||||
let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
|
||||
|
||||
@ -156,20 +156,19 @@ impl SnapshotProvider {
|
||||
tx_range: &RangeInclusive<u64>,
|
||||
) -> ProviderResult<SnapshotJarProvider<'_>> {
|
||||
let key = (*block_range.end(), segment);
|
||||
if let Some(jar) = self.map.get(&key) {
|
||||
Ok(jar.into())
|
||||
} else {
|
||||
let jar = NippyJar::load(&self.path.join(segment.filename(block_range, tx_range)))
|
||||
.map(|jar| {
|
||||
let entry = match self.map.entry(key) {
|
||||
dashmap::mapref::entry::Entry::Occupied(entry) => entry.into_ref(),
|
||||
dashmap::mapref::entry::Entry::Vacant(entry) => {
|
||||
let path = self.path.join(segment.filename(block_range, tx_range));
|
||||
let mut jar = NippyJar::load(&path)?;
|
||||
if self.load_filters {
|
||||
return jar.load_filters()
|
||||
jar.load_filters()?;
|
||||
}
|
||||
Ok(jar)
|
||||
})??;
|
||||
|
||||
self.map.insert(key, LoadedJar::new(jar)?);
|
||||
Ok(self.map.get(&key).expect("qed").into())
|
||||
}
|
||||
let loaded_jar = LoadedJar::new(jar)?;
|
||||
entry.insert(loaded_jar)
|
||||
}
|
||||
};
|
||||
Ok(entry.downgrade().into())
|
||||
}
|
||||
|
||||
/// Gets a snapshot segment's block range and transaction range from the provider inner block
|
||||
|
||||
@ -350,7 +350,8 @@ where
|
||||
transaction: Self::Transaction,
|
||||
) -> PoolResult<TxHash> {
|
||||
let (_, tx) = self.validate(origin, transaction).await;
|
||||
self.pool.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed")
|
||||
let mut results = self.pool.add_transactions(origin, std::iter::once(tx));
|
||||
results.pop().expect("result length is the same as the input")
|
||||
}
|
||||
|
||||
async fn add_transactions(
|
||||
|
||||
@ -476,7 +476,8 @@ where
|
||||
let mut listener = self.event_listener.write();
|
||||
listener.subscribe(tx.tx_hash())
|
||||
};
|
||||
self.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed")?;
|
||||
let mut results = self.add_transactions(origin, std::iter::once(tx));
|
||||
results.pop().expect("result length is the same as the input")?;
|
||||
Ok(listener)
|
||||
}
|
||||
|
||||
|
||||
@ -1583,7 +1583,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
|
||||
} else {
|
||||
// The transaction was added above so the _inclusive_ descendants iterator
|
||||
// returns at least 1 tx.
|
||||
let (id, tx) = descendants.peek().expect("Includes >= 1; qed.");
|
||||
let (id, tx) = descendants.peek().expect("includes >= 1");
|
||||
if id.nonce < inserted_tx_id.nonce {
|
||||
!tx.state.is_pending()
|
||||
} else {
|
||||
|
||||
@ -440,7 +440,7 @@ fn poll_action(&mut self) -> PollAction {
|
||||
return PollAction::NoPeersAvailable
|
||||
};
|
||||
|
||||
let request = self.queued_requests.pop_front().expect("not empty; qed");
|
||||
let request = self.queued_requests.pop_front().expect("not empty");
|
||||
let request = self.prepare_block_request(peer_id, request);
|
||||
|
||||
PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
|
||||
|
||||
Reference in New Issue
Block a user