mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 13:24:44 +00:00
Merge branch 'unstable' into electra-alpha7
This commit is contained in:
@@ -1,75 +0,0 @@
|
||||
use crate::chunked_vector::{chunk_key, Chunk, ChunkError, Field};
|
||||
use crate::{Error, KeyValueStore, KeyValueStoreOp};
|
||||
use types::EthSpec;
|
||||
|
||||
/// Buffered writer for chunked vectors (block roots mainly).
|
||||
pub struct ChunkWriter<'a, F, E, S>
|
||||
where
|
||||
F: Field<E>,
|
||||
E: EthSpec,
|
||||
S: KeyValueStore<E>,
|
||||
{
|
||||
/// Buffered chunk awaiting writing to disk (always dirty).
|
||||
chunk: Chunk<F::Value>,
|
||||
/// Chunk index of `chunk`.
|
||||
index: usize,
|
||||
store: &'a S,
|
||||
}
|
||||
|
||||
impl<'a, F, E, S> ChunkWriter<'a, F, E, S>
|
||||
where
|
||||
F: Field<E>,
|
||||
E: EthSpec,
|
||||
S: KeyValueStore<E>,
|
||||
{
|
||||
pub fn new(store: &'a S, vindex: usize) -> Result<Self, Error> {
|
||||
let chunk_index = F::chunk_index(vindex);
|
||||
let chunk = Chunk::load(store, F::column(), &chunk_key(chunk_index))?
|
||||
.unwrap_or_else(|| Chunk::new(vec![F::Value::default(); F::chunk_size()]));
|
||||
|
||||
Ok(Self {
|
||||
chunk,
|
||||
index: chunk_index,
|
||||
store,
|
||||
})
|
||||
}
|
||||
|
||||
/// Set the value at a given vector index, writing the current chunk and moving on if necessary.
|
||||
pub fn set(
|
||||
&mut self,
|
||||
vindex: usize,
|
||||
value: F::Value,
|
||||
batch: &mut Vec<KeyValueStoreOp>,
|
||||
) -> Result<(), Error> {
|
||||
let chunk_index = F::chunk_index(vindex);
|
||||
|
||||
// Advance to the next chunk.
|
||||
if chunk_index != self.index {
|
||||
self.write(batch)?;
|
||||
*self = Self::new(self.store, vindex)?;
|
||||
}
|
||||
|
||||
let i = vindex % F::chunk_size();
|
||||
let existing_value = &self.chunk.values[i];
|
||||
|
||||
if existing_value == &value || existing_value == &F::Value::default() {
|
||||
self.chunk.values[i] = value;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ChunkError::Inconsistent {
|
||||
field: F::column(),
|
||||
chunk_index,
|
||||
existing_value: format!("{:?}", existing_value),
|
||||
new_value: format!("{:?}", value),
|
||||
}
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the current chunk to disk.
|
||||
///
|
||||
/// Should be called before the writer is dropped, in order to write the final chunk to disk.
|
||||
pub fn write(&self, batch: &mut Vec<KeyValueStoreOp>) -> Result<(), Error> {
|
||||
self.chunk.store(F::column(), &chunk_key(self.index), batch)
|
||||
}
|
||||
}
|
||||
@@ -322,11 +322,11 @@ macro_rules! field {
|
||||
}
|
||||
|
||||
field!(
|
||||
BlockRoots,
|
||||
BlockRootsChunked,
|
||||
FixedLengthField,
|
||||
Hash256,
|
||||
E::SlotsPerHistoricalRoot,
|
||||
DBColumn::BeaconBlockRoots,
|
||||
DBColumn::BeaconBlockRootsChunked,
|
||||
|_| OncePerNSlots {
|
||||
n: 1,
|
||||
activation_slot: Some(Slot::new(0)),
|
||||
@@ -336,11 +336,11 @@ field!(
|
||||
);
|
||||
|
||||
field!(
|
||||
StateRoots,
|
||||
StateRootsChunked,
|
||||
FixedLengthField,
|
||||
Hash256,
|
||||
E::SlotsPerHistoricalRoot,
|
||||
DBColumn::BeaconStateRoots,
|
||||
DBColumn::BeaconStateRootsChunked,
|
||||
|_| OncePerNSlots {
|
||||
n: 1,
|
||||
activation_slot: Some(Slot::new(0)),
|
||||
@@ -859,8 +859,8 @@ mod test {
|
||||
fn test_fixed_length<F: Field<TestSpec>>(_: F, expected: bool) {
|
||||
assert_eq!(F::is_fixed_length(), expected);
|
||||
}
|
||||
test_fixed_length(BlockRoots, true);
|
||||
test_fixed_length(StateRoots, true);
|
||||
test_fixed_length(BlockRootsChunked, true);
|
||||
test_fixed_length(StateRootsChunked, true);
|
||||
test_fixed_length(HistoricalRoots, false);
|
||||
test_fixed_length(RandaoMixes, true);
|
||||
}
|
||||
@@ -880,12 +880,12 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn needs_genesis_value_block_roots() {
|
||||
needs_genesis_value_once_per_slot(BlockRoots);
|
||||
needs_genesis_value_once_per_slot(BlockRootsChunked);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn needs_genesis_value_state_roots() {
|
||||
needs_genesis_value_once_per_slot(StateRoots);
|
||||
needs_genesis_value_once_per_slot(StateRootsChunked);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,38 +1,47 @@
|
||||
use crate::{DBColumn, Error, StoreItem};
|
||||
use crate::hdiff::HierarchyConfig;
|
||||
use crate::{AnchorInfo, DBColumn, Error, Split, StoreItem};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::io::Write;
|
||||
use std::num::NonZeroUsize;
|
||||
use superstruct::superstruct;
|
||||
use types::non_zero_usize::new_non_zero_usize;
|
||||
use types::{EthSpec, MinimalEthSpec};
|
||||
use types::EthSpec;
|
||||
use zstd::Encoder;
|
||||
|
||||
pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
|
||||
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
|
||||
pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(5);
|
||||
// Only used in tests. Mainnet sets a higher default on the CLI.
|
||||
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8;
|
||||
pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(64);
|
||||
pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128);
|
||||
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
|
||||
pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(1);
|
||||
pub const DEFAULT_HDIFF_BUFFER_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(16);
|
||||
const EST_COMPRESSION_FACTOR: usize = 2;
|
||||
pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1;
|
||||
pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0;
|
||||
|
||||
/// Database configuration parameters.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct StoreConfig {
|
||||
/// Number of slots to wait between storing restore points in the freezer database.
|
||||
pub slots_per_restore_point: u64,
|
||||
/// Flag indicating whether the `slots_per_restore_point` was set explicitly by the user.
|
||||
pub slots_per_restore_point_set_explicitly: bool,
|
||||
/// Maximum number of blocks to store in the in-memory block cache.
|
||||
pub block_cache_size: NonZeroUsize,
|
||||
/// Maximum number of states to store in the in-memory state cache.
|
||||
pub state_cache_size: NonZeroUsize,
|
||||
/// Maximum number of states from freezer database to store in the in-memory state cache.
|
||||
/// Compression level for blocks, state diffs and other compressed values.
|
||||
pub compression_level: i32,
|
||||
/// Maximum number of historic states to store in the in-memory historic state cache.
|
||||
pub historic_state_cache_size: NonZeroUsize,
|
||||
/// Maximum number of `HDiffBuffer`s to store in memory.
|
||||
pub hdiff_buffer_cache_size: NonZeroUsize,
|
||||
/// Whether to compact the database on initialization.
|
||||
pub compact_on_init: bool,
|
||||
/// Whether to compact the database during database pruning.
|
||||
pub compact_on_prune: bool,
|
||||
/// Whether to prune payloads on initialization and finalization.
|
||||
pub prune_payloads: bool,
|
||||
/// State diff hierarchy.
|
||||
pub hierarchy_config: HierarchyConfig,
|
||||
/// Whether to prune blobs older than the blob data availability boundary.
|
||||
pub prune_blobs: bool,
|
||||
/// Frequency of blob pruning in epochs. Default: 1 (every epoch).
|
||||
@@ -43,28 +52,59 @@ pub struct StoreConfig {
|
||||
}
|
||||
|
||||
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
||||
#[superstruct(
|
||||
variants(V1, V22),
|
||||
variant_attributes(derive(Debug, Clone, PartialEq, Eq, Encode, Decode))
|
||||
)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct OnDiskStoreConfig {
|
||||
#[superstruct(only(V1))]
|
||||
pub slots_per_restore_point: u64,
|
||||
/// Prefix byte to future-proof versions of the `OnDiskStoreConfig` post V1
|
||||
#[superstruct(only(V22))]
|
||||
version_byte: u8,
|
||||
#[superstruct(only(V22))]
|
||||
pub hierarchy_config: HierarchyConfig,
|
||||
}
|
||||
|
||||
impl OnDiskStoreConfigV22 {
|
||||
fn new(hierarchy_config: HierarchyConfig) -> Self {
|
||||
Self {
|
||||
version_byte: 22,
|
||||
hierarchy_config,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum StoreConfigError {
|
||||
MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 },
|
||||
MismatchedSlotsPerRestorePoint {
|
||||
config: u64,
|
||||
on_disk: u64,
|
||||
},
|
||||
InvalidCompressionLevel {
|
||||
level: i32,
|
||||
},
|
||||
IncompatibleStoreConfig {
|
||||
config: OnDiskStoreConfig,
|
||||
on_disk: OnDiskStoreConfig,
|
||||
},
|
||||
ZeroEpochsPerBlobPrune,
|
||||
InvalidVersionByte(Option<u8>),
|
||||
}
|
||||
|
||||
impl Default for StoreConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
// Safe default for tests, shouldn't ever be read by a CLI node.
|
||||
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
|
||||
slots_per_restore_point_set_explicitly: false,
|
||||
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
|
||||
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
|
||||
historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE,
|
||||
hdiff_buffer_cache_size: DEFAULT_HDIFF_BUFFER_CACHE_SIZE,
|
||||
compression_level: DEFAULT_COMPRESSION_LEVEL,
|
||||
compact_on_init: false,
|
||||
compact_on_prune: true,
|
||||
prune_payloads: true,
|
||||
hierarchy_config: HierarchyConfig::default(),
|
||||
prune_blobs: true,
|
||||
epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE,
|
||||
blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS,
|
||||
@@ -74,22 +114,90 @@ impl Default for StoreConfig {
|
||||
|
||||
impl StoreConfig {
|
||||
pub fn as_disk_config(&self) -> OnDiskStoreConfig {
|
||||
OnDiskStoreConfig {
|
||||
slots_per_restore_point: self.slots_per_restore_point,
|
||||
}
|
||||
OnDiskStoreConfig::V22(OnDiskStoreConfigV22::new(self.hierarchy_config.clone()))
|
||||
}
|
||||
|
||||
pub fn check_compatibility(
|
||||
&self,
|
||||
on_disk_config: &OnDiskStoreConfig,
|
||||
split: &Split,
|
||||
anchor: &AnchorInfo,
|
||||
) -> Result<(), StoreConfigError> {
|
||||
if self.slots_per_restore_point != on_disk_config.slots_per_restore_point {
|
||||
return Err(StoreConfigError::MismatchedSlotsPerRestorePoint {
|
||||
config: self.slots_per_restore_point,
|
||||
on_disk: on_disk_config.slots_per_restore_point,
|
||||
});
|
||||
// Allow changing the hierarchy exponents if no historic states are stored.
|
||||
let no_historic_states_stored = anchor.no_historic_states_stored(split.slot);
|
||||
let hierarchy_config_changed =
|
||||
if let Ok(on_disk_hierarchy_config) = on_disk_config.hierarchy_config() {
|
||||
*on_disk_hierarchy_config != self.hierarchy_config
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if hierarchy_config_changed && !no_historic_states_stored {
|
||||
Err(StoreConfigError::IncompatibleStoreConfig {
|
||||
config: self.as_disk_config(),
|
||||
on_disk: on_disk_config.clone(),
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check that the configuration is valid.
|
||||
pub fn verify<E: EthSpec>(&self) -> Result<(), StoreConfigError> {
|
||||
self.verify_compression_level()?;
|
||||
self.verify_epochs_per_blob_prune()
|
||||
}
|
||||
|
||||
/// Check that the compression level is valid.
|
||||
fn verify_compression_level(&self) -> Result<(), StoreConfigError> {
|
||||
if zstd::compression_level_range().contains(&self.compression_level) {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(StoreConfigError::InvalidCompressionLevel {
|
||||
level: self.compression_level,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Check that epochs_per_blob_prune is at least 1 epoch to avoid attempting to prune the same
|
||||
/// epochs over and over again.
|
||||
fn verify_epochs_per_blob_prune(&self) -> Result<(), StoreConfigError> {
|
||||
if self.epochs_per_blob_prune > 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(StoreConfigError::ZeroEpochsPerBlobPrune)
|
||||
}
|
||||
}
|
||||
|
||||
/// Estimate the size of `len` bytes after compression at the current compression level.
|
||||
pub fn estimate_compressed_size(&self, len: usize) -> usize {
|
||||
// This is a rough estimate, but for our data it seems that all non-zero compression levels
|
||||
// provide a similar compression ratio.
|
||||
if self.compression_level == 0 {
|
||||
len
|
||||
} else {
|
||||
len / EST_COMPRESSION_FACTOR
|
||||
}
|
||||
}
|
||||
|
||||
/// Estimate the size of `len` compressed bytes after decompression at the current compression
|
||||
/// level.
|
||||
pub fn estimate_decompressed_size(&self, len: usize) -> usize {
|
||||
if self.compression_level == 0 {
|
||||
len
|
||||
} else {
|
||||
len * EST_COMPRESSION_FACTOR
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compress_bytes(&self, ssz_bytes: &[u8]) -> Result<Vec<u8>, Error> {
|
||||
let mut compressed_value =
|
||||
Vec::with_capacity(self.estimate_compressed_size(ssz_bytes.len()));
|
||||
let mut encoder = Encoder::new(&mut compressed_value, self.compression_level)
|
||||
.map_err(Error::Compression)?;
|
||||
encoder.write_all(ssz_bytes).map_err(Error::Compression)?;
|
||||
encoder.finish().map_err(Error::Compression)?;
|
||||
Ok(compressed_value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,10 +207,136 @@ impl StoreItem for OnDiskStoreConfig {
|
||||
}
|
||||
|
||||
fn as_store_bytes(&self) -> Vec<u8> {
|
||||
self.as_ssz_bytes()
|
||||
match self {
|
||||
OnDiskStoreConfig::V1(value) => value.as_ssz_bytes(),
|
||||
OnDiskStoreConfig::V22(value) => value.as_ssz_bytes(),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
||||
Ok(Self::from_ssz_bytes(bytes)?)
|
||||
// NOTE: V22 config can never be deserialized as a V1 because the minimum length of its
|
||||
// serialization is: 1 prefix byte + 1 offset (OnDiskStoreConfigV1 container) +
|
||||
// 1 offset (HierarchyConfig container) = 9.
|
||||
if let Ok(value) = OnDiskStoreConfigV1::from_ssz_bytes(bytes) {
|
||||
return Ok(Self::V1(value));
|
||||
}
|
||||
|
||||
Ok(Self::V22(OnDiskStoreConfigV22::from_ssz_bytes(bytes)?))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::{
|
||||
metadata::{ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_UNINITIALIZED, STATE_UPPER_LIMIT_NO_RETAIN},
|
||||
AnchorInfo, Split,
|
||||
};
|
||||
use ssz::DecodeError;
|
||||
use types::{Hash256, Slot};
|
||||
|
||||
#[test]
|
||||
fn check_compatibility_ok() {
|
||||
let store_config = StoreConfig {
|
||||
..Default::default()
|
||||
};
|
||||
let on_disk_config = OnDiskStoreConfig::V22(OnDiskStoreConfigV22::new(
|
||||
store_config.hierarchy_config.clone(),
|
||||
));
|
||||
let split = Split::default();
|
||||
assert!(store_config
|
||||
.check_compatibility(&on_disk_config, &split, &ANCHOR_UNINITIALIZED)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_compatibility_after_migration() {
|
||||
let store_config = StoreConfig {
|
||||
..Default::default()
|
||||
};
|
||||
let on_disk_config = OnDiskStoreConfig::V1(OnDiskStoreConfigV1 {
|
||||
slots_per_restore_point: 8192,
|
||||
});
|
||||
let split = Split::default();
|
||||
assert!(store_config
|
||||
.check_compatibility(&on_disk_config, &split, &ANCHOR_UNINITIALIZED)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_compatibility_hierarchy_config_incompatible() {
|
||||
let store_config = StoreConfig::default();
|
||||
let on_disk_config = OnDiskStoreConfig::V22(OnDiskStoreConfigV22::new(HierarchyConfig {
|
||||
exponents: vec![5, 8, 11, 13, 16, 18, 21],
|
||||
}));
|
||||
let split = Split {
|
||||
slot: Slot::new(32),
|
||||
..Default::default()
|
||||
};
|
||||
assert!(store_config
|
||||
.check_compatibility(&on_disk_config, &split, &ANCHOR_FOR_ARCHIVE_NODE)
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_compatibility_hierarchy_config_update() {
|
||||
let store_config = StoreConfig {
|
||||
..Default::default()
|
||||
};
|
||||
let on_disk_config = OnDiskStoreConfig::V22(OnDiskStoreConfigV22::new(HierarchyConfig {
|
||||
exponents: vec![5, 8, 11, 13, 16, 18, 21],
|
||||
}));
|
||||
let split = Split::default();
|
||||
let anchor = AnchorInfo {
|
||||
anchor_slot: Slot::new(0),
|
||||
oldest_block_slot: Slot::new(0),
|
||||
oldest_block_parent: Hash256::ZERO,
|
||||
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
|
||||
state_lower_limit: Slot::new(0),
|
||||
};
|
||||
assert!(store_config
|
||||
.check_compatibility(&on_disk_config, &split, &anchor)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_on_disk_config_v0_from_v1_default() {
|
||||
let config = OnDiskStoreConfig::V22(OnDiskStoreConfigV22::new(<_>::default()));
|
||||
let config_bytes = config.as_store_bytes();
|
||||
// On a downgrade, the previous version of lighthouse will attempt to deserialize the
|
||||
// prefixed V22 as just the V1 version.
|
||||
assert_eq!(
|
||||
OnDiskStoreConfigV1::from_ssz_bytes(&config_bytes).unwrap_err(),
|
||||
DecodeError::InvalidByteLength {
|
||||
len: 16,
|
||||
expected: 8
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_on_disk_config_v0_from_v1_empty() {
|
||||
let config = OnDiskStoreConfig::V22(OnDiskStoreConfigV22::new(HierarchyConfig {
|
||||
exponents: vec![],
|
||||
}));
|
||||
let config_bytes = config.as_store_bytes();
|
||||
// On a downgrade, the previous version of lighthouse will attempt to deserialize the
|
||||
// prefixed V22 as just the V1 version.
|
||||
assert_eq!(
|
||||
OnDiskStoreConfigV1::from_ssz_bytes(&config_bytes).unwrap_err(),
|
||||
DecodeError::InvalidByteLength {
|
||||
len: 9,
|
||||
expected: 8
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_on_disk_config_v1_roundtrip() {
|
||||
let config = OnDiskStoreConfig::V22(OnDiskStoreConfigV22::new(<_>::default()));
|
||||
let bytes = config.as_store_bytes();
|
||||
assert_eq!(bytes[0], 22);
|
||||
let config_out = OnDiskStoreConfig::from_store_bytes(&bytes).unwrap();
|
||||
assert_eq!(config_out, config);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use crate::chunked_vector::ChunkError;
|
||||
use crate::config::StoreConfigError;
|
||||
use crate::hot_cold_store::HotColdDBError;
|
||||
use crate::{hdiff, DBColumn};
|
||||
use ssz::DecodeError;
|
||||
use state_processing::BlockReplayError;
|
||||
use types::{BeaconStateError, EpochCacheError, Hash256, InconsistentFork, Slot};
|
||||
use types::{milhouse, BeaconStateError, EpochCacheError, Hash256, InconsistentFork, Slot};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@@ -38,27 +39,35 @@ pub enum Error {
|
||||
/// State reconstruction failed because it didn't reach the upper limit slot.
|
||||
///
|
||||
/// This should never happen (it's a logic error).
|
||||
StateReconstructionDidNotComplete,
|
||||
StateReconstructionLogicError,
|
||||
StateReconstructionRootMismatch {
|
||||
slot: Slot,
|
||||
expected: Hash256,
|
||||
computed: Hash256,
|
||||
},
|
||||
MissingGenesisState,
|
||||
MissingSnapshot(Slot),
|
||||
BlockReplayError(BlockReplayError),
|
||||
AddPayloadLogicError,
|
||||
SlotClockUnavailableForMigration,
|
||||
InvalidKey,
|
||||
InvalidBytes,
|
||||
UnableToDowngrade,
|
||||
InconsistentFork(InconsistentFork),
|
||||
CacheBuildError(EpochCacheError),
|
||||
RandaoMixOutOfBounds,
|
||||
MilhouseError(milhouse::Error),
|
||||
Compression(std::io::Error),
|
||||
FinalizedStateDecreasingSlot,
|
||||
FinalizedStateUnaligned,
|
||||
StateForCacheHasPendingUpdates {
|
||||
state_root: Hash256,
|
||||
slot: Slot,
|
||||
},
|
||||
AddPayloadLogicError,
|
||||
InvalidKey,
|
||||
InvalidBytes,
|
||||
InconsistentFork(InconsistentFork),
|
||||
Hdiff(hdiff::Error),
|
||||
CacheBuildError(EpochCacheError),
|
||||
ForwardsIterInvalidColumn(DBColumn),
|
||||
ForwardsIterGap(DBColumn, Slot, Slot),
|
||||
StateShouldNotBeRequired(Slot),
|
||||
MissingBlock(Hash256),
|
||||
RandaoMixOutOfBounds,
|
||||
GenesisStateUnknown,
|
||||
ArithError(safe_arith::ArithError),
|
||||
}
|
||||
|
||||
@@ -112,6 +121,18 @@ impl From<StoreConfigError> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<milhouse::Error> for Error {
|
||||
fn from(e: milhouse::Error) -> Self {
|
||||
Self::MilhouseError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hdiff::Error> for Error {
|
||||
fn from(e: hdiff::Error) -> Self {
|
||||
Self::Hdiff(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BlockReplayError> for Error {
|
||||
fn from(e: BlockReplayError) -> Error {
|
||||
Error::BlockReplayError(e)
|
||||
|
||||
@@ -1,37 +1,34 @@
|
||||
use crate::chunked_iter::ChunkedVectorIter;
|
||||
use crate::chunked_vector::{BlockRoots, Field, StateRoots};
|
||||
use crate::errors::{Error, Result};
|
||||
use crate::iter::{BlockRootsIterator, StateRootsIterator};
|
||||
use crate::{HotColdDB, ItemStore};
|
||||
use crate::{ColumnIter, DBColumn, HotColdDB, ItemStore};
|
||||
use itertools::process_results;
|
||||
use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot};
|
||||
use std::marker::PhantomData;
|
||||
use types::{BeaconState, EthSpec, Hash256, Slot};
|
||||
|
||||
pub type HybridForwardsBlockRootsIterator<'a, E, Hot, Cold> =
|
||||
HybridForwardsIterator<'a, E, BlockRoots, Hot, Cold>;
|
||||
HybridForwardsIterator<'a, E, Hot, Cold>;
|
||||
pub type HybridForwardsStateRootsIterator<'a, E, Hot, Cold> =
|
||||
HybridForwardsIterator<'a, E, StateRoots, Hot, Cold>;
|
||||
HybridForwardsIterator<'a, E, Hot, Cold>;
|
||||
|
||||
/// Trait unifying `BlockRoots` and `StateRoots` for forward iteration.
|
||||
pub trait Root<E: EthSpec>: Field<E, Value = Hash256> {
|
||||
fn simple_forwards_iterator<Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
|
||||
fn simple_forwards_iterator(
|
||||
&self,
|
||||
column: DBColumn,
|
||||
start_slot: Slot,
|
||||
end_state: BeaconState<E>,
|
||||
end_root: Hash256,
|
||||
) -> Result<SimpleForwardsIterator>;
|
||||
) -> Result<SimpleForwardsIterator> {
|
||||
if column == DBColumn::BeaconBlockRoots {
|
||||
self.forwards_iter_block_roots_using_state(start_slot, end_state, end_root)
|
||||
} else if column == DBColumn::BeaconStateRoots {
|
||||
self.forwards_iter_state_roots_using_state(start_slot, end_state, end_root)
|
||||
} else {
|
||||
Err(Error::ForwardsIterInvalidColumn(column))
|
||||
}
|
||||
}
|
||||
|
||||
/// The first slot for which this field is *no longer* stored in the freezer database.
|
||||
///
|
||||
/// If `None`, then this field is not stored in the freezer database at all due to pruning
|
||||
/// configuration.
|
||||
fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
) -> Option<Slot>;
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Root<E> for BlockRoots {
|
||||
fn simple_forwards_iterator<Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
fn forwards_iter_block_roots_using_state(
|
||||
&self,
|
||||
start_slot: Slot,
|
||||
end_state: BeaconState<E>,
|
||||
end_block_root: Hash256,
|
||||
@@ -39,7 +36,7 @@ impl<E: EthSpec> Root<E> for BlockRoots {
|
||||
// Iterate backwards from the end state, stopping at the start slot.
|
||||
let values = process_results(
|
||||
std::iter::once(Ok((end_block_root, end_state.slot())))
|
||||
.chain(BlockRootsIterator::owned(store, end_state)),
|
||||
.chain(BlockRootsIterator::owned(self, end_state)),
|
||||
|iter| {
|
||||
iter.take_while(|(_, slot)| *slot >= start_slot)
|
||||
.collect::<Vec<_>>()
|
||||
@@ -48,17 +45,8 @@ impl<E: EthSpec> Root<E> for BlockRoots {
|
||||
Ok(SimpleForwardsIterator { values })
|
||||
}
|
||||
|
||||
fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
) -> Option<Slot> {
|
||||
// Block roots are stored for all slots up to the split slot (exclusive).
|
||||
Some(store.get_split_slot())
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Root<E> for StateRoots {
|
||||
fn simple_forwards_iterator<Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
fn forwards_iter_state_roots_using_state(
|
||||
&self,
|
||||
start_slot: Slot,
|
||||
end_state: BeaconState<E>,
|
||||
end_state_root: Hash256,
|
||||
@@ -66,7 +54,7 @@ impl<E: EthSpec> Root<E> for StateRoots {
|
||||
// Iterate backwards from the end state, stopping at the start slot.
|
||||
let values = process_results(
|
||||
std::iter::once(Ok((end_state_root, end_state.slot())))
|
||||
.chain(StateRootsIterator::owned(store, end_state)),
|
||||
.chain(StateRootsIterator::owned(self, end_state)),
|
||||
|iter| {
|
||||
iter.take_while(|(_, slot)| *slot >= start_slot)
|
||||
.collect::<Vec<_>>()
|
||||
@@ -75,51 +63,123 @@ impl<E: EthSpec> Root<E> for StateRoots {
|
||||
Ok(SimpleForwardsIterator { values })
|
||||
}
|
||||
|
||||
fn freezer_upper_limit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
) -> Option<Slot> {
|
||||
// State roots are stored for all slots up to the latest restore point (exclusive).
|
||||
// There may not be a latest restore point if state pruning is enabled, in which
|
||||
// case this function will return `None`.
|
||||
store.get_latest_restore_point_slot()
|
||||
}
|
||||
}
|
||||
|
||||
/// Forwards root iterator that makes use of a flat field table in the freezer DB.
|
||||
pub struct FrozenForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
{
|
||||
inner: ChunkedVectorIter<'a, F, E, Hot, Cold>,
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
FrozenForwardsIterator<'a, E, F, Hot, Cold>
|
||||
{
|
||||
pub fn new(
|
||||
store: &'a HotColdDB<E, Hot, Cold>,
|
||||
/// Values in `column` are available in the range `start_slot..upper_bound`.
|
||||
///
|
||||
/// If `None` is returned then no values are available from `start_slot` due to pruning or
|
||||
/// incomplete backfill.
|
||||
pub fn freezer_upper_bound_for_column(
|
||||
&self,
|
||||
column: DBColumn,
|
||||
start_slot: Slot,
|
||||
last_restore_point_slot: Slot,
|
||||
spec: &ChainSpec,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: ChunkedVectorIter::new(
|
||||
store,
|
||||
start_slot.as_usize(),
|
||||
last_restore_point_slot,
|
||||
spec,
|
||||
),
|
||||
) -> Result<Option<Slot>> {
|
||||
if column == DBColumn::BeaconBlockRoots {
|
||||
Ok(self.freezer_upper_bound_for_block_roots(start_slot))
|
||||
} else if column == DBColumn::BeaconStateRoots {
|
||||
Ok(self.freezer_upper_bound_for_state_roots(start_slot))
|
||||
} else {
|
||||
Err(Error::ForwardsIterInvalidColumn(column))
|
||||
}
|
||||
}
|
||||
|
||||
fn freezer_upper_bound_for_block_roots(&self, start_slot: Slot) -> Option<Slot> {
|
||||
let oldest_block_slot = self.get_oldest_block_slot();
|
||||
if start_slot < oldest_block_slot {
|
||||
if start_slot == 0 {
|
||||
// Slot 0 block root is always available.
|
||||
Some(Slot::new(1))
|
||||
// Non-zero block roots are not available prior to the `oldest_block_slot`.
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
// Block roots are stored for all slots up to the split slot (exclusive).
|
||||
Some(self.get_split_slot())
|
||||
}
|
||||
}
|
||||
|
||||
fn freezer_upper_bound_for_state_roots(&self, start_slot: Slot) -> Option<Slot> {
|
||||
let split_slot = self.get_split_slot();
|
||||
let anchor = self.get_anchor_info();
|
||||
|
||||
if start_slot >= anchor.state_upper_limit {
|
||||
// Starting slot is after the upper limit, so the split is the upper limit.
|
||||
// The split state's root is not available in the freezer so this is exclusive.
|
||||
Some(split_slot)
|
||||
} else if start_slot <= anchor.state_lower_limit {
|
||||
// Starting slot is prior to lower limit, so that's the upper limit. We can't
|
||||
// iterate past the lower limit into the gap. The +1 accounts for exclusivity.
|
||||
Some(anchor.state_lower_limit + 1)
|
||||
} else {
|
||||
// In the gap, nothing is available.
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||
for FrozenForwardsIterator<'a, E, F, Hot, Cold>
|
||||
/// Forwards root iterator that makes use of a slot -> root mapping in the freezer DB.
|
||||
pub struct FrozenForwardsIterator<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
inner: ColumnIter<'a, Vec<u8>>,
|
||||
column: DBColumn,
|
||||
next_slot: Slot,
|
||||
end_slot: Slot,
|
||||
_phantom: PhantomData<(E, Hot, Cold)>,
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
FrozenForwardsIterator<'a, E, Hot, Cold>
|
||||
{
|
||||
type Item = (Hash256, Slot);
|
||||
/// `end_slot` is EXCLUSIVE here.
|
||||
pub fn new(
|
||||
store: &'a HotColdDB<E, Hot, Cold>,
|
||||
column: DBColumn,
|
||||
start_slot: Slot,
|
||||
end_slot: Slot,
|
||||
) -> Result<Self> {
|
||||
if column != DBColumn::BeaconBlockRoots && column != DBColumn::BeaconStateRoots {
|
||||
return Err(Error::ForwardsIterInvalidColumn(column));
|
||||
}
|
||||
let start = start_slot.as_u64().to_be_bytes();
|
||||
Ok(Self {
|
||||
inner: store.cold_db.iter_column_from(column, &start),
|
||||
column,
|
||||
next_slot: start_slot,
|
||||
end_slot,
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||
for FrozenForwardsIterator<'a, E, Hot, Cold>
|
||||
{
|
||||
type Item = Result<(Hash256, Slot)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.next_slot == self.end_slot {
|
||||
return None;
|
||||
}
|
||||
self.inner
|
||||
.next()
|
||||
.map(|(slot, root)| (root, Slot::from(slot)))
|
||||
.next()?
|
||||
.and_then(|(slot_bytes, root_bytes)| {
|
||||
let slot = slot_bytes
|
||||
.clone()
|
||||
.try_into()
|
||||
.map(u64::from_be_bytes)
|
||||
.map(Slot::new)
|
||||
.map_err(|_| Error::InvalidBytes)?;
|
||||
if root_bytes.len() != std::mem::size_of::<Hash256>() {
|
||||
return Err(Error::InvalidBytes);
|
||||
}
|
||||
let root = Hash256::from_slice(&root_bytes);
|
||||
|
||||
if slot != self.next_slot {
|
||||
return Err(Error::ForwardsIterGap(self.column, slot, self.next_slot));
|
||||
}
|
||||
self.next_slot += 1;
|
||||
|
||||
Ok(Some((root, slot)))
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,10 +199,12 @@ impl Iterator for SimpleForwardsIterator {
|
||||
}
|
||||
|
||||
/// Fusion of the above two approaches to forwards iteration. Fast and efficient.
|
||||
pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
pub enum HybridForwardsIterator<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
PreFinalization {
|
||||
iter: Box<FrozenForwardsIterator<'a, E, F, Hot, Cold>>,
|
||||
iter: Box<FrozenForwardsIterator<'a, E, Hot, Cold>>,
|
||||
store: &'a HotColdDB<E, Hot, Cold>,
|
||||
end_slot: Option<Slot>,
|
||||
column: DBColumn,
|
||||
/// Data required by the `PostFinalization` iterator when we get to it.
|
||||
continuation_data: Option<Box<(BeaconState<E>, Hash256)>>,
|
||||
},
|
||||
@@ -150,6 +212,7 @@ pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, C
|
||||
continuation_data: Option<Box<(BeaconState<E>, Hash256)>>,
|
||||
store: &'a HotColdDB<E, Hot, Cold>,
|
||||
start_slot: Slot,
|
||||
column: DBColumn,
|
||||
},
|
||||
PostFinalization {
|
||||
iter: SimpleForwardsIterator,
|
||||
@@ -157,8 +220,8 @@ pub enum HybridForwardsIterator<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, C
|
||||
Finished,
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
HybridForwardsIterator<'a, E, F, Hot, Cold>
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
HybridForwardsIterator<'a, E, Hot, Cold>
|
||||
{
|
||||
/// Construct a new hybrid iterator.
|
||||
///
|
||||
@@ -174,48 +237,54 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
/// function may block for some time while `get_state` runs.
|
||||
pub fn new(
|
||||
store: &'a HotColdDB<E, Hot, Cold>,
|
||||
column: DBColumn,
|
||||
start_slot: Slot,
|
||||
end_slot: Option<Slot>,
|
||||
get_state: impl FnOnce() -> Result<(BeaconState<E>, Hash256)>,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Self> {
|
||||
use HybridForwardsIterator::*;
|
||||
|
||||
// First slot at which this field is *not* available in the freezer. i.e. all slots less
|
||||
// than this slot have their data available in the freezer.
|
||||
let freezer_upper_limit = F::freezer_upper_limit(store).unwrap_or(Slot::new(0));
|
||||
let opt_freezer_upper_bound = store.freezer_upper_bound_for_column(column, start_slot)?;
|
||||
|
||||
let result = if start_slot < freezer_upper_limit {
|
||||
let iter = Box::new(FrozenForwardsIterator::new(
|
||||
store,
|
||||
start_slot,
|
||||
freezer_upper_limit,
|
||||
spec,
|
||||
));
|
||||
match opt_freezer_upper_bound {
|
||||
Some(freezer_upper_bound) if start_slot < freezer_upper_bound => {
|
||||
// EXCLUSIVE end slot for the frozen portion of the iterator.
|
||||
let frozen_end_slot = end_slot.map_or(freezer_upper_bound, |end_slot| {
|
||||
std::cmp::min(end_slot + 1, freezer_upper_bound)
|
||||
});
|
||||
let iter = Box::new(FrozenForwardsIterator::new(
|
||||
store,
|
||||
column,
|
||||
start_slot,
|
||||
frozen_end_slot,
|
||||
)?);
|
||||
|
||||
// No continuation data is needed if the forwards iterator plans to halt before
|
||||
// `end_slot`. If it tries to continue further a `NoContinuationData` error will be
|
||||
// returned.
|
||||
let continuation_data =
|
||||
if end_slot.map_or(false, |end_slot| end_slot < freezer_upper_limit) {
|
||||
None
|
||||
} else {
|
||||
Some(Box::new(get_state()?))
|
||||
};
|
||||
PreFinalization {
|
||||
iter,
|
||||
end_slot,
|
||||
continuation_data,
|
||||
// No continuation data is needed if the forwards iterator plans to halt before
|
||||
// `end_slot`. If it tries to continue further a `NoContinuationData` error will be
|
||||
// returned.
|
||||
let continuation_data =
|
||||
if end_slot.map_or(false, |end_slot| end_slot < freezer_upper_bound) {
|
||||
None
|
||||
} else {
|
||||
Some(Box::new(get_state()?))
|
||||
};
|
||||
Ok(PreFinalization {
|
||||
iter,
|
||||
store,
|
||||
end_slot,
|
||||
column,
|
||||
continuation_data,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
PostFinalizationLazy {
|
||||
_ => Ok(PostFinalizationLazy {
|
||||
continuation_data: Some(Box::new(get_state()?)),
|
||||
store,
|
||||
start_slot,
|
||||
}
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
column,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn do_next(&mut self) -> Result<Option<(Hash256, Slot)>> {
|
||||
@@ -225,29 +294,31 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
PreFinalization {
|
||||
iter,
|
||||
end_slot,
|
||||
store,
|
||||
continuation_data,
|
||||
column,
|
||||
} => {
|
||||
match iter.next() {
|
||||
Some(x) => Ok(Some(x)),
|
||||
Some(x) => x.map(Some),
|
||||
// Once the pre-finalization iterator is consumed, transition
|
||||
// to a post-finalization iterator beginning from the last slot
|
||||
// of the pre iterator.
|
||||
None => {
|
||||
// If the iterator has an end slot (inclusive) which has already been
|
||||
// covered by the (exclusive) frozen forwards iterator, then we're done!
|
||||
let iter_end_slot = Slot::from(iter.inner.end_vindex);
|
||||
if end_slot.map_or(false, |end_slot| iter_end_slot == end_slot + 1) {
|
||||
if end_slot.map_or(false, |end_slot| iter.end_slot == end_slot + 1) {
|
||||
*self = Finished;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let continuation_data = continuation_data.take();
|
||||
let store = iter.inner.store;
|
||||
let start_slot = iter_end_slot;
|
||||
let start_slot = iter.end_slot;
|
||||
|
||||
*self = PostFinalizationLazy {
|
||||
continuation_data,
|
||||
store,
|
||||
start_slot,
|
||||
column: *column,
|
||||
};
|
||||
|
||||
self.do_next()
|
||||
@@ -258,11 +329,17 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
continuation_data,
|
||||
store,
|
||||
start_slot,
|
||||
column,
|
||||
} => {
|
||||
let (end_state, end_root) =
|
||||
*continuation_data.take().ok_or(Error::NoContinuationData)?;
|
||||
*self = PostFinalization {
|
||||
iter: F::simple_forwards_iterator(store, *start_slot, end_state, end_root)?,
|
||||
iter: store.simple_forwards_iterator(
|
||||
*column,
|
||||
*start_slot,
|
||||
end_state,
|
||||
end_root,
|
||||
)?,
|
||||
};
|
||||
self.do_next()
|
||||
}
|
||||
@@ -272,8 +349,8 @@ impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, F: Root<E>, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||
for HybridForwardsIterator<'a, E, F, Hot, Cold>
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||
for HybridForwardsIterator<'a, E, Hot, Cold>
|
||||
{
|
||||
type Item = Result<(Hash256, Slot)>;
|
||||
|
||||
|
||||
914
beacon_node/store/src/hdiff.rs
Normal file
914
beacon_node/store/src/hdiff.rs
Normal file
@@ -0,0 +1,914 @@
|
||||
//! Hierarchical diff implementation.
|
||||
use crate::{metrics, DBColumn, StoreConfig, StoreItem};
|
||||
use bls::PublicKeyBytes;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::cmp::Ordering;
|
||||
use std::io::{Read, Write};
|
||||
use std::ops::RangeInclusive;
|
||||
use std::str::FromStr;
|
||||
use std::sync::LazyLock;
|
||||
use superstruct::superstruct;
|
||||
use types::historical_summary::HistoricalSummary;
|
||||
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, List, Slot, Validator};
|
||||
use zstd::{Decoder, Encoder};
|
||||
|
||||
static EMPTY_PUBKEY: LazyLock<PublicKeyBytes> = LazyLock::new(PublicKeyBytes::empty);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
InvalidHierarchy,
|
||||
DiffDeletionsNotSupported,
|
||||
UnableToComputeDiff,
|
||||
UnableToApplyDiff,
|
||||
BalancesIncompleteChunk,
|
||||
Compression(std::io::Error),
|
||||
InvalidSszState(ssz::DecodeError),
|
||||
InvalidBalancesLength,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
|
||||
pub struct HierarchyConfig {
|
||||
/// A sequence of powers of two to define how frequently to store each layer of state diffs.
|
||||
/// The last value always represents the frequency of full state snapshots. Adding more
|
||||
/// exponents increases the number of diff layers. This value allows to customize the trade-off
|
||||
/// between reconstruction speed and disk space.
|
||||
///
|
||||
/// Consider an example `exponents value of `[5,13,21]`. This means we have 3 layers:
|
||||
/// - Full state stored every 2^21 slots (2097152 slots or 291 days)
|
||||
/// - First diff layer stored every 2^13 slots (8192 slots or 2.3 hours)
|
||||
/// - Second diff layer stored every 2^5 slots (32 slots or 1 epoch)
|
||||
///
|
||||
/// To reconstruct a state at slot 3,000,003 we load each closest layer
|
||||
/// - Layer 0: 3000003 - (3000003 mod 2^21) = 2097152
|
||||
/// - Layer 1: 3000003 - (3000003 mod 2^13) = 2998272
|
||||
/// - Layer 2: 3000003 - (3000003 mod 2^5) = 3000000
|
||||
///
|
||||
/// Layer 0 is full state snapshot, apply layer 1 diff, then apply layer 2 diff and then replay
|
||||
/// blocks 3,000,001 to 3,000,003.
|
||||
pub exponents: Vec<u8>,
|
||||
}
|
||||
|
||||
impl FromStr for HierarchyConfig {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, String> {
|
||||
let exponents = s
|
||||
.split(',')
|
||||
.map(|s| {
|
||||
s.parse()
|
||||
.map_err(|e| format!("invalid hierarchy-exponents: {e:?}"))
|
||||
})
|
||||
.collect::<Result<Vec<u8>, _>>()?;
|
||||
|
||||
if exponents.windows(2).any(|w| w[0] >= w[1]) {
|
||||
return Err("hierarchy-exponents must be in ascending order".to_string());
|
||||
}
|
||||
|
||||
Ok(HierarchyConfig { exponents })
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for HierarchyConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", self.exponents.iter().join(","))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct HierarchyModuli {
|
||||
moduli: Vec<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum StorageStrategy {
|
||||
ReplayFrom(Slot),
|
||||
DiffFrom(Slot),
|
||||
Snapshot,
|
||||
}
|
||||
|
||||
/// Hierarchical diff output and working buffer.
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct HDiffBuffer {
|
||||
state: Vec<u8>,
|
||||
balances: Vec<u64>,
|
||||
inactivity_scores: Vec<u64>,
|
||||
validators: Vec<Validator>,
|
||||
historical_roots: Vec<Hash256>,
|
||||
historical_summaries: Vec<HistoricalSummary>,
|
||||
}
|
||||
|
||||
/// Hierarchical state diff.
|
||||
///
|
||||
/// Splits the diff into two data sections:
|
||||
///
|
||||
/// - **balances**: The balance of each active validator is almost certain to change every epoch.
|
||||
/// So this is the field in the state with most entropy. However the balance changes are small.
|
||||
/// We can optimize the diff significantly by computing the balance difference first and then
|
||||
/// compressing the result to squash those leading zero bytes.
|
||||
///
|
||||
/// - **everything else**: Instead of trying to apply heuristics and be clever on each field,
|
||||
/// running a generic binary diff algorithm on the rest of fields yields very good results. With
|
||||
/// this strategy the HDiff code is easily mantainable across forks, as new fields are covered
|
||||
/// automatically. xdelta3 algorithm showed diff compute and apply times of ~200 ms on a mainnet
|
||||
/// state from Apr 2023 (570k indexes), and a 92kB diff size.
|
||||
#[superstruct(
|
||||
variants(V0),
|
||||
variant_attributes(derive(Debug, PartialEq, Encode, Decode))
|
||||
)]
|
||||
#[derive(Debug, PartialEq, Encode, Decode)]
|
||||
#[ssz(enum_behaviour = "union")]
|
||||
pub struct HDiff {
|
||||
state_diff: BytesDiff,
|
||||
balances_diff: CompressedU64Diff,
|
||||
/// inactivity_scores are small integers that change slowly epoch to epoch. And are 0 for all
|
||||
/// participants unless there's non-finality. Computing the diff and compressing the result is
|
||||
/// much faster than running them through a binary patch algorithm. In the default case where
|
||||
/// all values are 0 it should also result in a tiny output.
|
||||
inactivity_scores_diff: CompressedU64Diff,
|
||||
/// The validators array represents the vast majority of data in a BeaconState. Due to its big
|
||||
/// size we have seen the performance of xdelta3 degrade. Comparing each entry of the
|
||||
/// validators array manually significantly speeds up the computation of the diff (+10x faster)
|
||||
/// and result in the same minimal diff. As the `Validator` record is unlikely to change,
|
||||
/// maintaining this extra complexity should be okay.
|
||||
validators_diff: ValidatorsDiff,
|
||||
/// `historical_roots` is an unbounded forever growing (after Capella it's
|
||||
/// historical_summaries) list of unique roots. This data is pure entropy so there's no point
|
||||
/// in compressing it. As it's an append only list, the optimal diff + compression is just the
|
||||
/// list of new entries. The size of `historical_roots` and `historical_summaries` in
|
||||
/// non-trivial ~10 MB so throwing it to xdelta3 adds CPU cycles. With a bit of extra complexity
|
||||
/// we can save those completely.
|
||||
historical_roots: AppendOnlyDiff<Hash256>,
|
||||
/// See historical_roots
|
||||
historical_summaries: AppendOnlyDiff<HistoricalSummary>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Encode, Decode)]
|
||||
pub struct BytesDiff {
|
||||
bytes: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Encode, Decode)]
|
||||
pub struct CompressedU64Diff {
|
||||
bytes: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Encode, Decode)]
|
||||
pub struct ValidatorsDiff {
|
||||
bytes: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Encode, Decode)]
|
||||
pub struct AppendOnlyDiff<T: Encode + Decode> {
|
||||
values: Vec<T>,
|
||||
}
|
||||
|
||||
impl HDiffBuffer {
|
||||
pub fn from_state<E: EthSpec>(mut beacon_state: BeaconState<E>) -> Self {
|
||||
let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_FROM_STATE_TIME);
|
||||
// Set state.balances to empty list, and then serialize state as ssz
|
||||
let balances_list = std::mem::take(beacon_state.balances_mut());
|
||||
let inactivity_scores = if let Ok(inactivity_scores) = beacon_state.inactivity_scores_mut()
|
||||
{
|
||||
std::mem::take(inactivity_scores).to_vec()
|
||||
} else {
|
||||
// If this state is pre-altair consider the list empty. If the target state
|
||||
// is post altair, all its items will show up in the diff as is.
|
||||
vec![]
|
||||
};
|
||||
let validators = std::mem::take(beacon_state.validators_mut()).to_vec();
|
||||
let historical_roots = std::mem::take(beacon_state.historical_roots_mut()).to_vec();
|
||||
let historical_summaries =
|
||||
if let Ok(historical_summaries) = beacon_state.historical_summaries_mut() {
|
||||
std::mem::take(historical_summaries).to_vec()
|
||||
} else {
|
||||
// If this state is pre-capella consider the list empty. The diff will
|
||||
// include all items in the target state. If both states are
|
||||
// pre-capella the diff will be empty.
|
||||
vec![]
|
||||
};
|
||||
|
||||
let state = beacon_state.as_ssz_bytes();
|
||||
let balances = balances_list.to_vec();
|
||||
|
||||
HDiffBuffer {
|
||||
state,
|
||||
balances,
|
||||
inactivity_scores,
|
||||
validators,
|
||||
historical_roots,
|
||||
historical_summaries,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_state<E: EthSpec>(&self, spec: &ChainSpec) -> Result<BeaconState<E>, Error> {
|
||||
let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_INTO_STATE_TIME);
|
||||
let mut state =
|
||||
BeaconState::from_ssz_bytes(&self.state, spec).map_err(Error::InvalidSszState)?;
|
||||
|
||||
*state.balances_mut() = List::try_from_iter(self.balances.iter().copied())
|
||||
.map_err(|_| Error::InvalidBalancesLength)?;
|
||||
|
||||
if let Ok(inactivity_scores) = state.inactivity_scores_mut() {
|
||||
*inactivity_scores = List::try_from_iter(self.inactivity_scores.iter().copied())
|
||||
.map_err(|_| Error::InvalidBalancesLength)?;
|
||||
}
|
||||
|
||||
*state.validators_mut() = List::try_from_iter(self.validators.iter().cloned())
|
||||
.map_err(|_| Error::InvalidBalancesLength)?;
|
||||
|
||||
*state.historical_roots_mut() = List::try_from_iter(self.historical_roots.iter().copied())
|
||||
.map_err(|_| Error::InvalidBalancesLength)?;
|
||||
|
||||
if let Ok(historical_summaries) = state.historical_summaries_mut() {
|
||||
*historical_summaries = List::try_from_iter(self.historical_summaries.iter().copied())
|
||||
.map_err(|_| Error::InvalidBalancesLength)?;
|
||||
}
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
/// Byte size of this instance
|
||||
pub fn size(&self) -> usize {
|
||||
self.state.len()
|
||||
+ self.balances.len() * std::mem::size_of::<u64>()
|
||||
+ self.inactivity_scores.len() * std::mem::size_of::<u64>()
|
||||
+ self.validators.len() * std::mem::size_of::<Validator>()
|
||||
+ self.historical_roots.len() * std::mem::size_of::<Hash256>()
|
||||
+ self.historical_summaries.len() * std::mem::size_of::<HistoricalSummary>()
|
||||
}
|
||||
}
|
||||
|
||||
impl HDiff {
|
||||
pub fn compute(
|
||||
source: &HDiffBuffer,
|
||||
target: &HDiffBuffer,
|
||||
config: &StoreConfig,
|
||||
) -> Result<Self, Error> {
|
||||
let state_diff = BytesDiff::compute(&source.state, &target.state)?;
|
||||
let balances_diff = CompressedU64Diff::compute(&source.balances, &target.balances, config)?;
|
||||
let inactivity_scores_diff = CompressedU64Diff::compute(
|
||||
&source.inactivity_scores,
|
||||
&target.inactivity_scores,
|
||||
config,
|
||||
)?;
|
||||
let validators_diff =
|
||||
ValidatorsDiff::compute(&source.validators, &target.validators, config)?;
|
||||
let historical_roots =
|
||||
AppendOnlyDiff::compute(&source.historical_roots, &target.historical_roots)?;
|
||||
let historical_summaries =
|
||||
AppendOnlyDiff::compute(&source.historical_summaries, &target.historical_summaries)?;
|
||||
|
||||
Ok(HDiff::V0(HDiffV0 {
|
||||
state_diff,
|
||||
balances_diff,
|
||||
inactivity_scores_diff,
|
||||
validators_diff,
|
||||
historical_roots,
|
||||
historical_summaries,
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn apply(&self, source: &mut HDiffBuffer, config: &StoreConfig) -> Result<(), Error> {
|
||||
let source_state = std::mem::take(&mut source.state);
|
||||
self.state_diff().apply(&source_state, &mut source.state)?;
|
||||
self.balances_diff().apply(&mut source.balances, config)?;
|
||||
self.inactivity_scores_diff()
|
||||
.apply(&mut source.inactivity_scores, config)?;
|
||||
self.validators_diff()
|
||||
.apply(&mut source.validators, config)?;
|
||||
self.historical_roots().apply(&mut source.historical_roots);
|
||||
self.historical_summaries()
|
||||
.apply(&mut source.historical_summaries);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Byte size of this instance
|
||||
pub fn size(&self) -> usize {
|
||||
self.sizes().iter().sum()
|
||||
}
|
||||
|
||||
pub fn sizes(&self) -> Vec<usize> {
|
||||
vec![
|
||||
self.state_diff().size(),
|
||||
self.balances_diff().size(),
|
||||
self.inactivity_scores_diff().size(),
|
||||
self.validators_diff().size(),
|
||||
self.historical_roots().size(),
|
||||
self.historical_summaries().size(),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
impl StoreItem for HDiff {
|
||||
fn db_column() -> DBColumn {
|
||||
DBColumn::BeaconStateDiff
|
||||
}
|
||||
|
||||
fn as_store_bytes(&self) -> Vec<u8> {
|
||||
self.as_ssz_bytes()
|
||||
}
|
||||
|
||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, crate::Error> {
|
||||
Ok(Self::from_ssz_bytes(bytes)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl BytesDiff {
|
||||
pub fn compute(source: &[u8], target: &[u8]) -> Result<Self, Error> {
|
||||
Self::compute_xdelta(source, target)
|
||||
}
|
||||
|
||||
pub fn compute_xdelta(source_bytes: &[u8], target_bytes: &[u8]) -> Result<Self, Error> {
|
||||
let bytes = xdelta3::encode(target_bytes, source_bytes)
|
||||
.ok_or(Error::UnableToComputeDiff)
|
||||
.unwrap();
|
||||
Ok(Self { bytes })
|
||||
}
|
||||
|
||||
pub fn apply(&self, source: &[u8], target: &mut Vec<u8>) -> Result<(), Error> {
|
||||
self.apply_xdelta(source, target)
|
||||
}
|
||||
|
||||
pub fn apply_xdelta(&self, source: &[u8], target: &mut Vec<u8>) -> Result<(), Error> {
|
||||
*target = xdelta3::decode(&self.bytes, source).ok_or(Error::UnableToApplyDiff)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Byte size of this instance
|
||||
pub fn size(&self) -> usize {
|
||||
self.bytes.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl CompressedU64Diff {
|
||||
pub fn compute(xs: &[u64], ys: &[u64], config: &StoreConfig) -> Result<Self, Error> {
|
||||
if xs.len() > ys.len() {
|
||||
return Err(Error::DiffDeletionsNotSupported);
|
||||
}
|
||||
|
||||
let uncompressed_bytes: Vec<u8> = ys
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(|(i, y)| {
|
||||
// Diff from 0 if the entry is new.
|
||||
let x = xs.get(i).copied().unwrap_or(0);
|
||||
y.wrapping_sub(x).to_be_bytes()
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(CompressedU64Diff {
|
||||
bytes: compress_bytes(&uncompressed_bytes, config)?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn apply(&self, xs: &mut Vec<u64>, config: &StoreConfig) -> Result<(), Error> {
|
||||
// Decompress balances diff.
|
||||
let balances_diff_bytes = uncompress_bytes(&self.bytes, config)?;
|
||||
|
||||
for (i, diff_bytes) in balances_diff_bytes
|
||||
.chunks(u64::BITS as usize / 8)
|
||||
.enumerate()
|
||||
{
|
||||
let diff = diff_bytes
|
||||
.try_into()
|
||||
.map(u64::from_be_bytes)
|
||||
.map_err(|_| Error::BalancesIncompleteChunk)?;
|
||||
|
||||
if let Some(x) = xs.get_mut(i) {
|
||||
*x = x.wrapping_add(diff);
|
||||
} else {
|
||||
xs.push(diff);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Byte size of this instance
|
||||
pub fn size(&self) -> usize {
|
||||
self.bytes.len()
|
||||
}
|
||||
}
|
||||
|
||||
fn compress_bytes(input: &[u8], config: &StoreConfig) -> Result<Vec<u8>, Error> {
|
||||
let compression_level = config.compression_level;
|
||||
let mut out = Vec::with_capacity(config.estimate_compressed_size(input.len()));
|
||||
let mut encoder = Encoder::new(&mut out, compression_level).map_err(Error::Compression)?;
|
||||
encoder.write_all(input).map_err(Error::Compression)?;
|
||||
encoder.finish().map_err(Error::Compression)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn uncompress_bytes(input: &[u8], config: &StoreConfig) -> Result<Vec<u8>, Error> {
|
||||
let mut out = Vec::with_capacity(config.estimate_decompressed_size(input.len()));
|
||||
let mut decoder = Decoder::new(input).map_err(Error::Compression)?;
|
||||
decoder.read_to_end(&mut out).map_err(Error::Compression)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
impl ValidatorsDiff {
|
||||
pub fn compute(
|
||||
xs: &[Validator],
|
||||
ys: &[Validator],
|
||||
config: &StoreConfig,
|
||||
) -> Result<Self, Error> {
|
||||
if xs.len() > ys.len() {
|
||||
return Err(Error::DiffDeletionsNotSupported);
|
||||
}
|
||||
|
||||
let uncompressed_bytes = ys
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(i, y)| {
|
||||
let validator_diff = if let Some(x) = xs.get(i) {
|
||||
if y == x {
|
||||
return None;
|
||||
} else {
|
||||
let pubkey_changed = y.pubkey != x.pubkey;
|
||||
// Note: If researchers attempt to change the Validator container, go quickly to
|
||||
// All Core Devs and push hard to add another List in the BeaconState instead.
|
||||
Validator {
|
||||
// The pubkey can be changed on index re-use
|
||||
pubkey: if pubkey_changed {
|
||||
y.pubkey
|
||||
} else {
|
||||
PublicKeyBytes::empty()
|
||||
},
|
||||
// withdrawal_credentials can be set to zero initially but can never be
|
||||
// changed INTO zero. On index re-use it can be set to zero, but in that
|
||||
// case the pubkey will also change.
|
||||
withdrawal_credentials: if pubkey_changed
|
||||
|| y.withdrawal_credentials != x.withdrawal_credentials
|
||||
{
|
||||
y.withdrawal_credentials
|
||||
} else {
|
||||
Hash256::ZERO
|
||||
},
|
||||
// effective_balance can increase and decrease
|
||||
effective_balance: y.effective_balance - x.effective_balance,
|
||||
// slashed can only change from false into true. In an index re-use it can
|
||||
// switch back to false, but in that case the pubkey will also change.
|
||||
slashed: y.slashed,
|
||||
// activation_eligibility_epoch can never be zero under any case. It's
|
||||
// set to either FAR_FUTURE_EPOCH or get_current_epoch(state) + 1
|
||||
activation_eligibility_epoch: if y.activation_eligibility_epoch
|
||||
!= x.activation_eligibility_epoch
|
||||
{
|
||||
y.activation_eligibility_epoch
|
||||
} else {
|
||||
Epoch::new(0)
|
||||
},
|
||||
// activation_epoch can never be zero under any case. It's
|
||||
// set to either FAR_FUTURE_EPOCH or epoch + 1 + MAX_SEED_LOOKAHEAD
|
||||
activation_epoch: if y.activation_epoch != x.activation_epoch {
|
||||
y.activation_epoch
|
||||
} else {
|
||||
Epoch::new(0)
|
||||
},
|
||||
// exit_epoch can never be zero under any case. It's set to either
|
||||
// FAR_FUTURE_EPOCH or > epoch + 1 + MAX_SEED_LOOKAHEAD
|
||||
exit_epoch: if y.exit_epoch != x.exit_epoch {
|
||||
y.exit_epoch
|
||||
} else {
|
||||
Epoch::new(0)
|
||||
},
|
||||
// withdrawable_epoch can never be zero under any case. It's set to
|
||||
// either FAR_FUTURE_EPOCH or > epoch + 1 + MAX_SEED_LOOKAHEAD
|
||||
withdrawable_epoch: if y.withdrawable_epoch != x.withdrawable_epoch {
|
||||
y.withdrawable_epoch
|
||||
} else {
|
||||
Epoch::new(0)
|
||||
},
|
||||
}
|
||||
}
|
||||
} else {
|
||||
y.clone()
|
||||
};
|
||||
|
||||
Some(ValidatorDiffEntry {
|
||||
index: i as u64,
|
||||
validator_diff,
|
||||
})
|
||||
})
|
||||
.flat_map(|v_diff| v_diff.as_ssz_bytes())
|
||||
.collect::<Vec<u8>>();
|
||||
|
||||
Ok(Self {
|
||||
bytes: compress_bytes(&uncompressed_bytes, config)?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn apply(&self, xs: &mut Vec<Validator>, config: &StoreConfig) -> Result<(), Error> {
|
||||
let validator_diff_bytes = uncompress_bytes(&self.bytes, config)?;
|
||||
|
||||
for diff_bytes in
|
||||
validator_diff_bytes.chunks(<ValidatorDiffEntry as Decode>::ssz_fixed_len())
|
||||
{
|
||||
let ValidatorDiffEntry {
|
||||
index,
|
||||
validator_diff: diff,
|
||||
} = ValidatorDiffEntry::from_ssz_bytes(diff_bytes)
|
||||
.map_err(|_| Error::BalancesIncompleteChunk)?;
|
||||
|
||||
if let Some(x) = xs.get_mut(index as usize) {
|
||||
// Note: a pubkey change implies index re-use. In that case over-write
|
||||
// withdrawal_credentials and slashed inconditionally as their default values
|
||||
// are valid values.
|
||||
let pubkey_changed = diff.pubkey != *EMPTY_PUBKEY;
|
||||
if pubkey_changed {
|
||||
x.pubkey = diff.pubkey;
|
||||
}
|
||||
if pubkey_changed || diff.withdrawal_credentials != Hash256::ZERO {
|
||||
x.withdrawal_credentials = diff.withdrawal_credentials;
|
||||
}
|
||||
if diff.effective_balance != 0 {
|
||||
x.effective_balance = x.effective_balance.wrapping_add(diff.effective_balance);
|
||||
}
|
||||
if pubkey_changed || diff.slashed {
|
||||
x.slashed = diff.slashed;
|
||||
}
|
||||
if diff.activation_eligibility_epoch != Epoch::new(0) {
|
||||
x.activation_eligibility_epoch = diff.activation_eligibility_epoch;
|
||||
}
|
||||
if diff.activation_epoch != Epoch::new(0) {
|
||||
x.activation_epoch = diff.activation_epoch;
|
||||
}
|
||||
if diff.exit_epoch != Epoch::new(0) {
|
||||
x.exit_epoch = diff.exit_epoch;
|
||||
}
|
||||
if diff.withdrawable_epoch != Epoch::new(0) {
|
||||
x.withdrawable_epoch = diff.withdrawable_epoch;
|
||||
}
|
||||
} else {
|
||||
xs.push(diff)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Byte size of this instance
|
||||
pub fn size(&self) -> usize {
|
||||
self.bytes.len()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Encode, Decode)]
|
||||
struct ValidatorDiffEntry {
|
||||
index: u64,
|
||||
validator_diff: Validator,
|
||||
}
|
||||
|
||||
impl<T: Decode + Encode + Copy> AppendOnlyDiff<T> {
|
||||
pub fn compute(xs: &[T], ys: &[T]) -> Result<Self, Error> {
|
||||
match xs.len().cmp(&ys.len()) {
|
||||
Ordering::Less => Ok(Self {
|
||||
values: ys.iter().skip(xs.len()).copied().collect(),
|
||||
}),
|
||||
// Don't even create an iterator for this common case
|
||||
Ordering::Equal => Ok(Self { values: vec![] }),
|
||||
Ordering::Greater => Err(Error::DiffDeletionsNotSupported),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn apply(&self, xs: &mut Vec<T>) {
|
||||
xs.extend(self.values.iter().copied());
|
||||
}
|
||||
|
||||
/// Byte size of this instance
|
||||
pub fn size(&self) -> usize {
|
||||
self.values.len() * size_of::<T>()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for HierarchyConfig {
|
||||
fn default() -> Self {
|
||||
HierarchyConfig {
|
||||
exponents: vec![5, 9, 11, 13, 16, 18, 21],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HierarchyConfig {
|
||||
pub fn to_moduli(&self) -> Result<HierarchyModuli, Error> {
|
||||
self.validate()?;
|
||||
let moduli = self.exponents.iter().map(|n| 1 << n).collect();
|
||||
Ok(HierarchyModuli { moduli })
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<(), Error> {
|
||||
if !self.exponents.is_empty()
|
||||
&& self
|
||||
.exponents
|
||||
.iter()
|
||||
.tuple_windows()
|
||||
.all(|(small, big)| small < big && *big < u64::BITS as u8)
|
||||
{
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::InvalidHierarchy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HierarchyModuli {
|
||||
pub fn storage_strategy(&self, slot: Slot) -> Result<StorageStrategy, Error> {
|
||||
// last = full snapshot interval
|
||||
let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?;
|
||||
// first = most frequent diff layer, need to replay blocks from this layer
|
||||
let first = self
|
||||
.moduli
|
||||
.first()
|
||||
.copied()
|
||||
.ok_or(Error::InvalidHierarchy)?;
|
||||
|
||||
if slot % last == 0 {
|
||||
return Ok(StorageStrategy::Snapshot);
|
||||
}
|
||||
|
||||
Ok(self
|
||||
.moduli
|
||||
.iter()
|
||||
.rev()
|
||||
.tuple_windows()
|
||||
.find_map(|(&n_big, &n_small)| {
|
||||
if slot % n_small == 0 {
|
||||
// Diff from the previous layer.
|
||||
Some(StorageStrategy::DiffFrom(slot / n_big * n_big))
|
||||
} else {
|
||||
// Keep trying with next layer
|
||||
None
|
||||
}
|
||||
})
|
||||
// Exhausted layers, need to replay from most frequent layer
|
||||
.unwrap_or(StorageStrategy::ReplayFrom(slot / first * first)))
|
||||
}
|
||||
|
||||
/// Return the smallest slot greater than or equal to `slot` at which a full snapshot should
|
||||
/// be stored.
|
||||
pub fn next_snapshot_slot(&self, slot: Slot) -> Result<Slot, Error> {
|
||||
let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?;
|
||||
if slot % last == 0 {
|
||||
Ok(slot)
|
||||
} else {
|
||||
Ok((slot / last + 1) * last)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return `true` if the database ops for this slot should be committed immediately.
|
||||
///
|
||||
/// This is the case for all diffs aside from the ones in the leaf layer. To store a diff
|
||||
/// might require loading the state at the previous layer, in which case the diff for that
|
||||
/// layer must already have been stored.
|
||||
///
|
||||
/// In future we may be able to handle this differently (with proper transaction semantics
|
||||
/// rather than LevelDB's "write batches").
|
||||
pub fn should_commit_immediately(&self, slot: Slot) -> Result<bool, Error> {
|
||||
// If there's only 1 layer of snapshots, then commit only when writing a snapshot.
|
||||
self.moduli.get(1).map_or_else(
|
||||
|| Ok(slot == self.next_snapshot_slot(slot)?),
|
||||
|second_layer_moduli| Ok(slot % *second_layer_moduli == 0),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl StorageStrategy {
|
||||
/// For the state stored with this `StorageStrategy` at `slot`, return the range of slots which
|
||||
/// should be checked for ancestor states in the historic state cache.
|
||||
///
|
||||
/// The idea is that for states which need to be built by replaying blocks we should scan
|
||||
/// for any viable ancestor state between their `from` slot and `slot`. If we find such a
|
||||
/// state it will save us from the slow reconstruction of the `from` state using diffs.
|
||||
///
|
||||
/// Similarly for `DiffFrom` and `Snapshot` states, loading the prior state and replaying 1
|
||||
/// block is often going to be faster than loading and applying diffs/snapshots, so we may as
|
||||
/// well check the cache for that 1 slot prior (in case the caller is iterating sequentially).
|
||||
pub fn replay_from_range(
|
||||
&self,
|
||||
slot: Slot,
|
||||
) -> std::iter::Map<RangeInclusive<u64>, fn(u64) -> Slot> {
|
||||
match self {
|
||||
Self::ReplayFrom(from) => from.as_u64()..=slot.as_u64(),
|
||||
Self::Snapshot | Self::DiffFrom(_) => {
|
||||
if slot > 0 {
|
||||
(slot - 1).as_u64()..=slot.as_u64()
|
||||
} else {
|
||||
slot.as_u64()..=slot.as_u64()
|
||||
}
|
||||
}
|
||||
}
|
||||
.map(Slot::from)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
|
||||
|
||||
#[test]
|
||||
fn default_storage_strategy() {
|
||||
let config = HierarchyConfig::default();
|
||||
config.validate().unwrap();
|
||||
|
||||
let moduli = config.to_moduli().unwrap();
|
||||
|
||||
// Full snapshots at multiples of 2^21.
|
||||
let snapshot_freq = Slot::new(1 << 21);
|
||||
assert_eq!(
|
||||
moduli.storage_strategy(Slot::new(0)).unwrap(),
|
||||
StorageStrategy::Snapshot
|
||||
);
|
||||
assert_eq!(
|
||||
moduli.storage_strategy(snapshot_freq).unwrap(),
|
||||
StorageStrategy::Snapshot
|
||||
);
|
||||
assert_eq!(
|
||||
moduli.storage_strategy(snapshot_freq * 3).unwrap(),
|
||||
StorageStrategy::Snapshot
|
||||
);
|
||||
|
||||
// Diffs should be from the previous layer (the snapshot in this case), and not the previous diff in the same layer.
|
||||
let first_layer = Slot::new(1 << 18);
|
||||
assert_eq!(
|
||||
moduli.storage_strategy(first_layer * 2).unwrap(),
|
||||
StorageStrategy::DiffFrom(Slot::new(0))
|
||||
);
|
||||
|
||||
let replay_strategy_slot = first_layer + 1;
|
||||
assert_eq!(
|
||||
moduli.storage_strategy(replay_strategy_slot).unwrap(),
|
||||
StorageStrategy::ReplayFrom(first_layer)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_snapshot_slot() {
|
||||
let config = HierarchyConfig::default();
|
||||
config.validate().unwrap();
|
||||
|
||||
let moduli = config.to_moduli().unwrap();
|
||||
let snapshot_freq = Slot::new(1 << 21);
|
||||
|
||||
assert_eq!(
|
||||
moduli.next_snapshot_slot(snapshot_freq).unwrap(),
|
||||
snapshot_freq
|
||||
);
|
||||
assert_eq!(
|
||||
moduli.next_snapshot_slot(snapshot_freq + 1).unwrap(),
|
||||
snapshot_freq * 2
|
||||
);
|
||||
assert_eq!(
|
||||
moduli.next_snapshot_slot(snapshot_freq * 2 - 1).unwrap(),
|
||||
snapshot_freq * 2
|
||||
);
|
||||
assert_eq!(
|
||||
moduli.next_snapshot_slot(snapshot_freq * 2).unwrap(),
|
||||
snapshot_freq * 2
|
||||
);
|
||||
assert_eq!(
|
||||
moduli.next_snapshot_slot(snapshot_freq * 100).unwrap(),
|
||||
snapshot_freq * 100
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compressed_u64_vs_bytes_diff() {
|
||||
let x_values = vec![99u64, 55, 123, 6834857, 0, 12];
|
||||
let y_values = vec![98u64, 55, 312, 1, 1, 2, 4, 5];
|
||||
let config = &StoreConfig::default();
|
||||
|
||||
let to_bytes =
|
||||
|nums: &[u64]| -> Vec<u8> { nums.iter().flat_map(|x| x.to_be_bytes()).collect() };
|
||||
|
||||
let x_bytes = to_bytes(&x_values);
|
||||
let y_bytes = to_bytes(&y_values);
|
||||
|
||||
let u64_diff = CompressedU64Diff::compute(&x_values, &y_values, config).unwrap();
|
||||
|
||||
let mut y_from_u64_diff = x_values;
|
||||
u64_diff.apply(&mut y_from_u64_diff, config).unwrap();
|
||||
|
||||
assert_eq!(y_values, y_from_u64_diff);
|
||||
|
||||
let bytes_diff = BytesDiff::compute(&x_bytes, &y_bytes).unwrap();
|
||||
|
||||
let mut y_from_bytes = vec![];
|
||||
bytes_diff.apply(&x_bytes, &mut y_from_bytes).unwrap();
|
||||
|
||||
assert_eq!(y_bytes, y_from_bytes);
|
||||
|
||||
// U64 diff wins by more than a factor of 3
|
||||
assert!(u64_diff.bytes.len() < 3 * bytes_diff.bytes.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compressed_validators_diff() {
|
||||
assert_eq!(<ValidatorDiffEntry as Decode>::ssz_fixed_len(), 129);
|
||||
|
||||
let mut rng = thread_rng();
|
||||
let config = &StoreConfig::default();
|
||||
let xs = (0..10)
|
||||
.map(|_| rand_validator(&mut rng))
|
||||
.collect::<Vec<_>>();
|
||||
let mut ys = xs.clone();
|
||||
ys[5] = rand_validator(&mut rng);
|
||||
ys.push(rand_validator(&mut rng));
|
||||
let diff = ValidatorsDiff::compute(&xs, &ys, config).unwrap();
|
||||
|
||||
let mut xs_out = xs.clone();
|
||||
diff.apply(&mut xs_out, config).unwrap();
|
||||
assert_eq!(xs_out, ys);
|
||||
}
|
||||
|
||||
fn rand_validator(mut rng: impl Rng) -> Validator {
|
||||
let mut pubkey = [0u8; 48];
|
||||
rng.fill_bytes(&mut pubkey);
|
||||
let withdrawal_credentials: [u8; 32] = rng.gen();
|
||||
|
||||
Validator {
|
||||
pubkey: PublicKeyBytes::from_ssz_bytes(&pubkey).unwrap(),
|
||||
withdrawal_credentials: withdrawal_credentials.into(),
|
||||
slashed: false,
|
||||
effective_balance: 32_000_000_000,
|
||||
activation_eligibility_epoch: Epoch::max_value(),
|
||||
activation_epoch: Epoch::max_value(),
|
||||
exit_epoch: Epoch::max_value(),
|
||||
withdrawable_epoch: Epoch::max_value(),
|
||||
}
|
||||
}
|
||||
|
||||
// This test checks that the hdiff algorithm doesn't accidentally change between releases.
|
||||
// If it does, we need to ensure appropriate backwards compatibility measures are implemented
|
||||
// before this test is updated.
|
||||
#[test]
|
||||
fn hdiff_version_stability() {
|
||||
let mut rng = SmallRng::seed_from_u64(0xffeeccdd00aa);
|
||||
|
||||
let pre_balances = vec![32_000_000_000, 16_000_000_000, 0];
|
||||
let post_balances = vec![31_000_000_000, 17_000_000, 0, 0];
|
||||
|
||||
let pre_inactivity_scores = vec![1, 1, 1];
|
||||
let post_inactivity_scores = vec![0, 0, 0, 1];
|
||||
|
||||
let pre_validators = (0..3).map(|_| rand_validator(&mut rng)).collect::<Vec<_>>();
|
||||
let post_validators = pre_validators.clone();
|
||||
|
||||
let pre_historical_roots = vec![Hash256::repeat_byte(0xff)];
|
||||
let post_historical_roots = vec![Hash256::repeat_byte(0xff), Hash256::repeat_byte(0xee)];
|
||||
|
||||
let pre_historical_summaries = vec![HistoricalSummary::default()];
|
||||
let post_historical_summaries = pre_historical_summaries.clone();
|
||||
|
||||
let pre_buffer = HDiffBuffer {
|
||||
state: vec![0, 1, 2, 3, 3, 2, 1, 0],
|
||||
balances: pre_balances,
|
||||
inactivity_scores: pre_inactivity_scores,
|
||||
validators: pre_validators,
|
||||
historical_roots: pre_historical_roots,
|
||||
historical_summaries: pre_historical_summaries,
|
||||
};
|
||||
let post_buffer = HDiffBuffer {
|
||||
state: vec![0, 1, 3, 2, 2, 3, 1, 1],
|
||||
balances: post_balances,
|
||||
inactivity_scores: post_inactivity_scores,
|
||||
validators: post_validators,
|
||||
historical_roots: post_historical_roots,
|
||||
historical_summaries: post_historical_summaries,
|
||||
};
|
||||
|
||||
let config = StoreConfig::default();
|
||||
let hdiff = HDiff::compute(&pre_buffer, &post_buffer, &config).unwrap();
|
||||
let hdiff_ssz = hdiff.as_ssz_bytes();
|
||||
|
||||
// First byte should match enum version.
|
||||
assert_eq!(hdiff_ssz[0], 0);
|
||||
|
||||
// Should roundtrip.
|
||||
assert_eq!(HDiff::from_ssz_bytes(&hdiff_ssz).unwrap(), hdiff);
|
||||
|
||||
// Should roundtrip as V0 with enum selector stripped.
|
||||
assert_eq!(
|
||||
HDiff::V0(HDiffV0::from_ssz_bytes(&hdiff_ssz[1..]).unwrap()),
|
||||
hdiff
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
hdiff_ssz,
|
||||
vec![
|
||||
0u8, 24, 0, 0, 0, 49, 0, 0, 0, 85, 0, 0, 0, 114, 0, 0, 0, 127, 0, 0, 0, 163, 0, 0,
|
||||
0, 4, 0, 0, 0, 214, 195, 196, 0, 0, 0, 14, 8, 0, 8, 1, 0, 0, 1, 3, 2, 2, 3, 1, 1,
|
||||
9, 4, 0, 0, 0, 40, 181, 47, 253, 0, 72, 189, 0, 0, 136, 255, 255, 255, 255, 196,
|
||||
101, 54, 0, 255, 255, 255, 252, 71, 86, 198, 64, 0, 1, 0, 59, 176, 4, 4, 0, 0, 0,
|
||||
40, 181, 47, 253, 0, 72, 133, 0, 0, 80, 255, 255, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 10,
|
||||
192, 2, 4, 0, 0, 0, 40, 181, 47, 253, 32, 0, 1, 0, 0, 4, 0, 0, 0, 238, 238, 238,
|
||||
238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238,
|
||||
238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 238, 4, 0, 0, 0
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
92
beacon_node/store/src/historic_state_cache.rs
Normal file
92
beacon_node/store/src/historic_state_cache.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use crate::hdiff::{Error, HDiffBuffer};
|
||||
use crate::metrics;
|
||||
use lru::LruCache;
|
||||
use std::num::NonZeroUsize;
|
||||
use types::{BeaconState, ChainSpec, EthSpec, Slot};
|
||||
|
||||
/// Holds a combination of finalized states in two formats:
|
||||
/// - `hdiff_buffers`: Format close to an SSZ serialized state for rapid application of diffs on top
|
||||
/// of it
|
||||
/// - `states`: Deserialized states for direct use or for rapid application of blocks (replay)
|
||||
///
|
||||
/// An example use: when requesting state data for consecutive slots, this cache allows the node to
|
||||
/// apply diffs once on the first request, and latter just apply blocks one at a time.
|
||||
#[derive(Debug)]
|
||||
pub struct HistoricStateCache<E: EthSpec> {
|
||||
hdiff_buffers: LruCache<Slot, HDiffBuffer>,
|
||||
states: LruCache<Slot, BeaconState<E>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Metrics {
|
||||
pub num_hdiff: usize,
|
||||
pub num_state: usize,
|
||||
pub hdiff_byte_size: usize,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> HistoricStateCache<E> {
|
||||
pub fn new(hdiff_buffer_cache_size: NonZeroUsize, state_cache_size: NonZeroUsize) -> Self {
|
||||
Self {
|
||||
hdiff_buffers: LruCache::new(hdiff_buffer_cache_size),
|
||||
states: LruCache::new(state_cache_size),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_hdiff_buffer(&mut self, slot: Slot) -> Option<HDiffBuffer> {
|
||||
if let Some(buffer_ref) = self.hdiff_buffers.get(&slot) {
|
||||
let _timer = metrics::start_timer(&metrics::BEACON_HDIFF_BUFFER_CLONE_TIMES);
|
||||
Some(buffer_ref.clone())
|
||||
} else if let Some(state) = self.states.get(&slot) {
|
||||
let buffer = HDiffBuffer::from_state(state.clone());
|
||||
let _timer = metrics::start_timer(&metrics::BEACON_HDIFF_BUFFER_CLONE_TIMES);
|
||||
let cloned = buffer.clone();
|
||||
drop(_timer);
|
||||
self.hdiff_buffers.put(slot, cloned);
|
||||
Some(buffer)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_state(
|
||||
&mut self,
|
||||
slot: Slot,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<BeaconState<E>>, Error> {
|
||||
if let Some(state) = self.states.get(&slot) {
|
||||
Ok(Some(state.clone()))
|
||||
} else if let Some(buffer) = self.hdiff_buffers.get(&slot) {
|
||||
let state = buffer.as_state(spec)?;
|
||||
self.states.put(slot, state.clone());
|
||||
Ok(Some(state))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_state(&mut self, slot: Slot, state: BeaconState<E>) {
|
||||
self.states.put(slot, state);
|
||||
}
|
||||
|
||||
pub fn put_hdiff_buffer(&mut self, slot: Slot, buffer: HDiffBuffer) {
|
||||
self.hdiff_buffers.put(slot, buffer);
|
||||
}
|
||||
|
||||
pub fn put_both(&mut self, slot: Slot, state: BeaconState<E>, buffer: HDiffBuffer) {
|
||||
self.put_state(slot, state);
|
||||
self.put_hdiff_buffer(slot, buffer);
|
||||
}
|
||||
|
||||
pub fn metrics(&self) -> Metrics {
|
||||
let hdiff_byte_size = self
|
||||
.hdiff_buffers
|
||||
.iter()
|
||||
.map(|(_, buffer)| buffer.size())
|
||||
.sum::<usize>();
|
||||
Metrics {
|
||||
num_hdiff: self.hdiff_buffers.len(),
|
||||
num_state: self.states.len(),
|
||||
hdiff_byte_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -7,7 +7,6 @@
|
||||
//!
|
||||
//! Provides a simple API for storing/retrieving all types that sometimes needs type-hints. See
|
||||
//! tests for implementation examples.
|
||||
mod chunk_writer;
|
||||
pub mod chunked_iter;
|
||||
pub mod chunked_vector;
|
||||
pub mod config;
|
||||
@@ -15,25 +14,25 @@ pub mod consensus_context;
|
||||
pub mod errors;
|
||||
mod forwards_iter;
|
||||
mod garbage_collection;
|
||||
pub mod hdiff;
|
||||
pub mod historic_state_cache;
|
||||
pub mod hot_cold_store;
|
||||
mod impls;
|
||||
mod leveldb_store;
|
||||
mod memory_store;
|
||||
pub mod metadata;
|
||||
pub mod metrics;
|
||||
mod partial_beacon_state;
|
||||
pub mod partial_beacon_state;
|
||||
pub mod reconstruct;
|
||||
pub mod state_cache;
|
||||
|
||||
pub mod iter;
|
||||
|
||||
pub use self::chunk_writer::ChunkWriter;
|
||||
pub use self::config::StoreConfig;
|
||||
pub use self::consensus_context::OnDiskConsensusContext;
|
||||
pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split};
|
||||
pub use self::leveldb_store::LevelDB;
|
||||
pub use self::memory_store::MemoryStore;
|
||||
pub use self::partial_beacon_state::PartialBeaconState;
|
||||
pub use crate::metadata::BlobInfo;
|
||||
pub use errors::Error;
|
||||
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
|
||||
@@ -251,6 +250,11 @@ pub enum DBColumn {
|
||||
/// For data related to the database itself.
|
||||
#[strum(serialize = "bma")]
|
||||
BeaconMeta,
|
||||
/// Data related to blocks.
|
||||
///
|
||||
/// - Key: `Hash256` block root.
|
||||
/// - Value in hot DB: SSZ-encoded blinded block.
|
||||
/// - Value in cold DB: 8-byte slot of block.
|
||||
#[strum(serialize = "blk")]
|
||||
BeaconBlock,
|
||||
#[strum(serialize = "blb")]
|
||||
@@ -260,9 +264,21 @@ pub enum DBColumn {
|
||||
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
|
||||
#[strum(serialize = "ste")]
|
||||
BeaconState,
|
||||
/// For the mapping from state roots to their slots or summaries.
|
||||
/// For beacon state snapshots in the freezer DB.
|
||||
#[strum(serialize = "bsn")]
|
||||
BeaconStateSnapshot,
|
||||
/// For compact `BeaconStateDiff`s in the freezer DB.
|
||||
#[strum(serialize = "bsd")]
|
||||
BeaconStateDiff,
|
||||
/// Mapping from state root to `HotStateSummary` in the hot DB.
|
||||
///
|
||||
/// Previously this column also served a role in the freezer DB, mapping state roots to
|
||||
/// `ColdStateSummary`. However that role is now filled by `BeaconColdStateSummary`.
|
||||
#[strum(serialize = "bss")]
|
||||
BeaconStateSummary,
|
||||
/// Mapping from state root to `ColdStateSummary` in the cold DB.
|
||||
#[strum(serialize = "bcs")]
|
||||
BeaconColdStateSummary,
|
||||
/// For the list of temporary states stored during block import,
|
||||
/// and then made non-temporary by the deletion of their state root from this column.
|
||||
#[strum(serialize = "bst")]
|
||||
@@ -281,15 +297,37 @@ pub enum DBColumn {
|
||||
ForkChoice,
|
||||
#[strum(serialize = "pkc")]
|
||||
PubkeyCache,
|
||||
/// For the table mapping restore point numbers to state roots.
|
||||
/// For the legacy table mapping restore point numbers to state roots.
|
||||
///
|
||||
/// DEPRECATED. Can be removed once schema v22 is buried by a hard fork.
|
||||
#[strum(serialize = "brp")]
|
||||
BeaconRestorePoint,
|
||||
#[strum(serialize = "bbr")]
|
||||
BeaconBlockRoots,
|
||||
#[strum(serialize = "bsr")]
|
||||
/// Mapping from slot to beacon state root in the freezer DB.
|
||||
///
|
||||
/// This new column was created to replace the previous `bsr` column. The replacement was
|
||||
/// necessary to guarantee atomicity of the upgrade migration.
|
||||
#[strum(serialize = "bsx")]
|
||||
BeaconStateRoots,
|
||||
/// DEPRECATED. This is the previous column for beacon state roots stored by "chunk index".
|
||||
///
|
||||
/// Can be removed once schema v22 is buried by a hard fork.
|
||||
#[strum(serialize = "bsr")]
|
||||
BeaconStateRootsChunked,
|
||||
/// Mapping from slot to beacon block root in the freezer DB.
|
||||
///
|
||||
/// This new column was created to replace the previous `bbr` column. The replacement was
|
||||
/// necessary to guarantee atomicity of the upgrade migration.
|
||||
#[strum(serialize = "bbx")]
|
||||
BeaconBlockRoots,
|
||||
/// DEPRECATED. This is the previous column for beacon block roots stored by "chunk index".
|
||||
///
|
||||
/// Can be removed once schema v22 is buried by a hard fork.
|
||||
#[strum(serialize = "bbr")]
|
||||
BeaconBlockRootsChunked,
|
||||
/// DEPRECATED. Can be removed once schema v22 is buried by a hard fork.
|
||||
#[strum(serialize = "bhr")]
|
||||
BeaconHistoricalRoots,
|
||||
/// DEPRECATED. Can be removed once schema v22 is buried by a hard fork.
|
||||
#[strum(serialize = "brm")]
|
||||
BeaconRandaoMixes,
|
||||
#[strum(serialize = "dht")]
|
||||
@@ -297,6 +335,7 @@ pub enum DBColumn {
|
||||
/// For Optimistically Imported Merge Transition Blocks
|
||||
#[strum(serialize = "otb")]
|
||||
OptimisticTransitionBlock,
|
||||
/// DEPRECATED. Can be removed once schema v22 is buried by a hard fork.
|
||||
#[strum(serialize = "bhs")]
|
||||
BeaconHistoricalSummaries,
|
||||
#[strum(serialize = "olc")]
|
||||
@@ -338,6 +377,7 @@ impl DBColumn {
|
||||
| Self::BeaconState
|
||||
| Self::BeaconBlob
|
||||
| Self::BeaconStateSummary
|
||||
| Self::BeaconColdStateSummary
|
||||
| Self::BeaconStateTemporary
|
||||
| Self::ExecPayload
|
||||
| Self::BeaconChain
|
||||
@@ -349,10 +389,14 @@ impl DBColumn {
|
||||
| Self::DhtEnrs
|
||||
| Self::OptimisticTransitionBlock => 32,
|
||||
Self::BeaconBlockRoots
|
||||
| Self::BeaconBlockRootsChunked
|
||||
| Self::BeaconStateRoots
|
||||
| Self::BeaconStateRootsChunked
|
||||
| Self::BeaconHistoricalRoots
|
||||
| Self::BeaconHistoricalSummaries
|
||||
| Self::BeaconRandaoMixes
|
||||
| Self::BeaconStateSnapshot
|
||||
| Self::BeaconStateDiff
|
||||
| Self::SyncCommittee
|
||||
| Self::SyncCommitteeBranch
|
||||
| Self::LightClientUpdate => 8,
|
||||
|
||||
@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use types::{Checkpoint, Hash256, Slot};
|
||||
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(21);
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(22);
|
||||
|
||||
// All the keys that get stored under the `BeaconMeta` column.
|
||||
//
|
||||
@@ -21,6 +21,27 @@ pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7);
|
||||
/// State upper limit value used to indicate that a node is not storing historic states.
|
||||
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
|
||||
|
||||
/// The `AnchorInfo` encoding full availability of all historic blocks & states.
|
||||
pub const ANCHOR_FOR_ARCHIVE_NODE: AnchorInfo = AnchorInfo {
|
||||
anchor_slot: Slot::new(0),
|
||||
oldest_block_slot: Slot::new(0),
|
||||
oldest_block_parent: Hash256::ZERO,
|
||||
state_upper_limit: Slot::new(0),
|
||||
state_lower_limit: Slot::new(0),
|
||||
};
|
||||
|
||||
/// The `AnchorInfo` encoding an uninitialized anchor.
|
||||
///
|
||||
/// This value should never exist except on initial start-up prior to the anchor being initialised
|
||||
/// by `init_anchor_info`.
|
||||
pub const ANCHOR_UNINITIALIZED: AnchorInfo = AnchorInfo {
|
||||
anchor_slot: Slot::new(u64::MAX),
|
||||
oldest_block_slot: Slot::new(u64::MAX),
|
||||
oldest_block_parent: Hash256::ZERO,
|
||||
state_upper_limit: Slot::new(u64::MAX),
|
||||
state_lower_limit: Slot::new(0),
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct SchemaVersion(pub u64);
|
||||
|
||||
@@ -88,17 +109,47 @@ impl StoreItem for CompactionTimestamp {
|
||||
/// Database parameters relevant to weak subjectivity sync.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)]
|
||||
pub struct AnchorInfo {
|
||||
/// The slot at which the anchor state is present and which we cannot revert.
|
||||
/// The slot at which the anchor state is present and which we cannot revert. Values on start:
|
||||
/// - Genesis start: 0
|
||||
/// - Checkpoint sync: Slot of the finalized checkpoint block
|
||||
///
|
||||
/// Immutable
|
||||
pub anchor_slot: Slot,
|
||||
/// The slot from which historical blocks are available (>=).
|
||||
/// All blocks with slots greater than or equal to this value are available in the database.
|
||||
/// Additionally, the genesis block is always available.
|
||||
///
|
||||
/// Values on start:
|
||||
/// - Genesis start: 0
|
||||
/// - Checkpoint sync: Slot of the finalized checkpoint block
|
||||
///
|
||||
/// Progressively decreases during backfill sync until reaching 0.
|
||||
pub oldest_block_slot: Slot,
|
||||
/// The block root of the next block that needs to be added to fill in the history.
|
||||
///
|
||||
/// Zero if we know all blocks back to genesis.
|
||||
pub oldest_block_parent: Hash256,
|
||||
/// The slot from which historical states are available (>=).
|
||||
/// All states with slots _greater than or equal to_ `min(split.slot, state_upper_limit)` are
|
||||
/// available in the database. If `state_upper_limit` is higher than `split.slot`, states are
|
||||
/// not being written to the freezer database.
|
||||
///
|
||||
/// Values on start if state reconstruction is enabled:
|
||||
/// - Genesis start: 0
|
||||
/// - Checkpoint sync: Slot of the next scheduled snapshot
|
||||
///
|
||||
/// Value on start if state reconstruction is disabled:
|
||||
/// - 2^64 - 1 representing no historic state storage.
|
||||
///
|
||||
/// Immutable until state reconstruction completes.
|
||||
pub state_upper_limit: Slot,
|
||||
/// The slot before which historical states are available (<=).
|
||||
/// All states with slots _less than or equal to_ this value are available in the database.
|
||||
/// The minimum value is 0, indicating that the genesis state is always available.
|
||||
///
|
||||
/// Values on start:
|
||||
/// - Genesis start: 0
|
||||
/// - Checkpoint sync: 0
|
||||
///
|
||||
/// When full block backfill completes (`oldest_block_slot == 0`) state reconstruction starts and
|
||||
/// this value will progressively increase until reaching `state_upper_limit`.
|
||||
pub state_lower_limit: Slot,
|
||||
}
|
||||
|
||||
@@ -109,6 +160,21 @@ impl AnchorInfo {
|
||||
pub fn block_backfill_complete(&self, target_slot: Slot) -> bool {
|
||||
self.oldest_block_slot <= target_slot
|
||||
}
|
||||
|
||||
/// Return true if all historic states are stored, i.e. if state reconstruction is complete.
|
||||
pub fn all_historic_states_stored(&self) -> bool {
|
||||
self.state_lower_limit == self.state_upper_limit
|
||||
}
|
||||
|
||||
/// Return true if no historic states other than genesis are stored in the database.
|
||||
pub fn no_historic_states_stored(&self, split_slot: Slot) -> bool {
|
||||
self.state_lower_limit == 0 && self.state_upper_limit >= split_slot
|
||||
}
|
||||
|
||||
/// Return true if no historic states other than genesis *will ever be stored*.
|
||||
pub fn full_state_pruning_enabled(&self) -> bool {
|
||||
self.state_lower_limit == 0 && self.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN
|
||||
}
|
||||
}
|
||||
|
||||
impl StoreItem for AnchorInfo {
|
||||
|
||||
@@ -73,6 +73,27 @@ pub static DISK_DB_DELETE_COUNT: LazyLock<Result<IntCounterVec>> = LazyLock::new
|
||||
&["col"],
|
||||
)
|
||||
});
|
||||
/*
|
||||
* Anchor Info
|
||||
*/
|
||||
pub static STORE_BEACON_ANCHOR_SLOT: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_anchor_slot",
|
||||
"Current anchor info anchor_slot value",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_OLDEST_BLOCK_SLOT: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_oldest_block_slot",
|
||||
"Current anchor info oldest_block_slot value",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_STATE_LOWER_LIMIT: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_state_lower_limit",
|
||||
"Current anchor info state_lower_limit value",
|
||||
)
|
||||
});
|
||||
/*
|
||||
* Beacon State
|
||||
*/
|
||||
@@ -130,6 +151,24 @@ pub static BEACON_STATE_WRITE_BYTES: LazyLock<Result<IntCounter>> = LazyLock::ne
|
||||
"Total number of beacon state bytes written to the DB",
|
||||
)
|
||||
});
|
||||
pub static BEACON_HDIFF_READ_TIMES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_hdiff_read_seconds",
|
||||
"Time required to read the hierarchical diff bytes from the database",
|
||||
)
|
||||
});
|
||||
pub static BEACON_HDIFF_DECODE_TIMES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_hdiff_decode_seconds",
|
||||
"Time required to decode hierarchical diff bytes",
|
||||
)
|
||||
});
|
||||
pub static BEACON_HDIFF_BUFFER_CLONE_TIMES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_hdiff_buffer_clone_seconds",
|
||||
"Time required to clone hierarchical diff buffer bytes",
|
||||
)
|
||||
});
|
||||
/*
|
||||
* Beacon Block
|
||||
*/
|
||||
@@ -145,12 +184,181 @@ pub static BEACON_BLOCK_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> = LazyLock
|
||||
"Number of hits to the store's block cache",
|
||||
)
|
||||
});
|
||||
|
||||
/*
|
||||
* Caches
|
||||
*/
|
||||
pub static BEACON_BLOBS_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"store_beacon_blobs_cache_hit_total",
|
||||
"Number of hits to the store's blob cache",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_BLOCK_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_block_cache_size",
|
||||
"Current count of items in beacon store block cache",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_BLOB_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_blob_cache_size",
|
||||
"Current count of items in beacon store blob cache",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_STATE_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_state_cache_size",
|
||||
"Current count of items in beacon store state cache",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HISTORIC_STATE_CACHE_SIZE: LazyLock<Result<IntGauge>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_historic_state_cache_size",
|
||||
"Current count of states in the historic state cache",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_hdiff_buffer_cache_size",
|
||||
"Current count of hdiff buffers in the historic state cache",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_CACHE_BYTE_SIZE: LazyLock<Result<IntGauge>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"store_beacon_hdiff_buffer_cache_byte_size",
|
||||
"Memory consumed by hdiff buffers in the historic state cache",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_STATE_FREEZER_COMPRESS_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_state_compress_seconds",
|
||||
"Time taken to compress a state snapshot for the freezer DB",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_STATE_FREEZER_DECOMPRESS_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_state_decompress_seconds",
|
||||
"Time taken to decompress a state snapshot for the freezer DB",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_APPLY_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_hdiff_buffer_apply_seconds",
|
||||
"Time taken to apply hdiff buffer to a state buffer",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_COMPUTE_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_hdiff_buffer_compute_seconds",
|
||||
"Time taken to compute hdiff buffer to a state buffer",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_LOAD_TIME: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_hdiff_buffer_load_seconds",
|
||||
"Time taken to load an hdiff buffer",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_LOAD_FOR_STORE_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_hdiff_buffer_load_for_store_seconds",
|
||||
"Time taken to load an hdiff buffer to store another hdiff",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HISTORIC_STATE_CACHE_HIT: LazyLock<Result<IntCounter>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"store_beacon_historic_state_cache_hit_total",
|
||||
"Total count of historic state cache hits for full states",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HISTORIC_STATE_CACHE_MISS: LazyLock<Result<IntCounter>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"store_beacon_historic_state_cache_miss_total",
|
||||
"Total count of historic state cache misses for full states",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_CACHE_HIT: LazyLock<Result<IntCounter>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"store_beacon_hdiff_buffer_cache_hit_total",
|
||||
"Total count of hdiff buffer cache hits",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_CACHE_MISS: LazyLock<Result<IntCounter>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"store_beacon_hdiff_buffer_cache_miss_total",
|
||||
"Total count of hdiff buffer cache miss",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_INTO_STATE_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_hdiff_buffer_into_state_seconds",
|
||||
"Time taken to recreate a BeaconState from an hdiff buffer",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_HDIFF_BUFFER_FROM_STATE_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_hdiff_buffer_from_state_seconds",
|
||||
"Time taken to create an hdiff buffer from a BeaconState",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_REPLAYED_BLOCKS: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"store_beacon_replayed_blocks_total",
|
||||
"Total count of replayed blocks",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_LOAD_COLD_BLOCKS_TIME: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_load_cold_blocks_time",
|
||||
"Time spent loading blocks to replay for historic states",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_LOAD_HOT_BLOCKS_TIME: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_load_hot_blocks_time",
|
||||
"Time spent loading blocks to replay for hot states",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_REPLAY_COLD_BLOCKS_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_replay_cold_blocks_time",
|
||||
"Time spent replaying blocks for historic states",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_COLD_BUILD_BEACON_CACHES_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_cold_build_beacon_caches_time",
|
||||
"Time spent building caches on historic states",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_REPLAY_HOT_BLOCKS_TIME: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_replay_hot_blocks_time",
|
||||
"Time spent replaying blocks for hot states",
|
||||
)
|
||||
});
|
||||
pub static STORE_BEACON_RECONSTRUCTION_TIME: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"store_beacon_reconstruction_time_seconds",
|
||||
"Time taken to run a reconstruct historic states batch",
|
||||
)
|
||||
});
|
||||
pub static BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
|
||||
@@ -1,18 +1,20 @@
|
||||
use crate::chunked_vector::{
|
||||
load_variable_list_from_db, load_vector_from_db, BlockRoots, HistoricalRoots,
|
||||
HistoricalSummaries, RandaoMixes, StateRoots,
|
||||
load_variable_list_from_db, load_vector_from_db, BlockRootsChunked, HistoricalRoots,
|
||||
HistoricalSummaries, RandaoMixes, StateRootsChunked,
|
||||
};
|
||||
use crate::{get_key_for_col, DBColumn, Error, KeyValueStore, KeyValueStoreOp};
|
||||
use ssz::{Decode, DecodeError, Encode};
|
||||
use crate::{Error, KeyValueStore};
|
||||
use ssz::{Decode, DecodeError};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::sync::Arc;
|
||||
use types::historical_summary::HistoricalSummary;
|
||||
use types::superstruct;
|
||||
use types::*;
|
||||
|
||||
/// Lightweight variant of the `BeaconState` that is stored in the database.
|
||||
/// DEPRECATED Lightweight variant of the `BeaconState` that is stored in the database.
|
||||
///
|
||||
/// Utilises lazy-loading from separate storage for its vector fields.
|
||||
///
|
||||
/// This can be deleted once schema versions prior to V22 are no longer supported.
|
||||
#[superstruct(
|
||||
variants(Base, Altair, Bellatrix, Capella, Deneb, Electra),
|
||||
variant_attributes(derive(Debug, PartialEq, Clone, Encode, Decode))
|
||||
@@ -142,163 +144,7 @@ where
|
||||
pub pending_consolidations: List<PendingConsolidation, E::PendingConsolidationsLimit>,
|
||||
}
|
||||
|
||||
/// Implement the conversion function from BeaconState -> PartialBeaconState.
|
||||
macro_rules! impl_from_state_forgetful {
|
||||
($s:ident, $outer:ident, $variant_name:ident, $struct_name:ident, [$($extra_fields:ident),*], [$($extra_fields_opt:ident),*]) => {
|
||||
PartialBeaconState::$variant_name($struct_name {
|
||||
// Versioning
|
||||
genesis_time: $s.genesis_time,
|
||||
genesis_validators_root: $s.genesis_validators_root,
|
||||
slot: $s.slot,
|
||||
fork: $s.fork,
|
||||
|
||||
// History
|
||||
latest_block_header: $s.latest_block_header.clone(),
|
||||
block_roots: None,
|
||||
state_roots: None,
|
||||
historical_roots: None,
|
||||
|
||||
// Eth1
|
||||
eth1_data: $s.eth1_data.clone(),
|
||||
eth1_data_votes: $s.eth1_data_votes.clone(),
|
||||
eth1_deposit_index: $s.eth1_deposit_index,
|
||||
|
||||
// Validator registry
|
||||
validators: $s.validators.clone(),
|
||||
balances: $s.balances.clone(),
|
||||
|
||||
// Shuffling
|
||||
latest_randao_value: *$outer
|
||||
.get_randao_mix($outer.current_epoch())
|
||||
.expect("randao at current epoch is OK"),
|
||||
randao_mixes: None,
|
||||
|
||||
// Slashings
|
||||
slashings: $s.slashings.clone(),
|
||||
|
||||
// Finality
|
||||
justification_bits: $s.justification_bits.clone(),
|
||||
previous_justified_checkpoint: $s.previous_justified_checkpoint,
|
||||
current_justified_checkpoint: $s.current_justified_checkpoint,
|
||||
finalized_checkpoint: $s.finalized_checkpoint,
|
||||
|
||||
// Variant-specific fields
|
||||
$(
|
||||
$extra_fields: $s.$extra_fields.clone()
|
||||
),*,
|
||||
|
||||
// Variant-specific optional
|
||||
$(
|
||||
$extra_fields_opt: None
|
||||
),*
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> PartialBeaconState<E> {
|
||||
/// Convert a `BeaconState` to a `PartialBeaconState`, while dropping the optional fields.
|
||||
pub fn from_state_forgetful(outer: &BeaconState<E>) -> Self {
|
||||
match outer {
|
||||
BeaconState::Base(s) => impl_from_state_forgetful!(
|
||||
s,
|
||||
outer,
|
||||
Base,
|
||||
PartialBeaconStateBase,
|
||||
[previous_epoch_attestations, current_epoch_attestations],
|
||||
[]
|
||||
),
|
||||
BeaconState::Altair(s) => impl_from_state_forgetful!(
|
||||
s,
|
||||
outer,
|
||||
Altair,
|
||||
PartialBeaconStateAltair,
|
||||
[
|
||||
previous_epoch_participation,
|
||||
current_epoch_participation,
|
||||
current_sync_committee,
|
||||
next_sync_committee,
|
||||
inactivity_scores
|
||||
],
|
||||
[]
|
||||
),
|
||||
BeaconState::Bellatrix(s) => impl_from_state_forgetful!(
|
||||
s,
|
||||
outer,
|
||||
Bellatrix,
|
||||
PartialBeaconStateBellatrix,
|
||||
[
|
||||
previous_epoch_participation,
|
||||
current_epoch_participation,
|
||||
current_sync_committee,
|
||||
next_sync_committee,
|
||||
inactivity_scores,
|
||||
latest_execution_payload_header
|
||||
],
|
||||
[]
|
||||
),
|
||||
BeaconState::Capella(s) => impl_from_state_forgetful!(
|
||||
s,
|
||||
outer,
|
||||
Capella,
|
||||
PartialBeaconStateCapella,
|
||||
[
|
||||
previous_epoch_participation,
|
||||
current_epoch_participation,
|
||||
current_sync_committee,
|
||||
next_sync_committee,
|
||||
inactivity_scores,
|
||||
latest_execution_payload_header,
|
||||
next_withdrawal_index,
|
||||
next_withdrawal_validator_index
|
||||
],
|
||||
[historical_summaries]
|
||||
),
|
||||
BeaconState::Deneb(s) => impl_from_state_forgetful!(
|
||||
s,
|
||||
outer,
|
||||
Deneb,
|
||||
PartialBeaconStateDeneb,
|
||||
[
|
||||
previous_epoch_participation,
|
||||
current_epoch_participation,
|
||||
current_sync_committee,
|
||||
next_sync_committee,
|
||||
inactivity_scores,
|
||||
latest_execution_payload_header,
|
||||
next_withdrawal_index,
|
||||
next_withdrawal_validator_index
|
||||
],
|
||||
[historical_summaries]
|
||||
),
|
||||
BeaconState::Electra(s) => impl_from_state_forgetful!(
|
||||
s,
|
||||
outer,
|
||||
Electra,
|
||||
PartialBeaconStateElectra,
|
||||
[
|
||||
previous_epoch_participation,
|
||||
current_epoch_participation,
|
||||
current_sync_committee,
|
||||
next_sync_committee,
|
||||
inactivity_scores,
|
||||
latest_execution_payload_header,
|
||||
next_withdrawal_index,
|
||||
next_withdrawal_validator_index,
|
||||
deposit_requests_start_index,
|
||||
deposit_balance_to_consume,
|
||||
exit_balance_to_consume,
|
||||
earliest_exit_epoch,
|
||||
consolidation_balance_to_consume,
|
||||
earliest_consolidation_epoch,
|
||||
pending_deposits,
|
||||
pending_partial_withdrawals,
|
||||
pending_consolidations
|
||||
],
|
||||
[historical_summaries]
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// SSZ decode.
|
||||
pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result<Self, ssz::DecodeError> {
|
||||
// Slot is after genesis_time (u64) and genesis_validators_root (Hash256).
|
||||
@@ -321,19 +167,13 @@ impl<E: EthSpec> PartialBeaconState<E> {
|
||||
))
|
||||
}
|
||||
|
||||
/// Prepare the partial state for storage in the KV database.
|
||||
pub fn as_kv_store_op(&self, state_root: Hash256) -> KeyValueStoreOp {
|
||||
let db_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
|
||||
KeyValueStoreOp::PutKeyValue(db_key, self.as_ssz_bytes())
|
||||
}
|
||||
|
||||
pub fn load_block_roots<S: KeyValueStore<E>>(
|
||||
&mut self,
|
||||
store: &S,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(), Error> {
|
||||
if self.block_roots().is_none() {
|
||||
*self.block_roots_mut() = Some(load_vector_from_db::<BlockRoots, E, _>(
|
||||
*self.block_roots_mut() = Some(load_vector_from_db::<BlockRootsChunked, E, _>(
|
||||
store,
|
||||
self.slot(),
|
||||
spec,
|
||||
@@ -348,7 +188,7 @@ impl<E: EthSpec> PartialBeaconState<E> {
|
||||
spec: &ChainSpec,
|
||||
) -> Result<(), Error> {
|
||||
if self.state_roots().is_none() {
|
||||
*self.state_roots_mut() = Some(load_vector_from_db::<StateRoots, E, _>(
|
||||
*self.state_roots_mut() = Some(load_vector_from_db::<StateRootsChunked, E, _>(
|
||||
store,
|
||||
self.slot(),
|
||||
spec,
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
//! Implementation of historic state reconstruction (given complete block history).
|
||||
use crate::hot_cold_store::{HotColdDB, HotColdDBError};
|
||||
use crate::metadata::ANCHOR_FOR_ARCHIVE_NODE;
|
||||
use crate::metrics;
|
||||
use crate::{Error, ItemStore};
|
||||
use itertools::{process_results, Itertools};
|
||||
use slog::info;
|
||||
use slog::{debug, info};
|
||||
use state_processing::{
|
||||
per_block_processing, per_slot_processing, BlockSignatureStrategy, ConsensusContext,
|
||||
VerifyBlockRoot,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use types::{EthSpec, Hash256};
|
||||
use types::EthSpec;
|
||||
|
||||
impl<E, Hot, Cold> HotColdDB<E, Hot, Cold>
|
||||
where
|
||||
@@ -16,11 +18,16 @@ where
|
||||
Hot: ItemStore<E>,
|
||||
Cold: ItemStore<E>,
|
||||
{
|
||||
pub fn reconstruct_historic_states(self: &Arc<Self>) -> Result<(), Error> {
|
||||
let Some(mut anchor) = self.get_anchor_info() else {
|
||||
// Nothing to do, history is complete.
|
||||
pub fn reconstruct_historic_states(
|
||||
self: &Arc<Self>,
|
||||
num_blocks: Option<usize>,
|
||||
) -> Result<(), Error> {
|
||||
let mut anchor = self.get_anchor_info();
|
||||
|
||||
// Nothing to do, history is complete.
|
||||
if anchor.all_historic_states_stored() {
|
||||
return Ok(());
|
||||
};
|
||||
}
|
||||
|
||||
// Check that all historic blocks are known.
|
||||
if anchor.oldest_block_slot != 0 {
|
||||
@@ -29,37 +36,30 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
self.log,
|
||||
"Beginning historic state reconstruction";
|
||||
"Starting state reconstruction batch";
|
||||
"start_slot" => anchor.state_lower_limit,
|
||||
);
|
||||
|
||||
let slots_per_restore_point = self.config.slots_per_restore_point;
|
||||
let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME);
|
||||
|
||||
// Iterate blocks from the state lower limit to the upper limit.
|
||||
let lower_limit_slot = anchor.state_lower_limit;
|
||||
let split = self.get_split_info();
|
||||
let upper_limit_state = self.get_restore_point(
|
||||
anchor.state_upper_limit.as_u64() / slots_per_restore_point,
|
||||
&split,
|
||||
)?;
|
||||
let upper_limit_slot = upper_limit_state.slot();
|
||||
let lower_limit_slot = anchor.state_lower_limit;
|
||||
let upper_limit_slot = std::cmp::min(split.slot, anchor.state_upper_limit);
|
||||
|
||||
// Use a dummy root, as we never read the block for the upper limit state.
|
||||
let upper_limit_block_root = Hash256::repeat_byte(0xff);
|
||||
|
||||
let block_root_iter = self.forwards_block_roots_iterator(
|
||||
lower_limit_slot,
|
||||
upper_limit_state,
|
||||
upper_limit_block_root,
|
||||
&self.spec,
|
||||
)?;
|
||||
// If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch
|
||||
// boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive*
|
||||
// of the state at slot `lower_limit_slot + num_blocks`.
|
||||
let block_root_iter = self
|
||||
.forwards_block_roots_iterator_until(lower_limit_slot, upper_limit_slot - 1, || {
|
||||
Err(Error::StateShouldNotBeRequired(upper_limit_slot - 1))
|
||||
})?
|
||||
.take(num_blocks.map_or(usize::MAX, |n| n + 1));
|
||||
|
||||
// The state to be advanced.
|
||||
let mut state = self
|
||||
.load_cold_state_by_slot(lower_limit_slot)?
|
||||
.ok_or(HotColdDBError::MissingLowerLimitState(lower_limit_slot))?;
|
||||
let mut state = self.load_cold_state_by_slot(lower_limit_slot)?;
|
||||
|
||||
state.build_caches(&self.spec)?;
|
||||
|
||||
@@ -110,8 +110,19 @@ where
|
||||
// Stage state for storage in freezer DB.
|
||||
self.store_cold_state(&state_root, &state, &mut io_batch)?;
|
||||
|
||||
// If the slot lies on an epoch boundary, commit the batch and update the anchor.
|
||||
if slot % slots_per_restore_point == 0 || slot + 1 == upper_limit_slot {
|
||||
let batch_complete =
|
||||
num_blocks.map_or(false, |n_blocks| slot == lower_limit_slot + n_blocks as u64);
|
||||
let reconstruction_complete = slot + 1 == upper_limit_slot;
|
||||
|
||||
// Commit the I/O batch if:
|
||||
//
|
||||
// - The diff/snapshot for this slot is required for future slots, or
|
||||
// - The reconstruction batch is complete (we are about to return), or
|
||||
// - Reconstruction is complete.
|
||||
if self.hierarchy.should_commit_immediately(slot)?
|
||||
|| batch_complete
|
||||
|| reconstruction_complete
|
||||
{
|
||||
info!(
|
||||
self.log,
|
||||
"State reconstruction in progress";
|
||||
@@ -122,9 +133,9 @@ where
|
||||
self.cold_db.do_atomically(std::mem::take(&mut io_batch))?;
|
||||
|
||||
// Update anchor.
|
||||
let old_anchor = Some(anchor.clone());
|
||||
let old_anchor = anchor.clone();
|
||||
|
||||
if slot + 1 == upper_limit_slot {
|
||||
if reconstruction_complete {
|
||||
// The two limits have met in the middle! We're done!
|
||||
// Perform one last integrity check on the state reached.
|
||||
let computed_state_root = state.update_tree_hash_cache()?;
|
||||
@@ -136,23 +147,36 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
self.compare_and_set_anchor_info_with_write(old_anchor, None)?;
|
||||
self.compare_and_set_anchor_info_with_write(
|
||||
old_anchor,
|
||||
ANCHOR_FOR_ARCHIVE_NODE,
|
||||
)?;
|
||||
|
||||
return Ok(());
|
||||
} else {
|
||||
// The lower limit has been raised, store it.
|
||||
anchor.state_lower_limit = slot;
|
||||
|
||||
self.compare_and_set_anchor_info_with_write(
|
||||
old_anchor,
|
||||
Some(anchor.clone()),
|
||||
)?;
|
||||
self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?;
|
||||
}
|
||||
|
||||
// If this is the end of the batch, return Ok. The caller will run another
|
||||
// batch when there is idle capacity.
|
||||
if batch_complete {
|
||||
debug!(
|
||||
self.log,
|
||||
"Finished state reconstruction batch";
|
||||
"start_slot" => lower_limit_slot,
|
||||
"end_slot" => slot,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Should always reach the `upper_limit_slot` and return early above.
|
||||
Err(Error::StateReconstructionDidNotComplete)
|
||||
// Should always reach the `upper_limit_slot` or the end of the batch and return early
|
||||
// above.
|
||||
Err(Error::StateReconstructionLogicError)
|
||||
})??;
|
||||
|
||||
// Check that the split point wasn't mutated during the state reconstruction process.
|
||||
|
||||
Reference in New Issue
Block a user