mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-05 05:44:30 +00:00
Shrink persisted fork choice data (#7805)
Closes: - https://github.com/sigp/lighthouse/issues/7760 - [x] Remove `balances_cache` from `PersistedForkChoiceStore` (~65 MB saving on mainnet) - [x] Remove `justified_balances` from `PersistedForkChoiceStore` (~16 MB saving on mainnet) - [x] Remove `balances` from `ProtoArray`/`SszContainer`. - [x] Implement zstd compression for votes - [x] Fix bug in justified state usage - [x] Bump schema version to V28 and implement migration.
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -922,6 +922,7 @@ dependencies = [
|
||||
"tree_hash",
|
||||
"tree_hash_derive",
|
||||
"types",
|
||||
"zstd 0.13.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3567,6 +3568,7 @@ dependencies = [
|
||||
"proto_array",
|
||||
"state_processing",
|
||||
"store",
|
||||
"superstruct",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"types",
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
|
||||
|
||||
[package]
|
||||
name = "beacon_chain"
|
||||
version = "0.2.0"
|
||||
@@ -65,6 +65,7 @@ tracing = { workspace = true }
|
||||
tree_hash = { workspace = true }
|
||||
tree_hash_derive = { workspace = true }
|
||||
types = { workspace = true }
|
||||
zstd = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { workspace = true }
|
||||
|
||||
@@ -121,8 +121,8 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
|
||||
use store::{
|
||||
BlobSidecarListFromRoot, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary,
|
||||
KeyValueStoreOp, StoreItem, StoreOp,
|
||||
BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary,
|
||||
KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
|
||||
};
|
||||
use task_executor::{ShutdownReason, TaskExecutor};
|
||||
use tokio_stream::Stream;
|
||||
@@ -618,12 +618,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
reset_payload_statuses: ResetPayloadStatuses,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Option<BeaconForkChoice<T>>, Error> {
|
||||
let Some(persisted_fork_choice) =
|
||||
store.get_item::<PersistedForkChoice>(&FORK_CHOICE_DB_KEY)?
|
||||
let Some(persisted_fork_choice_bytes) = store
|
||||
.hot_db
|
||||
.get_bytes(DBColumn::ForkChoice, FORK_CHOICE_DB_KEY.as_slice())?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let persisted_fork_choice =
|
||||
PersistedForkChoice::from_bytes(&persisted_fork_choice_bytes, store.get_config())?;
|
||||
let fc_store =
|
||||
BeaconForkChoiceStore::from_persisted(persisted_fork_choice.fork_choice_store, store)?;
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ pub enum Error {
|
||||
FailedToReadState(StoreError),
|
||||
MissingState(Hash256),
|
||||
BeaconStateError(BeaconStateError),
|
||||
UnalignedCheckpoint { block_slot: Slot, state_slot: Slot },
|
||||
Arith(ArithError),
|
||||
}
|
||||
|
||||
@@ -136,7 +137,9 @@ pub struct BeaconForkChoiceStore<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<
|
||||
finalized_checkpoint: Checkpoint,
|
||||
justified_checkpoint: Checkpoint,
|
||||
justified_balances: JustifiedBalances,
|
||||
justified_state_root: Hash256,
|
||||
unrealized_justified_checkpoint: Checkpoint,
|
||||
unrealized_justified_state_root: Hash256,
|
||||
unrealized_finalized_checkpoint: Checkpoint,
|
||||
proposer_boost_root: Hash256,
|
||||
equivocating_indices: BTreeSet<u64>,
|
||||
@@ -162,21 +165,37 @@ where
|
||||
/// It is assumed that `anchor` is already persisted in `store`.
|
||||
pub fn get_forkchoice_store(
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
anchor: &BeaconSnapshot<E>,
|
||||
anchor: BeaconSnapshot<E>,
|
||||
) -> Result<Self, Error> {
|
||||
let anchor_state = &anchor.beacon_state;
|
||||
let unadvanced_state_root = anchor.beacon_state_root();
|
||||
let mut anchor_state = anchor.beacon_state;
|
||||
let mut anchor_block_header = anchor_state.latest_block_header().clone();
|
||||
if anchor_block_header.state_root == Hash256::zero() {
|
||||
anchor_block_header.state_root = anchor.beacon_state_root();
|
||||
|
||||
// The anchor state MUST be on an epoch boundary (it should be advanced by the caller).
|
||||
if !anchor_state
|
||||
.slot()
|
||||
.as_u64()
|
||||
.is_multiple_of(E::slots_per_epoch())
|
||||
{
|
||||
return Err(Error::UnalignedCheckpoint {
|
||||
block_slot: anchor_block_header.slot,
|
||||
state_slot: anchor_state.slot(),
|
||||
});
|
||||
}
|
||||
let anchor_root = anchor_block_header.canonical_root();
|
||||
|
||||
// Compute the accurate block root for the checkpoint block.
|
||||
if anchor_block_header.state_root.is_zero() {
|
||||
anchor_block_header.state_root = unadvanced_state_root;
|
||||
}
|
||||
let anchor_block_root = anchor_block_header.canonical_root();
|
||||
let anchor_epoch = anchor_state.current_epoch();
|
||||
let justified_checkpoint = Checkpoint {
|
||||
epoch: anchor_epoch,
|
||||
root: anchor_root,
|
||||
root: anchor_block_root,
|
||||
};
|
||||
let finalized_checkpoint = justified_checkpoint;
|
||||
let justified_balances = JustifiedBalances::from_justified_state(anchor_state)?;
|
||||
let justified_balances = JustifiedBalances::from_justified_state(&anchor_state)?;
|
||||
let justified_state_root = anchor_state.canonical_root()?;
|
||||
|
||||
Ok(Self {
|
||||
store,
|
||||
@@ -184,8 +203,10 @@ where
|
||||
time: anchor_state.slot(),
|
||||
justified_checkpoint,
|
||||
justified_balances,
|
||||
justified_state_root,
|
||||
finalized_checkpoint,
|
||||
unrealized_justified_checkpoint: justified_checkpoint,
|
||||
unrealized_justified_state_root: justified_state_root,
|
||||
unrealized_finalized_checkpoint: finalized_checkpoint,
|
||||
proposer_boost_root: Hash256::zero(),
|
||||
equivocating_indices: BTreeSet::new(),
|
||||
@@ -197,12 +218,12 @@ where
|
||||
/// on-disk database.
|
||||
pub fn to_persisted(&self) -> PersistedForkChoiceStore {
|
||||
PersistedForkChoiceStore {
|
||||
balances_cache: self.balances_cache.clone(),
|
||||
time: self.time,
|
||||
finalized_checkpoint: self.finalized_checkpoint,
|
||||
justified_checkpoint: self.justified_checkpoint,
|
||||
justified_balances: self.justified_balances.effective_balances.clone(),
|
||||
justified_state_root: self.justified_state_root,
|
||||
unrealized_justified_checkpoint: self.unrealized_justified_checkpoint,
|
||||
unrealized_justified_state_root: self.unrealized_justified_state_root,
|
||||
unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint,
|
||||
proposer_boost_root: self.proposer_boost_root,
|
||||
equivocating_indices: self.equivocating_indices.clone(),
|
||||
@@ -210,20 +231,59 @@ where
|
||||
}
|
||||
|
||||
/// Restore `Self` from a previously-generated `PersistedForkChoiceStore`.
|
||||
pub fn from_persisted(
|
||||
persisted: PersistedForkChoiceStore,
|
||||
///
|
||||
/// DEPRECATED. Can be deleted once migrations no longer require it.
|
||||
pub fn from_persisted_v17(
|
||||
persisted: PersistedForkChoiceStoreV17,
|
||||
justified_state_root: Hash256,
|
||||
unrealized_justified_state_root: Hash256,
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
) -> Result<Self, Error> {
|
||||
let justified_balances =
|
||||
JustifiedBalances::from_effective_balances(persisted.justified_balances)?;
|
||||
|
||||
Ok(Self {
|
||||
store,
|
||||
balances_cache: persisted.balances_cache,
|
||||
balances_cache: <_>::default(),
|
||||
time: persisted.time,
|
||||
finalized_checkpoint: persisted.finalized_checkpoint,
|
||||
justified_checkpoint: persisted.justified_checkpoint,
|
||||
justified_balances,
|
||||
justified_state_root,
|
||||
unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint,
|
||||
unrealized_justified_state_root,
|
||||
unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint,
|
||||
proposer_boost_root: persisted.proposer_boost_root,
|
||||
equivocating_indices: persisted.equivocating_indices,
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Restore `Self` from a previously-generated `PersistedForkChoiceStore`.
|
||||
pub fn from_persisted(
|
||||
persisted: PersistedForkChoiceStore,
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
) -> Result<Self, Error> {
|
||||
let justified_checkpoint = persisted.justified_checkpoint;
|
||||
let justified_state_root = persisted.justified_state_root;
|
||||
|
||||
let update_cache = true;
|
||||
let justified_state = store
|
||||
.get_hot_state(&justified_state_root, update_cache)
|
||||
.map_err(Error::FailedToReadState)?
|
||||
.ok_or(Error::MissingState(justified_state_root))?;
|
||||
|
||||
let justified_balances = JustifiedBalances::from_justified_state(&justified_state)?;
|
||||
Ok(Self {
|
||||
store,
|
||||
balances_cache: <_>::default(),
|
||||
time: persisted.time,
|
||||
finalized_checkpoint: persisted.finalized_checkpoint,
|
||||
justified_checkpoint,
|
||||
justified_balances,
|
||||
justified_state_root,
|
||||
unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint,
|
||||
unrealized_justified_state_root: persisted.unrealized_justified_state_root,
|
||||
unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint,
|
||||
proposer_boost_root: persisted.proposer_boost_root,
|
||||
equivocating_indices: persisted.equivocating_indices,
|
||||
@@ -261,6 +321,10 @@ where
|
||||
&self.justified_checkpoint
|
||||
}
|
||||
|
||||
fn justified_state_root(&self) -> Hash256 {
|
||||
self.justified_state_root
|
||||
}
|
||||
|
||||
fn justified_balances(&self) -> &JustifiedBalances {
|
||||
&self.justified_balances
|
||||
}
|
||||
@@ -273,6 +337,10 @@ where
|
||||
&self.unrealized_justified_checkpoint
|
||||
}
|
||||
|
||||
fn unrealized_justified_state_root(&self) -> Hash256 {
|
||||
self.unrealized_justified_state_root
|
||||
}
|
||||
|
||||
fn unrealized_finalized_checkpoint(&self) -> &Checkpoint {
|
||||
&self.unrealized_finalized_checkpoint
|
||||
}
|
||||
@@ -285,8 +353,13 @@ where
|
||||
self.finalized_checkpoint = checkpoint
|
||||
}
|
||||
|
||||
fn set_justified_checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), Error> {
|
||||
fn set_justified_checkpoint(
|
||||
&mut self,
|
||||
checkpoint: Checkpoint,
|
||||
justified_state_root: Hash256,
|
||||
) -> Result<(), Error> {
|
||||
self.justified_checkpoint = checkpoint;
|
||||
self.justified_state_root = justified_state_root;
|
||||
|
||||
if let Some(balances) = self.balances_cache.get(
|
||||
self.justified_checkpoint.root,
|
||||
@@ -297,27 +370,14 @@ where
|
||||
self.justified_balances = JustifiedBalances::from_effective_balances(balances)?;
|
||||
} else {
|
||||
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);
|
||||
let justified_block = self
|
||||
.store
|
||||
.get_blinded_block(&self.justified_checkpoint.root)
|
||||
.map_err(Error::FailedToReadBlock)?
|
||||
.ok_or(Error::MissingBlock(self.justified_checkpoint.root))?
|
||||
.deconstruct()
|
||||
.0;
|
||||
|
||||
let max_slot = self
|
||||
.justified_checkpoint
|
||||
.epoch
|
||||
.start_slot(E::slots_per_epoch());
|
||||
let (_, state) = self
|
||||
// Justified state is reasonably useful to cache, it might be finalized soon.
|
||||
let update_cache = true;
|
||||
let state = self
|
||||
.store
|
||||
.get_advanced_hot_state(
|
||||
self.justified_checkpoint.root,
|
||||
max_slot,
|
||||
justified_block.state_root(),
|
||||
)
|
||||
.get_hot_state(&self.justified_state_root, update_cache)
|
||||
.map_err(Error::FailedToReadState)?
|
||||
.ok_or_else(|| Error::MissingState(justified_block.state_root()))?;
|
||||
.ok_or_else(|| Error::MissingState(self.justified_state_root))?;
|
||||
|
||||
self.justified_balances = JustifiedBalances::from_justified_state(&state)?;
|
||||
}
|
||||
@@ -325,8 +385,9 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint) {
|
||||
fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint, state_root: Hash256) {
|
||||
self.unrealized_justified_checkpoint = checkpoint;
|
||||
self.unrealized_justified_state_root = state_root;
|
||||
}
|
||||
|
||||
fn set_unrealized_finalized_checkpoint(&mut self, checkpoint: Checkpoint) {
|
||||
@@ -346,18 +407,48 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV17;
|
||||
pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV28;
|
||||
|
||||
/// A container which allows persisting the `BeaconForkChoiceStore` to the on-disk database.
|
||||
#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)]
|
||||
#[superstruct(
|
||||
variants(V17, V28),
|
||||
variant_attributes(derive(Encode, Decode)),
|
||||
no_enum
|
||||
)]
|
||||
pub struct PersistedForkChoiceStore {
|
||||
/// The balances cache was removed from disk storage in schema V28.
|
||||
#[superstruct(only(V17))]
|
||||
pub balances_cache: BalancesCacheV8,
|
||||
pub time: Slot,
|
||||
pub finalized_checkpoint: Checkpoint,
|
||||
pub justified_checkpoint: Checkpoint,
|
||||
/// The justified balances were removed from disk storage in schema V28.
|
||||
#[superstruct(only(V17))]
|
||||
pub justified_balances: Vec<u64>,
|
||||
/// The justified state root is stored so that it can be used to load the justified balances.
|
||||
#[superstruct(only(V28))]
|
||||
pub justified_state_root: Hash256,
|
||||
pub unrealized_justified_checkpoint: Checkpoint,
|
||||
#[superstruct(only(V28))]
|
||||
pub unrealized_justified_state_root: Hash256,
|
||||
pub unrealized_finalized_checkpoint: Checkpoint,
|
||||
pub proposer_boost_root: Hash256,
|
||||
pub equivocating_indices: BTreeSet<u64>,
|
||||
}
|
||||
|
||||
// Convert V28 to V17 by adding balances and removing justified state roots.
|
||||
impl From<(PersistedForkChoiceStoreV28, JustifiedBalances)> for PersistedForkChoiceStoreV17 {
|
||||
fn from((v28, balances): (PersistedForkChoiceStoreV28, JustifiedBalances)) -> Self {
|
||||
Self {
|
||||
balances_cache: Default::default(),
|
||||
time: v28.time,
|
||||
finalized_checkpoint: v28.finalized_checkpoint,
|
||||
justified_checkpoint: v28.justified_checkpoint,
|
||||
justified_balances: balances.effective_balances,
|
||||
unrealized_justified_checkpoint: v28.unrealized_justified_checkpoint,
|
||||
unrealized_finalized_checkpoint: v28.unrealized_finalized_checkpoint,
|
||||
proposer_boost_root: v28.proposer_boost_root,
|
||||
equivocating_indices: v28.equivocating_indices,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -394,7 +394,7 @@ where
|
||||
.map_err(|e| format!("Failed to initialize genesis data column info: {:?}", e))?,
|
||||
);
|
||||
|
||||
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis)
|
||||
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, genesis.clone())
|
||||
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
|
||||
let current_slot = None;
|
||||
|
||||
@@ -616,7 +616,7 @@ where
|
||||
beacon_state: weak_subj_state,
|
||||
};
|
||||
|
||||
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot)
|
||||
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, snapshot.clone())
|
||||
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
|
||||
|
||||
let fork_choice = ForkChoice::from_anchor(
|
||||
@@ -887,8 +887,9 @@ where
|
||||
self.pending_io_batch.push(BeaconChain::<
|
||||
Witness<TSlotClock, E, THotStore, TColdStore>,
|
||||
>::persist_fork_choice_in_batch_standalone(
|
||||
&fork_choice
|
||||
));
|
||||
&fork_choice,
|
||||
store.get_config(),
|
||||
).map_err(|e| format!("Fork choice compression error: {e:?}"))?);
|
||||
store
|
||||
.hot_db
|
||||
.do_atomically(self.pending_io_batch)
|
||||
|
||||
@@ -53,7 +53,9 @@ use slot_clock::SlotClock;
|
||||
use state_processing::AllCaches;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use store::{KeyValueStore, KeyValueStoreOp, StoreItem, iter::StateRootsIterator};
|
||||
use store::{
|
||||
Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator,
|
||||
};
|
||||
use task_executor::{JoinHandle, ShutdownReason};
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use types::*;
|
||||
@@ -998,25 +1000,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Persist fork choice to disk, writing immediately.
|
||||
pub fn persist_fork_choice(&self) -> Result<(), Error> {
|
||||
let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);
|
||||
let batch = vec![self.persist_fork_choice_in_batch()];
|
||||
let batch = vec![self.persist_fork_choice_in_batch()?];
|
||||
self.store.hot_db.do_atomically(batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return a database operation for writing fork choice to disk.
|
||||
pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp {
|
||||
Self::persist_fork_choice_in_batch_standalone(&self.canonical_head.fork_choice_read_lock())
|
||||
pub fn persist_fork_choice_in_batch(&self) -> Result<KeyValueStoreOp, Error> {
|
||||
Self::persist_fork_choice_in_batch_standalone(
|
||||
&self.canonical_head.fork_choice_read_lock(),
|
||||
self.store.get_config(),
|
||||
)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Return a database operation for writing fork choice to disk.
|
||||
pub fn persist_fork_choice_in_batch_standalone(
|
||||
fork_choice: &BeaconForkChoice<T>,
|
||||
) -> KeyValueStoreOp {
|
||||
store_config: &StoreConfig,
|
||||
) -> Result<KeyValueStoreOp, StoreError> {
|
||||
let persisted_fork_choice = PersistedForkChoice {
|
||||
fork_choice: fork_choice.to_persisted(),
|
||||
fork_choice_store: fork_choice.fc_store().to_persisted(),
|
||||
};
|
||||
persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY)
|
||||
persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY, store_config)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -142,8 +142,9 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
|
||||
beacon_state: finalized_state,
|
||||
};
|
||||
|
||||
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), &finalized_snapshot)
|
||||
.map_err(|e| format!("Unable to reset fork choice store for revert: {e:?}"))?;
|
||||
let fc_store =
|
||||
BeaconForkChoiceStore::get_forkchoice_store(store.clone(), finalized_snapshot.clone())
|
||||
.map_err(|e| format!("Unable to reset fork choice store for revert: {e:?}"))?;
|
||||
|
||||
let mut fork_choice = ForkChoice::from_anchor(
|
||||
fc_store,
|
||||
|
||||
@@ -74,7 +74,10 @@ pub use self::chain_config::ChainConfig;
|
||||
pub use self::errors::{BeaconChainError, BlockProductionError};
|
||||
pub use self::historical_blocks::HistoricalBlockError;
|
||||
pub use attestation_verification::Error as AttestationError;
|
||||
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
|
||||
pub use beacon_fork_choice_store::{
|
||||
BeaconForkChoiceStore, Error as ForkChoiceStoreError, PersistedForkChoiceStoreV17,
|
||||
PersistedForkChoiceStoreV28,
|
||||
};
|
||||
pub use block_verification::{
|
||||
BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock,
|
||||
IntoExecutionPendingBlock, IntoGossipVerifiedBlock, InvalidSignature,
|
||||
|
||||
@@ -585,6 +585,18 @@ pub static FORK_CHOICE_WRITE_LOCK_AQUIRE_TIMES: LazyLock<Result<Histogram>> = La
|
||||
exponential_buckets(1e-3, 4.0, 7),
|
||||
)
|
||||
});
|
||||
pub static FORK_CHOICE_ENCODE_TIMES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"beacon_fork_choice_encode_seconds",
|
||||
"Time taken to SSZ encode the persisted fork choice data",
|
||||
)
|
||||
});
|
||||
pub static FORK_CHOICE_COMPRESS_TIMES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"beacon_fork_choice_compress_seconds",
|
||||
"Time taken to compress the persisted fork choice data",
|
||||
)
|
||||
});
|
||||
pub static BALANCES_CACHE_HITS: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"beacon_balances_cache_hits_total",
|
||||
|
||||
@@ -1,16 +1,30 @@
|
||||
use crate::beacon_fork_choice_store::PersistedForkChoiceStoreV17;
|
||||
use crate::{
|
||||
beacon_fork_choice_store::{PersistedForkChoiceStoreV17, PersistedForkChoiceStoreV28},
|
||||
metrics,
|
||||
};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use store::{DBColumn, Error, StoreItem};
|
||||
use store::{DBColumn, Error, KeyValueStoreOp, StoreConfig, StoreItem};
|
||||
use superstruct::superstruct;
|
||||
use types::Hash256;
|
||||
|
||||
// If adding a new version you should update this type alias and fix the breakages.
|
||||
pub type PersistedForkChoice = PersistedForkChoiceV17;
|
||||
pub type PersistedForkChoice = PersistedForkChoiceV28;
|
||||
|
||||
#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)]
|
||||
#[superstruct(
|
||||
variants(V17, V28),
|
||||
variant_attributes(derive(Encode, Decode)),
|
||||
no_enum
|
||||
)]
|
||||
pub struct PersistedForkChoice {
|
||||
pub fork_choice: fork_choice::PersistedForkChoice,
|
||||
pub fork_choice_store: PersistedForkChoiceStoreV17,
|
||||
#[superstruct(only(V17))]
|
||||
pub fork_choice_v17: fork_choice::PersistedForkChoiceV17,
|
||||
#[superstruct(only(V28))]
|
||||
pub fork_choice: fork_choice::PersistedForkChoiceV28,
|
||||
#[superstruct(only(V17))]
|
||||
pub fork_choice_store_v17: PersistedForkChoiceStoreV17,
|
||||
#[superstruct(only(V28))]
|
||||
pub fork_choice_store: PersistedForkChoiceStoreV28,
|
||||
}
|
||||
|
||||
macro_rules! impl_store_item {
|
||||
@@ -32,3 +46,35 @@ macro_rules! impl_store_item {
|
||||
}
|
||||
|
||||
impl_store_item!(PersistedForkChoiceV17);
|
||||
|
||||
impl PersistedForkChoiceV28 {
|
||||
pub fn from_bytes(bytes: &[u8], store_config: &StoreConfig) -> Result<Self, Error> {
|
||||
let decompressed_bytes = store_config
|
||||
.decompress_bytes(bytes)
|
||||
.map_err(Error::Compression)?;
|
||||
Self::from_ssz_bytes(&decompressed_bytes).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn as_bytes(&self, store_config: &StoreConfig) -> Result<Vec<u8>, Error> {
|
||||
let encode_timer = metrics::start_timer(&metrics::FORK_CHOICE_ENCODE_TIMES);
|
||||
let ssz_bytes = self.as_ssz_bytes();
|
||||
drop(encode_timer);
|
||||
|
||||
let _compress_timer = metrics::start_timer(&metrics::FORK_CHOICE_COMPRESS_TIMES);
|
||||
store_config
|
||||
.compress_bytes(&ssz_bytes)
|
||||
.map_err(Error::Compression)
|
||||
}
|
||||
|
||||
pub fn as_kv_store_op(
|
||||
&self,
|
||||
key: Hash256,
|
||||
store_config: &StoreConfig,
|
||||
) -> Result<KeyValueStoreOp, Error> {
|
||||
Ok(KeyValueStoreOp::PutKeyValue(
|
||||
DBColumn::ForkChoice,
|
||||
key.as_slice().to_vec(),
|
||||
self.as_bytes(store_config)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ mod migration_schema_v24;
|
||||
mod migration_schema_v25;
|
||||
mod migration_schema_v26;
|
||||
mod migration_schema_v27;
|
||||
mod migration_schema_v28;
|
||||
|
||||
use crate::beacon_chain::BeaconChainTypes;
|
||||
use std::sync::Arc;
|
||||
@@ -79,6 +80,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
|
||||
migration_schema_v27::downgrade_from_v27::<T>(db.clone())?;
|
||||
db.store_schema_version_atomically(to, vec![])
|
||||
}
|
||||
(SchemaVersion(27), SchemaVersion(28)) => {
|
||||
let ops = migration_schema_v28::upgrade_to_v28::<T>(db.clone())?;
|
||||
db.store_schema_version_atomically(to, ops)
|
||||
}
|
||||
(SchemaVersion(28), SchemaVersion(27)) => {
|
||||
let ops = migration_schema_v28::downgrade_from_v28::<T>(db.clone())?;
|
||||
db.store_schema_version_atomically(to, ops)
|
||||
}
|
||||
// Anything else is an error.
|
||||
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
|
||||
target_version: to,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::BeaconForkChoiceStore;
|
||||
use crate::beacon_chain::BeaconChainTypes;
|
||||
use crate::persisted_fork_choice::PersistedForkChoice;
|
||||
use crate::persisted_fork_choice::PersistedForkChoiceV17;
|
||||
use crate::schema_change::StoreError;
|
||||
use crate::test_utils::{BEACON_CHAIN_DB_KEY, FORK_CHOICE_DB_KEY, PersistedBeaconChain};
|
||||
use fork_choice::{ForkChoice, ResetPayloadStatuses};
|
||||
@@ -80,7 +80,7 @@ pub fn downgrade_from_v23<T: BeaconChainTypes>(
|
||||
};
|
||||
|
||||
// Recreate head-tracker from fork choice.
|
||||
let Some(persisted_fork_choice) = db.get_item::<PersistedForkChoice>(&FORK_CHOICE_DB_KEY)?
|
||||
let Some(persisted_fork_choice) = db.get_item::<PersistedForkChoiceV17>(&FORK_CHOICE_DB_KEY)?
|
||||
else {
|
||||
// Fork choice should exist if the database exists.
|
||||
return Err(Error::MigrationError(
|
||||
@@ -88,19 +88,30 @@ pub fn downgrade_from_v23<T: BeaconChainTypes>(
|
||||
));
|
||||
};
|
||||
|
||||
let fc_store =
|
||||
BeaconForkChoiceStore::from_persisted(persisted_fork_choice.fork_choice_store, db.clone())
|
||||
.map_err(|e| {
|
||||
Error::MigrationError(format!(
|
||||
"Error loading fork choise store from persisted: {e:?}"
|
||||
))
|
||||
})?;
|
||||
// We use dummy roots for the justified states because we can source the balances from the v17
|
||||
// persited fork choice. The justified state root isn't required to look up the justified state's
|
||||
// balances (as it would be in V28). This fork choice object with corrupt state roots SHOULD NOT
|
||||
// be written to disk.
|
||||
let dummy_justified_state_root = Hash256::repeat_byte(0x66);
|
||||
let dummy_unrealized_justified_state_root = Hash256::repeat_byte(0x77);
|
||||
|
||||
let fc_store = BeaconForkChoiceStore::from_persisted_v17(
|
||||
persisted_fork_choice.fork_choice_store_v17,
|
||||
dummy_justified_state_root,
|
||||
dummy_unrealized_justified_state_root,
|
||||
db.clone(),
|
||||
)
|
||||
.map_err(|e| {
|
||||
Error::MigrationError(format!(
|
||||
"Error loading fork choice store from persisted: {e:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
// Doesn't matter what policy we use for invalid payloads, as our head calculation just
|
||||
// considers descent from finalization.
|
||||
let reset_payload_statuses = ResetPayloadStatuses::OnlyWithInvalidPayload;
|
||||
let fork_choice = ForkChoice::from_persisted(
|
||||
persisted_fork_choice.fork_choice,
|
||||
persisted_fork_choice.fork_choice_v17.try_into()?,
|
||||
reset_payload_statuses,
|
||||
fc_store,
|
||||
&db.spec,
|
||||
|
||||
@@ -0,0 +1,152 @@
|
||||
use crate::{
|
||||
BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, PersistedForkChoiceStoreV17,
|
||||
beacon_chain::FORK_CHOICE_DB_KEY,
|
||||
persisted_fork_choice::{PersistedForkChoiceV17, PersistedForkChoiceV28},
|
||||
summaries_dag::{DAGStateSummary, StateSummariesDAG},
|
||||
};
|
||||
use fork_choice::{ForkChoice, ForkChoiceStore, ResetPayloadStatuses};
|
||||
use std::sync::Arc;
|
||||
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
|
||||
use tracing::{info, warn};
|
||||
use types::{EthSpec, Hash256};
|
||||
|
||||
/// Upgrade `PersistedForkChoice` from V17 to V28.
|
||||
pub fn upgrade_to_v28<T: BeaconChainTypes>(
|
||||
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
||||
) -> Result<Vec<KeyValueStoreOp>, Error> {
|
||||
let Some(persisted_fork_choice_v17) =
|
||||
db.get_item::<PersistedForkChoiceV17>(&FORK_CHOICE_DB_KEY)?
|
||||
else {
|
||||
warn!("No fork choice found to upgrade to v28");
|
||||
return Ok(vec![]);
|
||||
};
|
||||
|
||||
// Load state DAG in order to compute justified checkpoint roots.
|
||||
let state_summaries_dag = {
|
||||
let state_summaries = db
|
||||
.load_hot_state_summaries()?
|
||||
.into_iter()
|
||||
.map(|(state_root, summary)| (state_root, summary.into()))
|
||||
.collect::<Vec<(Hash256, DAGStateSummary)>>();
|
||||
|
||||
StateSummariesDAG::new(state_summaries).map_err(|e| {
|
||||
Error::MigrationError(format!("Error loading state summaries DAG: {e:?}"))
|
||||
})?
|
||||
};
|
||||
|
||||
// Determine the justified state roots.
|
||||
let justified_checkpoint = persisted_fork_choice_v17
|
||||
.fork_choice_store_v17
|
||||
.justified_checkpoint;
|
||||
let justified_block_root = justified_checkpoint.root;
|
||||
let justified_slot = justified_checkpoint
|
||||
.epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch());
|
||||
let justified_state_root = state_summaries_dag
|
||||
.state_root_at_slot(justified_block_root, justified_slot)
|
||||
.ok_or_else(|| {
|
||||
Error::MigrationError(format!(
|
||||
"Missing state root for justified slot {justified_slot} with latest_block_root \
|
||||
{justified_block_root:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let unrealized_justified_checkpoint = persisted_fork_choice_v17
|
||||
.fork_choice_store_v17
|
||||
.unrealized_justified_checkpoint;
|
||||
let unrealized_justified_block_root = unrealized_justified_checkpoint.root;
|
||||
let unrealized_justified_slot = unrealized_justified_checkpoint
|
||||
.epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch());
|
||||
let unrealized_justified_state_root = state_summaries_dag
|
||||
.state_root_at_slot(unrealized_justified_block_root, unrealized_justified_slot)
|
||||
.ok_or_else(|| {
|
||||
Error::MigrationError(format!(
|
||||
"Missing state root for unrealized justified slot {unrealized_justified_slot} \
|
||||
with latest_block_root {unrealized_justified_block_root:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let fc_store = BeaconForkChoiceStore::from_persisted_v17(
|
||||
persisted_fork_choice_v17.fork_choice_store_v17,
|
||||
justified_state_root,
|
||||
unrealized_justified_state_root,
|
||||
db.clone(),
|
||||
)
|
||||
.map_err(|e| {
|
||||
Error::MigrationError(format!(
|
||||
"Error loading fork choice store from persisted: {e:?}"
|
||||
))
|
||||
})?;
|
||||
|
||||
info!(
|
||||
?justified_state_root,
|
||||
%justified_slot,
|
||||
"Added justified state root to fork choice"
|
||||
);
|
||||
|
||||
// Construct top-level ForkChoice struct using the patched fork choice store, and the converted
|
||||
// proto array.
|
||||
let reset_payload_statuses = ResetPayloadStatuses::OnlyWithInvalidPayload;
|
||||
let fork_choice = ForkChoice::from_persisted(
|
||||
persisted_fork_choice_v17.fork_choice_v17.try_into()?,
|
||||
reset_payload_statuses,
|
||||
fc_store,
|
||||
db.get_chain_spec(),
|
||||
)
|
||||
.map_err(|e| Error::MigrationError(format!("Unable to build ForkChoice: {e:?}")))?;
|
||||
|
||||
let ops = vec![BeaconChain::<T>::persist_fork_choice_in_batch_standalone(
|
||||
&fork_choice,
|
||||
db.get_config(),
|
||||
)?];
|
||||
|
||||
info!("Upgraded fork choice for DB schema v28");
|
||||
|
||||
Ok(ops)
|
||||
}
|
||||
|
||||
pub fn downgrade_from_v28<T: BeaconChainTypes>(
|
||||
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
||||
) -> Result<Vec<KeyValueStoreOp>, Error> {
|
||||
let reset_payload_statuses = ResetPayloadStatuses::OnlyWithInvalidPayload;
|
||||
let Some(fork_choice) =
|
||||
BeaconChain::<T>::load_fork_choice(db.clone(), reset_payload_statuses, db.get_chain_spec())
|
||||
.map_err(|e| Error::MigrationError(format!("Unable to load fork choice: {e:?}")))?
|
||||
else {
|
||||
warn!("No fork choice to downgrade");
|
||||
return Ok(vec![]);
|
||||
};
|
||||
|
||||
// Recreate V28 persisted fork choice, then convert each field back to its V17 version.
|
||||
let persisted_fork_choice = PersistedForkChoiceV28 {
|
||||
fork_choice: fork_choice.to_persisted(),
|
||||
fork_choice_store: fork_choice.fc_store().to_persisted(),
|
||||
};
|
||||
|
||||
let justified_balances = fork_choice.fc_store().justified_balances();
|
||||
|
||||
// 1. Create `proto_array::PersistedForkChoiceV17`.
|
||||
let fork_choice_v17: fork_choice::PersistedForkChoiceV17 = (
|
||||
persisted_fork_choice.fork_choice,
|
||||
justified_balances.clone(),
|
||||
)
|
||||
.into();
|
||||
|
||||
let fork_choice_store_v17: PersistedForkChoiceStoreV17 = (
|
||||
persisted_fork_choice.fork_choice_store,
|
||||
justified_balances.clone(),
|
||||
)
|
||||
.into();
|
||||
|
||||
let persisted_fork_choice_v17 = PersistedForkChoiceV17 {
|
||||
fork_choice_v17,
|
||||
fork_choice_store_v17,
|
||||
};
|
||||
|
||||
let ops = vec![persisted_fork_choice_v17.as_kv_store_op(FORK_CHOICE_DB_KEY)];
|
||||
|
||||
info!("Downgraded fork choice for DB schema v28");
|
||||
|
||||
Ok(ops)
|
||||
}
|
||||
@@ -355,6 +355,18 @@ impl StateSummariesDAG {
|
||||
}
|
||||
Ok(descendants)
|
||||
}
|
||||
|
||||
/// Returns the root of the state at `slot` with `latest_block_root`, if it exists.
|
||||
///
|
||||
/// The `slot` must be the slot of the `latest_block_root` or a skipped slot following it. This
|
||||
/// function will not return the `state_root` of a state with a different `latest_block_root`
|
||||
/// even if it lies on the same chain.
|
||||
pub fn state_root_at_slot(&self, latest_block_root: Hash256, slot: Slot) -> Option<Hash256> {
|
||||
self.state_summaries_by_block_root
|
||||
.get(&latest_block_root)?
|
||||
.get(&slot)
|
||||
.map(|(state_root, _)| *state_root)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<HotStateSummary> for DAGStateSummary {
|
||||
|
||||
@@ -4,12 +4,12 @@ use crate::{DBColumn, Error, StoreItem};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::io::Write;
|
||||
use std::io::{Read, Write};
|
||||
use std::num::NonZeroUsize;
|
||||
use strum::{Display, EnumString, EnumVariantNames};
|
||||
use types::EthSpec;
|
||||
use types::non_zero_usize::new_non_zero_usize;
|
||||
use zstd::Encoder;
|
||||
use zstd::{Decoder, Encoder};
|
||||
|
||||
#[cfg(all(feature = "redb", not(feature = "leveldb")))]
|
||||
pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Redb;
|
||||
@@ -194,15 +194,23 @@ impl StoreConfig {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compress_bytes(&self, ssz_bytes: &[u8]) -> Result<Vec<u8>, Error> {
|
||||
/// Compress bytes using zstd and the compression level from `self`.
|
||||
pub fn compress_bytes(&self, ssz_bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut compressed_value =
|
||||
Vec::with_capacity(self.estimate_compressed_size(ssz_bytes.len()));
|
||||
let mut encoder = Encoder::new(&mut compressed_value, self.compression_level)
|
||||
.map_err(Error::Compression)?;
|
||||
encoder.write_all(ssz_bytes).map_err(Error::Compression)?;
|
||||
encoder.finish().map_err(Error::Compression)?;
|
||||
let mut encoder = Encoder::new(&mut compressed_value, self.compression_level)?;
|
||||
encoder.write_all(ssz_bytes)?;
|
||||
encoder.finish()?;
|
||||
Ok(compressed_value)
|
||||
}
|
||||
|
||||
/// Decompress bytes compressed using zstd.
|
||||
pub fn decompress_bytes(&self, input: &[u8]) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut out = Vec::with_capacity(self.estimate_decompressed_size(input.len()));
|
||||
let mut decoder = Decoder::new(input)?;
|
||||
decoder.read_to_end(&mut out)?;
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
impl StoreItem for OnDiskStoreConfig {
|
||||
|
||||
@@ -6,14 +6,12 @@ use serde::{Deserialize, Serialize};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::cmp::Ordering;
|
||||
use std::io::{Read, Write};
|
||||
use std::ops::RangeInclusive;
|
||||
use std::str::FromStr;
|
||||
use std::sync::LazyLock;
|
||||
use superstruct::superstruct;
|
||||
use types::historical_summary::HistoricalSummary;
|
||||
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, List, Slot, Validator};
|
||||
use zstd::{Decoder, Encoder};
|
||||
|
||||
static EMPTY_PUBKEY: LazyLock<PublicKeyBytes> = LazyLock::new(PublicKeyBytes::empty);
|
||||
|
||||
@@ -395,13 +393,17 @@ impl CompressedU64Diff {
|
||||
.collect();
|
||||
|
||||
Ok(CompressedU64Diff {
|
||||
bytes: compress_bytes(&uncompressed_bytes, config)?,
|
||||
bytes: config
|
||||
.compress_bytes(&uncompressed_bytes)
|
||||
.map_err(Error::Compression)?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn apply(&self, xs: &mut Vec<u64>, config: &StoreConfig) -> Result<(), Error> {
|
||||
// Decompress balances diff.
|
||||
let balances_diff_bytes = uncompress_bytes(&self.bytes, config)?;
|
||||
let balances_diff_bytes = config
|
||||
.decompress_bytes(&self.bytes)
|
||||
.map_err(Error::Compression)?;
|
||||
|
||||
for (i, diff_bytes) in balances_diff_bytes
|
||||
.chunks(u64::BITS as usize / 8)
|
||||
@@ -428,22 +430,6 @@ impl CompressedU64Diff {
|
||||
}
|
||||
}
|
||||
|
||||
fn compress_bytes(input: &[u8], config: &StoreConfig) -> Result<Vec<u8>, Error> {
|
||||
let compression_level = config.compression_level;
|
||||
let mut out = Vec::with_capacity(config.estimate_compressed_size(input.len()));
|
||||
let mut encoder = Encoder::new(&mut out, compression_level).map_err(Error::Compression)?;
|
||||
encoder.write_all(input).map_err(Error::Compression)?;
|
||||
encoder.finish().map_err(Error::Compression)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn uncompress_bytes(input: &[u8], config: &StoreConfig) -> Result<Vec<u8>, Error> {
|
||||
let mut out = Vec::with_capacity(config.estimate_decompressed_size(input.len()));
|
||||
let mut decoder = Decoder::new(input).map_err(Error::Compression)?;
|
||||
decoder.read_to_end(&mut out).map_err(Error::Compression)?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
impl ValidatorsDiff {
|
||||
pub fn compute(
|
||||
xs: &[Validator],
|
||||
@@ -534,12 +520,16 @@ impl ValidatorsDiff {
|
||||
.collect::<Vec<u8>>();
|
||||
|
||||
Ok(Self {
|
||||
bytes: compress_bytes(&uncompressed_bytes, config)?,
|
||||
bytes: config
|
||||
.compress_bytes(&uncompressed_bytes)
|
||||
.map_err(Error::Compression)?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn apply(&self, xs: &mut Vec<Validator>, config: &StoreConfig) -> Result<(), Error> {
|
||||
let validator_diff_bytes = uncompress_bytes(&self.bytes, config)?;
|
||||
let validator_diff_bytes = config
|
||||
.decompress_bytes(&self.bytes)
|
||||
.map_err(Error::Compression)?;
|
||||
|
||||
for diff_bytes in
|
||||
validator_diff_bytes.chunks(<ValidatorDiffEntry as Decode>::ssz_fixed_len())
|
||||
|
||||
@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use types::{Hash256, Slot};
|
||||
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(27);
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(28);
|
||||
|
||||
// All the keys that get stored under the `BeaconMeta` column.
|
||||
//
|
||||
|
||||
@@ -12,6 +12,7 @@ logging = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
proto_array = { workspace = true }
|
||||
state_processing = { workspace = true }
|
||||
superstruct = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
types = { workspace = true }
|
||||
|
||||
|
||||
@@ -2,9 +2,10 @@ use crate::metrics::{self, scrape_for_metrics};
|
||||
use crate::{ForkChoiceStore, InvalidationOperation};
|
||||
use logging::crit;
|
||||
use proto_array::{
|
||||
Block as ProtoBlock, DisallowedReOrgOffsets, ExecutionStatus, ProposerHeadError,
|
||||
ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold,
|
||||
Block as ProtoBlock, DisallowedReOrgOffsets, ExecutionStatus, JustifiedBalances,
|
||||
ProposerHeadError, ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold,
|
||||
};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use state_processing::{
|
||||
per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing,
|
||||
@@ -13,6 +14,7 @@ use std::cmp::Ordering;
|
||||
use std::collections::BTreeSet;
|
||||
use std::marker::PhantomData;
|
||||
use std::time::Duration;
|
||||
use superstruct::superstruct;
|
||||
use tracing::{debug, instrument, warn};
|
||||
use types::{
|
||||
AbstractExecPayload, AttestationShufflingId, AttesterSlashingRef, BeaconBlockRef, BeaconState,
|
||||
@@ -736,6 +738,11 @@ where
|
||||
self.update_checkpoints(
|
||||
state.current_justified_checkpoint(),
|
||||
state.finalized_checkpoint(),
|
||||
|| {
|
||||
state
|
||||
.get_state_root_at_epoch_start(state.current_justified_checkpoint().epoch)
|
||||
.map_err(Into::into)
|
||||
},
|
||||
)?;
|
||||
|
||||
// Update unrealized justified/finalized checkpoints.
|
||||
@@ -795,8 +802,15 @@ where
|
||||
if unrealized_justified_checkpoint.epoch
|
||||
> self.fc_store.unrealized_justified_checkpoint().epoch
|
||||
{
|
||||
self.fc_store
|
||||
.set_unrealized_justified_checkpoint(unrealized_justified_checkpoint);
|
||||
// Justification has recently updated therefore the justified state root should be in
|
||||
// range of the head state's `state_roots` vector.
|
||||
let unrealized_justified_state_root =
|
||||
state.get_state_root_at_epoch_start(unrealized_justified_checkpoint.epoch)?;
|
||||
|
||||
self.fc_store.set_unrealized_justified_checkpoint(
|
||||
unrealized_justified_checkpoint,
|
||||
unrealized_justified_state_root,
|
||||
);
|
||||
}
|
||||
if unrealized_finalized_checkpoint.epoch
|
||||
> self.fc_store.unrealized_finalized_checkpoint().epoch
|
||||
@@ -810,6 +824,13 @@ where
|
||||
self.pull_up_store_checkpoints(
|
||||
unrealized_justified_checkpoint,
|
||||
unrealized_finalized_checkpoint,
|
||||
|| {
|
||||
// In the case where we actually update justification, it must be that the
|
||||
// unrealized justification is recent and in range of the `state_roots` vector.
|
||||
state
|
||||
.get_state_root_at_epoch_start(unrealized_justified_checkpoint.epoch)
|
||||
.map_err(Into::into)
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -896,11 +917,13 @@ where
|
||||
&mut self,
|
||||
justified_checkpoint: Checkpoint,
|
||||
finalized_checkpoint: Checkpoint,
|
||||
justified_state_root_producer: impl FnOnce() -> Result<Hash256, Error<T::Error>>,
|
||||
) -> Result<(), Error<T::Error>> {
|
||||
// Update justified checkpoint.
|
||||
if justified_checkpoint.epoch > self.fc_store.justified_checkpoint().epoch {
|
||||
let justified_state_root = justified_state_root_producer()?;
|
||||
self.fc_store
|
||||
.set_justified_checkpoint(justified_checkpoint)
|
||||
.set_justified_checkpoint(justified_checkpoint, justified_state_root)
|
||||
.map_err(Error::UnableToSetJustifiedCheckpoint)?;
|
||||
}
|
||||
|
||||
@@ -1166,10 +1189,12 @@ where
|
||||
// Update the justified/finalized checkpoints based upon the
|
||||
// best-observed unrealized justification/finality.
|
||||
let unrealized_justified_checkpoint = *self.fc_store.unrealized_justified_checkpoint();
|
||||
let unrealized_justified_state_root = self.fc_store.unrealized_justified_state_root();
|
||||
let unrealized_finalized_checkpoint = *self.fc_store.unrealized_finalized_checkpoint();
|
||||
self.pull_up_store_checkpoints(
|
||||
unrealized_justified_checkpoint,
|
||||
unrealized_finalized_checkpoint,
|
||||
|| Ok(unrealized_justified_state_root),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
@@ -1179,10 +1204,12 @@ where
|
||||
&mut self,
|
||||
unrealized_justified_checkpoint: Checkpoint,
|
||||
unrealized_finalized_checkpoint: Checkpoint,
|
||||
unrealized_justified_state_root_producer: impl FnOnce() -> Result<Hash256, Error<T::Error>>,
|
||||
) -> Result<(), Error<T::Error>> {
|
||||
self.update_checkpoints(
|
||||
unrealized_justified_checkpoint,
|
||||
unrealized_finalized_checkpoint,
|
||||
unrealized_justified_state_root_producer,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1375,12 +1402,16 @@ where
|
||||
/// Instantiate `Self` from some `PersistedForkChoice` generated by a earlier call to
|
||||
/// `Self::to_persisted`.
|
||||
pub fn proto_array_from_persisted(
|
||||
persisted: &PersistedForkChoice,
|
||||
persisted_proto_array: proto_array::core::SszContainer,
|
||||
justified_balances: JustifiedBalances,
|
||||
reset_payload_statuses: ResetPayloadStatuses,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<ProtoArrayForkChoice, Error<T::Error>> {
|
||||
let mut proto_array = ProtoArrayForkChoice::from_bytes(&persisted.proto_array_bytes)
|
||||
.map_err(Error::InvalidProtoArrayBytes)?;
|
||||
let mut proto_array = ProtoArrayForkChoice::from_container(
|
||||
persisted_proto_array.clone(),
|
||||
justified_balances.clone(),
|
||||
)
|
||||
.map_err(Error::InvalidProtoArrayBytes)?;
|
||||
let contains_invalid_payloads = proto_array.contains_invalid_payloads();
|
||||
|
||||
debug!(
|
||||
@@ -1408,7 +1439,7 @@ where
|
||||
info = "please report this error",
|
||||
"Failed to reset payload statuses"
|
||||
);
|
||||
ProtoArrayForkChoice::from_bytes(&persisted.proto_array_bytes)
|
||||
ProtoArrayForkChoice::from_container(persisted_proto_array, justified_balances)
|
||||
.map_err(Error::InvalidProtoArrayBytes)
|
||||
} else {
|
||||
debug!("Successfully reset all payload statuses");
|
||||
@@ -1424,8 +1455,13 @@ where
|
||||
fc_store: T,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<Self, Error<T::Error>> {
|
||||
let proto_array =
|
||||
Self::proto_array_from_persisted(&persisted, reset_payload_statuses, spec)?;
|
||||
let justified_balances = fc_store.justified_balances().clone();
|
||||
let proto_array = Self::proto_array_from_persisted(
|
||||
persisted.proto_array,
|
||||
justified_balances,
|
||||
reset_payload_statuses,
|
||||
spec,
|
||||
)?;
|
||||
|
||||
let current_slot = fc_store.get_current_slot();
|
||||
|
||||
@@ -1471,7 +1507,7 @@ where
|
||||
/// be instantiated again later.
|
||||
pub fn to_persisted(&self) -> PersistedForkChoice {
|
||||
PersistedForkChoice {
|
||||
proto_array_bytes: self.proto_array().as_bytes(),
|
||||
proto_array: self.proto_array().as_ssz_container(),
|
||||
queued_attestations: self.queued_attestations().to_vec(),
|
||||
}
|
||||
}
|
||||
@@ -1485,10 +1521,46 @@ where
|
||||
/// Helper struct that is used to encode/decode the state of the `ForkChoice` as SSZ bytes.
|
||||
///
|
||||
/// This is used when persisting the state of the fork choice to disk.
|
||||
#[derive(Encode, Decode, Clone)]
|
||||
#[superstruct(
|
||||
variants(V17, V28),
|
||||
variant_attributes(derive(Encode, Decode, Clone)),
|
||||
no_enum
|
||||
)]
|
||||
pub struct PersistedForkChoice {
|
||||
#[superstruct(only(V17))]
|
||||
pub proto_array_bytes: Vec<u8>,
|
||||
queued_attestations: Vec<QueuedAttestation>,
|
||||
#[superstruct(only(V28))]
|
||||
pub proto_array: proto_array::core::SszContainerV28,
|
||||
pub queued_attestations: Vec<QueuedAttestation>,
|
||||
}
|
||||
|
||||
pub type PersistedForkChoice = PersistedForkChoiceV28;
|
||||
|
||||
impl TryFrom<PersistedForkChoiceV17> for PersistedForkChoiceV28 {
|
||||
type Error = ssz::DecodeError;
|
||||
|
||||
fn try_from(v17: PersistedForkChoiceV17) -> Result<Self, Self::Error> {
|
||||
let container_v17 =
|
||||
proto_array::core::SszContainerV17::from_ssz_bytes(&v17.proto_array_bytes)?;
|
||||
let container_v28 = container_v17.into();
|
||||
|
||||
Ok(Self {
|
||||
proto_array: container_v28,
|
||||
queued_attestations: v17.queued_attestations,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(PersistedForkChoiceV28, JustifiedBalances)> for PersistedForkChoiceV17 {
|
||||
fn from((v28, balances): (PersistedForkChoiceV28, JustifiedBalances)) -> Self {
|
||||
let container_v17 = proto_array::core::SszContainerV17::from((v28.proto_array, balances));
|
||||
let proto_array_bytes = container_v17.as_ssz_bytes();
|
||||
|
||||
Self {
|
||||
proto_array_bytes,
|
||||
queued_attestations: v28.queued_attestations,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -44,6 +44,9 @@ pub trait ForkChoiceStore<E: EthSpec>: Sized {
|
||||
/// Returns the `justified_checkpoint`.
|
||||
fn justified_checkpoint(&self) -> &Checkpoint;
|
||||
|
||||
/// Returns the state root of the justified checkpoint.
|
||||
fn justified_state_root(&self) -> Hash256;
|
||||
|
||||
/// Returns balances from the `state` identified by `justified_checkpoint.root`.
|
||||
fn justified_balances(&self) -> &JustifiedBalances;
|
||||
|
||||
@@ -53,6 +56,9 @@ pub trait ForkChoiceStore<E: EthSpec>: Sized {
|
||||
/// Returns the `unrealized_justified_checkpoint`.
|
||||
fn unrealized_justified_checkpoint(&self) -> &Checkpoint;
|
||||
|
||||
/// Returns the state root of the unrealized justified checkpoint.
|
||||
fn unrealized_justified_state_root(&self) -> Hash256;
|
||||
|
||||
/// Returns the `unrealized_finalized_checkpoint`.
|
||||
fn unrealized_finalized_checkpoint(&self) -> &Checkpoint;
|
||||
|
||||
@@ -63,10 +69,14 @@ pub trait ForkChoiceStore<E: EthSpec>: Sized {
|
||||
fn set_finalized_checkpoint(&mut self, checkpoint: Checkpoint);
|
||||
|
||||
/// Sets the `justified_checkpoint`.
|
||||
fn set_justified_checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), Self::Error>;
|
||||
fn set_justified_checkpoint(
|
||||
&mut self,
|
||||
checkpoint: Checkpoint,
|
||||
state_root: Hash256,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
/// Sets the `unrealized_justified_checkpoint`.
|
||||
fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint);
|
||||
fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint, state_root: Hash256);
|
||||
|
||||
/// Sets the `unrealized_finalized_checkpoint`.
|
||||
fn set_unrealized_finalized_checkpoint(&mut self, checkpoint: Checkpoint);
|
||||
|
||||
@@ -5,7 +5,7 @@ mod metrics;
|
||||
pub use crate::fork_choice::{
|
||||
AttestationFromBlock, Error, ForkChoice, ForkChoiceView, ForkchoiceUpdateParameters,
|
||||
InvalidAttestation, InvalidBlock, PayloadVerificationStatus, PersistedForkChoice,
|
||||
QueuedAttestation, ResetPayloadStatuses,
|
||||
PersistedForkChoiceV17, PersistedForkChoiceV28, QueuedAttestation, ResetPayloadStatuses,
|
||||
};
|
||||
pub use fork_choice_store::ForkChoiceStore;
|
||||
pub use proto_array::{
|
||||
|
||||
@@ -306,8 +306,8 @@ fn get_checkpoint(i: u64) -> Checkpoint {
|
||||
|
||||
fn check_bytes_round_trip(original: &ProtoArrayForkChoice) {
|
||||
let bytes = original.as_bytes();
|
||||
let decoded =
|
||||
ProtoArrayForkChoice::from_bytes(&bytes).expect("fork choice should decode from bytes");
|
||||
let decoded = ProtoArrayForkChoice::from_bytes(&bytes, original.balances.clone())
|
||||
.expect("fork choice should decode from bytes");
|
||||
assert!(
|
||||
*original == decoded,
|
||||
"fork choice should encode and decode without change"
|
||||
|
||||
@@ -16,5 +16,5 @@ pub use error::Error;
|
||||
pub mod core {
|
||||
pub use super::proto_array::{ProposerBoost, ProtoArray, ProtoNode};
|
||||
pub use super::proto_array_fork_choice::VoteTracker;
|
||||
pub use super::ssz_container::{SszContainer, SszContainerV17};
|
||||
pub use super::ssz_container::{SszContainer, SszContainerV17, SszContainerV28};
|
||||
}
|
||||
|
||||
@@ -866,14 +866,25 @@ impl ProtoArrayForkChoice {
|
||||
self.proto_array.iter_block_roots(block_root)
|
||||
}
|
||||
|
||||
pub fn as_ssz_container(&self) -> SszContainer {
|
||||
SszContainer::from(self)
|
||||
}
|
||||
|
||||
pub fn as_bytes(&self) -> Vec<u8> {
|
||||
SszContainer::from(self).as_ssz_bytes()
|
||||
}
|
||||
|
||||
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
|
||||
pub fn from_bytes(bytes: &[u8], balances: JustifiedBalances) -> Result<Self, String> {
|
||||
let container = SszContainer::from_ssz_bytes(bytes)
|
||||
.map_err(|e| format!("Failed to decode ProtoArrayForkChoice: {:?}", e))?;
|
||||
container
|
||||
Self::from_container(container, balances)
|
||||
}
|
||||
|
||||
pub fn from_container(
|
||||
container: SszContainer,
|
||||
balances: JustifiedBalances,
|
||||
) -> Result<Self, String> {
|
||||
(container, balances)
|
||||
.try_into()
|
||||
.map_err(|e| format!("Failed to initialize ProtoArrayForkChoice: {e:?}"))
|
||||
}
|
||||
|
||||
@@ -14,16 +14,20 @@ use types::{Checkpoint, Hash256};
|
||||
// selector.
|
||||
four_byte_option_impl!(four_byte_option_checkpoint, Checkpoint);
|
||||
|
||||
pub type SszContainer = SszContainerV17;
|
||||
pub type SszContainer = SszContainerV28;
|
||||
|
||||
#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)]
|
||||
#[superstruct(
|
||||
variants(V17, V28),
|
||||
variant_attributes(derive(Encode, Decode, Clone)),
|
||||
no_enum
|
||||
)]
|
||||
pub struct SszContainer {
|
||||
pub votes: Vec<VoteTracker>,
|
||||
#[superstruct(only(V17))]
|
||||
pub balances: Vec<u64>,
|
||||
pub prune_threshold: usize,
|
||||
pub justified_checkpoint: Checkpoint,
|
||||
pub finalized_checkpoint: Checkpoint,
|
||||
#[superstruct(only(V17))]
|
||||
pub nodes: Vec<ProtoNodeV17>,
|
||||
pub indices: Vec<(Hash256, usize)>,
|
||||
pub previous_proposer_boost: ProposerBoost,
|
||||
@@ -35,7 +39,6 @@ impl From<&ProtoArrayForkChoice> for SszContainer {
|
||||
|
||||
Self {
|
||||
votes: from.votes.0.clone(),
|
||||
balances: from.balances.effective_balances.clone(),
|
||||
prune_threshold: proto_array.prune_threshold,
|
||||
justified_checkpoint: proto_array.justified_checkpoint,
|
||||
finalized_checkpoint: proto_array.finalized_checkpoint,
|
||||
@@ -46,10 +49,10 @@ impl From<&ProtoArrayForkChoice> for SszContainer {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<SszContainer> for ProtoArrayForkChoice {
|
||||
impl TryFrom<(SszContainer, JustifiedBalances)> for ProtoArrayForkChoice {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(from: SszContainer) -> Result<Self, Error> {
|
||||
fn try_from((from, balances): (SszContainer, JustifiedBalances)) -> Result<Self, Error> {
|
||||
let proto_array = ProtoArray {
|
||||
prune_threshold: from.prune_threshold,
|
||||
justified_checkpoint: from.justified_checkpoint,
|
||||
@@ -62,7 +65,38 @@ impl TryFrom<SszContainer> for ProtoArrayForkChoice {
|
||||
Ok(Self {
|
||||
proto_array,
|
||||
votes: ElasticList(from.votes),
|
||||
balances: JustifiedBalances::from_effective_balances(from.balances)?,
|
||||
balances,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Convert V17 to V28 by dropping balances.
|
||||
impl From<SszContainerV17> for SszContainerV28 {
|
||||
fn from(v17: SszContainerV17) -> Self {
|
||||
Self {
|
||||
votes: v17.votes,
|
||||
prune_threshold: v17.prune_threshold,
|
||||
justified_checkpoint: v17.justified_checkpoint,
|
||||
finalized_checkpoint: v17.finalized_checkpoint,
|
||||
nodes: v17.nodes,
|
||||
indices: v17.indices,
|
||||
previous_proposer_boost: v17.previous_proposer_boost,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Convert V28 to V17 by re-adding balances.
|
||||
impl From<(SszContainerV28, JustifiedBalances)> for SszContainerV17 {
|
||||
fn from((v28, balances): (SszContainerV28, JustifiedBalances)) -> Self {
|
||||
Self {
|
||||
votes: v28.votes,
|
||||
balances: balances.effective_balances.clone(),
|
||||
prune_threshold: v28.prune_threshold,
|
||||
justified_checkpoint: v28.justified_checkpoint,
|
||||
finalized_checkpoint: v28.finalized_checkpoint,
|
||||
nodes: v28.nodes,
|
||||
indices: v28.indices,
|
||||
previous_proposer_boost: v28.previous_proposer_boost,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1444,6 +1444,12 @@ impl<E: EthSpec> BeaconState<E> {
|
||||
.ok_or(Error::StateRootsOutOfBounds(i))
|
||||
}
|
||||
|
||||
/// Gets the state root for the start slot of some epoch.
|
||||
pub fn get_state_root_at_epoch_start(&self, epoch: Epoch) -> Result<Hash256, Error> {
|
||||
self.get_state_root(epoch.start_slot(E::slots_per_epoch()))
|
||||
.copied()
|
||||
}
|
||||
|
||||
/// Gets the oldest (earliest slot) state root.
|
||||
pub fn get_oldest_state_root(&self) -> Result<&Hash256, Error> {
|
||||
let oldest_slot = self.slot().saturating_sub(self.state_roots().len());
|
||||
|
||||
Reference in New Issue
Block a user