mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
refactor(node-builder): use datadir provided by the config (#8592)
This commit is contained in:
@ -183,7 +183,7 @@ impl<Ext: clap::Args + fmt::Debug> NodeCommand<Ext> {
|
||||
|
||||
let builder = NodeBuilder::new(node_config)
|
||||
.with_database(database)
|
||||
.with_launch_context(ctx.task_executor, data_dir);
|
||||
.with_launch_context(ctx.task_executor);
|
||||
|
||||
launcher(builder, ext).await
|
||||
}
|
||||
|
||||
@ -108,7 +108,7 @@ where
|
||||
type Pool = EthTransactionPool<Node::Provider, DiskFileBlobStore>;
|
||||
|
||||
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
|
||||
let data_dir = ctx.data_dir();
|
||||
let data_dir = ctx.config().datadir();
|
||||
let pool_config = ctx.pool_config();
|
||||
let blob_store = DiskFileBlobStore::open(data_dir.blobstore(), Default::default())?;
|
||||
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.chain_spec())
|
||||
|
||||
@ -1,8 +1,5 @@
|
||||
use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeTypes};
|
||||
use reth_node_core::{
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
node_config::NodeConfig,
|
||||
};
|
||||
use reth_node_core::node_config::NodeConfig;
|
||||
use reth_primitives::Head;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use tokio::sync::mpsc::{Receiver, UnboundedSender};
|
||||
@ -14,8 +11,6 @@ use crate::{ExExEvent, ExExNotification};
|
||||
pub struct ExExContext<Node: FullNodeComponents> {
|
||||
/// The current head of the blockchain at launch.
|
||||
pub head: Head,
|
||||
/// The data dir of the node.
|
||||
pub data_dir: ChainPath<DataDirPath>,
|
||||
/// The config of the node
|
||||
pub config: NodeConfig,
|
||||
/// The loaded node config
|
||||
|
||||
@ -309,10 +309,10 @@ impl NodeConfig {
|
||||
client: C,
|
||||
executor: TaskExecutor,
|
||||
head: Head,
|
||||
data_dir: &ChainPath<DataDirPath>,
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
) -> eyre::Result<NetworkConfig<C>> {
|
||||
info!(target: "reth::cli", "Connecting to P2P network");
|
||||
let secret_key = self.network_secret(data_dir)?;
|
||||
let secret_key = self.network_secret(&data_dir)?;
|
||||
let default_peers_path = data_dir.known_peers();
|
||||
Ok(self.load_network_config(config, client, executor, head, secret_key, default_peers_path))
|
||||
}
|
||||
@ -326,7 +326,7 @@ impl NodeConfig {
|
||||
client: C,
|
||||
executor: TaskExecutor,
|
||||
head: Head,
|
||||
data_dir: &ChainPath<DataDirPath>,
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
) -> eyre::Result<NetworkBuilder<C, (), ()>>
|
||||
where
|
||||
C: BlockNumReader,
|
||||
|
||||
@ -20,7 +20,7 @@ use reth_network::{NetworkBuilder, NetworkConfig, NetworkHandle};
|
||||
use reth_node_api::{FullNodeTypes, FullNodeTypesAdapter, NodeTypes};
|
||||
use reth_node_core::{
|
||||
cli::config::{PayloadBuilderConfig, RethTransactionPoolConfig},
|
||||
dirs::{ChainPath, DataDirPath, MaybePlatformPath},
|
||||
dirs::{DataDirPath, MaybePlatformPath},
|
||||
node_config::NodeConfig,
|
||||
primitives::{kzg::KzgSettings, Head},
|
||||
utils::write_peers_to_file,
|
||||
@ -161,26 +161,24 @@ impl<DB> NodeBuilder<DB> {
|
||||
/// Preconfigure the builder with the context to launch the node.
|
||||
///
|
||||
/// This provides the task executor and the data directory for the node.
|
||||
pub const fn with_launch_context(
|
||||
self,
|
||||
task_executor: TaskExecutor,
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
) -> WithLaunchContext<Self> {
|
||||
WithLaunchContext { builder: self, task_executor, data_dir }
|
||||
pub const fn with_launch_context(self, task_executor: TaskExecutor) -> WithLaunchContext<Self> {
|
||||
WithLaunchContext { builder: self, task_executor }
|
||||
}
|
||||
|
||||
/// Creates an _ephemeral_ preconfigured node for testing purposes.
|
||||
pub fn testing_node(
|
||||
self,
|
||||
mut self,
|
||||
task_executor: TaskExecutor,
|
||||
) -> WithLaunchContext<NodeBuilder<Arc<TempDatabase<DatabaseEnv>>>> {
|
||||
let path = MaybePlatformPath::<DataDirPath>::from(tempdir_path());
|
||||
self.config.datadir.datadir = path.clone();
|
||||
|
||||
let data_dir =
|
||||
path.unwrap_or_chain_default(self.config.chain.chain, self.config.datadir.clone());
|
||||
|
||||
let db = create_test_rw_db_with_path(data_dir.db());
|
||||
|
||||
WithLaunchContext { builder: self.with_database(db), task_executor, data_dir }
|
||||
WithLaunchContext { builder: self.with_database(db), task_executor }
|
||||
}
|
||||
}
|
||||
|
||||
@ -217,7 +215,6 @@ where
|
||||
pub struct WithLaunchContext<Builder> {
|
||||
builder: Builder,
|
||||
task_executor: TaskExecutor,
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
}
|
||||
|
||||
impl<Builder> WithLaunchContext<Builder> {
|
||||
@ -225,11 +222,6 @@ impl<Builder> WithLaunchContext<Builder> {
|
||||
pub const fn task_executor(&self) -> &TaskExecutor {
|
||||
&self.task_executor
|
||||
}
|
||||
|
||||
/// Returns a reference to the data directory.
|
||||
pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
|
||||
&self.data_dir
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> WithLaunchContext<NodeBuilder<DB>>
|
||||
@ -246,11 +238,7 @@ where
|
||||
where
|
||||
T: NodeTypes,
|
||||
{
|
||||
WithLaunchContext {
|
||||
builder: self.builder.with_types(),
|
||||
task_executor: self.task_executor,
|
||||
data_dir: self.data_dir,
|
||||
}
|
||||
WithLaunchContext { builder: self.builder.with_types(), task_executor: self.task_executor }
|
||||
}
|
||||
|
||||
/// Preconfigures the node with a specific node implementation.
|
||||
@ -305,7 +293,6 @@ where
|
||||
WithLaunchContext {
|
||||
builder: self.builder.with_components(components_builder),
|
||||
task_executor: self.task_executor,
|
||||
data_dir: self.data_dir,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -326,7 +313,6 @@ where
|
||||
Self {
|
||||
builder: self.builder.on_component_initialized(hook),
|
||||
task_executor: self.task_executor,
|
||||
data_dir: self.data_dir,
|
||||
}
|
||||
}
|
||||
|
||||
@ -339,11 +325,7 @@ where
|
||||
+ Send
|
||||
+ 'static,
|
||||
{
|
||||
Self {
|
||||
builder: self.builder.on_node_started(hook),
|
||||
task_executor: self.task_executor,
|
||||
data_dir: self.data_dir,
|
||||
}
|
||||
Self { builder: self.builder.on_node_started(hook), task_executor: self.task_executor }
|
||||
}
|
||||
|
||||
/// Sets the hook that is run once the rpc server is started.
|
||||
@ -356,11 +338,7 @@ where
|
||||
+ Send
|
||||
+ 'static,
|
||||
{
|
||||
Self {
|
||||
builder: self.builder.on_rpc_started(hook),
|
||||
task_executor: self.task_executor,
|
||||
data_dir: self.data_dir,
|
||||
}
|
||||
Self { builder: self.builder.on_rpc_started(hook), task_executor: self.task_executor }
|
||||
}
|
||||
|
||||
/// Sets the hook that is run to configure the rpc modules.
|
||||
@ -372,11 +350,7 @@ where
|
||||
+ Send
|
||||
+ 'static,
|
||||
{
|
||||
Self {
|
||||
builder: self.builder.extend_rpc_modules(hook),
|
||||
task_executor: self.task_executor,
|
||||
data_dir: self.data_dir,
|
||||
}
|
||||
Self { builder: self.builder.extend_rpc_modules(hook), task_executor: self.task_executor }
|
||||
}
|
||||
|
||||
/// Installs an `ExEx` (Execution Extension) in the node.
|
||||
@ -395,7 +369,6 @@ where
|
||||
Self {
|
||||
builder: self.builder.install_exex(exex_id, exex),
|
||||
task_executor: self.task_executor,
|
||||
data_dir: self.data_dir,
|
||||
}
|
||||
}
|
||||
|
||||
@ -403,9 +376,9 @@ where
|
||||
pub async fn launch(
|
||||
self,
|
||||
) -> eyre::Result<NodeHandle<NodeAdapter<RethFullAdapter<DB, T>, CB::Components>>> {
|
||||
let Self { builder, task_executor, data_dir } = self;
|
||||
let Self { builder, task_executor } = self;
|
||||
|
||||
let launcher = DefaultNodeLauncher::new(task_executor, data_dir);
|
||||
let launcher = DefaultNodeLauncher::new(task_executor, builder.config.datadir());
|
||||
builder.launch_with(launcher).await
|
||||
}
|
||||
|
||||
@ -425,8 +398,6 @@ pub struct BuilderContext<Node: FullNodeTypes> {
|
||||
pub(crate) provider: Node::Provider,
|
||||
/// The executor of the node.
|
||||
pub(crate) executor: TaskExecutor,
|
||||
/// The data dir of the node.
|
||||
pub(crate) data_dir: ChainPath<DataDirPath>,
|
||||
/// The config of the node
|
||||
pub(crate) config: NodeConfig,
|
||||
/// loaded config
|
||||
@ -439,11 +410,10 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
head: Head,
|
||||
provider: Node::Provider,
|
||||
executor: TaskExecutor,
|
||||
data_dir: ChainPath<DataDirPath>,
|
||||
config: NodeConfig,
|
||||
reth_config: reth_config::Config,
|
||||
) -> Self {
|
||||
Self { head, provider, executor, data_dir, config, reth_config }
|
||||
Self { head, provider, executor, config, reth_config }
|
||||
}
|
||||
|
||||
/// Returns the configured provider to interact with the blockchain.
|
||||
@ -461,13 +431,6 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
&self.config
|
||||
}
|
||||
|
||||
/// Returns the data dir of the node.
|
||||
///
|
||||
/// This gives access to all relevant files and directories of the node's datadir.
|
||||
pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
|
||||
&self.data_dir
|
||||
}
|
||||
|
||||
/// Returns the executor of the node.
|
||||
///
|
||||
/// This can be used to execute async tasks or functions during the setup.
|
||||
@ -502,7 +465,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
self.provider.clone(),
|
||||
self.executor.clone(),
|
||||
self.head,
|
||||
self.data_dir(),
|
||||
self.config.datadir(),
|
||||
)
|
||||
}
|
||||
|
||||
@ -514,7 +477,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
self.provider.clone(),
|
||||
self.executor.clone(),
|
||||
self.head,
|
||||
self.data_dir(),
|
||||
self.config.datadir(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
@ -539,7 +502,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
self.executor.spawn_critical("p2p txpool", txpool);
|
||||
self.executor.spawn_critical("p2p eth request handler", eth);
|
||||
|
||||
let default_peers_path = self.data_dir().known_peers();
|
||||
let default_peers_path = self.config.datadir().known_peers();
|
||||
let known_peers_file = self.config.network.persistent_peers_file(default_peers_path);
|
||||
self.executor.spawn_critical_with_graceful_shutdown_signal(
|
||||
"p2p network task",
|
||||
@ -560,7 +523,6 @@ impl<Node: FullNodeTypes> std::fmt::Debug for BuilderContext<Node> {
|
||||
.field("head", &self.head)
|
||||
.field("provider", &std::any::type_name::<Node::Provider>())
|
||||
.field("executor", &self.executor)
|
||||
.field("data_dir", &self.data_dir)
|
||||
.field("config", &self.config)
|
||||
.finish()
|
||||
}
|
||||
|
||||
@ -146,7 +146,6 @@ where
|
||||
head,
|
||||
blockchain_db.clone(),
|
||||
ctx.task_executor().clone(),
|
||||
ctx.data_dir().clone(),
|
||||
ctx.node_config().clone(),
|
||||
ctx.toml_config().clone(),
|
||||
);
|
||||
@ -195,7 +194,6 @@ where
|
||||
// create the launch context for the exex
|
||||
let context = ExExContext {
|
||||
head,
|
||||
data_dir: ctx.data_dir().clone(),
|
||||
config: ctx.node_config().clone(),
|
||||
reth_config: ctx.toml_config().clone(),
|
||||
components: node_adapter.clone(),
|
||||
|
||||
@ -126,7 +126,7 @@ where
|
||||
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore>;
|
||||
|
||||
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
|
||||
let data_dir = ctx.data_dir();
|
||||
let data_dir = ctx.config().datadir();
|
||||
let blob_store = DiskFileBlobStore::open(data_dir.blobstore(), Default::default())?;
|
||||
|
||||
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.chain_spec())
|
||||
|
||||
@ -213,7 +213,7 @@ impl BlobTransaction {
|
||||
// decode the _first_ list header for the rest of the transaction
|
||||
let outer_header = Header::decode(data)?;
|
||||
if !outer_header.list {
|
||||
return Err(RlpError::Custom("PooledTransactions blob tx must be encoded as a list"));
|
||||
return Err(RlpError::Custom("PooledTransactions blob tx must be encoded as a list"))
|
||||
}
|
||||
|
||||
let outer_remaining_len = data.len();
|
||||
@ -225,7 +225,7 @@ impl BlobTransaction {
|
||||
if !inner_header.list {
|
||||
return Err(RlpError::Custom(
|
||||
"PooledTransactions inner blob tx must be encoded as a list",
|
||||
));
|
||||
))
|
||||
}
|
||||
|
||||
let inner_remaining_len = data.len();
|
||||
@ -239,7 +239,7 @@ impl BlobTransaction {
|
||||
// the inner header only decodes the transaction and signature, so we check the length here
|
||||
let inner_consumed = inner_remaining_len - data.len();
|
||||
if inner_consumed != inner_header.payload_length {
|
||||
return Err(RlpError::UnexpectedLength);
|
||||
return Err(RlpError::UnexpectedLength)
|
||||
}
|
||||
|
||||
// All that's left are the blobs, commitments, and proofs
|
||||
|
||||
@ -266,7 +266,7 @@ where
|
||||
Err(EthApiError::InvalidTransaction(RpcInvalidTransactionError::GasTooHigh))
|
||||
if tx_request_gas_limit.is_some() || tx_request_gas_price.is_some() =>
|
||||
{
|
||||
return Err(self.map_out_of_gas_err(block_env_gas_limit, env, &mut db));
|
||||
return Err(self.map_out_of_gas_err(block_env_gas_limit, env, &mut db))
|
||||
}
|
||||
// Propagate other results (successful or other errors).
|
||||
ethres => ethres?,
|
||||
|
||||
@ -25,7 +25,7 @@ pub(crate) fn generate_flag_struct(
|
||||
.iter()
|
||||
.filter_map(|f| {
|
||||
if let FieldTypes::StructField(f) = f {
|
||||
return Some(f);
|
||||
return Some(f)
|
||||
}
|
||||
None
|
||||
})
|
||||
@ -36,7 +36,7 @@ pub(crate) fn generate_flag_struct(
|
||||
};
|
||||
|
||||
if total_bits == 0 {
|
||||
return placeholder_flag_struct(ident, &flags_ident);
|
||||
return placeholder_flag_struct(ident, &flags_ident)
|
||||
}
|
||||
|
||||
let (total_bytes, unused_bits) = pad_flag_struct(total_bits, &mut field_flags);
|
||||
|
||||
@ -89,7 +89,7 @@ fn generate_from_compact(fields: &FieldList, ident: &Ident, is_zstd: bool) -> To
|
||||
let ident = format_ident!("{name}");
|
||||
return Some(quote! {
|
||||
#ident: #ident,
|
||||
});
|
||||
})
|
||||
}
|
||||
None
|
||||
});
|
||||
|
||||
@ -156,7 +156,7 @@ fn should_use_alt_impl(ftype: &String, segment: &syn::PathSegment) -> bool {
|
||||
]
|
||||
.contains(&path.ident.to_string().as_str())
|
||||
{
|
||||
return true;
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,7 +66,7 @@ impl<'a> StructHandler<'a> {
|
||||
})
|
||||
}
|
||||
|
||||
return;
|
||||
return
|
||||
}
|
||||
|
||||
let name = format_ident!("{name}");
|
||||
|
||||
@ -81,7 +81,7 @@ impl TableObject for ObjectLength {
|
||||
impl<const LEN: usize> TableObject for [u8; LEN] {
|
||||
fn decode(data_val: &[u8]) -> Result<Self, Error> {
|
||||
if data_val.len() != LEN {
|
||||
return Err(Error::DecodeErrorLenDiff);
|
||||
return Err(Error::DecodeErrorLenDiff)
|
||||
}
|
||||
let mut a = [0; LEN];
|
||||
a[..].copy_from_slice(data_val);
|
||||
|
||||
@ -373,7 +373,7 @@ where
|
||||
{
|
||||
let res: Result<Option<((), ())>> = self.set_range(key);
|
||||
if let Err(error) = res {
|
||||
return Iter::Err(Some(error));
|
||||
return Iter::Err(Some(error))
|
||||
};
|
||||
Iter::new(self, ffi::MDBX_GET_CURRENT, ffi::MDBX_NEXT)
|
||||
}
|
||||
@ -408,7 +408,7 @@ where
|
||||
{
|
||||
let res: Result<Option<((), ())>> = self.set_range(key);
|
||||
if let Err(error) = res {
|
||||
return IterDup::Err(Some(error));
|
||||
return IterDup::Err(Some(error))
|
||||
};
|
||||
IterDup::new(self, ffi::MDBX_GET_CURRENT)
|
||||
}
|
||||
@ -424,7 +424,7 @@ where
|
||||
Ok(Some(_)) => (),
|
||||
Ok(None) => {
|
||||
let _: Result<Option<((), ())>> = self.last();
|
||||
return Iter::new(self, ffi::MDBX_NEXT, ffi::MDBX_NEXT);
|
||||
return Iter::new(self, ffi::MDBX_NEXT, ffi::MDBX_NEXT)
|
||||
}
|
||||
Err(error) => return Iter::Err(Some(error)),
|
||||
};
|
||||
|
||||
@ -118,10 +118,10 @@ impl Environment {
|
||||
warn!(target: "libmdbx", "Process stalled, awaiting read-write transaction lock.");
|
||||
}
|
||||
sleep(Duration::from_millis(250));
|
||||
continue;
|
||||
continue
|
||||
}
|
||||
|
||||
break res;
|
||||
break res
|
||||
}?;
|
||||
Ok(Transaction::new_from_ptr(self.clone(), txn.0))
|
||||
}
|
||||
@ -216,7 +216,7 @@ impl Environment {
|
||||
for result in cursor.iter_slices() {
|
||||
let (_key, value) = result?;
|
||||
if value.len() < size_of::<usize>() {
|
||||
return Err(Error::Corrupted);
|
||||
return Err(Error::Corrupted)
|
||||
}
|
||||
|
||||
let s = &value[..size_of::<usize>()];
|
||||
@ -708,7 +708,7 @@ impl EnvironmentBuilder {
|
||||
})() {
|
||||
ffi::mdbx_env_close_ex(env, false);
|
||||
|
||||
return Err(e);
|
||||
return Err(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -504,7 +504,7 @@ impl Transaction<RW> {
|
||||
/// Begins a new nested transaction inside of this transaction.
|
||||
pub fn begin_nested_txn(&mut self) -> Result<Self> {
|
||||
if self.inner.env.is_write_map() {
|
||||
return Err(Error::NestedTransactionsUnsupportedWithWriteMap);
|
||||
return Err(Error::NestedTransactionsUnsupportedWithWriteMap)
|
||||
}
|
||||
self.txn_execute(|txn| {
|
||||
let (tx, rx) = sync_channel(0);
|
||||
@ -576,7 +576,7 @@ impl TransactionPtr {
|
||||
// because we're taking a lock for any actions on the transaction pointer, including a call
|
||||
// to the `mdbx_txn_reset`.
|
||||
if self.is_timed_out() {
|
||||
return Err(Error::ReadTransactionTimeout);
|
||||
return Err(Error::ReadTransactionTimeout)
|
||||
}
|
||||
|
||||
Ok((f)(self.txn))
|
||||
|
||||
@ -43,7 +43,7 @@ impl Compression for Lz4 {
|
||||
Err(err) => {
|
||||
multiplier *= 2;
|
||||
if multiplier == 16 {
|
||||
return Err(NippyJarError::Custom(err.to_string()));
|
||||
return Err(NippyJarError::Custom(err.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -60,7 +60,7 @@ impl Zstd {
|
||||
pub fn decompressors(&self) -> Result<Vec<Decompressor<'_>>, NippyJarError> {
|
||||
if let Some(dictionaries) = &self.dictionaries {
|
||||
debug_assert!(dictionaries.len() == self.columns);
|
||||
return dictionaries.decompressors();
|
||||
return dictionaries.decompressors()
|
||||
}
|
||||
|
||||
Ok(vec![])
|
||||
@ -72,12 +72,12 @@ impl Zstd {
|
||||
ZstdState::PendingDictionary => Err(NippyJarError::CompressorNotReady),
|
||||
ZstdState::Ready => {
|
||||
if !self.use_dict {
|
||||
return Ok(None);
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
if let Some(dictionaries) = &self.dictionaries {
|
||||
debug!(target: "nippy-jar", count=?dictionaries.len(), "Generating ZSTD compressor dictionaries.");
|
||||
return Ok(Some(dictionaries.compressors()?));
|
||||
return Ok(Some(dictionaries.compressors()?))
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
@ -102,7 +102,7 @@ impl Zstd {
|
||||
buffer.reserve(column_value.len() * multiplier);
|
||||
multiplier += 1;
|
||||
if multiplier == 5 {
|
||||
return Err(NippyJarError::Disconnect(err));
|
||||
return Err(NippyJarError::Disconnect(err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -191,7 +191,7 @@ impl Compression for Zstd {
|
||||
columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
|
||||
) -> Result<(), NippyJarError> {
|
||||
if !self.use_dict {
|
||||
return Ok(());
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// There's a per 2GB hard limit on each column data set for training
|
||||
@ -205,7 +205,7 @@ impl Compression for Zstd {
|
||||
// ```
|
||||
|
||||
if columns.len() != self.columns {
|
||||
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()));
|
||||
return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
|
||||
}
|
||||
|
||||
// TODO: parallel calculation
|
||||
@ -363,7 +363,7 @@ impl<'a> Serialize for ZstdDictionary<'a> {
|
||||
impl<'a> PartialEq for ZstdDictionary<'a> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
if let (Self::Raw(a), Self::Raw(b)) = (self, &other) {
|
||||
return a == b;
|
||||
return a == b
|
||||
}
|
||||
unimplemented!("`DecoderDictionary` can't be compared. So comparison should be done after decompressing a value.");
|
||||
}
|
||||
|
||||
@ -86,11 +86,11 @@ impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
|
||||
.offsets_index
|
||||
.access(row_index as usize)
|
||||
.expect("built from same set") as u64;
|
||||
return self.next_row();
|
||||
return self.next_row()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(NippyJarError::UnsupportedFilterQuery);
|
||||
return Err(NippyJarError::UnsupportedFilterQuery)
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
@ -108,7 +108,7 @@ impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
|
||||
|
||||
if self.row as usize >= self.jar.rows {
|
||||
// Has reached the end
|
||||
return Ok(None);
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
let mut row = Vec::with_capacity(self.jar.columns);
|
||||
@ -154,11 +154,11 @@ impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
|
||||
.offsets_index
|
||||
.access(row_index as usize)
|
||||
.expect("built from same set") as u64;
|
||||
return self.next_row_with_cols(mask);
|
||||
return self.next_row_with_cols(mask)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(NippyJarError::UnsupportedFilterQuery);
|
||||
return Err(NippyJarError::UnsupportedFilterQuery)
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
@ -182,7 +182,7 @@ impl<'a, H: NippyJarHeader> NippyJarCursor<'a, H> {
|
||||
|
||||
if self.row as usize >= self.jar.rows {
|
||||
// Has reached the end
|
||||
return Ok(None);
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
let columns = self.jar.columns;
|
||||
|
||||
@ -28,7 +28,7 @@ impl Cuckoo {
|
||||
impl InclusionFilter for Cuckoo {
|
||||
fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> {
|
||||
if self.remaining == 0 {
|
||||
return Err(NippyJarError::FilterMaxCapacity);
|
||||
return Err(NippyJarError::FilterMaxCapacity)
|
||||
}
|
||||
|
||||
self.remaining -= 1;
|
||||
|
||||
@ -28,7 +28,7 @@ impl PerfectHashingFunction for Fmph {
|
||||
|
||||
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
|
||||
if let Some(f) = &self.function {
|
||||
return Ok(f.get(key));
|
||||
return Ok(f.get(key))
|
||||
}
|
||||
Err(NippyJarError::PHFMissingKeys)
|
||||
}
|
||||
@ -92,7 +92,7 @@ impl<'de> Deserialize<'de> for Fmph {
|
||||
function: Some(
|
||||
Function::read(&mut std::io::Cursor::new(buffer)).map_err(D::Error::custom)?,
|
||||
),
|
||||
});
|
||||
})
|
||||
}
|
||||
Ok(Self { function: None })
|
||||
}
|
||||
|
||||
@ -28,7 +28,7 @@ impl PerfectHashingFunction for GoFmph {
|
||||
|
||||
fn get_index(&self, key: &[u8]) -> Result<Option<u64>, NippyJarError> {
|
||||
if let Some(f) = &self.function {
|
||||
return Ok(f.get(key));
|
||||
return Ok(f.get(key))
|
||||
}
|
||||
Err(NippyJarError::PHFMissingKeys)
|
||||
}
|
||||
@ -93,7 +93,7 @@ impl<'de> Deserialize<'de> for GoFmph {
|
||||
GOFunction::read(&mut std::io::Cursor::new(buffer))
|
||||
.map_err(D::Error::custom)?,
|
||||
),
|
||||
});
|
||||
})
|
||||
}
|
||||
Ok(Self { function: None })
|
||||
}
|
||||
|
||||
@ -142,7 +142,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
|
||||
// When an offset size is smaller than the initial (8), we are dealing with immutable
|
||||
// data.
|
||||
if reader.offset_size() != OFFSET_SIZE_BYTES {
|
||||
return Err(NippyJarError::FrozenJar);
|
||||
return Err(NippyJarError::FrozenJar)
|
||||
}
|
||||
|
||||
let expected_offsets_file_size: u64 = (1 + // first byte is the size of one offset
|
||||
@ -153,7 +153,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
|
||||
if check_mode.should_err() &&
|
||||
expected_offsets_file_size.cmp(&actual_offsets_file_size) != Ordering::Equal
|
||||
{
|
||||
return Err(NippyJarError::InconsistentState);
|
||||
return Err(NippyJarError::InconsistentState)
|
||||
}
|
||||
|
||||
// Offsets configuration wasn't properly committed
|
||||
@ -184,7 +184,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
|
||||
let data_file_len = self.data_file.get_ref().metadata()?.len();
|
||||
|
||||
if check_mode.should_err() && last_offset.cmp(&data_file_len) != Ordering::Equal {
|
||||
return Err(NippyJarError::InconsistentState);
|
||||
return Err(NippyJarError::InconsistentState)
|
||||
}
|
||||
|
||||
// Offset list wasn't properly committed
|
||||
@ -214,7 +214,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
|
||||
// Since we decrease the offset list, we need to check the consistency of
|
||||
// `self.jar.rows` again
|
||||
self.ensure_file_consistency(ConsistencyFailStrategy::Heal)?;
|
||||
break;
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -347,7 +347,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
|
||||
return Err(NippyJarError::InvalidPruning(
|
||||
num_offsets,
|
||||
remaining_to_prune as u64,
|
||||
));
|
||||
))
|
||||
}
|
||||
|
||||
let new_num_offsets = num_offsets.saturating_sub(remaining_to_prune as u64);
|
||||
@ -373,7 +373,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
|
||||
self.data_file.get_mut().set_len(last_offset)?;
|
||||
}
|
||||
} else {
|
||||
return Err(NippyJarError::InvalidPruning(0, remaining_to_prune as u64));
|
||||
return Err(NippyJarError::InvalidPruning(0, remaining_to_prune as u64))
|
||||
}
|
||||
}
|
||||
|
||||
@ -463,7 +463,7 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
|
||||
for offset in self.offsets.drain(..) {
|
||||
if let Some(last_offset_ondisk) = last_offset_ondisk.take() {
|
||||
if last_offset_ondisk == offset {
|
||||
continue;
|
||||
continue
|
||||
}
|
||||
}
|
||||
self.offsets_file.write_all(&offset.to_le_bytes())?;
|
||||
|
||||
@ -702,7 +702,7 @@ impl StaticFileProvider {
|
||||
?segment,
|
||||
"Setting unwind target."
|
||||
);
|
||||
return Ok(Some(highest_static_file_block));
|
||||
return Ok(Some(highest_static_file_block))
|
||||
}
|
||||
|
||||
// If the checkpoint is behind, then we failed to do a database commit **but committed** to
|
||||
|
||||
@ -49,7 +49,7 @@ where
|
||||
type Pool = EthTransactionPool<Node::Provider, InMemoryBlobStore>;
|
||||
|
||||
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
|
||||
let data_dir = ctx.data_dir();
|
||||
let data_dir = ctx.config().datadir();
|
||||
let blob_store = InMemoryBlobStore::default();
|
||||
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.chain_spec())
|
||||
.with_head_timestamp(ctx.head().timestamp)
|
||||
|
||||
Reference in New Issue
Block a user