From 836c39efaaeeb29823d0b66bcd42adb1347ea3f1 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 18 Aug 2025 16:03:28 +1000 Subject: [PATCH] Shrink persisted fork choice data (#7805) Closes: - https://github.com/sigp/lighthouse/issues/7760 - [x] Remove `balances_cache` from `PersistedForkChoiceStore` (~65 MB saving on mainnet) - [x] Remove `justified_balances` from `PersistedForkChoiceStore` (~16 MB saving on mainnet) - [x] Remove `balances` from `ProtoArray`/`SszContainer`. - [x] Implement zstd compression for votes - [x] Fix bug in justified state usage - [x] Bump schema version to V28 and implement migration. --- Cargo.lock | 2 + beacon_node/beacon_chain/Cargo.toml | 3 +- beacon_node/beacon_chain/src/beacon_chain.rs | 11 +- .../src/beacon_fork_choice_store.rs | 159 ++++++++++++++---- beacon_node/beacon_chain/src/builder.rs | 9 +- .../beacon_chain/src/canonical_head.rs | 19 ++- beacon_node/beacon_chain/src/fork_revert.rs | 5 +- beacon_node/beacon_chain/src/lib.rs | 5 +- beacon_node/beacon_chain/src/metrics.rs | 12 ++ .../beacon_chain/src/persisted_fork_choice.rs | 58 ++++++- beacon_node/beacon_chain/src/schema_change.rs | 9 + .../src/schema_change/migration_schema_v23.rs | 31 ++-- .../src/schema_change/migration_schema_v28.rs | 152 +++++++++++++++++ beacon_node/beacon_chain/src/summaries_dag.rs | 12 ++ beacon_node/store/src/config.rs | 22 ++- beacon_node/store/src/hdiff.rs | 34 ++-- beacon_node/store/src/metadata.rs | 2 +- consensus/fork_choice/Cargo.toml | 1 + consensus/fork_choice/src/fork_choice.rs | 100 +++++++++-- .../fork_choice/src/fork_choice_store.rs | 14 +- consensus/fork_choice/src/lib.rs | 2 +- .../src/fork_choice_test_definition.rs | 4 +- consensus/proto_array/src/lib.rs | 2 +- .../src/proto_array_fork_choice.rs | 15 +- consensus/proto_array/src/ssz_container.rs | 48 +++++- consensus/types/src/beacon_state.rs | 6 + 26 files changed, 610 insertions(+), 127 deletions(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v28.rs diff --git a/Cargo.lock b/Cargo.lock index f924d74ec3..2e52527478 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -922,6 +922,7 @@ dependencies = [ "tree_hash", "tree_hash_derive", "types", + "zstd 0.13.3", ] [[package]] @@ -3567,6 +3568,7 @@ dependencies = [ "proto_array", "state_processing", "store", + "superstruct", "tokio", "tracing", "types", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index fbc58eafc8..575bc8ad90 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -1,4 +1,4 @@ - + [package] name = "beacon_chain" version = "0.2.0" @@ -65,6 +65,7 @@ tracing = { workspace = true } tree_hash = { workspace = true } tree_hash_derive = { workspace = true } types = { workspace = true } +zstd = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 26ef750c47..14ddc021b8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -121,8 +121,8 @@ use std::sync::Arc; use std::time::Duration; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; use store::{ - BlobSidecarListFromRoot, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary, - KeyValueStoreOp, StoreItem, StoreOp, + BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary, + KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use task_executor::{ShutdownReason, TaskExecutor}; use tokio_stream::Stream; @@ -618,12 +618,15 @@ impl BeaconChain { reset_payload_statuses: ResetPayloadStatuses, spec: &ChainSpec, ) -> Result>, Error> { - let Some(persisted_fork_choice) = - store.get_item::(&FORK_CHOICE_DB_KEY)? + let Some(persisted_fork_choice_bytes) = store + .hot_db + .get_bytes(DBColumn::ForkChoice, FORK_CHOICE_DB_KEY.as_slice())? else { return Ok(None); }; + let persisted_fork_choice = + PersistedForkChoice::from_bytes(&persisted_fork_choice_bytes, store.get_config())?; let fc_store = BeaconForkChoiceStore::from_persisted(persisted_fork_choice.fork_choice_store, store)?; diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 8a2c0be0c8..2c05df3c7f 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -27,6 +27,7 @@ pub enum Error { FailedToReadState(StoreError), MissingState(Hash256), BeaconStateError(BeaconStateError), + UnalignedCheckpoint { block_slot: Slot, state_slot: Slot }, Arith(ArithError), } @@ -136,7 +137,9 @@ pub struct BeaconForkChoiceStore, Cold: ItemStore< finalized_checkpoint: Checkpoint, justified_checkpoint: Checkpoint, justified_balances: JustifiedBalances, + justified_state_root: Hash256, unrealized_justified_checkpoint: Checkpoint, + unrealized_justified_state_root: Hash256, unrealized_finalized_checkpoint: Checkpoint, proposer_boost_root: Hash256, equivocating_indices: BTreeSet, @@ -162,21 +165,37 @@ where /// It is assumed that `anchor` is already persisted in `store`. pub fn get_forkchoice_store( store: Arc>, - anchor: &BeaconSnapshot, + anchor: BeaconSnapshot, ) -> Result { - let anchor_state = &anchor.beacon_state; + let unadvanced_state_root = anchor.beacon_state_root(); + let mut anchor_state = anchor.beacon_state; let mut anchor_block_header = anchor_state.latest_block_header().clone(); - if anchor_block_header.state_root == Hash256::zero() { - anchor_block_header.state_root = anchor.beacon_state_root(); + + // The anchor state MUST be on an epoch boundary (it should be advanced by the caller). + if !anchor_state + .slot() + .as_u64() + .is_multiple_of(E::slots_per_epoch()) + { + return Err(Error::UnalignedCheckpoint { + block_slot: anchor_block_header.slot, + state_slot: anchor_state.slot(), + }); } - let anchor_root = anchor_block_header.canonical_root(); + + // Compute the accurate block root for the checkpoint block. + if anchor_block_header.state_root.is_zero() { + anchor_block_header.state_root = unadvanced_state_root; + } + let anchor_block_root = anchor_block_header.canonical_root(); let anchor_epoch = anchor_state.current_epoch(); let justified_checkpoint = Checkpoint { epoch: anchor_epoch, - root: anchor_root, + root: anchor_block_root, }; let finalized_checkpoint = justified_checkpoint; - let justified_balances = JustifiedBalances::from_justified_state(anchor_state)?; + let justified_balances = JustifiedBalances::from_justified_state(&anchor_state)?; + let justified_state_root = anchor_state.canonical_root()?; Ok(Self { store, @@ -184,8 +203,10 @@ where time: anchor_state.slot(), justified_checkpoint, justified_balances, + justified_state_root, finalized_checkpoint, unrealized_justified_checkpoint: justified_checkpoint, + unrealized_justified_state_root: justified_state_root, unrealized_finalized_checkpoint: finalized_checkpoint, proposer_boost_root: Hash256::zero(), equivocating_indices: BTreeSet::new(), @@ -197,12 +218,12 @@ where /// on-disk database. pub fn to_persisted(&self) -> PersistedForkChoiceStore { PersistedForkChoiceStore { - balances_cache: self.balances_cache.clone(), time: self.time, finalized_checkpoint: self.finalized_checkpoint, justified_checkpoint: self.justified_checkpoint, - justified_balances: self.justified_balances.effective_balances.clone(), + justified_state_root: self.justified_state_root, unrealized_justified_checkpoint: self.unrealized_justified_checkpoint, + unrealized_justified_state_root: self.unrealized_justified_state_root, unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint, proposer_boost_root: self.proposer_boost_root, equivocating_indices: self.equivocating_indices.clone(), @@ -210,20 +231,59 @@ where } /// Restore `Self` from a previously-generated `PersistedForkChoiceStore`. - pub fn from_persisted( - persisted: PersistedForkChoiceStore, + /// + /// DEPRECATED. Can be deleted once migrations no longer require it. + pub fn from_persisted_v17( + persisted: PersistedForkChoiceStoreV17, + justified_state_root: Hash256, + unrealized_justified_state_root: Hash256, store: Arc>, ) -> Result { let justified_balances = JustifiedBalances::from_effective_balances(persisted.justified_balances)?; + Ok(Self { store, - balances_cache: persisted.balances_cache, + balances_cache: <_>::default(), time: persisted.time, finalized_checkpoint: persisted.finalized_checkpoint, justified_checkpoint: persisted.justified_checkpoint, justified_balances, + justified_state_root, unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint, + unrealized_justified_state_root, + unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint, + proposer_boost_root: persisted.proposer_boost_root, + equivocating_indices: persisted.equivocating_indices, + _phantom: PhantomData, + }) + } + + /// Restore `Self` from a previously-generated `PersistedForkChoiceStore`. + pub fn from_persisted( + persisted: PersistedForkChoiceStore, + store: Arc>, + ) -> Result { + let justified_checkpoint = persisted.justified_checkpoint; + let justified_state_root = persisted.justified_state_root; + + let update_cache = true; + let justified_state = store + .get_hot_state(&justified_state_root, update_cache) + .map_err(Error::FailedToReadState)? + .ok_or(Error::MissingState(justified_state_root))?; + + let justified_balances = JustifiedBalances::from_justified_state(&justified_state)?; + Ok(Self { + store, + balances_cache: <_>::default(), + time: persisted.time, + finalized_checkpoint: persisted.finalized_checkpoint, + justified_checkpoint, + justified_balances, + justified_state_root, + unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint, + unrealized_justified_state_root: persisted.unrealized_justified_state_root, unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint, proposer_boost_root: persisted.proposer_boost_root, equivocating_indices: persisted.equivocating_indices, @@ -261,6 +321,10 @@ where &self.justified_checkpoint } + fn justified_state_root(&self) -> Hash256 { + self.justified_state_root + } + fn justified_balances(&self) -> &JustifiedBalances { &self.justified_balances } @@ -273,6 +337,10 @@ where &self.unrealized_justified_checkpoint } + fn unrealized_justified_state_root(&self) -> Hash256 { + self.unrealized_justified_state_root + } + fn unrealized_finalized_checkpoint(&self) -> &Checkpoint { &self.unrealized_finalized_checkpoint } @@ -285,8 +353,13 @@ where self.finalized_checkpoint = checkpoint } - fn set_justified_checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), Error> { + fn set_justified_checkpoint( + &mut self, + checkpoint: Checkpoint, + justified_state_root: Hash256, + ) -> Result<(), Error> { self.justified_checkpoint = checkpoint; + self.justified_state_root = justified_state_root; if let Some(balances) = self.balances_cache.get( self.justified_checkpoint.root, @@ -297,27 +370,14 @@ where self.justified_balances = JustifiedBalances::from_effective_balances(balances)?; } else { metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES); - let justified_block = self - .store - .get_blinded_block(&self.justified_checkpoint.root) - .map_err(Error::FailedToReadBlock)? - .ok_or(Error::MissingBlock(self.justified_checkpoint.root))? - .deconstruct() - .0; - let max_slot = self - .justified_checkpoint - .epoch - .start_slot(E::slots_per_epoch()); - let (_, state) = self + // Justified state is reasonably useful to cache, it might be finalized soon. + let update_cache = true; + let state = self .store - .get_advanced_hot_state( - self.justified_checkpoint.root, - max_slot, - justified_block.state_root(), - ) + .get_hot_state(&self.justified_state_root, update_cache) .map_err(Error::FailedToReadState)? - .ok_or_else(|| Error::MissingState(justified_block.state_root()))?; + .ok_or_else(|| Error::MissingState(self.justified_state_root))?; self.justified_balances = JustifiedBalances::from_justified_state(&state)?; } @@ -325,8 +385,9 @@ where Ok(()) } - fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint) { + fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint, state_root: Hash256) { self.unrealized_justified_checkpoint = checkpoint; + self.unrealized_justified_state_root = state_root; } fn set_unrealized_finalized_checkpoint(&mut self, checkpoint: Checkpoint) { @@ -346,18 +407,48 @@ where } } -pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV17; +pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV28; /// A container which allows persisting the `BeaconForkChoiceStore` to the on-disk database. -#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)] +#[superstruct( + variants(V17, V28), + variant_attributes(derive(Encode, Decode)), + no_enum +)] pub struct PersistedForkChoiceStore { + /// The balances cache was removed from disk storage in schema V28. + #[superstruct(only(V17))] pub balances_cache: BalancesCacheV8, pub time: Slot, pub finalized_checkpoint: Checkpoint, pub justified_checkpoint: Checkpoint, + /// The justified balances were removed from disk storage in schema V28. + #[superstruct(only(V17))] pub justified_balances: Vec, + /// The justified state root is stored so that it can be used to load the justified balances. + #[superstruct(only(V28))] + pub justified_state_root: Hash256, pub unrealized_justified_checkpoint: Checkpoint, + #[superstruct(only(V28))] + pub unrealized_justified_state_root: Hash256, pub unrealized_finalized_checkpoint: Checkpoint, pub proposer_boost_root: Hash256, pub equivocating_indices: BTreeSet, } + +// Convert V28 to V17 by adding balances and removing justified state roots. +impl From<(PersistedForkChoiceStoreV28, JustifiedBalances)> for PersistedForkChoiceStoreV17 { + fn from((v28, balances): (PersistedForkChoiceStoreV28, JustifiedBalances)) -> Self { + Self { + balances_cache: Default::default(), + time: v28.time, + finalized_checkpoint: v28.finalized_checkpoint, + justified_checkpoint: v28.justified_checkpoint, + justified_balances: balances.effective_balances, + unrealized_justified_checkpoint: v28.unrealized_justified_checkpoint, + unrealized_finalized_checkpoint: v28.unrealized_finalized_checkpoint, + proposer_boost_root: v28.proposer_boost_root, + equivocating_indices: v28.equivocating_indices, + } + } +} diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 3e5e7a1eee..5e7aa7d4f8 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -394,7 +394,7 @@ where .map_err(|e| format!("Failed to initialize genesis data column info: {:?}", e))?, ); - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis) + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, genesis.clone()) .map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?; let current_slot = None; @@ -616,7 +616,7 @@ where beacon_state: weak_subj_state, }; - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot) + let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, snapshot.clone()) .map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?; let fork_choice = ForkChoice::from_anchor( @@ -887,8 +887,9 @@ where self.pending_io_batch.push(BeaconChain::< Witness, >::persist_fork_choice_in_batch_standalone( - &fork_choice - )); + &fork_choice, + store.get_config(), + ).map_err(|e| format!("Fork choice compression error: {e:?}"))?); store .hot_db .do_atomically(self.pending_io_batch) diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 493baf513e..56d1975972 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -53,7 +53,9 @@ use slot_clock::SlotClock; use state_processing::AllCaches; use std::sync::Arc; use std::time::Duration; -use store::{KeyValueStore, KeyValueStoreOp, StoreItem, iter::StateRootsIterator}; +use store::{ + Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator, +}; use task_executor::{JoinHandle, ShutdownReason}; use tracing::{debug, error, info, instrument, warn}; use types::*; @@ -998,25 +1000,30 @@ impl BeaconChain { /// Persist fork choice to disk, writing immediately. pub fn persist_fork_choice(&self) -> Result<(), Error> { let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE); - let batch = vec![self.persist_fork_choice_in_batch()]; + let batch = vec![self.persist_fork_choice_in_batch()?]; self.store.hot_db.do_atomically(batch)?; Ok(()) } /// Return a database operation for writing fork choice to disk. - pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp { - Self::persist_fork_choice_in_batch_standalone(&self.canonical_head.fork_choice_read_lock()) + pub fn persist_fork_choice_in_batch(&self) -> Result { + Self::persist_fork_choice_in_batch_standalone( + &self.canonical_head.fork_choice_read_lock(), + self.store.get_config(), + ) + .map_err(Into::into) } /// Return a database operation for writing fork choice to disk. pub fn persist_fork_choice_in_batch_standalone( fork_choice: &BeaconForkChoice, - ) -> KeyValueStoreOp { + store_config: &StoreConfig, + ) -> Result { let persisted_fork_choice = PersistedForkChoice { fork_choice: fork_choice.to_persisted(), fork_choice_store: fork_choice.fc_store().to_persisted(), }; - persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY) + persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY, store_config) } } diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index b0cb3b5d9d..4db79790d3 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -142,8 +142,9 @@ pub fn reset_fork_choice_to_finalization, Cold: It beacon_state: finalized_state, }; - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), &finalized_snapshot) - .map_err(|e| format!("Unable to reset fork choice store for revert: {e:?}"))?; + let fc_store = + BeaconForkChoiceStore::get_forkchoice_store(store.clone(), finalized_snapshot.clone()) + .map_err(|e| format!("Unable to reset fork choice store for revert: {e:?}"))?; let mut fork_choice = ForkChoice::from_anchor( fc_store, diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index e4d17ab831..c755eeb9f4 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -74,7 +74,10 @@ pub use self::chain_config::ChainConfig; pub use self::errors::{BeaconChainError, BlockProductionError}; pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; -pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; +pub use beacon_fork_choice_store::{ + BeaconForkChoiceStore, Error as ForkChoiceStoreError, PersistedForkChoiceStoreV17, + PersistedForkChoiceStoreV28, +}; pub use block_verification::{ BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, IntoGossipVerifiedBlock, InvalidSignature, diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 4471b0a93f..1b57bad104 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -585,6 +585,18 @@ pub static FORK_CHOICE_WRITE_LOCK_AQUIRE_TIMES: LazyLock> = La exponential_buckets(1e-3, 4.0, 7), ) }); +pub static FORK_CHOICE_ENCODE_TIMES: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "beacon_fork_choice_encode_seconds", + "Time taken to SSZ encode the persisted fork choice data", + ) +}); +pub static FORK_CHOICE_COMPRESS_TIMES: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "beacon_fork_choice_compress_seconds", + "Time taken to compress the persisted fork choice data", + ) +}); pub static BALANCES_CACHE_HITS: LazyLock> = LazyLock::new(|| { try_create_int_counter( "beacon_balances_cache_hits_total", diff --git a/beacon_node/beacon_chain/src/persisted_fork_choice.rs b/beacon_node/beacon_chain/src/persisted_fork_choice.rs index 8961a74c3d..d8fcc0901b 100644 --- a/beacon_node/beacon_chain/src/persisted_fork_choice.rs +++ b/beacon_node/beacon_chain/src/persisted_fork_choice.rs @@ -1,16 +1,30 @@ -use crate::beacon_fork_choice_store::PersistedForkChoiceStoreV17; +use crate::{ + beacon_fork_choice_store::{PersistedForkChoiceStoreV17, PersistedForkChoiceStoreV28}, + metrics, +}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use store::{DBColumn, Error, StoreItem}; +use store::{DBColumn, Error, KeyValueStoreOp, StoreConfig, StoreItem}; use superstruct::superstruct; +use types::Hash256; // If adding a new version you should update this type alias and fix the breakages. -pub type PersistedForkChoice = PersistedForkChoiceV17; +pub type PersistedForkChoice = PersistedForkChoiceV28; -#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)] +#[superstruct( + variants(V17, V28), + variant_attributes(derive(Encode, Decode)), + no_enum +)] pub struct PersistedForkChoice { - pub fork_choice: fork_choice::PersistedForkChoice, - pub fork_choice_store: PersistedForkChoiceStoreV17, + #[superstruct(only(V17))] + pub fork_choice_v17: fork_choice::PersistedForkChoiceV17, + #[superstruct(only(V28))] + pub fork_choice: fork_choice::PersistedForkChoiceV28, + #[superstruct(only(V17))] + pub fork_choice_store_v17: PersistedForkChoiceStoreV17, + #[superstruct(only(V28))] + pub fork_choice_store: PersistedForkChoiceStoreV28, } macro_rules! impl_store_item { @@ -32,3 +46,35 @@ macro_rules! impl_store_item { } impl_store_item!(PersistedForkChoiceV17); + +impl PersistedForkChoiceV28 { + pub fn from_bytes(bytes: &[u8], store_config: &StoreConfig) -> Result { + let decompressed_bytes = store_config + .decompress_bytes(bytes) + .map_err(Error::Compression)?; + Self::from_ssz_bytes(&decompressed_bytes).map_err(Into::into) + } + + pub fn as_bytes(&self, store_config: &StoreConfig) -> Result, Error> { + let encode_timer = metrics::start_timer(&metrics::FORK_CHOICE_ENCODE_TIMES); + let ssz_bytes = self.as_ssz_bytes(); + drop(encode_timer); + + let _compress_timer = metrics::start_timer(&metrics::FORK_CHOICE_COMPRESS_TIMES); + store_config + .compress_bytes(&ssz_bytes) + .map_err(Error::Compression) + } + + pub fn as_kv_store_op( + &self, + key: Hash256, + store_config: &StoreConfig, + ) -> Result { + Ok(KeyValueStoreOp::PutKeyValue( + DBColumn::ForkChoice, + key.as_slice().to_vec(), + self.as_bytes(store_config)?, + )) + } +} diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 5e813624db..ddc5978339 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -4,6 +4,7 @@ mod migration_schema_v24; mod migration_schema_v25; mod migration_schema_v26; mod migration_schema_v27; +mod migration_schema_v28; use crate::beacon_chain::BeaconChainTypes; use std::sync::Arc; @@ -79,6 +80,14 @@ pub fn migrate_schema( migration_schema_v27::downgrade_from_v27::(db.clone())?; db.store_schema_version_atomically(to, vec![]) } + (SchemaVersion(27), SchemaVersion(28)) => { + let ops = migration_schema_v28::upgrade_to_v28::(db.clone())?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(28), SchemaVersion(27)) => { + let ops = migration_schema_v28::downgrade_from_v28::(db.clone())?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs index bc832c3399..e8bd526e19 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs @@ -1,6 +1,6 @@ use crate::BeaconForkChoiceStore; use crate::beacon_chain::BeaconChainTypes; -use crate::persisted_fork_choice::PersistedForkChoice; +use crate::persisted_fork_choice::PersistedForkChoiceV17; use crate::schema_change::StoreError; use crate::test_utils::{BEACON_CHAIN_DB_KEY, FORK_CHOICE_DB_KEY, PersistedBeaconChain}; use fork_choice::{ForkChoice, ResetPayloadStatuses}; @@ -80,7 +80,7 @@ pub fn downgrade_from_v23( }; // Recreate head-tracker from fork choice. - let Some(persisted_fork_choice) = db.get_item::(&FORK_CHOICE_DB_KEY)? + let Some(persisted_fork_choice) = db.get_item::(&FORK_CHOICE_DB_KEY)? else { // Fork choice should exist if the database exists. return Err(Error::MigrationError( @@ -88,19 +88,30 @@ pub fn downgrade_from_v23( )); }; - let fc_store = - BeaconForkChoiceStore::from_persisted(persisted_fork_choice.fork_choice_store, db.clone()) - .map_err(|e| { - Error::MigrationError(format!( - "Error loading fork choise store from persisted: {e:?}" - )) - })?; + // We use dummy roots for the justified states because we can source the balances from the v17 + // persited fork choice. The justified state root isn't required to look up the justified state's + // balances (as it would be in V28). This fork choice object with corrupt state roots SHOULD NOT + // be written to disk. + let dummy_justified_state_root = Hash256::repeat_byte(0x66); + let dummy_unrealized_justified_state_root = Hash256::repeat_byte(0x77); + + let fc_store = BeaconForkChoiceStore::from_persisted_v17( + persisted_fork_choice.fork_choice_store_v17, + dummy_justified_state_root, + dummy_unrealized_justified_state_root, + db.clone(), + ) + .map_err(|e| { + Error::MigrationError(format!( + "Error loading fork choice store from persisted: {e:?}" + )) + })?; // Doesn't matter what policy we use for invalid payloads, as our head calculation just // considers descent from finalization. let reset_payload_statuses = ResetPayloadStatuses::OnlyWithInvalidPayload; let fork_choice = ForkChoice::from_persisted( - persisted_fork_choice.fork_choice, + persisted_fork_choice.fork_choice_v17.try_into()?, reset_payload_statuses, fc_store, &db.spec, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v28.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v28.rs new file mode 100644 index 0000000000..5885eaabc0 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v28.rs @@ -0,0 +1,152 @@ +use crate::{ + BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, PersistedForkChoiceStoreV17, + beacon_chain::FORK_CHOICE_DB_KEY, + persisted_fork_choice::{PersistedForkChoiceV17, PersistedForkChoiceV28}, + summaries_dag::{DAGStateSummary, StateSummariesDAG}, +}; +use fork_choice::{ForkChoice, ForkChoiceStore, ResetPayloadStatuses}; +use std::sync::Arc; +use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem}; +use tracing::{info, warn}; +use types::{EthSpec, Hash256}; + +/// Upgrade `PersistedForkChoice` from V17 to V28. +pub fn upgrade_to_v28( + db: Arc>, +) -> Result, Error> { + let Some(persisted_fork_choice_v17) = + db.get_item::(&FORK_CHOICE_DB_KEY)? + else { + warn!("No fork choice found to upgrade to v28"); + return Ok(vec![]); + }; + + // Load state DAG in order to compute justified checkpoint roots. + let state_summaries_dag = { + let state_summaries = db + .load_hot_state_summaries()? + .into_iter() + .map(|(state_root, summary)| (state_root, summary.into())) + .collect::>(); + + StateSummariesDAG::new(state_summaries).map_err(|e| { + Error::MigrationError(format!("Error loading state summaries DAG: {e:?}")) + })? + }; + + // Determine the justified state roots. + let justified_checkpoint = persisted_fork_choice_v17 + .fork_choice_store_v17 + .justified_checkpoint; + let justified_block_root = justified_checkpoint.root; + let justified_slot = justified_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + let justified_state_root = state_summaries_dag + .state_root_at_slot(justified_block_root, justified_slot) + .ok_or_else(|| { + Error::MigrationError(format!( + "Missing state root for justified slot {justified_slot} with latest_block_root \ + {justified_block_root:?}" + )) + })?; + + let unrealized_justified_checkpoint = persisted_fork_choice_v17 + .fork_choice_store_v17 + .unrealized_justified_checkpoint; + let unrealized_justified_block_root = unrealized_justified_checkpoint.root; + let unrealized_justified_slot = unrealized_justified_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + let unrealized_justified_state_root = state_summaries_dag + .state_root_at_slot(unrealized_justified_block_root, unrealized_justified_slot) + .ok_or_else(|| { + Error::MigrationError(format!( + "Missing state root for unrealized justified slot {unrealized_justified_slot} \ + with latest_block_root {unrealized_justified_block_root:?}" + )) + })?; + + let fc_store = BeaconForkChoiceStore::from_persisted_v17( + persisted_fork_choice_v17.fork_choice_store_v17, + justified_state_root, + unrealized_justified_state_root, + db.clone(), + ) + .map_err(|e| { + Error::MigrationError(format!( + "Error loading fork choice store from persisted: {e:?}" + )) + })?; + + info!( + ?justified_state_root, + %justified_slot, + "Added justified state root to fork choice" + ); + + // Construct top-level ForkChoice struct using the patched fork choice store, and the converted + // proto array. + let reset_payload_statuses = ResetPayloadStatuses::OnlyWithInvalidPayload; + let fork_choice = ForkChoice::from_persisted( + persisted_fork_choice_v17.fork_choice_v17.try_into()?, + reset_payload_statuses, + fc_store, + db.get_chain_spec(), + ) + .map_err(|e| Error::MigrationError(format!("Unable to build ForkChoice: {e:?}")))?; + + let ops = vec![BeaconChain::::persist_fork_choice_in_batch_standalone( + &fork_choice, + db.get_config(), + )?]; + + info!("Upgraded fork choice for DB schema v28"); + + Ok(ops) +} + +pub fn downgrade_from_v28( + db: Arc>, +) -> Result, Error> { + let reset_payload_statuses = ResetPayloadStatuses::OnlyWithInvalidPayload; + let Some(fork_choice) = + BeaconChain::::load_fork_choice(db.clone(), reset_payload_statuses, db.get_chain_spec()) + .map_err(|e| Error::MigrationError(format!("Unable to load fork choice: {e:?}")))? + else { + warn!("No fork choice to downgrade"); + return Ok(vec![]); + }; + + // Recreate V28 persisted fork choice, then convert each field back to its V17 version. + let persisted_fork_choice = PersistedForkChoiceV28 { + fork_choice: fork_choice.to_persisted(), + fork_choice_store: fork_choice.fc_store().to_persisted(), + }; + + let justified_balances = fork_choice.fc_store().justified_balances(); + + // 1. Create `proto_array::PersistedForkChoiceV17`. + let fork_choice_v17: fork_choice::PersistedForkChoiceV17 = ( + persisted_fork_choice.fork_choice, + justified_balances.clone(), + ) + .into(); + + let fork_choice_store_v17: PersistedForkChoiceStoreV17 = ( + persisted_fork_choice.fork_choice_store, + justified_balances.clone(), + ) + .into(); + + let persisted_fork_choice_v17 = PersistedForkChoiceV17 { + fork_choice_v17, + fork_choice_store_v17, + }; + + let ops = vec![persisted_fork_choice_v17.as_kv_store_op(FORK_CHOICE_DB_KEY)]; + + info!("Downgraded fork choice for DB schema v28"); + + Ok(ops) +} diff --git a/beacon_node/beacon_chain/src/summaries_dag.rs b/beacon_node/beacon_chain/src/summaries_dag.rs index d74bf638ef..4ddcdaab5a 100644 --- a/beacon_node/beacon_chain/src/summaries_dag.rs +++ b/beacon_node/beacon_chain/src/summaries_dag.rs @@ -355,6 +355,18 @@ impl StateSummariesDAG { } Ok(descendants) } + + /// Returns the root of the state at `slot` with `latest_block_root`, if it exists. + /// + /// The `slot` must be the slot of the `latest_block_root` or a skipped slot following it. This + /// function will not return the `state_root` of a state with a different `latest_block_root` + /// even if it lies on the same chain. + pub fn state_root_at_slot(&self, latest_block_root: Hash256, slot: Slot) -> Option { + self.state_summaries_by_block_root + .get(&latest_block_root)? + .get(&slot) + .map(|(state_root, _)| *state_root) + } } impl From for DAGStateSummary { diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index e3e33de4f9..ad81fa6076 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -4,12 +4,12 @@ use crate::{DBColumn, Error, StoreItem}; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use std::io::Write; +use std::io::{Read, Write}; use std::num::NonZeroUsize; use strum::{Display, EnumString, EnumVariantNames}; use types::EthSpec; use types::non_zero_usize::new_non_zero_usize; -use zstd::Encoder; +use zstd::{Decoder, Encoder}; #[cfg(all(feature = "redb", not(feature = "leveldb")))] pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Redb; @@ -194,15 +194,23 @@ impl StoreConfig { } } - pub fn compress_bytes(&self, ssz_bytes: &[u8]) -> Result, Error> { + /// Compress bytes using zstd and the compression level from `self`. + pub fn compress_bytes(&self, ssz_bytes: &[u8]) -> Result, std::io::Error> { let mut compressed_value = Vec::with_capacity(self.estimate_compressed_size(ssz_bytes.len())); - let mut encoder = Encoder::new(&mut compressed_value, self.compression_level) - .map_err(Error::Compression)?; - encoder.write_all(ssz_bytes).map_err(Error::Compression)?; - encoder.finish().map_err(Error::Compression)?; + let mut encoder = Encoder::new(&mut compressed_value, self.compression_level)?; + encoder.write_all(ssz_bytes)?; + encoder.finish()?; Ok(compressed_value) } + + /// Decompress bytes compressed using zstd. + pub fn decompress_bytes(&self, input: &[u8]) -> Result, std::io::Error> { + let mut out = Vec::with_capacity(self.estimate_decompressed_size(input.len())); + let mut decoder = Decoder::new(input)?; + decoder.read_to_end(&mut out)?; + Ok(out) + } } impl StoreItem for OnDiskStoreConfig { diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index de62d42c59..3e20aab9bf 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -6,14 +6,12 @@ use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::cmp::Ordering; -use std::io::{Read, Write}; use std::ops::RangeInclusive; use std::str::FromStr; use std::sync::LazyLock; use superstruct::superstruct; use types::historical_summary::HistoricalSummary; use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, List, Slot, Validator}; -use zstd::{Decoder, Encoder}; static EMPTY_PUBKEY: LazyLock = LazyLock::new(PublicKeyBytes::empty); @@ -395,13 +393,17 @@ impl CompressedU64Diff { .collect(); Ok(CompressedU64Diff { - bytes: compress_bytes(&uncompressed_bytes, config)?, + bytes: config + .compress_bytes(&uncompressed_bytes) + .map_err(Error::Compression)?, }) } pub fn apply(&self, xs: &mut Vec, config: &StoreConfig) -> Result<(), Error> { // Decompress balances diff. - let balances_diff_bytes = uncompress_bytes(&self.bytes, config)?; + let balances_diff_bytes = config + .decompress_bytes(&self.bytes) + .map_err(Error::Compression)?; for (i, diff_bytes) in balances_diff_bytes .chunks(u64::BITS as usize / 8) @@ -428,22 +430,6 @@ impl CompressedU64Diff { } } -fn compress_bytes(input: &[u8], config: &StoreConfig) -> Result, Error> { - let compression_level = config.compression_level; - let mut out = Vec::with_capacity(config.estimate_compressed_size(input.len())); - let mut encoder = Encoder::new(&mut out, compression_level).map_err(Error::Compression)?; - encoder.write_all(input).map_err(Error::Compression)?; - encoder.finish().map_err(Error::Compression)?; - Ok(out) -} - -fn uncompress_bytes(input: &[u8], config: &StoreConfig) -> Result, Error> { - let mut out = Vec::with_capacity(config.estimate_decompressed_size(input.len())); - let mut decoder = Decoder::new(input).map_err(Error::Compression)?; - decoder.read_to_end(&mut out).map_err(Error::Compression)?; - Ok(out) -} - impl ValidatorsDiff { pub fn compute( xs: &[Validator], @@ -534,12 +520,16 @@ impl ValidatorsDiff { .collect::>(); Ok(Self { - bytes: compress_bytes(&uncompressed_bytes, config)?, + bytes: config + .compress_bytes(&uncompressed_bytes) + .map_err(Error::Compression)?, }) } pub fn apply(&self, xs: &mut Vec, config: &StoreConfig) -> Result<(), Error> { - let validator_diff_bytes = uncompress_bytes(&self.bytes, config)?; + let validator_diff_bytes = config + .decompress_bytes(&self.bytes) + .map_err(Error::Compression)?; for diff_bytes in validator_diff_bytes.chunks(::ssz_fixed_len()) diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index b6091087ef..cf49468451 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(27); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(28); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/consensus/fork_choice/Cargo.toml b/consensus/fork_choice/Cargo.toml index 5c009a5e78..0a244c2ba1 100644 --- a/consensus/fork_choice/Cargo.toml +++ b/consensus/fork_choice/Cargo.toml @@ -12,6 +12,7 @@ logging = { workspace = true } metrics = { workspace = true } proto_array = { workspace = true } state_processing = { workspace = true } +superstruct = { workspace = true } tracing = { workspace = true } types = { workspace = true } diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 7bd8da4cbd..19f294d439 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -2,9 +2,10 @@ use crate::metrics::{self, scrape_for_metrics}; use crate::{ForkChoiceStore, InvalidationOperation}; use logging::crit; use proto_array::{ - Block as ProtoBlock, DisallowedReOrgOffsets, ExecutionStatus, ProposerHeadError, - ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold, + Block as ProtoBlock, DisallowedReOrgOffsets, ExecutionStatus, JustifiedBalances, + ProposerHeadError, ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold, }; +use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::{ per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing, @@ -13,6 +14,7 @@ use std::cmp::Ordering; use std::collections::BTreeSet; use std::marker::PhantomData; use std::time::Duration; +use superstruct::superstruct; use tracing::{debug, instrument, warn}; use types::{ AbstractExecPayload, AttestationShufflingId, AttesterSlashingRef, BeaconBlockRef, BeaconState, @@ -736,6 +738,11 @@ where self.update_checkpoints( state.current_justified_checkpoint(), state.finalized_checkpoint(), + || { + state + .get_state_root_at_epoch_start(state.current_justified_checkpoint().epoch) + .map_err(Into::into) + }, )?; // Update unrealized justified/finalized checkpoints. @@ -795,8 +802,15 @@ where if unrealized_justified_checkpoint.epoch > self.fc_store.unrealized_justified_checkpoint().epoch { - self.fc_store - .set_unrealized_justified_checkpoint(unrealized_justified_checkpoint); + // Justification has recently updated therefore the justified state root should be in + // range of the head state's `state_roots` vector. + let unrealized_justified_state_root = + state.get_state_root_at_epoch_start(unrealized_justified_checkpoint.epoch)?; + + self.fc_store.set_unrealized_justified_checkpoint( + unrealized_justified_checkpoint, + unrealized_justified_state_root, + ); } if unrealized_finalized_checkpoint.epoch > self.fc_store.unrealized_finalized_checkpoint().epoch @@ -810,6 +824,13 @@ where self.pull_up_store_checkpoints( unrealized_justified_checkpoint, unrealized_finalized_checkpoint, + || { + // In the case where we actually update justification, it must be that the + // unrealized justification is recent and in range of the `state_roots` vector. + state + .get_state_root_at_epoch_start(unrealized_justified_checkpoint.epoch) + .map_err(Into::into) + }, )?; } @@ -896,11 +917,13 @@ where &mut self, justified_checkpoint: Checkpoint, finalized_checkpoint: Checkpoint, + justified_state_root_producer: impl FnOnce() -> Result>, ) -> Result<(), Error> { // Update justified checkpoint. if justified_checkpoint.epoch > self.fc_store.justified_checkpoint().epoch { + let justified_state_root = justified_state_root_producer()?; self.fc_store - .set_justified_checkpoint(justified_checkpoint) + .set_justified_checkpoint(justified_checkpoint, justified_state_root) .map_err(Error::UnableToSetJustifiedCheckpoint)?; } @@ -1166,10 +1189,12 @@ where // Update the justified/finalized checkpoints based upon the // best-observed unrealized justification/finality. let unrealized_justified_checkpoint = *self.fc_store.unrealized_justified_checkpoint(); + let unrealized_justified_state_root = self.fc_store.unrealized_justified_state_root(); let unrealized_finalized_checkpoint = *self.fc_store.unrealized_finalized_checkpoint(); self.pull_up_store_checkpoints( unrealized_justified_checkpoint, unrealized_finalized_checkpoint, + || Ok(unrealized_justified_state_root), )?; Ok(()) @@ -1179,10 +1204,12 @@ where &mut self, unrealized_justified_checkpoint: Checkpoint, unrealized_finalized_checkpoint: Checkpoint, + unrealized_justified_state_root_producer: impl FnOnce() -> Result>, ) -> Result<(), Error> { self.update_checkpoints( unrealized_justified_checkpoint, unrealized_finalized_checkpoint, + unrealized_justified_state_root_producer, ) } @@ -1375,12 +1402,16 @@ where /// Instantiate `Self` from some `PersistedForkChoice` generated by a earlier call to /// `Self::to_persisted`. pub fn proto_array_from_persisted( - persisted: &PersistedForkChoice, + persisted_proto_array: proto_array::core::SszContainer, + justified_balances: JustifiedBalances, reset_payload_statuses: ResetPayloadStatuses, spec: &ChainSpec, ) -> Result> { - let mut proto_array = ProtoArrayForkChoice::from_bytes(&persisted.proto_array_bytes) - .map_err(Error::InvalidProtoArrayBytes)?; + let mut proto_array = ProtoArrayForkChoice::from_container( + persisted_proto_array.clone(), + justified_balances.clone(), + ) + .map_err(Error::InvalidProtoArrayBytes)?; let contains_invalid_payloads = proto_array.contains_invalid_payloads(); debug!( @@ -1408,7 +1439,7 @@ where info = "please report this error", "Failed to reset payload statuses" ); - ProtoArrayForkChoice::from_bytes(&persisted.proto_array_bytes) + ProtoArrayForkChoice::from_container(persisted_proto_array, justified_balances) .map_err(Error::InvalidProtoArrayBytes) } else { debug!("Successfully reset all payload statuses"); @@ -1424,8 +1455,13 @@ where fc_store: T, spec: &ChainSpec, ) -> Result> { - let proto_array = - Self::proto_array_from_persisted(&persisted, reset_payload_statuses, spec)?; + let justified_balances = fc_store.justified_balances().clone(); + let proto_array = Self::proto_array_from_persisted( + persisted.proto_array, + justified_balances, + reset_payload_statuses, + spec, + )?; let current_slot = fc_store.get_current_slot(); @@ -1471,7 +1507,7 @@ where /// be instantiated again later. pub fn to_persisted(&self) -> PersistedForkChoice { PersistedForkChoice { - proto_array_bytes: self.proto_array().as_bytes(), + proto_array: self.proto_array().as_ssz_container(), queued_attestations: self.queued_attestations().to_vec(), } } @@ -1485,10 +1521,46 @@ where /// Helper struct that is used to encode/decode the state of the `ForkChoice` as SSZ bytes. /// /// This is used when persisting the state of the fork choice to disk. -#[derive(Encode, Decode, Clone)] +#[superstruct( + variants(V17, V28), + variant_attributes(derive(Encode, Decode, Clone)), + no_enum +)] pub struct PersistedForkChoice { + #[superstruct(only(V17))] pub proto_array_bytes: Vec, - queued_attestations: Vec, + #[superstruct(only(V28))] + pub proto_array: proto_array::core::SszContainerV28, + pub queued_attestations: Vec, +} + +pub type PersistedForkChoice = PersistedForkChoiceV28; + +impl TryFrom for PersistedForkChoiceV28 { + type Error = ssz::DecodeError; + + fn try_from(v17: PersistedForkChoiceV17) -> Result { + let container_v17 = + proto_array::core::SszContainerV17::from_ssz_bytes(&v17.proto_array_bytes)?; + let container_v28 = container_v17.into(); + + Ok(Self { + proto_array: container_v28, + queued_attestations: v17.queued_attestations, + }) + } +} + +impl From<(PersistedForkChoiceV28, JustifiedBalances)> for PersistedForkChoiceV17 { + fn from((v28, balances): (PersistedForkChoiceV28, JustifiedBalances)) -> Self { + let container_v17 = proto_array::core::SszContainerV17::from((v28.proto_array, balances)); + let proto_array_bytes = container_v17.as_ssz_bytes(); + + Self { + proto_array_bytes, + queued_attestations: v28.queued_attestations, + } + } } #[cfg(test)] diff --git a/consensus/fork_choice/src/fork_choice_store.rs b/consensus/fork_choice/src/fork_choice_store.rs index 27f3d34dbc..caa0ae9be2 100644 --- a/consensus/fork_choice/src/fork_choice_store.rs +++ b/consensus/fork_choice/src/fork_choice_store.rs @@ -44,6 +44,9 @@ pub trait ForkChoiceStore: Sized { /// Returns the `justified_checkpoint`. fn justified_checkpoint(&self) -> &Checkpoint; + /// Returns the state root of the justified checkpoint. + fn justified_state_root(&self) -> Hash256; + /// Returns balances from the `state` identified by `justified_checkpoint.root`. fn justified_balances(&self) -> &JustifiedBalances; @@ -53,6 +56,9 @@ pub trait ForkChoiceStore: Sized { /// Returns the `unrealized_justified_checkpoint`. fn unrealized_justified_checkpoint(&self) -> &Checkpoint; + /// Returns the state root of the unrealized justified checkpoint. + fn unrealized_justified_state_root(&self) -> Hash256; + /// Returns the `unrealized_finalized_checkpoint`. fn unrealized_finalized_checkpoint(&self) -> &Checkpoint; @@ -63,10 +69,14 @@ pub trait ForkChoiceStore: Sized { fn set_finalized_checkpoint(&mut self, checkpoint: Checkpoint); /// Sets the `justified_checkpoint`. - fn set_justified_checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), Self::Error>; + fn set_justified_checkpoint( + &mut self, + checkpoint: Checkpoint, + state_root: Hash256, + ) -> Result<(), Self::Error>; /// Sets the `unrealized_justified_checkpoint`. - fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint); + fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint, state_root: Hash256); /// Sets the `unrealized_finalized_checkpoint`. fn set_unrealized_finalized_checkpoint(&mut self, checkpoint: Checkpoint); diff --git a/consensus/fork_choice/src/lib.rs b/consensus/fork_choice/src/lib.rs index 17f1dc38a6..afe06dee1b 100644 --- a/consensus/fork_choice/src/lib.rs +++ b/consensus/fork_choice/src/lib.rs @@ -5,7 +5,7 @@ mod metrics; pub use crate::fork_choice::{ AttestationFromBlock, Error, ForkChoice, ForkChoiceView, ForkchoiceUpdateParameters, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, PersistedForkChoice, - QueuedAttestation, ResetPayloadStatuses, + PersistedForkChoiceV17, PersistedForkChoiceV28, QueuedAttestation, ResetPayloadStatuses, }; pub use fork_choice_store::ForkChoiceStore; pub use proto_array::{ diff --git a/consensus/proto_array/src/fork_choice_test_definition.rs b/consensus/proto_array/src/fork_choice_test_definition.rs index d99ace05f9..20987dff26 100644 --- a/consensus/proto_array/src/fork_choice_test_definition.rs +++ b/consensus/proto_array/src/fork_choice_test_definition.rs @@ -306,8 +306,8 @@ fn get_checkpoint(i: u64) -> Checkpoint { fn check_bytes_round_trip(original: &ProtoArrayForkChoice) { let bytes = original.as_bytes(); - let decoded = - ProtoArrayForkChoice::from_bytes(&bytes).expect("fork choice should decode from bytes"); + let decoded = ProtoArrayForkChoice::from_bytes(&bytes, original.balances.clone()) + .expect("fork choice should decode from bytes"); assert!( *original == decoded, "fork choice should encode and decode without change" diff --git a/consensus/proto_array/src/lib.rs b/consensus/proto_array/src/lib.rs index 4581e5b78f..964e836d91 100644 --- a/consensus/proto_array/src/lib.rs +++ b/consensus/proto_array/src/lib.rs @@ -16,5 +16,5 @@ pub use error::Error; pub mod core { pub use super::proto_array::{ProposerBoost, ProtoArray, ProtoNode}; pub use super::proto_array_fork_choice::VoteTracker; - pub use super::ssz_container::{SszContainer, SszContainerV17}; + pub use super::ssz_container::{SszContainer, SszContainerV17, SszContainerV28}; } diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index ecefb9ff51..4b31dc60bd 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -866,14 +866,25 @@ impl ProtoArrayForkChoice { self.proto_array.iter_block_roots(block_root) } + pub fn as_ssz_container(&self) -> SszContainer { + SszContainer::from(self) + } + pub fn as_bytes(&self) -> Vec { SszContainer::from(self).as_ssz_bytes() } - pub fn from_bytes(bytes: &[u8]) -> Result { + pub fn from_bytes(bytes: &[u8], balances: JustifiedBalances) -> Result { let container = SszContainer::from_ssz_bytes(bytes) .map_err(|e| format!("Failed to decode ProtoArrayForkChoice: {:?}", e))?; - container + Self::from_container(container, balances) + } + + pub fn from_container( + container: SszContainer, + balances: JustifiedBalances, + ) -> Result { + (container, balances) .try_into() .map_err(|e| format!("Failed to initialize ProtoArrayForkChoice: {e:?}")) } diff --git a/consensus/proto_array/src/ssz_container.rs b/consensus/proto_array/src/ssz_container.rs index c13c6a0d59..0bb3f2b35d 100644 --- a/consensus/proto_array/src/ssz_container.rs +++ b/consensus/proto_array/src/ssz_container.rs @@ -14,16 +14,20 @@ use types::{Checkpoint, Hash256}; // selector. four_byte_option_impl!(four_byte_option_checkpoint, Checkpoint); -pub type SszContainer = SszContainerV17; +pub type SszContainer = SszContainerV28; -#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)] +#[superstruct( + variants(V17, V28), + variant_attributes(derive(Encode, Decode, Clone)), + no_enum +)] pub struct SszContainer { pub votes: Vec, + #[superstruct(only(V17))] pub balances: Vec, pub prune_threshold: usize, pub justified_checkpoint: Checkpoint, pub finalized_checkpoint: Checkpoint, - #[superstruct(only(V17))] pub nodes: Vec, pub indices: Vec<(Hash256, usize)>, pub previous_proposer_boost: ProposerBoost, @@ -35,7 +39,6 @@ impl From<&ProtoArrayForkChoice> for SszContainer { Self { votes: from.votes.0.clone(), - balances: from.balances.effective_balances.clone(), prune_threshold: proto_array.prune_threshold, justified_checkpoint: proto_array.justified_checkpoint, finalized_checkpoint: proto_array.finalized_checkpoint, @@ -46,10 +49,10 @@ impl From<&ProtoArrayForkChoice> for SszContainer { } } -impl TryFrom for ProtoArrayForkChoice { +impl TryFrom<(SszContainer, JustifiedBalances)> for ProtoArrayForkChoice { type Error = Error; - fn try_from(from: SszContainer) -> Result { + fn try_from((from, balances): (SszContainer, JustifiedBalances)) -> Result { let proto_array = ProtoArray { prune_threshold: from.prune_threshold, justified_checkpoint: from.justified_checkpoint, @@ -62,7 +65,38 @@ impl TryFrom for ProtoArrayForkChoice { Ok(Self { proto_array, votes: ElasticList(from.votes), - balances: JustifiedBalances::from_effective_balances(from.balances)?, + balances, }) } } + +// Convert V17 to V28 by dropping balances. +impl From for SszContainerV28 { + fn from(v17: SszContainerV17) -> Self { + Self { + votes: v17.votes, + prune_threshold: v17.prune_threshold, + justified_checkpoint: v17.justified_checkpoint, + finalized_checkpoint: v17.finalized_checkpoint, + nodes: v17.nodes, + indices: v17.indices, + previous_proposer_boost: v17.previous_proposer_boost, + } + } +} + +// Convert V28 to V17 by re-adding balances. +impl From<(SszContainerV28, JustifiedBalances)> for SszContainerV17 { + fn from((v28, balances): (SszContainerV28, JustifiedBalances)) -> Self { + Self { + votes: v28.votes, + balances: balances.effective_balances.clone(), + prune_threshold: v28.prune_threshold, + justified_checkpoint: v28.justified_checkpoint, + finalized_checkpoint: v28.finalized_checkpoint, + nodes: v28.nodes, + indices: v28.indices, + previous_proposer_boost: v28.previous_proposer_boost, + } + } +} diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 923168030f..c27200c894 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1444,6 +1444,12 @@ impl BeaconState { .ok_or(Error::StateRootsOutOfBounds(i)) } + /// Gets the state root for the start slot of some epoch. + pub fn get_state_root_at_epoch_start(&self, epoch: Epoch) -> Result { + self.get_state_root(epoch.start_slot(E::slots_per_epoch())) + .copied() + } + /// Gets the oldest (earliest slot) state root. pub fn get_oldest_state_root(&self) -> Result<&Hash256, Error> { let oldest_slot = self.slot().saturating_sub(self.state_roots().len());