feat(db): record client version history (#7119)

This commit is contained in:
Roman Krasiuk
2024-03-13 13:07:13 +01:00
committed by GitHub
parent 884fd71a01
commit 610731ced8
32 changed files with 366 additions and 119 deletions

View File

@ -1,10 +1,13 @@
//! Module that interacts with MDBX.
use crate::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
metrics::DatabaseEnvMetrics,
tables::{TableType, Tables},
models::client_version::ClientVersion,
tables::{self, TableType, Tables},
transaction::{DbTx, DbTxMut},
utils::default_page_size,
DatabaseError,
};
@ -16,7 +19,12 @@ use reth_libmdbx::{
PageSize, SyncMode, RO, RW,
};
use reth_tracing::tracing::error;
use std::{ops::Deref, path::Path, sync::Arc};
use std::{
ops::Deref,
path::Path,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tx::Tx;
pub mod cursor;
@ -42,9 +50,18 @@ pub enum DatabaseEnvKind {
RW,
}
impl DatabaseEnvKind {
/// Returns `true` if the environment is read-write.
pub fn is_rw(&self) -> bool {
matches!(self, Self::RW)
}
}
/// Arguments for database initialization.
#[derive(Debug, Default, Clone, Copy)]
#[derive(Clone, Debug)]
pub struct DatabaseArguments {
/// Client version that accesses the database.
client_version: ClientVersion,
/// Database log level. If [None], the default value is used.
log_level: Option<LogLevel>,
/// Maximum duration of a read transaction. If [None], the default value is used.
@ -73,14 +90,24 @@ pub struct DatabaseArguments {
}
impl DatabaseArguments {
/// Create new database arguments with given client version.
pub fn new(client_version: ClientVersion) -> Self {
Self {
client_version,
log_level: None,
max_read_transaction_duration: None,
exclusive: None,
}
}
/// Set the log level.
pub fn log_level(mut self, log_level: Option<LogLevel>) -> Self {
pub fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
self.log_level = log_level;
self
}
/// Set the maximum duration of a read transaction.
pub fn max_read_transaction_duration(
pub fn with_max_read_transaction_duration(
mut self,
max_read_transaction_duration: Option<MaxReadTransactionDuration>,
) -> Self {
@ -89,10 +116,15 @@ impl DatabaseArguments {
}
/// Set the mdbx exclusive flag.
pub fn exclusive(mut self, exclusive: Option<bool>) -> Self {
pub fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
self.exclusive = exclusive;
self
}
/// Returns the client version if any.
pub fn client_version(&self) -> &ClientVersion {
&self.client_version
}
}
/// Wrapper for the libmdbx environment: [Environment]
@ -375,6 +407,27 @@ impl DatabaseEnv {
Ok(())
}
/// Records version that accesses the database with write privileges.
pub fn record_client_version(&self, version: ClientVersion) -> Result<(), DatabaseError> {
if version.is_empty() {
return Ok(())
}
let tx = self.tx_mut()?;
let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
let last_version = version_cursor.last()?.map(|(_, v)| v);
if Some(&version) != last_version.as_ref() {
version_cursor.upsert(
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
version,
)?;
tx.commit()?;
}
Ok(())
}
}
impl Deref for DatabaseEnv {
@ -390,13 +443,12 @@ mod tests {
use super::*;
use crate::{
abstraction::table::{Encode, Table},
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
cursor::{DbDupCursorRO, DbDupCursorRW, ReverseWalker, Walker},
models::{AccountBeforeTx, ShardedKey},
tables::{
AccountsHistory, CanonicalHeaders, Headers, PlainAccountState, PlainStorageState,
},
test_utils::*,
transaction::{DbTx, DbTxMut},
AccountChangeSets,
};
use reth_interfaces::db::{DatabaseWriteError, DatabaseWriteOperation};
@ -415,8 +467,8 @@ mod tests {
/// Create database for testing with specified path
fn create_test_db_with_path(kind: DatabaseEnvKind, path: &Path) -> DatabaseEnv {
let env =
DatabaseEnv::open(path, kind, DatabaseArguments::default()).expect(ERROR_DB_CREATION);
let env = DatabaseEnv::open(path, kind, DatabaseArguments::new(ClientVersion::default()))
.expect(ERROR_DB_CREATION);
env.create_tables().expect(ERROR_TABLE_CREATION);
env
}
@ -1041,8 +1093,12 @@ mod tests {
assert_eq!(result.expect(ERROR_RETURN_VALUE), 200);
}
let env = DatabaseEnv::open(&path, DatabaseEnvKind::RO, Default::default())
.expect(ERROR_DB_CREATION);
let env = DatabaseEnv::open(
&path,
DatabaseEnvKind::RO,
DatabaseArguments::new(ClientVersion::default()),
)
.expect(ERROR_DB_CREATION);
// GET
let result =

View File

@ -392,8 +392,8 @@ impl DbTxMut for Tx<RW> {
#[cfg(test)]
mod tests {
use crate::{
database::Database, mdbx::DatabaseArguments, tables, transaction::DbTx, DatabaseEnv,
DatabaseEnvKind,
database::Database, mdbx::DatabaseArguments, models::client_version::ClientVersion, tables,
transaction::DbTx, DatabaseEnv, DatabaseEnvKind,
};
use reth_interfaces::db::DatabaseError;
use reth_libmdbx::MaxReadTransactionDuration;
@ -405,8 +405,10 @@ mod tests {
const MAX_DURATION: Duration = Duration::from_secs(1);
let dir = tempdir().unwrap();
let args = DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(MAX_DURATION)));
let args = DatabaseArguments::new(ClientVersion::default())
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
MAX_DURATION,
)));
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
let mut tx = db.tx().unwrap();
@ -429,8 +431,10 @@ mod tests {
const MAX_DURATION: Duration = Duration::from_secs(1);
let dir = tempdir().unwrap();
let args = DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(MAX_DURATION)));
let args = DatabaseArguments::new(ClientVersion::default())
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
MAX_DURATION,
)));
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
let mut tx = db.tx().unwrap();

