diff --git a/Cargo.lock b/Cargo.lock index a915011249..d922a0d7a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1350,7 +1350,7 @@ dependencies = [ "store", "swap_or_not_shuffle", "tree_hash", - "tree_hash_derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tree_hash_derive", "types", ] @@ -1683,11 +1683,13 @@ dependencies = [ "derivative", "eth2_serde_utils", "eth2_ssz", + "milhouse", "serde", "serde_derive", "serde_json", + "smallvec", "tree_hash", - "tree_hash_derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tree_hash_derive", "typenum", ] @@ -1838,7 +1840,7 @@ dependencies = [ "task_executor", "tokio", "tree_hash", - "tree_hash_derive 0.4.0", + "tree_hash_derive", "types", "warp 0.3.0", ] @@ -3604,6 +3606,7 @@ dependencies = [ "derivative", "eth2_hashing 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "eth2_ssz", + "eth2_ssz_derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools", "parking_lot", "rayon", @@ -5516,7 +5519,7 @@ dependencies = [ "sloggers", "tempfile", "tree_hash", - "tree_hash_derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tree_hash_derive", "types", ] @@ -5785,6 +5788,7 @@ dependencies = [ "lighthouse_metrics", "merkle_proof", "rayon", + "rustc-hash", "safe_arith", "smallvec", "tree_hash", @@ -5813,10 +5817,12 @@ name = "store" version = "0.2.0" dependencies = [ "beacon_chain", + "bincode", "db-key", "directory", "eth2_ssz", "eth2_ssz_derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "flate2", "itertools", "lazy_static", "leveldb", @@ -6385,7 +6391,7 @@ dependencies = [ "ethereum-types 0.12.1", "rand 0.7.3", "smallvec", - "tree_hash_derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tree_hash_derive", "types", ] @@ -6398,17 +6404,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tree_hash_derive" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cd22d128157837a4434bb51119aef11103f17bfe8c402ce688cf25aa1e608ad" -dependencies = [ - "darling", - "quote", - "syn", -] - [[package]] name = "triomphe" version = "0.1.5" @@ -6562,13 +6557,14 @@ dependencies = [ "serde_json", "serde_yaml", "slog", + "smallvec", "state_processing", "superstruct", "swap_or_not_shuffle", "tempfile", "test_random_derive", "tree_hash", - "tree_hash_derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tree_hash_derive", ] [[package]] diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4eafc2271d..e2ee197642 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -357,10 +357,10 @@ impl BeaconChain { let mut batch = vec![]; let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD); - batch.push(self.persist_head_in_batch()); + batch.push(self.persist_head_in_batch()?); let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE); - batch.push(self.persist_fork_choice_in_batch()); + batch.push(self.persist_fork_choice_in_batch()?); self.store.hot_db.do_atomically(batch)?; @@ -380,20 +380,20 @@ impl BeaconChain { } /// Return a database operation for writing the beacon chain head to disk. - pub fn persist_head_in_batch(&self) -> KeyValueStoreOp { + pub fn persist_head_in_batch(&self) -> Result { Self::persist_head_in_batch_standalone(self.genesis_block_root, &self.head_tracker) } pub fn persist_head_in_batch_standalone( genesis_block_root: Hash256, head_tracker: &HeadTracker, - ) -> KeyValueStoreOp { + ) -> Result { Self::make_persisted_head(genesis_block_root, head_tracker) .as_kv_store_op(BEACON_CHAIN_DB_KEY) } /// Return a database operation for writing fork choice to disk. - pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp { + pub fn persist_fork_choice_in_batch(&self) -> Result { let fork_choice = self.fork_choice.read(); Self::persist_fork_choice_in_batch_standalone(&fork_choice) } @@ -401,7 +401,7 @@ impl BeaconChain { /// Return a database operation for writing fork choice to disk. pub fn persist_fork_choice_in_batch_standalone( fork_choice: &BeaconForkChoice, - ) -> KeyValueStoreOp { + ) -> Result { let persisted_fork_choice = PersistedForkChoice { fork_choice: fork_choice.to_persisted(), fork_choice_store: fork_choice.fc_store().to_persisted(), @@ -2928,6 +2928,7 @@ impl BeaconChain { drop(slot_timer); state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; + state.apply_pending_mutations()?; let parent_root = if state.slot() > 0 { *state diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 11c3838256..ff2aac855c 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -440,9 +440,9 @@ where weak_subj_slot.epoch(TEthSpec::slots_per_epoch()), weak_subj_state.clone(), ) - .map_err(|e| format!("Failed to set genesis state as finalized state: {:?}", e))?; + .map_err(|e| format!("Failed to set checkpoint state as finalized state: {:?}", e))?; store - .put_state(&weak_subj_state_root, &weak_subj_state) + .store_full_state(&weak_subj_state_root, &weak_subj_state) .map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?; store .put_block(&weak_subj_block_root, weak_subj_block.clone()) @@ -451,7 +451,11 @@ where // Stage the database's metadata fields for atomic storage when `build` is called. // This prevents the database from restarting in an inconsistent state if the anchor // info or split point is written before the `PersistedBeaconChain`. - self.pending_io_batch.push(store.store_split_in_batch()); + self.pending_io_batch.push( + store + .store_split_in_batch() + .map_err(|e| format!("Failed to store split: {:?}", e))?, + ); self.pending_io_batch.push( store .init_anchor_info(weak_subj_block.message()) @@ -459,11 +463,14 @@ where ); // Store pruning checkpoint to prevent attempting to prune before the anchor state. - self.pending_io_batch - .push(store.pruning_checkpoint_store_op(Checkpoint { - root: weak_subj_block_root, - epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()), - })); + self.pending_io_batch.push( + store + .pruning_checkpoint_store_op(Checkpoint { + root: weak_subj_block_root, + epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()), + }) + .map_err(|e| format!("{:?}", e))?, + ); let snapshot = BeaconSnapshot { beacon_block_root: weak_subj_block_root, @@ -716,12 +723,12 @@ where Witness, >::persist_head_in_batch_standalone( genesis_block_root, &head_tracker - )); + ).map_err(|e| format!("{:?}", e))?); self.pending_io_batch.push(BeaconChain::< Witness, >::persist_fork_choice_in_batch_standalone( &fork_choice - )); + ).map_err(|e| format!("{:?}", e))?); store .hot_db .do_atomically(self.pending_io_batch) diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 8d5087ef3c..5fd1883b97 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -179,8 +179,8 @@ impl StoreItem for SszEth1 { DBColumn::Eth1Cache } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, StoreError> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index fb8edd510c..30a4987e19 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -559,10 +559,10 @@ impl, Cold: ItemStore> BackgroundMigrator Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, StoreError> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/beacon_chain/src/persisted_fork_choice.rs b/beacon_node/beacon_chain/src/persisted_fork_choice.rs index eb4c761913..3935c6214c 100644 --- a/beacon_node/beacon_chain/src/persisted_fork_choice.rs +++ b/beacon_node/beacon_chain/src/persisted_fork_choice.rs @@ -31,11 +31,11 @@ macro_rules! impl_store_item { DBColumn::ForkChoice } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) } - fn from_store_bytes(bytes: &[u8]) -> std::result::Result { + fn from_store_bytes(bytes: &[u8]) -> Result { Self::from_ssz_bytes(bytes).map_err(Into::into) } } diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 6d797ab37b..1d7dbe2b0b 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -114,7 +114,7 @@ pub fn migrate_schema( .map_err(StoreError::SchemaMigrationError)?; // Store the converted fork choice store under the same key. - ops.push(persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY)); + ops.push(persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY)?); } db.store_schema_version_atomically(to, ops)?; @@ -159,7 +159,7 @@ pub fn migrate_schema( } // Store the converted fork choice store under the same key. - ops.push(persisted_fork_choice_v7.as_kv_store_op(FORK_CHOICE_DB_KEY)); + ops.push(persisted_fork_choice_v7.as_kv_store_op(FORK_CHOICE_DB_KEY)?); } db.store_schema_version_atomically(to, ops)?; @@ -174,7 +174,7 @@ pub fn migrate_schema( let updated_fork_choice = migration_schema_v8::update_fork_choice::(fork_choice, db.clone())?; - ops.push(updated_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY)); + ops.push(updated_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY)?); } db.store_schema_version_atomically(to, ops)?; @@ -202,8 +202,8 @@ impl StoreItem for OnDiskStoreConfigV4 { DBColumn::BeaconMeta } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, StoreError> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 27bbd435f6..ebb837bebb 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -208,8 +208,8 @@ impl StoreItem for DatabasePubkey { DBColumn::PubkeyCache } - fn as_store_bytes(&self) -> Vec { - self.0.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, StoreError> { + Ok(self.0.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 567e0cdb72..7a270eca30 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -10,7 +10,7 @@ use slasher::{Config as SlasherConfig, Slasher}; use state_processing::{ common::get_indexed_attestation, per_block_processing::{per_block_processing, BlockSignatureStrategy}, - per_slot_processing, BlockProcessingError, VerifyBlockRoot, + per_slot_processing, BlockProcessingError, ConsensusContext, VerifyBlockRoot, }; use std::sync::Arc; use tempfile::tempdir; @@ -968,13 +968,14 @@ fn add_base_block_to_altair_chain() { { let mut state = state; per_slot_processing(&mut state, None, &harness.chain.spec).unwrap(); + let mut ctxt = ConsensusContext::new(state.slot()); assert!(matches!( per_block_processing( &mut state, &base_block, - None, BlockSignatureStrategy::NoVerification, VerifyBlockRoot::True, + &mut ctxt, &harness.chain.spec, ), Err(BlockProcessingError::InconsistentBlockFork( @@ -1087,13 +1088,14 @@ fn add_altair_block_to_base_chain() { { let mut state = state; per_slot_processing(&mut state, None, &harness.chain.spec).unwrap(); + let mut ctxt = ConsensusContext::new(state.slot()); assert!(matches!( per_block_processing( &mut state, &altair_block, - None, BlockSignatureStrategy::NoVerification, VerifyBlockRoot::True, + &mut ctxt, &harness.chain.spec, ), Err(BlockProcessingError::InconsistentBlockFork( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 5c020df492..3b37d95bef 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -310,19 +310,6 @@ fn epoch_boundary_state_attestation_processing() { let mut checked_pre_fin = false; for (attestation, subnet_id) in late_attestations.into_iter().flatten() { - // load_epoch_boundary_state is idempotent! - let block_root = attestation.data.beacon_block_root; - let block = store.get_block(&block_root).unwrap().expect("block exists"); - let epoch_boundary_state = store - .load_epoch_boundary_state(&block.state_root()) - .expect("no error") - .expect("epoch boundary state exists"); - let ebs_of_ebs = store - .load_epoch_boundary_state(&epoch_boundary_state.canonical_root()) - .expect("no error") - .expect("ebs of ebs exists"); - assert_eq!(epoch_boundary_state, ebs_of_ebs); - // If the attestation is pre-finalization it should be rejected. let finalized_epoch = harness .chain @@ -539,6 +526,8 @@ fn block_replayer_hooks() { assert_eq!(post_block_slots, block_slots); // States match. + end_state.apply_pending_mutations().unwrap(); + replay_state.apply_pending_mutations().unwrap(); end_state.drop_all_caches().unwrap(); replay_state.drop_all_caches().unwrap(); assert_eq!(end_state, replay_state); @@ -2465,15 +2454,15 @@ fn check_split_slot(harness: &TestHarness, store: Arc, L /// Check that all the states in a chain dump have the correct tree hash. fn check_chain_dump(harness: &TestHarness, expected_len: u64) { - let chain_dump = harness.chain.chain_dump().unwrap(); + let mut chain_dump = harness.chain.chain_dump().unwrap(); assert_eq!(chain_dump.len() as u64, expected_len); - for checkpoint in &chain_dump { + for checkpoint in &mut chain_dump { // Check that the tree hash of the stored state is as expected assert_eq!( checkpoint.beacon_state_root(), - checkpoint.beacon_state.tree_hash_root(), + checkpoint.beacon_state.update_tree_hash_cache().unwrap(), "tree hash of stored state is incorrect" ); diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs index e69230c50c..6176791164 100644 --- a/beacon_node/network/src/persisted_dht.rs +++ b/beacon_node/network/src/persisted_dht.rs @@ -44,8 +44,8 @@ impl StoreItem for PersistedDht { DBColumn::DhtEnrs } - fn as_store_bytes(&self) -> Vec { - rlp::encode_list(&self.enrs).to_vec() + fn as_store_bytes(&self) -> Result, StoreError> { + Ok(rlp::encode_list(&self.enrs).to_vec()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 50b7828fae..b9f212b07a 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -165,8 +165,8 @@ impl StoreItem for PersistedOperationPoolBase { DBColumn::OpPool } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, StoreError> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { @@ -182,8 +182,8 @@ impl StoreItem for PersistedOperationPool { DBColumn::OpPool } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, StoreError> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 432e6bcdd7..cf120a782c 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -28,6 +28,8 @@ sloggers = { version = "2.1.1", features = ["json"] } directory = { path = "../../common/directory" } tree_hash = "0.4.0" take-until = "0.1.0" +flate2 = { version = "1.0.22", features = ["zlib"], default-features = false } +bincode = "1.3.3" [features] milhouse = ["state_processing/milhouse"] diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index cde8d1f3ae..3593320710 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -73,8 +73,8 @@ impl StoreItem for OnDiskStoreConfig { DBColumn::BeaconMeta } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 977ec1b8be..41dc0c0546 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -47,6 +47,8 @@ pub enum Error { BlockReplayError(BlockReplayError), #[cfg(feature = "milhouse")] MilhouseError(milhouse::Error), + Bincode(Box), + FlateCompression(std::io::Error), } pub trait HandleUnavailable { @@ -112,6 +114,12 @@ impl From for Error { } } +impl From> for Error { + fn from(e: Box) -> Self { + Self::Bincode(e) + } +} + #[derive(Debug)] pub struct DBError { pub message: String, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6e18213898..b44feec462 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -21,6 +21,7 @@ use crate::{ }; use leveldb::iterator::LevelDBIterator; use lru::LruCache; +use milhouse::Diff; use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; use serde_derive::{Deserialize, Serialize}; @@ -89,6 +90,7 @@ pub enum HotColdDBError { MissingEpochBoundaryState(Hash256), MissingPrevState(Hash256), MissingSplitState(Hash256, Slot), + MissingStateDiff(Hash256), MissingAnchorInfo, HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), @@ -454,17 +456,7 @@ impl, Cold: ItemStore> HotColdDB /// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint /// (which will be deleted by this function but shouldn't be). pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { - // Delete the state summary. - self.hot_db - .key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?; - - // Delete the full state if it lies on an epoch boundary. - if slot % E::slots_per_epoch() == 0 { - self.hot_db - .key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?; - } - - Ok(()) + self.do_atomically(vec![StoreOp::DeleteState(*state_root, Some(slot))]) } pub fn forwards_block_roots_iterator( @@ -519,36 +511,6 @@ impl, Cold: ItemStore> HotColdDB HybridForwardsStateRootsIterator::new(self, start_slot, Some(end_slot), get_state, spec) } - /// Load an epoch boundary state by using the hot state summary look-up. - /// - /// Will fall back to the cold DB if a hot state summary is not found. - pub fn load_epoch_boundary_state( - &self, - state_root: &Hash256, - ) -> Result>, Error> { - if let Some(HotStateSummary { - epoch_boundary_state_root, - .. - }) = self.load_hot_state_summary(state_root)? - { - // NOTE: minor inefficiency here because we load an unnecessary hot state summary - let state = self.get_hot_state(&epoch_boundary_state_root)?.ok_or( - HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), - )?; - Ok(Some(state)) - } else { - // Try the cold DB - match self.load_cold_state_slot(state_root)? { - Some(state_slot) => { - let epoch_boundary_slot = - state_slot / E::slots_per_epoch() * E::slots_per_epoch(); - self.load_cold_state_by_slot(epoch_boundary_slot) - } - None => Ok(None), - } - } - } - pub fn put_item(&self, key: &Hash256, item: &I) -> Result<(), Error> { self.hot_db.put(key, item) } @@ -575,7 +537,7 @@ impl, Cold: ItemStore> HotColdDB } StoreOp::PutStateTemporaryFlag(state_root) => { - key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)); + key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)?); } StoreOp::DeleteStateTemporaryFlag(state_root) => { @@ -595,9 +557,17 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) { + // Delete full state if any. let state_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); + + // Delete diff too. + let diff_key = get_key_for_col( + DBColumn::BeaconStateDiff.into(), + state_root.as_bytes(), + ); + key_value_batch.push(KeyValueStoreOp::DeleteKey(diff_key)); } } } @@ -606,7 +576,7 @@ impl, Cold: ItemStore> HotColdDB } pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { - let mut guard = self.block_cache.lock(); + let mut block_cache = self.block_cache.lock(); self.hot_db .do_atomically(self.convert_to_kv_batch(&batch)?)?; @@ -614,7 +584,7 @@ impl, Cold: ItemStore> HotColdDB for op in &batch { match op { StoreOp::PutBlock(block_root, block) => { - guard.put(*block_root, (**block).clone()); + block_cache.put(*block_root, (**block).clone()); } StoreOp::PutState(_, _) => (), @@ -624,10 +594,13 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteStateTemporaryFlag(_) => (), StoreOp::DeleteBlock(block_root) => { - guard.pop(block_root); + block_cache.pop(block_root); } - StoreOp::DeleteState(_, _) => (), + StoreOp::DeleteState(state_root, _) => { + // FIXME(sproul): atomics are a bit sketchy here + self.state_cache.lock().delete(state_root) + } } } Ok(()) @@ -656,27 +629,67 @@ impl, Cold: ItemStore> HotColdDB return Ok(()); } - // On the epoch boundary, store the full state. - if state.slot() % E::slots_per_epoch() == 0 { - trace!( - self.log, - "Storing full state on epoch boundary"; - "slot" => state.slot().as_u64(), - "state_root" => format!("{:?}", state_root) - ); - store_full_state(state_root, state, ops)?; - } - // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. let hot_state_summary = HotStateSummary::new(state_root, state)?; - let op = hot_state_summary.as_kv_store_op(*state_root); + let op = hot_state_summary.as_kv_store_op(*state_root)?; ops.push(op); + // On the epoch boundary, store a diff from the previous epoch boundary state -- unless + // we're at a fork boundary in which case the full state must be stored. + if state.slot() % E::slots_per_epoch() == 0 { + if let Some(fork) = self.spec.fork_activated_at_slot::(state.slot()) { + info!( + self.log, + "Storing fork transition state"; + "fork" => %fork, + "slot" => state.slot(), + "state_root" => ?state_root, + ); + self.store_full_state_in_batch(state_root, state, ops)?; + } else { + debug!( + self.log, + "Storing state diff on epoch boundary"; + "slot" => state.slot(), + "state_root" => ?state_root, + ); + let prev_epoch_state_root = hot_state_summary.epoch_boundary_state_root; + let prev_boundary_state = self.get_hot_state(&prev_epoch_state_root)?.ok_or( + HotColdDBError::MissingEpochBoundaryState(prev_epoch_state_root), + )?; + + let compute_diff_timer = + metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPUTE_TIME); + let diff = BeaconStateDiff::compute_diff(&prev_boundary_state, state)?; + drop(compute_diff_timer); + ops.push(diff.as_kv_store_op(*state_root)?); + } + } + Ok(()) } + pub fn store_full_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + let mut ops = Vec::with_capacity(4); + self.store_full_state_in_batch(state_root, state, &mut ops)?; + self.hot_db.do_atomically(ops) + } + + pub fn store_full_state_in_batch( + &self, + state_root: &Hash256, + state: &BeaconState, + ops: &mut Vec, + ) -> Result<(), Error> { + store_full_state(state_root, state, ops) + } + /// Get a post-finalization state from the database or store. pub fn get_hot_state(&self, state_root: &Hash256) -> Result>, Error> { if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) { @@ -715,18 +728,12 @@ impl, Cold: ItemStore> HotColdDB // If the state is the finalized state, load it from disk. This should only be necessary // once during start-up, after which point the finalized state will be cached. if *state_root == self.get_split_info().state_root { - let mut state = get_full_state(&self.hot_db, state_root, &self.spec)? - .ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?; - - // Do a tree hash here so that the cache is fully built. - state.update_tree_hash_cache()?; - - let latest_block_root = state.get_latest_block_root(*state_root); - return Ok(Some((state, latest_block_root))); + return self.load_hot_state_full(state_root).map(Some); } // If the state is marked as temporary, do not return it. It will become visible // only once its transaction commits and deletes its temporary flag. + // FIXME(sproul): reconsider if self.load_state_temporary_flag(state_root)?.is_some() { return Ok(None); } @@ -734,15 +741,24 @@ impl, Cold: ItemStore> HotColdDB if let Some(HotStateSummary { slot, latest_block_root, + epoch_boundary_state_root, prev_state_root, - .. }) = self.load_hot_state_summary(state_root)? { - // Load prior state, potentially from the cache. - // - // This can backtrack as far as the finalized state in extreme cases, but will prime - // the cache with every intermediate state while doing so, meaning that this work should - // be repeated infrequently. + // On a fork boundary slot load a full state from disk. + if self.spec.fork_activated_at_slot::(slot).is_some() { + return self.load_hot_state_full(state_root).map(Some); + } + + // On any other epoch boundary load and apply a diff. + if slot % E::slots_per_epoch() == 0 { + return self + .load_state_from_diff(*state_root, epoch_boundary_state_root) + .map(Some); + } + + // Otherwise load the prior state, potentially from the cache, and replay a single block + // on top of it. let prev_state = self .get_hot_state(&prev_state_root)? .ok_or(HotColdDBError::MissingPrevState(prev_state_root))?; @@ -761,6 +777,43 @@ impl, Cold: ItemStore> HotColdDB } } + pub fn load_hot_state_full( + &self, + state_root: &Hash256, + ) -> Result<(BeaconState, Hash256), Error> { + let mut state = get_full_state(&self.hot_db, state_root, &self.spec)? + .ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?; + + // Do a tree hash here so that the cache is fully built. + state.update_tree_hash_cache()?; + + let latest_block_root = state.get_latest_block_root(*state_root); + Ok((state, latest_block_root)) + } + + pub fn load_state_from_diff( + &self, + state_root: Hash256, + prev_epoch_state_root: Hash256, + ) -> Result<(BeaconState, Hash256), Error> { + let diff = self.load_state_diff(state_root)?; + let mut state = self.get_hot_state(&prev_epoch_state_root)?.ok_or( + HotColdDBError::MissingEpochBoundaryState(prev_epoch_state_root), + )?; + diff.apply_diff(&mut state)?; + + // Do a tree hash here so that the cache is fully built. + state.update_tree_hash_cache()?; + + let latest_block_root = state.get_latest_block_root(state_root); + Ok((state, latest_block_root)) + } + + pub fn load_state_diff(&self, state_root: Hash256) -> Result, Error> { + self.get_item(&state_root)? + .ok_or(HotColdDBError::MissingStateDiff(state_root).into()) + } + /// Store a pre-finalization state in the freezer database. /// /// If the state doesn't lie on a restore point boundary then just its summary will be stored. @@ -770,7 +823,7 @@ impl, Cold: ItemStore> HotColdDB state: &BeaconState, ops: &mut Vec, ) -> Result<(), Error> { - ops.push(ColdStateSummary { slot: state.slot() }.as_kv_store_op(*state_root)); + ops.push(ColdStateSummary { slot: state.slot() }.as_kv_store_op(*state_root)?); if state.slot() % self.config.slots_per_restore_point != 0 { return Ok(()); @@ -797,7 +850,7 @@ impl, Cold: ItemStore> HotColdDB // 3. Store restore point. let restore_point_index = state.slot().as_u64() / self.config.slots_per_restore_point; - self.store_restore_point_hash(restore_point_index, *state_root, ops); + self.store_restore_point_hash(restore_point_index, *state_root, ops)?; Ok(()) } @@ -1031,7 +1084,7 @@ impl, Cold: ItemStore> HotColdDB let column = SchemaVersion::db_column().into(); let key = SCHEMA_VERSION_KEY.as_bytes(); let db_key = get_key_for_col(column, key); - let op = KeyValueStoreOp::PutKeyValue(db_key, schema_version.as_store_bytes()); + let op = KeyValueStoreOp::PutKeyValue(db_key, schema_version.as_store_bytes()?); ops.push(op); self.hot_db.do_atomically(ops) @@ -1080,7 +1133,7 @@ impl, Cold: ItemStore> HotColdDB ) -> Result { let mut anchor_info = self.anchor_info.write(); if *anchor_info == prev_value { - let kv_op = self.store_anchor_info_in_batch(&new_value); + let kv_op = self.store_anchor_info_in_batch(&new_value)?; *anchor_info = new_value; Ok(kv_op) } else { @@ -1107,14 +1160,17 @@ impl, Cold: ItemStore> HotColdDB /// /// The argument is intended to be `self.anchor_info`, but is passed manually to avoid issues /// with recursive locking. - fn store_anchor_info_in_batch(&self, anchor_info: &Option) -> KeyValueStoreOp { + fn store_anchor_info_in_batch( + &self, + anchor_info: &Option, + ) -> Result { if let Some(ref anchor_info) = anchor_info { anchor_info.as_kv_store_op(ANCHOR_INFO_KEY) } else { - KeyValueStoreOp::DeleteKey(get_key_for_col( + Ok(KeyValueStoreOp::DeleteKey(get_key_for_col( DBColumn::BeaconMeta.into(), ANCHOR_INFO_KEY.as_bytes(), - )) + ))) } } @@ -1184,7 +1240,7 @@ impl, Cold: ItemStore> HotColdDB } /// Stage the split for storage to disk. - pub fn store_split_in_batch(&self) -> KeyValueStoreOp { + pub fn store_split_in_batch(&self) -> Result { self.split.read_recursive().as_kv_store_op(SPLIT_KEY) } @@ -1203,10 +1259,11 @@ impl, Cold: ItemStore> HotColdDB restore_point_index: u64, state_root: Hash256, ops: &mut Vec, - ) { + ) -> Result<(), Error> { let value = &RestorePointHash { state_root }; - let op = value.as_kv_store_op(Self::restore_point_key(restore_point_index)); + let op = value.as_kv_store_op(Self::restore_point_key(restore_point_index))?; ops.push(op); + Ok(()) } /// Convert a `restore_point_index` into a database key. @@ -1293,11 +1350,14 @@ impl, Cold: ItemStore> HotColdDB /// Store the checkpoint to begin pruning from (the "old finalized checkpoint"). pub fn store_pruning_checkpoint(&self, checkpoint: Checkpoint) -> Result<(), Error> { self.hot_db - .do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)]) + .do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)?]) } /// Create a staged store for the pruning checkpoint. - pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp { + pub fn pruning_checkpoint_store_op( + &self, + checkpoint: Checkpoint, + ) -> Result { PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY) } @@ -1353,10 +1413,14 @@ pub fn migrate_database, Cold: ItemStore>( return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into()); } + // Store the new finalized state as a full state in the database. It would likely previously + // have been stored as a diff. + store.store_full_state(&finalized_state_root, finalized_state)?; + + // Copy all of the states between the new finalized state and the split slot, from the hot DB to + // the cold DB. let mut hot_db_ops: Vec> = Vec::new(); - // 1. Copy all of the states between the new finalized state and the split slot, from the hot DB - // to the cold DB. let state_root_iter = StateRootsIterator::new(&store, finalized_state); for maybe_pair in state_root_iter.take_while(|result| match result { Ok((_, slot)) => { @@ -1380,7 +1444,7 @@ pub fn migrate_database, Cold: ItemStore>( // Store a pointer from this state root to its slot, so we can later reconstruct states // from their state root alone. let cold_state_summary = ColdStateSummary { slot }; - let op = cold_state_summary.as_kv_store_op(state_root); + let op = cold_state_summary.as_kv_store_op(state_root)?; cold_db_ops.push(op); // There are data dependencies between calls to `store_cold_state()` that prevent us from @@ -1471,8 +1535,8 @@ impl StoreItem for Split { DBColumn::BeaconMeta } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { @@ -1498,8 +1562,8 @@ impl StoreItem for HotStateSummary { DBColumn::BeaconStateSummary } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { @@ -1514,7 +1578,7 @@ impl HotStateSummary { // slots where there isn't a skip). let slot = state.slot(); let latest_block_root = state.get_latest_block_root(*state_root); - let epoch_boundary_slot = slot / E::slots_per_epoch() * E::slots_per_epoch(); + let epoch_boundary_slot = (slot - 1) / E::slots_per_epoch() * E::slots_per_epoch(); let epoch_boundary_state_root = if epoch_boundary_slot == slot { *state_root } else { @@ -1550,8 +1614,8 @@ impl StoreItem for ColdStateSummary { DBColumn::BeaconStateSummary } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { @@ -1570,8 +1634,8 @@ impl StoreItem for RestorePointHash { DBColumn::BeaconRestorePoint } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { @@ -1587,8 +1651,8 @@ impl StoreItem for TemporaryFlag { DBColumn::BeaconStateTemporary } - fn as_store_bytes(&self) -> Vec { - vec![] + fn as_store_bytes(&self) -> Result, Error> { + Ok(vec![]) } fn from_store_bytes(_: &[u8]) -> Result { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index a44cef4138..d683432579 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -26,6 +26,7 @@ pub mod metrics; mod partial_beacon_state; pub mod reconstruct; mod state_cache; +mod state_diff; pub mod iter; @@ -94,7 +95,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati let column = I::db_column().into(); let key = key.as_bytes(); - self.put_bytes(column, key, &item.as_store_bytes()) + self.put_bytes(column, key, &item.as_store_bytes()?) .map_err(Into::into) } @@ -102,7 +103,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati let column = I::db_column().into(); let key = key.as_bytes(); - self.put_bytes_sync(column, key, &item.as_store_bytes()) + self.put_bytes_sync(column, key, &item.as_store_bytes()?) .map_err(Into::into) } @@ -151,7 +152,15 @@ pub enum DBColumn { /// For data related to the database itself. BeaconMeta, BeaconBlock, + /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). BeaconState, + /// For compact `BeaconStateDiff`s. + BeaconStateDiff, + /// For the mapping from state roots to their slots or summaries. + BeaconStateSummary, + /// For the list of temporary states stored during block import, + /// and then made non-temporary by the deletion of their state root from this column. + BeaconStateTemporary, /// For persisting in-memory state to the database. BeaconChain, OpPool, @@ -160,11 +169,6 @@ pub enum DBColumn { PubkeyCache, /// For the table mapping restore point numbers to state roots. BeaconRestorePoint, - /// For the mapping from state roots to their slots or summaries. - BeaconStateSummary, - /// For the list of temporary states stored during block import, - /// and then made non-temporary by the deletion of their state root from this column. - BeaconStateTemporary, BeaconBlockRoots, BeaconStateRoots, BeaconHistoricalRoots, @@ -179,14 +183,15 @@ impl Into<&'static str> for DBColumn { DBColumn::BeaconMeta => "bma", DBColumn::BeaconBlock => "blk", DBColumn::BeaconState => "ste", + DBColumn::BeaconStateDiff => "bsd", + DBColumn::BeaconStateSummary => "bss", + DBColumn::BeaconStateTemporary => "bst", DBColumn::BeaconChain => "bch", DBColumn::OpPool => "opo", DBColumn::Eth1Cache => "etc", DBColumn::ForkChoice => "frk", DBColumn::PubkeyCache => "pkc", DBColumn::BeaconRestorePoint => "brp", - DBColumn::BeaconStateSummary => "bss", - DBColumn::BeaconStateTemporary => "bst", DBColumn::BeaconBlockRoots => "bbr", DBColumn::BeaconStateRoots => "bsr", DBColumn::BeaconHistoricalRoots => "bhr", @@ -212,16 +217,16 @@ pub trait StoreItem: Sized { fn db_column() -> DBColumn; /// Serialize `self` as bytes. - fn as_store_bytes(&self) -> Vec; + fn as_store_bytes(&self) -> Result, Error>; /// De-serialize `self` from bytes. /// /// Return an instance of the type and the number of bytes that were read. fn from_store_bytes(bytes: &[u8]) -> Result; - fn as_kv_store_op(&self, key: Hash256) -> KeyValueStoreOp { + fn as_kv_store_op(&self, key: Hash256) -> Result { let db_key = get_key_for_col(Self::db_column().into(), key.as_bytes()); - KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes()) + Ok(KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes()?)) } } @@ -243,8 +248,8 @@ mod tests { DBColumn::BeaconBlock } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 9053247186..c5f18e4bea 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -30,8 +30,8 @@ impl StoreItem for SchemaVersion { DBColumn::BeaconMeta } - fn as_store_bytes(&self) -> Vec { - self.0.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.0.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { @@ -52,8 +52,8 @@ impl StoreItem for PruningCheckpoint { DBColumn::BeaconMeta } - fn as_store_bytes(&self) -> Vec { - self.checkpoint.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.checkpoint.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { @@ -71,8 +71,8 @@ impl StoreItem for CompactionTimestamp { DBColumn::BeaconMeta } - fn as_store_bytes(&self) -> Vec { - self.0.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.0.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { @@ -109,8 +109,8 @@ impl StoreItem for AnchorInfo { DBColumn::BeaconMeta } - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) } fn from_store_bytes(bytes: &[u8]) -> Result { diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 6b3445463c..326a111920 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -86,6 +86,33 @@ lazy_static! { "store_beacon_state_write_bytes_total", "Total number of beacon state bytes written to the DB" ); + /* + * Beacon state diffs + */ + pub static ref BEACON_STATE_DIFF_WRITE_BYTES: Result = try_create_int_counter( + "store_beacon_state_diff_write_bytes_total", + "Total number of bytes written for beacon state diffs" + ); + pub static ref BEACON_STATE_DIFF_WRITE_COUNT: Result = try_create_int_counter( + "store_beacon_state_diff_write_count_total", + "Total number of beacon state diffs written" + ); + pub static ref BEACON_STATE_DIFF_COMPRESSION_RATIO: Result = try_create_float_gauge( + "store_beacon_state_diff_compression_ratio", + "Compression ratio for beacon state diffs (higher is better)" + ); + pub static ref BEACON_STATE_DIFF_COMPUTE_TIME: Result = try_create_histogram( + "store_beacon_state_diff_compute_time", + "Time to calculate a beacon state diff" + ); + pub static ref BEACON_STATE_DIFF_ENCODE_TIME: Result = try_create_histogram( + "store_beacon_state_diff_encode_time", + "Time to encode a beacon state diff as SSZ" + ); + pub static ref BEACON_STATE_DIFF_COMPRESSION_TIME: Result = try_create_histogram( + "store_beacon_state_diff_compression_time", + "Time to compress beacon state SSZ using Flate2" + ); /* * Beacon Block */ diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index c87dfcf959..223df9f0ba 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -88,6 +88,16 @@ impl StateCache { block_root: Hash256, state: &BeaconState, ) -> Result { + if self + .finalized_state + .as_ref() + .map_or(false, |finalized_state| { + finalized_state.state_root == state_root + }) + { + // FIXME(sproul): this should technically be true + return Ok(false); + } if self.states.peek(&state_root).is_some() { return Ok(true); } @@ -136,6 +146,11 @@ impl StateCache { let state = self.get_by_state_root(state_root)?; Some((state_root, state)) } + + pub fn delete(&mut self, state_root: &Hash256) { + self.states.pop(state_root); + self.block_map.delete(state_root); + } } impl BlockMap { @@ -164,6 +179,16 @@ impl BlockMap { pruned_states } + + // FIXME(sproul): slow, make generic + fn delete(&mut self, state_root_to_delete: &Hash256) { + self.blocks.retain(|_, slot_map| { + slot_map + .slots + .retain(|_, state_root| state_root != state_root_to_delete); + !slot_map.slots.is_empty() + }); + } } #[cfg(test)] diff --git a/beacon_node/store/src/state_diff.rs b/beacon_node/store/src/state_diff.rs new file mode 100644 index 0000000000..11d4b0262b --- /dev/null +++ b/beacon_node/store/src/state_diff.rs @@ -0,0 +1,49 @@ +use crate::{metrics, DBColumn, Error, StoreItem}; +use flate2::bufread::{ZlibDecoder, ZlibEncoder}; +use ssz::{Decode, Encode}; +use std::io::Read; +use types::{beacon_state::BeaconStateDiff, EthSpec}; + +impl StoreItem for BeaconStateDiff { + fn db_column() -> DBColumn { + DBColumn::BeaconStateDiff + } + + fn as_store_bytes(&self) -> Result, Error> { + let encode_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_ENCODE_TIME); + let value = self.as_ssz_bytes(); + drop(encode_timer); + + // FIXME(sproul): try vec with capacity + let compression_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPRESSION_TIME); + let mut encoder = ZlibEncoder::new(&value[..], flate2::Compression::fast()); + let mut compressed_value = vec![]; + encoder + .read_to_end(&mut compressed_value) + .map_err(Error::FlateCompression)?; + drop(compression_timer); + + let compression_ratio = value.len() as f64 / compressed_value.len() as f64; + metrics::set_float_gauge( + &metrics::BEACON_STATE_DIFF_COMPRESSION_RATIO, + compression_ratio, + ); + + metrics::inc_counter_by( + &metrics::BEACON_STATE_DIFF_WRITE_BYTES, + compressed_value.len() as u64, + ); + metrics::inc_counter(&metrics::BEACON_STATE_DIFF_WRITE_COUNT); + + Ok(compressed_value) + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + let mut ssz_bytes = vec![]; + let mut decoder = ZlibDecoder::new(bytes); + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::FlateCompression)?; + Ok(Self::from_ssz_bytes(&ssz_bytes)?) + } +} diff --git a/consensus/ssz/src/decode/impls.rs b/consensus/ssz/src/decode/impls.rs index 0e6b390830..9dae5be63d 100644 --- a/consensus/ssz/src/decode/impls.rs +++ b/consensus/ssz/src/decode/impls.rs @@ -2,6 +2,8 @@ use super::*; use core::num::NonZeroUsize; use ethereum_types::{H160, H256, U128, U256}; use smallvec::SmallVec; +use std::collections::BTreeMap; +use std::iter::{self, FromIterator}; use std::sync::Arc; macro_rules! impl_decodable_for_uint { @@ -380,14 +382,14 @@ macro_rules! impl_for_vec { fn from_ssz_bytes(bytes: &[u8]) -> Result { if bytes.is_empty() { - Ok(vec![].into()) + Ok(Self::from_iter(iter::empty())) } else if T::is_ssz_fixed_len() { bytes .chunks(T::ssz_fixed_len()) - .map(|chunk| T::from_ssz_bytes(chunk)) + .map(T::from_ssz_bytes) .collect() } else { - decode_list_of_variable_length_items(bytes, $max_len).map(|vec| vec.into()) + decode_list_of_variable_length_items(bytes, $max_len) } } } @@ -404,17 +406,40 @@ impl_for_vec!(SmallVec<[T; 6]>, Some(6)); impl_for_vec!(SmallVec<[T; 7]>, Some(7)); impl_for_vec!(SmallVec<[T; 8]>, Some(8)); +impl Decode for BTreeMap +where + K: Decode + Ord, + V: Decode, +{ + fn is_ssz_fixed_len() -> bool { + false + } + + fn from_ssz_bytes(bytes: &[u8]) -> Result { + if bytes.is_empty() { + Ok(Self::from_iter(iter::empty())) + } else if <(K, V)>::is_ssz_fixed_len() { + bytes + .chunks(<(K, V)>::ssz_fixed_len()) + .map(<(K, V)>::from_ssz_bytes) + .collect() + } else { + decode_list_of_variable_length_items(bytes, None) + } + } +} + /// Decodes `bytes` as if it were a list of variable-length items. /// -/// The `ssz::SszDecoder` can also perform this functionality, however it it significantly faster -/// as it is optimized to read same-typed items whilst `ssz::SszDecoder` supports reading items of -/// differing types. -pub fn decode_list_of_variable_length_items( +/// The `ssz::SszDecoder` can also perform this functionality, however this function is +/// significantly faster as it is optimized to read same-typed items whilst `ssz::SszDecoder` +/// supports reading items of differing types. +pub fn decode_list_of_variable_length_items>( bytes: &[u8], max_len: Option, -) -> Result, DecodeError> { +) -> Result { if bytes.is_empty() { - return Ok(vec![]); + return Ok(Container::from_iter(iter::empty())); } let first_offset = read_offset(bytes)?; @@ -433,35 +458,25 @@ pub fn decode_list_of_variable_length_items( ))); } - // Only initialize the vec with a capacity if a maximum length is provided. - // - // We assume that if a max length is provided then the application is able to handle an - // allocation of this size. - let mut values = if max_len.is_some() { - Vec::with_capacity(num_items) - } else { - vec![] - }; - let mut offset = first_offset; - for i in 1..=num_items { - let slice_option = if i == num_items { - bytes.get(offset..) - } else { - let start = offset; + (1..=num_items) + .map(|i| { + let slice_option = if i == num_items { + bytes.get(offset..) + } else { + let start = offset; - let next_offset = read_offset(&bytes[(i * BYTES_PER_LENGTH_OFFSET)..])?; - offset = sanitize_offset(next_offset, Some(offset), bytes.len(), Some(first_offset))?; + let next_offset = read_offset(&bytes[(i * BYTES_PER_LENGTH_OFFSET)..])?; + offset = + sanitize_offset(next_offset, Some(offset), bytes.len(), Some(first_offset))?; - bytes.get(start..offset) - }; + bytes.get(start..offset) + }; - let slice = slice_option.ok_or(DecodeError::OutOfBoundsByte { i: offset })?; - - values.push(T::from_ssz_bytes(slice)?); - } - - Ok(values) + let slice = slice_option.ok_or(DecodeError::OutOfBoundsByte { i: offset })?; + T::from_ssz_bytes(slice) + }) + .collect() } #[cfg(test)] diff --git a/consensus/ssz/src/encode/impls.rs b/consensus/ssz/src/encode/impls.rs index 5728685d01..24f23a5ae8 100644 --- a/consensus/ssz/src/encode/impls.rs +++ b/consensus/ssz/src/encode/impls.rs @@ -2,6 +2,7 @@ use super::*; use core::num::NonZeroUsize; use ethereum_types::{H160, H256, U128, U256}; use smallvec::SmallVec; +use std::collections::BTreeMap; use std::sync::Arc; macro_rules! impl_encodable_for_uint { @@ -220,6 +221,65 @@ impl Encode for Arc { } } +// Encode transparently through references. +impl<'a, T: Encode> Encode for &'a T { + fn is_ssz_fixed_len() -> bool { + T::is_ssz_fixed_len() + } + + fn ssz_fixed_len() -> usize { + T::ssz_fixed_len() + } + + fn ssz_append(&self, buf: &mut Vec) { + T::ssz_append(self, buf) + } + + fn ssz_bytes_len(&self) -> usize { + T::ssz_bytes_len(self) + } +} + +/// Compute the encoded length of a vector-like sequence of `T`. +pub fn sequence_ssz_bytes_len(iter: I) -> usize +where + I: Iterator + ExactSizeIterator, + T: Encode, +{ + // Compute length before doing any iteration. + let length = iter.len(); + if ::is_ssz_fixed_len() { + ::ssz_fixed_len() * length + } else { + let mut len = iter.map(|item| item.ssz_bytes_len()).sum(); + len += BYTES_PER_LENGTH_OFFSET * length; + len + } +} + +/// Encode a vector-like sequence of `T`. +pub fn sequence_ssz_append(iter: I, buf: &mut Vec) +where + I: Iterator + ExactSizeIterator, + T: Encode, +{ + if T::is_ssz_fixed_len() { + buf.reserve(T::ssz_fixed_len() * iter.len()); + + for item in iter { + item.ssz_append(buf); + } + } else { + let mut encoder = SszEncoder::container(buf, iter.len() * BYTES_PER_LENGTH_OFFSET); + + for item in iter { + encoder.append(&item); + } + + encoder.finalize(); + } +} + macro_rules! impl_for_vec { ($type: ty) => { impl Encode for $type { @@ -228,32 +288,11 @@ macro_rules! impl_for_vec { } fn ssz_bytes_len(&self) -> usize { - if ::is_ssz_fixed_len() { - ::ssz_fixed_len() * self.len() - } else { - let mut len = self.iter().map(|item| item.ssz_bytes_len()).sum(); - len += BYTES_PER_LENGTH_OFFSET * self.len(); - len - } + sequence_ssz_bytes_len(self.iter()) } fn ssz_append(&self, buf: &mut Vec) { - if T::is_ssz_fixed_len() { - buf.reserve(T::ssz_fixed_len() * self.len()); - - for item in self { - item.ssz_append(buf); - } - } else { - let mut encoder = - SszEncoder::container(buf, self.len() * BYTES_PER_LENGTH_OFFSET); - - for item in self { - encoder.append(item); - } - - encoder.finalize(); - } + sequence_ssz_append(self.iter(), buf) } } }; @@ -269,6 +308,24 @@ impl_for_vec!(SmallVec<[T; 6]>); impl_for_vec!(SmallVec<[T; 7]>); impl_for_vec!(SmallVec<[T; 8]>); +impl Encode for BTreeMap +where + K: Encode + Ord, + V: Encode, +{ + fn is_ssz_fixed_len() -> bool { + false + } + + fn ssz_bytes_len(&self) -> usize { + sequence_ssz_bytes_len(self.iter()) + } + + fn ssz_append(&self, buf: &mut Vec) { + sequence_ssz_append(self.iter(), buf) + } +} + impl Encode for bool { fn is_ssz_fixed_len() -> bool { true diff --git a/consensus/ssz/tests/tests.rs b/consensus/ssz/tests/tests.rs index 7bd6252ad0..e41fc15dd4 100644 --- a/consensus/ssz/tests/tests.rs +++ b/consensus/ssz/tests/tests.rs @@ -4,6 +4,8 @@ use ssz_derive::{Decode, Encode}; mod round_trip { use super::*; + use std::collections::BTreeMap; + use std::iter::FromIterator; fn round_trip(items: Vec) { for item in items { @@ -321,6 +323,52 @@ mod round_trip { round_trip(vec); } + + #[test] + fn btree_map_fixed() { + let data = vec![ + BTreeMap::new(), + BTreeMap::from_iter(vec![(0u8, 0u16), (1, 2), (2, 4), (4, 6)]), + ]; + round_trip(data); + } + + #[test] + fn btree_map_variable_value() { + let data = vec![ + BTreeMap::new(), + BTreeMap::from_iter(vec![ + ( + 0u64, + ThreeVariableLen { + a: 1, + b: vec![3, 5, 7], + c: vec![], + d: vec![0, 0], + }, + ), + ( + 1, + ThreeVariableLen { + a: 99, + b: vec![1], + c: vec![2, 3, 4, 5, 6, 7, 8, 9, 10], + d: vec![4, 5, 6, 7, 8], + }, + ), + ( + 2, + ThreeVariableLen { + a: 0, + b: vec![], + c: vec![], + d: vec![], + }, + ), + ]), + ]; + round_trip(data); + } } mod derive_macro { diff --git a/consensus/ssz_types/Cargo.toml b/consensus/ssz_types/Cargo.toml index 9c23ce92b5..5f5294b6c2 100644 --- a/consensus/ssz_types/Cargo.toml +++ b/consensus/ssz_types/Cargo.toml @@ -19,6 +19,7 @@ typenum = "1.12.0" arbitrary = { version = "1.0", features = ["derive"], optional = true } derivative = "2.1.1" smallvec = "1.8.0" +milhouse = { path = "../../../milhouse" } [dev-dependencies] serde_json = "1.0.58" diff --git a/consensus/ssz_types/src/variable_list.rs b/consensus/ssz_types/src/variable_list.rs index a3e8c5d550..1d062114f2 100644 --- a/consensus/ssz_types/src/variable_list.rs +++ b/consensus/ssz_types/src/variable_list.rs @@ -255,7 +255,8 @@ where }) .map(Into::into) } else { - ssz::decode_list_of_variable_length_items(bytes, Some(max_len)).map(|vec| vec.into()) + ssz::decode_list_of_variable_length_items(bytes, Some(max_len)) + .map(|vec: Vec<_>| vec.into()) } } } diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index 0d14429467..0a12226da9 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -71,7 +71,7 @@ impl ConsensusContext { return Ok(current_block_root); } - let current_block_root = block.tree_hash_root(); + let current_block_root = block.message().tree_hash_root(); self.current_block_root = Some(current_block_root); Ok(current_block_root) } diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 70666cc432..4b298d1c5d 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -26,6 +26,7 @@ pub use self::committee_cache::{ CommitteeCache, }; pub use clone_config::CloneConfig; +pub use diff::BeaconStateDiff; pub use eth_spec::*; pub use iter::BlockRootsIter; @@ -40,6 +41,7 @@ pub use { #[macro_use] mod committee_cache; mod clone_config; +mod diff; mod exit_cache; mod iter; mod pubkey_cache; @@ -1577,7 +1579,10 @@ impl BeaconState { || self.randao_mixes().has_pending_updates() || self.slashings().has_pending_updates() || self - .inactivity_scores() + .previous_epoch_attestations() + .map_or(false, VList::has_pending_updates) + || self + .current_epoch_attestations() .map_or(false, VList::has_pending_updates) || self .previous_epoch_participation() @@ -1585,6 +1590,9 @@ impl BeaconState { || self .current_epoch_participation() .map_or(false, VList::has_pending_updates) + || self + .inactivity_scores() + .map_or(false, VList::has_pending_updates) } // FIXME(sproul): automate this somehow @@ -1598,7 +1606,12 @@ impl BeaconState { self.randao_mixes_mut().apply_updates()?; self.slashings_mut().apply_updates()?; - // FIXME(sproul): phase0 fields + if let Ok(previous_epoch_attestations) = self.previous_epoch_attestations_mut() { + previous_epoch_attestations.apply_updates()?; + } + if let Ok(current_epoch_attestations) = self.current_epoch_attestations_mut() { + current_epoch_attestations.apply_updates()?; + } if let Ok(inactivity_scores) = self.inactivity_scores_mut() { inactivity_scores.apply_updates()?; } diff --git a/consensus/types/src/beacon_state/diff.rs b/consensus/types/src/beacon_state/diff.rs new file mode 100644 index 0000000000..e8bc8c236c --- /dev/null +++ b/consensus/types/src/beacon_state/diff.rs @@ -0,0 +1,255 @@ +use crate::{ + BeaconBlockHeader, BeaconState, BeaconStateError as Error, BitVector, Checkpoint, Eth1Data, + EthSpec, ExecutionPayloadHeader, Fork, Hash256, ParticipationFlags, PendingAttestation, Slot, + SyncCommittee, Validator, +}; +use milhouse::{CloneDiff, Diff, ListDiff, ResetListDiff, VectorDiff}; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use std::sync::Arc; + +/// `Option`-like type implementing SSZ encode/decode. +/// +/// Uses a succinct 1 byte union selector. +#[derive(Debug, PartialEq, Encode, Decode)] +#[ssz(enum_behaviour = "union")] +pub enum Maybe { + Nothing(u8), + Just(T), +} + +impl Maybe { + fn nothing() -> Self { + Self::Nothing(0) + } +} + +#[derive(Debug, PartialEq, Encode, Decode)] +pub struct BeaconStateDiff { + // Versioning + genesis_time: CloneDiff, + genesis_validators_root: CloneDiff, + slot: CloneDiff, + fork: CloneDiff, + + // History + latest_block_header: CloneDiff, + block_roots: VectorDiff, + state_roots: VectorDiff, + historical_roots: ListDiff, + + // Ethereum 1.0 chain data + eth1_data: CloneDiff, + eth1_data_votes: ResetListDiff, + eth1_deposit_index: CloneDiff, + + // Registry + validators: ListDiff, + balances: ListDiff, + + // Randomness + randao_mixes: VectorDiff, + + // Slashings + slashings: VectorDiff, + + // Attestations (genesis fork only) + // FIXME(sproul): do some clever diffing of prev against former current + previous_epoch_attestations: + Maybe, T::MaxPendingAttestations>>, + current_epoch_attestations: + Maybe, T::MaxPendingAttestations>>, + + // Participation (Altair and later) + previous_epoch_participation: Maybe>, + current_epoch_participation: Maybe>, + + // Finality + justification_bits: CloneDiff>, + previous_justified_checkpoint: CloneDiff, + current_justified_checkpoint: CloneDiff, + finalized_checkpoint: CloneDiff, + + // Inactivity + inactivity_scores: Maybe>, + + // Light-client sync committees + current_sync_committee: Maybe>>>, + next_sync_committee: Maybe>>>, + + // Execution + latest_execution_payload_header: Maybe>>, +} + +fn optional_field_diff< + T: EthSpec, + X, + D: Diff + Encode + Decode, +>( + old: &BeaconState, + new: &BeaconState, + field: impl Fn(&BeaconState) -> Result<&X, Error>, +) -> Result, Error> { + if let Ok(new_value) = field(new) { + let old_value = field(old)?; + Ok(Maybe::Just(D::compute_diff(old_value, new_value)?)) + } else { + Ok(Maybe::nothing()) + } +} + +fn apply_optional_diff + Encode + Decode>( + diff: Maybe, + field: Result<&mut X, Error>, +) -> Result<(), Error> { + if let Maybe::Just(diff) = diff { + diff.apply_diff(field?)?; + } + Ok(()) +} + +impl Diff for BeaconStateDiff { + type Target = BeaconState; + type Error = Error; + + // FIXME(sproul): proc macro + fn compute_diff(orig: &Self::Target, other: &Self::Target) -> Result { + // FIXME(sproul): consider cross-variant diffs + Ok(BeaconStateDiff { + genesis_time: <_>::compute_diff(&orig.genesis_time(), &other.genesis_time())?, + genesis_validators_root: <_>::compute_diff( + &orig.genesis_validators_root(), + &other.genesis_validators_root(), + )?, + slot: <_>::compute_diff(&orig.slot(), &other.slot())?, + fork: <_>::compute_diff(&orig.fork(), &other.fork())?, + latest_block_header: <_>::compute_diff( + orig.latest_block_header(), + other.latest_block_header(), + )?, + block_roots: <_>::compute_diff(orig.block_roots(), other.block_roots())?, + state_roots: <_>::compute_diff(orig.state_roots(), other.state_roots())?, + historical_roots: <_>::compute_diff(orig.historical_roots(), other.historical_roots())?, + eth1_data: <_>::compute_diff(orig.eth1_data(), other.eth1_data())?, + eth1_data_votes: <_>::compute_diff(orig.eth1_data_votes(), other.eth1_data_votes())?, + eth1_deposit_index: <_>::compute_diff( + &orig.eth1_deposit_index(), + &other.eth1_deposit_index(), + )?, + validators: <_>::compute_diff(orig.validators(), other.validators())?, + balances: <_>::compute_diff(orig.balances(), other.balances())?, + randao_mixes: <_>::compute_diff(orig.randao_mixes(), other.randao_mixes())?, + slashings: <_>::compute_diff(orig.slashings(), other.slashings())?, + previous_epoch_attestations: optional_field_diff( + orig, + other, + BeaconState::previous_epoch_attestations, + )?, + current_epoch_attestations: optional_field_diff( + orig, + other, + BeaconState::current_epoch_attestations, + )?, + previous_epoch_participation: optional_field_diff( + orig, + other, + BeaconState::previous_epoch_participation, + )?, + current_epoch_participation: optional_field_diff( + orig, + other, + BeaconState::current_epoch_participation, + )?, + justification_bits: <_>::compute_diff( + orig.justification_bits(), + other.justification_bits(), + )?, + previous_justified_checkpoint: <_>::compute_diff( + &orig.previous_justified_checkpoint(), + &other.previous_justified_checkpoint(), + )?, + current_justified_checkpoint: <_>::compute_diff( + &orig.current_justified_checkpoint(), + &other.current_justified_checkpoint(), + )?, + finalized_checkpoint: <_>::compute_diff( + &orig.finalized_checkpoint(), + &other.finalized_checkpoint(), + )?, + inactivity_scores: optional_field_diff(orig, other, BeaconState::inactivity_scores)?, + current_sync_committee: optional_field_diff( + orig, + other, + BeaconState::current_sync_committee, + )?, + next_sync_committee: optional_field_diff( + orig, + other, + BeaconState::next_sync_committee, + )?, + latest_execution_payload_header: optional_field_diff( + orig, + other, + BeaconState::latest_execution_payload_header, + )?, + }) + } + + fn apply_diff(self, target: &mut BeaconState) -> Result<(), Error> { + self.genesis_time.apply_diff(target.genesis_time_mut())?; + self.genesis_validators_root + .apply_diff(target.genesis_validators_root_mut())?; + self.slot.apply_diff(target.slot_mut())?; + self.fork.apply_diff(target.fork_mut())?; + self.latest_block_header + .apply_diff(target.latest_block_header_mut())?; + self.block_roots.apply_diff(target.block_roots_mut())?; + self.state_roots.apply_diff(target.state_roots_mut())?; + self.historical_roots + .apply_diff(target.historical_roots_mut())?; + self.eth1_data.apply_diff(target.eth1_data_mut())?; + self.eth1_data_votes + .apply_diff(target.eth1_data_votes_mut())?; + self.eth1_deposit_index + .apply_diff(target.eth1_deposit_index_mut())?; + self.validators.apply_diff(target.validators_mut())?; + self.balances.apply_diff(target.balances_mut())?; + self.randao_mixes.apply_diff(target.randao_mixes_mut())?; + self.slashings.apply_diff(target.slashings_mut())?; + apply_optional_diff( + self.previous_epoch_attestations, + target.previous_epoch_attestations_mut(), + )?; + apply_optional_diff( + self.current_epoch_attestations, + target.current_epoch_attestations_mut(), + )?; + apply_optional_diff( + self.previous_epoch_participation, + target.previous_epoch_participation_mut(), + )?; + apply_optional_diff( + self.current_epoch_participation, + target.current_epoch_participation_mut(), + )?; + self.justification_bits + .apply_diff(target.justification_bits_mut())?; + self.previous_justified_checkpoint + .apply_diff(target.previous_justified_checkpoint_mut())?; + self.current_justified_checkpoint + .apply_diff(target.current_justified_checkpoint_mut())?; + self.finalized_checkpoint + .apply_diff(target.finalized_checkpoint_mut())?; + apply_optional_diff(self.inactivity_scores, target.inactivity_scores_mut())?; + apply_optional_diff( + self.current_sync_committee, + target.current_sync_committee_mut(), + )?; + apply_optional_diff(self.next_sync_committee, target.next_sync_committee_mut())?; + apply_optional_diff( + self.latest_execution_payload_header, + target.latest_execution_payload_header_mut(), + )?; + Ok(()) + } +} diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index fa74f9d29c..572ce54c22 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -226,6 +226,13 @@ impl ChainSpec { } } + /// Return the name of the fork activated at `slot`, if any. + pub fn fork_activated_at_slot(&self, slot: Slot) -> Option { + let prev_slot_fork = self.fork_name_at_slot::(slot - 1); + let slot_fork = self.fork_name_at_slot::(slot); + (slot_fork != prev_slot_fork).then(|| slot_fork) + } + /// Returns the fork version for a named fork. pub fn fork_version_for_name(&self, fork_name: ForkName) -> [u8; 4] { match fork_name { diff --git a/testing/ef_tests/src/case_result.rs b/testing/ef_tests/src/case_result.rs index 4982bf94c1..935cb74215 100644 --- a/testing/ef_tests/src/case_result.rs +++ b/testing/ef_tests/src/case_result.rs @@ -39,6 +39,9 @@ pub fn compare_beacon_state_results_without_caches( if let (Ok(ref mut result), Some(ref mut expected)) = (result.as_mut(), expected.as_mut()) { result.drop_all_caches().unwrap(); expected.drop_all_caches().unwrap(); + + result.apply_pending_mutations().unwrap(); + expected.apply_pending_mutations().unwrap(); } compare_result_detailed(result, expected) diff --git a/testing/ef_tests/src/cases/common.rs b/testing/ef_tests/src/cases/common.rs index ade8711cdc..e77e561939 100644 --- a/testing/ef_tests/src/cases/common.rs +++ b/testing/ef_tests/src/cases/common.rs @@ -43,7 +43,7 @@ macro_rules! uint_wrapper { <$wrapped_type>::tree_hash_type() } - fn tree_hash_packed_encoding(&self) -> Vec { + fn tree_hash_packed_encoding(&self) -> tree_hash::PackedEncoding { self.x.tree_hash_packed_encoding() } diff --git a/testing/ef_tests/src/cases/epoch_processing.rs b/testing/ef_tests/src/cases/epoch_processing.rs index b187d46fed..c52089a039 100644 --- a/testing/ef_tests/src/cases/epoch_processing.rs +++ b/testing/ef_tests/src/cases/epoch_processing.rs @@ -137,16 +137,17 @@ impl EpochTransition for Slashings { validator_statuses.process_attestations(state)?; process_slashings( state, + None, validator_statuses.total_balances.current_epoch(), spec, )?; } BeaconState::Altair(_) | BeaconState::Merge(_) => { + let mut cache = altair::ParticipationCache::new(state, spec).unwrap(); process_slashings( state, - altair::ParticipationCache::new(state, spec) - .unwrap() - .current_epoch_total_active_balance(), + Some(cache.process_slashings_indices()), + cache.current_epoch_total_active_balance(), spec, )?; } diff --git a/testing/ef_tests/src/cases/operations.rs b/testing/ef_tests/src/cases/operations.rs index 195df7f382..d940c985d3 100644 --- a/testing/ef_tests/src/cases/operations.rs +++ b/testing/ef_tests/src/cases/operations.rs @@ -5,14 +5,17 @@ use crate::decode::{ssz_decode_file, ssz_decode_file_with, ssz_decode_state, yam use crate::testing_spec; use crate::type_name::TypeName; use serde_derive::Deserialize; -use state_processing::per_block_processing::{ - errors::BlockProcessingError, - process_block_header, process_execution_payload, - process_operations::{ - altair, base, process_attester_slashings, process_deposits, process_exits, - process_proposer_slashings, +use state_processing::{ + per_block_processing::{ + errors::BlockProcessingError, + process_block_header, process_execution_payload, + process_operations::{ + altair, base, process_attester_slashings, process_deposits, process_exits, + process_proposer_slashings, + }, + process_sync_aggregate, VerifyBlockRoot, VerifySignatures, }, - process_sync_aggregate, VerifyBlockRoot, VerifySignatures, + ConsensusContext, }; use std::fmt::Debug; use std::path::Path; @@ -183,7 +186,8 @@ impl Operation for BeaconBlock { spec: &ChainSpec, _: &Operations, ) -> Result<(), BlockProcessingError> { - process_block_header(state, self.to_ref(), VerifyBlockRoot::True, spec)?; + let mut ctxt = ConsensusContext::new(state.slot()); + process_block_header(state, self.to_ref(), VerifyBlockRoot::True, &mut ctxt, spec)?; Ok(()) } } diff --git a/testing/ef_tests/src/cases/sanity_blocks.rs b/testing/ef_tests/src/cases/sanity_blocks.rs index c155be877a..6c2890cd15 100644 --- a/testing/ef_tests/src/cases/sanity_blocks.rs +++ b/testing/ef_tests/src/cases/sanity_blocks.rs @@ -5,7 +5,7 @@ use crate::decode::{ssz_decode_file_with, ssz_decode_state, yaml_decode_file}; use serde_derive::Deserialize; use state_processing::{ per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy, - VerifyBlockRoot, + ConsensusContext, VerifyBlockRoot, }; use types::{BeaconState, EthSpec, ForkName, RelativeEpoch, SignedBeaconBlock}; @@ -94,26 +94,28 @@ impl Case for SanityBlocks { .build_committee_cache(RelativeEpoch::Current, spec) .unwrap(); + let mut ctxt = ConsensusContext::new(indiv_state.slot()); per_block_processing( &mut indiv_state, signed_block, - None, BlockSignatureStrategy::VerifyIndividual, VerifyBlockRoot::True, + &mut ctxt, spec, )?; + let mut ctxt = ConsensusContext::new(indiv_state.slot()); per_block_processing( &mut bulk_state, signed_block, - None, BlockSignatureStrategy::VerifyBulk, VerifyBlockRoot::True, + &mut ctxt, spec, )?; - if block.state_root() == bulk_state.canonical_root() - && block.state_root() == indiv_state.canonical_root() + if block.state_root() == bulk_state.update_tree_hash_cache().unwrap() + && block.state_root() == indiv_state.update_tree_hash_cache().unwrap() { Ok(()) } else { diff --git a/testing/ef_tests/src/cases/ssz_static.rs b/testing/ef_tests/src/cases/ssz_static.rs index d0cc5f9eac..0da9c51899 100644 --- a/testing/ef_tests/src/cases/ssz_static.rs +++ b/testing/ef_tests/src/cases/ssz_static.rs @@ -42,7 +42,7 @@ fn load_from_dir(path: &Path) -> Result<(SszStaticRoots, Vec Case for TransitionTest { .map_err(|e| format!("Failed to advance: {:?}", e))?; // Apply block. + let mut ctxt = ConsensusContext::new(state.slot()); per_block_processing( &mut state, block, - None, BlockSignatureStrategy::VerifyBulk, VerifyBlockRoot::True, + &mut ctxt, spec, ) .map_err(|e| format!("Block processing failed: {:?}", e))?;