diff --git a/Cargo.lock b/Cargo.lock index 427996b533..754e2b9520 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,7 +332,7 @@ dependencies = [ "smallvec", "state_processing", "store", - "strum", + "strum 0.21.0", "superstruct", "task_executor", "tempfile", @@ -1117,6 +1117,24 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" +[[package]] +name = "database_manager" +version = "0.1.0" +dependencies = [ + "beacon_chain", + "beacon_node", + "clap", + "clap_utils", + "environment", + "logging", + "slog", + "sloggers", + "store", + "strum 0.24.0", + "tempfile", + "types", +] + [[package]] name = "db-key" version = "0.0.5" @@ -1416,7 +1434,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c5f0096a91d210159eceb2ff5e1c4da18388a170e1e3ce948aac9c8fdbbf595" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "syn", @@ -2351,6 +2369,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -3380,6 +3404,7 @@ dependencies = [ "boot_node", "clap", "clap_utils", + "database_manager", "directory", "env_logger 0.9.0", "environment", @@ -3446,7 +3471,7 @@ dependencies = [ "slog-term", "smallvec", "snap", - "strum", + "strum 0.21.0", "superstruct", "task_executor", "tempfile", @@ -3550,6 +3575,7 @@ dependencies = [ name = "malloc_utils" version = "0.1.0" dependencies = [ + "jemalloc-sys", "jemallocator", "lazy_static", "libc", @@ -3937,7 +3963,7 @@ dependencies = [ "slot_clock", "smallvec", "store", - "strum", + "strum 0.21.0", "task_executor", "tokio", "tokio-stream", @@ -4605,7 +4631,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" dependencies = [ "bytes", - "heck", + "heck 0.3.3", "itertools", "lazy_static", "log", @@ -5869,6 +5895,7 @@ dependencies = [ "slog", "sloggers", "state_processing", + "strum 0.24.0", "take-until", "tempfile", "tree_hash", @@ -5894,7 +5921,16 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2" dependencies = [ - "strum_macros", + "strum_macros 0.21.1", +] + +[[package]] +name = "strum" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e96acfc1b70604b8b2f1ffa4c57e59176c7dbb05d556c71ecd2f5498a1dee7f8" +dependencies = [ + "strum_macros 0.24.0", ] [[package]] @@ -5903,12 +5939,25 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro2", "quote", "syn", ] +[[package]] +name = "strum_macros" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef" +dependencies = [ + "heck 0.4.0", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "subtle" version = "2.4.1" diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index e264844db3..14fee72e59 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1158,8 +1158,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { let committee_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_COMMITTEE); - state.build_committee_cache(RelativeEpoch::Previous, &chain.spec)?; - state.build_committee_cache(RelativeEpoch::Current, &chain.spec)?; + state.build_all_committee_caches(&chain.spec)?; metrics::stop_timer(committee_timer); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ff2aac855c..bd6d3ec1a6 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1066,14 +1066,15 @@ mod test { } for v in state.validators() { - let creds = v.withdrawal_credentials.as_bytes(); + let creds = v.withdrawal_credentials(); + let creds = creds.as_bytes(); assert_eq!( creds[0], spec.bls_withdrawal_prefix_byte, "first byte of withdrawal creds should be bls prefix" ); assert_eq!( &creds[1..], - &hash(&v.pubkey.as_ssz_bytes())[1..], + &hash(&v.pubkey().as_ssz_bytes())[1..], "rest of withdrawal creds should be pubkey hash" ) } diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs index 2df397ea76..621ba15095 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs @@ -150,7 +150,7 @@ pub fn upgrade_to_v10( "Converting full state to diff"; "prev_state_root" => ?state_root, "state_root" => ?current_state_root, - "slot" => slot, + "slot" => current_state.slot(), ); let diff = BeaconStateDiff::compute_diff(&backtrack_state, ¤t_state)?; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index eb3aa43785..e535d67395 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -791,6 +791,7 @@ impl, Cold: ItemStore> HotColdDB let mut state = self.replay_blocks(prev_state, blocks, slot, state_root_iter)?; state.update_tree_hash_cache()?; + state.build_all_caches(&self.spec)?; Ok(Some((state, latest_block_root))) } else { @@ -825,6 +826,7 @@ impl, Cold: ItemStore> HotColdDB // Do a tree hash here so that the cache is fully built. state.update_tree_hash_cache()?; + state.build_all_caches(&self.spec)?; let latest_block_root = state.get_latest_block_root(*state_root); Ok((state, latest_block_root)) @@ -843,6 +845,7 @@ impl, Cold: ItemStore> HotColdDB // Do a tree hash here so that the cache is fully built. state.update_tree_hash_cache()?; + state.build_all_caches(&self.spec)?; let latest_block_root = state.get_latest_block_root(state_root); Ok((state, latest_block_root)) diff --git a/consensus/state_processing/src/per_slot_processing.rs b/consensus/state_processing/src/per_slot_processing.rs index 9018db65bc..5b04c6f599 100644 --- a/consensus/state_processing/src/per_slot_processing.rs +++ b/consensus/state_processing/src/per_slot_processing.rs @@ -55,6 +55,11 @@ pub fn per_slot_processing( if spec.bellatrix_fork_epoch == Some(state.current_epoch()) { upgrade_to_bellatrix(state, spec)?; } + + // Additionally build all caches so that all valid states that are advanced always have + // committee caches built, and we don't have to worry about initialising them at higher + // layers. + state.build_all_caches(spec)?; } Ok(summary) diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 10ea3983cb..330f2fe6ca 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -136,6 +136,13 @@ pub enum Error { }, #[cfg(feature = "milhouse")] MilhouseError(milhouse::Error), + CommitteeCacheDiffInvalidEpoch { + prev_current_epoch: Epoch, + current_epoch: Epoch, + }, + CommitteeCacheDiffUninitialized { + expected_epoch: Epoch, + }, } /// Control whether an epoch-indexed field can be indexed at the next epoch or not. @@ -1488,7 +1495,7 @@ impl BeaconState { Ok(()) } - fn committee_cache_index(relative_epoch: RelativeEpoch) -> usize { + pub(crate) fn committee_cache_index(relative_epoch: RelativeEpoch) -> usize { match relative_epoch { RelativeEpoch::Previous => 0, RelativeEpoch::Current => 1, diff --git a/consensus/types/src/beacon_state/diff.rs b/consensus/types/src/beacon_state/diff.rs index e8bc8c236c..b87b7daf42 100644 --- a/consensus/types/src/beacon_state/diff.rs +++ b/consensus/types/src/beacon_state/diff.rs @@ -1,9 +1,11 @@ use crate::{ - BeaconBlockHeader, BeaconState, BeaconStateError as Error, BitVector, Checkpoint, Eth1Data, - EthSpec, ExecutionPayloadHeader, Fork, Hash256, ParticipationFlags, PendingAttestation, Slot, - SyncCommittee, Validator, + beacon_state::{CommitteeCache, CACHED_EPOCHS}, + BeaconBlockHeader, BeaconState, BeaconStateError as Error, BitVector, Checkpoint, Epoch, + Eth1Data, EthSpec, ExecutionPayloadHeader, Fork, Hash256, ParticipationFlags, + PendingAttestation, Slot, SyncCommittee, Validator, }; use milhouse::{CloneDiff, Diff, ListDiff, ResetListDiff, VectorDiff}; +use safe_arith::SafeArith; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::sync::Arc; @@ -79,6 +81,19 @@ pub struct BeaconStateDiff { // Execution latest_execution_payload_header: Maybe>>, + + // Committee caches + committee_caches: CommitteeCachesDiff, +} + +/// Zero to three committee caches which update a `BeaconState`'s stored committee caches. +/// +/// For most diffs which are taken relative to the previous epoch boundary state this diff +/// will contain a single committee cache. +#[derive(Debug, PartialEq, Encode, Decode)] +pub struct CommitteeCachesDiff { + current_epoch: Epoch, + caches: Vec>, } fn optional_field_diff< @@ -108,6 +123,100 @@ fn apply_optional_diff + Encode Ok(()) } +fn compute_committee_cache_dist( + current_epoch: Epoch, + prev_current_epoch: Epoch, +) -> Result { + current_epoch + .safe_sub(prev_current_epoch) + .as_ref() + .map(Epoch::as_usize) + .map_err(|_| Error::CommitteeCacheDiffInvalidEpoch { + prev_current_epoch, + current_epoch, + }) +} + +/// Check that an array of committee caches is fully initialized with respect to `current_epoch`. +fn check_committee_caches( + caches: &[Arc; CACHED_EPOCHS], + current_epoch: Epoch, +) -> Result<(), Error> { + for (i, cache) in caches.iter().enumerate() { + const CURRENT_EPOCH_OFFSET: u64 = 1; + let expected_epoch = Epoch::new( + current_epoch + .safe_add(i as u64)? + .as_u64() + .saturating_sub(CURRENT_EPOCH_OFFSET), + ); + if !cache.is_initialized_at(expected_epoch) { + return Err(Error::CommitteeCacheDiffUninitialized { expected_epoch }).unwrap(); + } + } + Ok(()) +} + +impl Diff for CommitteeCachesDiff { + // Diffs are applied wrt to the current epoch and the `state.committee_caches` array. + type Target = (Epoch, [Arc; CACHED_EPOCHS]); + type Error = Error; + + fn compute_diff(orig: &Self::Target, other: &Self::Target) -> Result { + let (prev_current_epoch, prev_caches) = orig; + let (current_epoch, caches) = other; + + // Sanity check the inputs to ensure we can compute a sensible diff. + check_committee_caches(&prev_caches, *prev_current_epoch)?; + check_committee_caches(&caches, *current_epoch)?; + + let dist = compute_committee_cache_dist(*current_epoch, *prev_current_epoch)?; + + // The distance determines the number of caches that are unique to the new cache array. + // If the epoch distance is 0 then there are no new caches, if it's 1 then only the last + // element of the cache is new, and so on up to the maximum of `CACHED_EPOCHS` at which + // point the entire array is new. + let new_caches = (CACHED_EPOCHS.saturating_sub(dist)..CACHED_EPOCHS) + .map(|i| { + caches + .get(i) + .cloned() + .ok_or(Error::CommitteeCachesOutOfBounds(i)) + }) + .collect::, _>>()?; + + assert_eq!(new_caches.len(), std::cmp::min(CACHED_EPOCHS, dist)); + Ok(CommitteeCachesDiff { + current_epoch: *current_epoch, + caches: new_caches, + }) + } + + fn apply_diff(self, target: &mut Self::Target) -> Result<(), Error> { + let (prev_current_epoch, caches) = target; + + let dist = compute_committee_cache_dist(self.current_epoch, *prev_current_epoch)?; + let capped_dist = std::cmp::min(CACHED_EPOCHS, dist); + + // Rotate caches for the epoch advance. This moves the caches that are still relevant into + // position. The irrelevant caches will be overwritten in the next step. + caches.rotate_left(capped_dist); + + let base = CACHED_EPOCHS.saturating_sub(capped_dist); + for (i, cache) in self.caches.into_iter().enumerate() { + let cache_index = base.safe_add(i)?; + *caches + .get_mut(cache_index) + .ok_or(Error::CommitteeCachesOutOfBounds(cache_index))? = cache; + } + + *prev_current_epoch = self.current_epoch; + + // Sanity check the diff application. + check_committee_caches(caches, self.current_epoch) + } +} + impl Diff for BeaconStateDiff { type Target = BeaconState; type Error = Error; @@ -115,6 +224,18 @@ impl Diff for BeaconStateDiff { // FIXME(sproul): proc macro fn compute_diff(orig: &Self::Target, other: &Self::Target) -> Result { // FIXME(sproul): consider cross-variant diffs + + // Compute committee caches diff. + let prev_current_epoch = orig.current_epoch(); + let current_epoch = other.current_epoch(); + + let orig_committee_caches = orig.committee_caches().clone(); + let new_committee_caches = other.committee_caches().clone(); + let committee_caches = CommitteeCachesDiff::compute_diff( + &(prev_current_epoch, orig_committee_caches), + &(current_epoch, new_committee_caches), + )?; + Ok(BeaconStateDiff { genesis_time: <_>::compute_diff(&orig.genesis_time(), &other.genesis_time())?, genesis_validators_root: <_>::compute_diff( @@ -192,10 +313,13 @@ impl Diff for BeaconStateDiff { other, BeaconState::latest_execution_payload_header, )?, + committee_caches, }) } fn apply_diff(self, target: &mut BeaconState) -> Result<(), Error> { + let prev_current_epoch = target.current_epoch(); + self.genesis_time.apply_diff(target.genesis_time_mut())?; self.genesis_validators_root .apply_diff(target.genesis_validators_root_mut())?; @@ -250,6 +374,12 @@ impl Diff for BeaconStateDiff { self.latest_execution_payload_header, target.latest_execution_payload_header_mut(), )?; + + // Apply committee caches diff. + let mut committee_caches = (prev_current_epoch, target.committee_caches().clone()); + self.committee_caches.apply_diff(&mut committee_caches)?; + *target.committee_caches_mut() = committee_caches.1; + Ok(()) } }