View File

@ -111,8 +111,9 @@ pub fn init_db<P: AsRef<Path>>(path: P, args: DatabaseArguments) -> eyre::Result
}
#[cfg(feature = "mdbx")]
{
let db = DatabaseEnv::open(rpath, DatabaseEnvKind::RW, args)?;
let db = DatabaseEnv::open(rpath, DatabaseEnvKind::RW, args.clone())?;
db.create_tables()?;
db.record_client_version(args.client_version().clone())?;
Ok(db)
}
#[cfg(not(feature = "mdbx"))]
@ -139,8 +140,10 @@ pub fn open_db_read_only(path: &Path, args: DatabaseArguments) -> eyre::Result<D
pub fn open_db(path: &Path, args: DatabaseArguments) -> eyre::Result<DatabaseEnv> {
#[cfg(feature = "mdbx")]
{
DatabaseEnv::open(path, DatabaseEnvKind::RW, args)
.with_context(|| format!("Could not open database at path: {}", path.display()))
let db = DatabaseEnv::open(path, DatabaseEnvKind::RW, args.clone())
.with_context(|| format!("Could not open database at path: {}", path.display()))?;
db.record_client_version(args.client_version().clone())?;
Ok(db)
}
#[cfg(not(feature = "mdbx"))]
{
@ -155,6 +158,7 @@ pub mod test_utils {
use crate::{
database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
models::client_version::ClientVersion,
};
use reth_libmdbx::MaxReadTransactionDuration;
use reth_primitives::fs;
@ -250,8 +254,8 @@ pub mod test_utils {
let db = init_db(
&path,
DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
DatabaseArguments::new(ClientVersion::default())
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
)
.expect(&emsg);
@ -263,8 +267,8 @@ pub mod test_utils {
let path = path.as_ref().to_path_buf();
let db = init_db(
path.as_path(),
DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
DatabaseArguments::new(ClientVersion::default())
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
)
.expect(ERROR_DB_CREATION);
Arc::new(TempDatabase { db: Some(db), path })
@ -272,12 +276,12 @@ pub mod test_utils {
/// Create read only database for testing
pub fn create_test_ro_db() -> Arc<TempDatabase<DatabaseEnv>> {
let args = DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
let args = DatabaseArguments::new(ClientVersion::default())
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
let path = tempdir_path();
{
init_db(path.as_path(), args).expect(ERROR_DB_CREATION);
init_db(path.as_path(), args.clone()).expect(ERROR_DB_CREATION);
}
let db = open_db_read_only(path.as_path(), args).expect(ERROR_DB_OPEN);
Arc::new(TempDatabase { db: Some(db), path })
@ -286,9 +290,16 @@ pub mod test_utils {
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::{
cursor::DbCursorRO,
database::Database,
init_db,
mdbx::DatabaseArguments,
models::client_version::ClientVersion,
open_db, tables,
transaction::DbTx,
version::{db_version_file_path, DatabaseVersionError},
};
use assert_matches::assert_matches;
@ -300,25 +311,25 @@ mod tests {
fn db_version() {
let path = tempdir().unwrap();
let args = DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
let args = DatabaseArguments::new(ClientVersion::default())
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
// Database is empty
{
let db = init_db(&path, args);
let db = init_db(&path, args.clone());
assert_matches!(db, Ok(_));
}
// Database is not empty, current version is the same as in the file
{
let db = init_db(&path, args);
let db = init_db(&path, args.clone());
assert_matches!(db, Ok(_));
}
// Database is not empty, version file is malformed
{
fs::write(path.path().join(db_version_file_path(&path)), "invalid-version").unwrap();
let db = init_db(&path, args);
let db = init_db(&path, args.clone());
assert!(db.is_err());
assert_matches!(
db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
@ -337,4 +348,86 @@ mod tests {
)
}
}
#[test]
fn db_client_version() {
let path = tempdir().unwrap();
// Empty client version is not recorded
{
let db = init_db(&path, DatabaseArguments::new(ClientVersion::default())).unwrap();
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
assert_matches!(cursor.first(), Ok(None));
}
// Client version is recorded
let first_version = ClientVersion { version: String::from("v1"), ..Default::default() };
{
let db = init_db(&path, DatabaseArguments::new(first_version.clone())).unwrap();
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
assert_eq!(
cursor
.walk_range(..)
.unwrap()
.map(|x| x.map(|(_, v)| v))
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![first_version.clone()]
);
}
// Same client version is not duplicated.
{
let db = init_db(&path, DatabaseArguments::new(first_version.clone())).unwrap();
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
assert_eq!(
cursor
.walk_range(..)
.unwrap()
.map(|x| x.map(|(_, v)| v))
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![first_version.clone()]
);
}
// Different client version is recorded
std::thread::sleep(Duration::from_secs(1));
let second_version = ClientVersion { version: String::from("v2"), ..Default::default() };
{
let db = init_db(&path, DatabaseArguments::new(second_version.clone())).unwrap();
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
assert_eq!(
cursor
.walk_range(..)
.unwrap()
.map(|x| x.map(|(_, v)| v))
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![first_version.clone(), second_version.clone()]
);
}
// Different client version is recorded on db open.
std::thread::sleep(Duration::from_secs(1));
let third_version = ClientVersion { version: String::from("v3"), ..Default::default() };
{
let db = open_db(path.path(), DatabaseArguments::new(third_version.clone())).unwrap();
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
assert_eq!(
cursor
.walk_range(..)
.unwrap()
.map(|x| x.map(|(_, v)| v))
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![first_version, second_version, third_version]
);
}
}
}

View File

@ -1,4 +1,5 @@
use crate::{
models::client_version::ClientVersion,
table::{Compress, Decompress},
tables::models::*,
};
@ -48,7 +49,8 @@ impl_compression_for_compact!(
TransactionSignedNoHash,
CompactU256,
StageCheckpoint,
PruneCheckpoint
PruneCheckpoint,
ClientVersion
);
macro_rules! impl_compression_fixed_compact {
@ -118,3 +120,4 @@ macro_rules! add_wrapper_struct {
add_wrapper_struct!((U256, CompactU256));
add_wrapper_struct!((u64, CompactU64));
add_wrapper_struct!((ClientVersion, CompactClientVersion));

View File

@ -31,6 +31,7 @@ use crate::{
models::{
accounts::{AccountBeforeTx, BlockNumberAddress},
blocks::{HeaderHash, StoredBlockOmmers},
client_version::ClientVersion,
storage_sharded_key::StorageShardedKey,
ShardedKey, StoredBlockBodyIndices, StoredBlockWithdrawals,
},
@ -372,6 +373,9 @@ tables! {
/// Stores the highest pruned block number and prune mode of each prune segment.
table PruneCheckpoints<Key = PruneSegment, Value = PruneCheckpoint>;
/// Stores the history of client versions that have accessed the database with write privileges by unix timestamp in seconds.
table VersionHistory<Key = u64, Value = ClientVersion>;
}
// Alias types.

View File

@ -0,0 +1,47 @@
//! Client version model.
use reth_codecs::{derive_arbitrary, Compact};
use serde::{Deserialize, Serialize};
/// Client version that accessed the database.
#[derive_arbitrary(compact)]
#[derive(Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
pub struct ClientVersion {
/// Client version
pub version: String,
/// The git commit sha
pub git_sha: String,
/// Build timestamp
pub build_timestamp: String,
}
impl ClientVersion {
/// Returns `true` if no version fields are set.
pub fn is_empty(&self) -> bool {
self.version.is_empty() && self.git_sha.is_empty() && self.build_timestamp.is_empty()
}
}
impl Compact for ClientVersion {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let Self { version, git_sha, build_timestamp } = self;
version.into_bytes().to_compact(buf);
git_sha.into_bytes().to_compact(buf);
build_timestamp.into_bytes().to_compact(buf)
}
fn from_compact(buf: &[u8], len: usize) -> (Self, &[u8]) {
let (version, buf) = Vec::<u8>::from_compact(buf, len);
let (git_sha, buf) = Vec::<u8>::from_compact(buf, len);
let (build_timestamp, buf) = Vec::<u8>::from_compact(buf, len);
let client_version = Self {
version: unsafe { String::from_utf8_unchecked(version) },
git_sha: unsafe { String::from_utf8_unchecked(git_sha) },
build_timestamp: unsafe { String::from_utf8_unchecked(build_timestamp) },
};
(client_version, buf)
}
}

View File

@ -12,6 +12,7 @@ use reth_primitives::{
pub mod accounts;
pub mod blocks;
pub mod client_version;
pub mod integer_list;
pub mod sharded_key;
pub mod storage_sharded_key;
@ -20,6 +21,8 @@ pub use accounts::*;
pub use blocks::*;
pub use sharded_key::ShardedKey;
use self::client_version::ClientVersion;
/// Macro that implements [`Encode`] and [`Decode`] for uint types.
macro_rules! impl_uints {
($($name:tt),+) => {
@ -155,3 +158,21 @@ impl Decode for PruneSegment {
Ok(Self::from_compact(buf, buf.len()).0)
}
}
impl Encode for ClientVersion {
type Encoded = Vec<u8>;
// Delegate to the Compact implementation
fn encode(self) -> Self::Encoded {
let mut buf = vec![];
self.to_compact(&mut buf);
buf
}
}
impl Decode for ClientVersion {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let buf = value.as_ref();
Ok(Self::from_compact(buf, buf.len()).0)
}
}