From f0fa5a1618da1a4bedb7b1dbfa41e2d646835d40 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 18 Jul 2022 17:52:22 +1000 Subject: [PATCH] Implement DB changes --- beacon_node/beacon_chain/src/schema_change.rs | 11 +++ .../src/schema_change/migration_schema_v11.rs | 64 +++++++++++++ .../operation_pool/src/attestation_id.rs | 37 +------- beacon_node/operation_pool/src/lib.rs | 6 +- beacon_node/operation_pool/src/persistence.rs | 91 +++++++++++-------- beacon_node/store/src/metadata.rs | 2 +- 6 files changed, 135 insertions(+), 76 deletions(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v11.rs diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index a48f1d3756..1b9a9f5067 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -1,4 +1,5 @@ //! Utilities for managing database schema changes. +mod migration_schema_v11; mod migration_schema_v6; mod migration_schema_v7; mod migration_schema_v8; @@ -130,6 +131,16 @@ pub fn migrate_schema( migration_schema_v9::downgrade_from_v9::(db.clone(), log)?; db.store_schema_version(to) } + // Upgrade from v9 to v11 to store richer metadata in the attestation op pool. + (SchemaVersion(9), SchemaVersion(11)) => { + let ops = migration_schema_v11::upgrade_to_v11::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + // Downgrade from v11 to v9 to drop richer metadata from the attestation op pool. + (SchemaVersion(11), SchemaVersion(9)) => { + let ops = migration_schema_v11::downgrade_from_v11::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v11.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v11.rs new file mode 100644 index 0000000000..4eb1609f90 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v11.rs @@ -0,0 +1,64 @@ +use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY}; +use operation_pool::{PersistedOperationPool, PersistedOperationPoolV11, PersistedOperationPoolV5}; +use slog::{debug, Logger}; +use std::sync::Arc; +use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem}; + +pub fn upgrade_to_v11( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V5 op pool and transform it to V11. + let v5 = if let Some(op_pool) = + db.get_item::>(&OP_POOL_DB_KEY)? + { + op_pool + } else { + return Ok(vec![]); + }; + + debug!( + log, + "Dropping attestations from pool"; + "count" => v5.attestations_v5.len(), + ); + + // FIXME(sproul): work out whether it's worth trying to carry across the attestations + let v11 = PersistedOperationPool::V11(PersistedOperationPoolV11 { + attestations: vec![], + sync_contributions: v5.sync_contributions, + attester_slashings: v5.attester_slashings, + proposer_slashings: v5.proposer_slashings, + voluntary_exits: v5.voluntary_exits, + }); + Ok(vec![v11.as_kv_store_op(OP_POOL_DB_KEY)]) +} + +pub fn downgrade_from_v11( + db: Arc>, + log: Logger, +) -> Result, Error> { + // Load a V11 op pool and transform it to V5. + let v11 = if let Some(PersistedOperationPool::V11(op_pool)) = + db.get_item::>(&OP_POOL_DB_KEY)? + { + op_pool + } else { + return Ok(vec![]); + }; + + debug!( + log, + "Dropping attestations from pool"; + "count" => v11.attestations.len(), + ); + + let v5 = PersistedOperationPoolV5 { + attestations_v5: vec![], + sync_contributions: v11.sync_contributions, + attester_slashings: v11.attester_slashings, + proposer_slashings: v11.proposer_slashings, + voluntary_exits: v11.voluntary_exits, + }; + Ok(vec![v5.as_kv_store_op(OP_POOL_DB_KEY)]) +} diff --git a/beacon_node/operation_pool/src/attestation_id.rs b/beacon_node/operation_pool/src/attestation_id.rs index f496ecb3a3..06920d8e6b 100644 --- a/beacon_node/operation_pool/src/attestation_id.rs +++ b/beacon_node/operation_pool/src/attestation_id.rs @@ -1,45 +1,12 @@ use serde_derive::{Deserialize, Serialize}; -use ssz::ssz_encode; use ssz_derive::{Decode, Encode}; -use types::{AttestationData, ChainSpec, Domain, Epoch, Fork, Hash256}; /// Serialized `AttestationData` augmented with a domain to encode the fork info. +/// +/// [DEPRECATED] To be removed once all nodes have updated to schema v11. #[derive( PartialEq, Eq, Clone, Hash, Debug, PartialOrd, Ord, Encode, Decode, Serialize, Deserialize, )] pub struct AttestationId { v: Vec, } - -/// Number of domain bytes that the end of an attestation ID is padded with. -const DOMAIN_BYTES_LEN: usize = std::mem::size_of::(); - -impl AttestationId { - pub fn from_data( - attestation: &AttestationData, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, - ) -> Self { - let mut bytes = ssz_encode(attestation); - let epoch = attestation.target.epoch; - bytes.extend_from_slice( - AttestationId::compute_domain_bytes(epoch, fork, genesis_validators_root, spec) - .as_bytes(), - ); - AttestationId { v: bytes } - } - - pub fn compute_domain_bytes( - epoch: Epoch, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, - ) -> Hash256 { - spec.get_domain(epoch, Domain::BeaconAttester, fork, genesis_validators_root) - } - - pub fn domain_bytes_match(&self, domain_bytes: &Hash256) -> bool { - &self.v[self.v.len() - DOMAIN_BYTES_LEN..] == domain_bytes.as_bytes() - } -} diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 86bda5cb8f..8824d17cb5 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -11,7 +11,9 @@ mod sync_aggregate_id; pub use attestation::AttMaxCover; pub use attestation_storage::{AttestationRef, SplitAttestation}; pub use max_cover::MaxCover; -pub use persistence::{PersistedOperationPool, PersistedOperationPoolAltair}; +pub use persistence::{ + PersistedOperationPool, PersistedOperationPoolV11, PersistedOperationPoolV5, +}; pub use reward_cache::RewardCache; use crate::attestation_storage::{AttestationMap, CheckpointKey}; @@ -665,8 +667,6 @@ mod release_tests { num_committees: usize, ) -> (BeaconChainHarness>, ChainSpec) { let mut spec = test_spec::(); - - // FIXME(sproul): make this modular? spec.altair_fork_epoch = Some(Epoch::new(0)); let num_validators = diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 6e3cfcfba3..d8114ca5db 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -18,7 +18,7 @@ type PersistedSyncContributions = Vec<(SyncAggregateId, Vec = Vec<(SyncAggregateId, Vec { - /// Mapping from attestation ID to attestation mappings. - // We could save space by not storing the attestation ID, but it might - // be difficult to make that roundtrip due to eager aggregation. - attestations: Vec<(AttestationId, Vec>)>, + /// [DEPRECATED] Mapping from attestation ID to attestation mappings. + #[superstruct(only(V5))] + pub attestations_v5: Vec<(AttestationId, Vec>)>, + /// Attestations and their attesting indices. + #[superstruct(only(V11))] + pub attestations: Vec<(Attestation, Vec)>, /// Mapping from sync contribution ID to sync contributions and aggregate. - #[superstruct(only(Altair))] - sync_contributions: PersistedSyncContributions, + pub sync_contributions: PersistedSyncContributions, /// Attester slashings. - attester_slashings: Vec<(AttesterSlashing, ForkVersion)>, + pub attester_slashings: Vec<(AttesterSlashing, ForkVersion)>, /// Proposer slashings. - proposer_slashings: Vec, + pub proposer_slashings: Vec, /// Voluntary exits. - voluntary_exits: Vec, + pub voluntary_exits: Vec, } impl PersistedOperationPool { /// Convert an `OperationPool` into serializable form. pub fn from_operation_pool(operation_pool: &OperationPool) -> Self { - /* FIXME(sproul): fix persistence let attestations = operation_pool .attestations .read() .iter() - .map(|(att_id, att)| (att_id.clone(), att.clone())) + .map(|att| { + ( + att.clone_as_attestation(), + att.indexed.attesting_indices.clone(), + ) + }) .collect(); - */ - let attestations = vec![]; let sync_contributions = operation_pool .sync_contributions @@ -87,7 +90,7 @@ impl PersistedOperationPool { .map(|(_, exit)| exit.clone()) .collect(); - PersistedOperationPool::Altair(PersistedOperationPoolAltair { + PersistedOperationPool::V11(PersistedOperationPoolV11 { attestations, sync_contributions, attester_slashings, @@ -96,12 +99,8 @@ impl PersistedOperationPool { }) } - /// Reconstruct an `OperationPool`. Sets `sync_contributions` to its `Default` if `self` matches - /// `PersistedOperationPool::Base`. + /// Reconstruct an `OperationPool`. pub fn into_operation_pool(self) -> Result, OpPoolError> { - // FIXME(sproul): fix load - // let attestations = RwLock::new(self.attestations().iter().cloned().collect()); - let attestations = RwLock::new(AttestationMap::default()); let attester_slashings = RwLock::new(self.attester_slashings().iter().cloned().collect()); let proposer_slashings = RwLock::new( self.proposer_slashings() @@ -117,27 +116,45 @@ impl PersistedOperationPool { .map(|exit| (exit.message.validator_index, exit)) .collect(), ); - let op_pool = match self { - PersistedOperationPool::Altair(_) => { - let sync_contributions = - RwLock::new(self.sync_contributions()?.iter().cloned().collect()); - - OperationPool { - attestations, - sync_contributions, - attester_slashings, - proposer_slashings, - voluntary_exits, - reward_cache: Default::default(), - _phantom: Default::default(), + let sync_contributions = RwLock::new(self.sync_contributions().iter().cloned().collect()); + let attestations = match self { + PersistedOperationPool::V5(_) => return Err(OpPoolError::IncorrectOpPoolVariant), + PersistedOperationPool::V11(pool) => { + let mut map = AttestationMap::default(); + for (att, attesting_indices) in pool.attestations { + map.insert(att, attesting_indices); } + RwLock::new(map) } }; + let op_pool = OperationPool { + attestations, + sync_contributions, + attester_slashings, + proposer_slashings, + voluntary_exits, + reward_cache: Default::default(), + _phantom: Default::default(), + }; Ok(op_pool) } } -/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair`. +impl StoreItem for PersistedOperationPoolV5 { + fn db_column() -> DBColumn { + DBColumn::OpPool + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + PersistedOperationPoolV5::from_ssz_bytes(bytes).map_err(Into::into) + } +} + +/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V11`. impl StoreItem for PersistedOperationPool { fn db_column() -> DBColumn { DBColumn::OpPool @@ -148,9 +165,9 @@ impl StoreItem for PersistedOperationPool { } fn from_store_bytes(bytes: &[u8]) -> Result { - // Default deserialization to the Altair variant. - PersistedOperationPoolAltair::from_ssz_bytes(bytes) - .map(Self::Altair) + // Default deserialization to the latest variant. + PersistedOperationPoolV11::from_ssz_bytes(bytes) + .map(Self::V11) .map_err(Into::into) } } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 5551f1f44d..d72dbcd23d 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(9); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(11); // All the keys that get stored under the `BeaconMeta` column. //