mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-18 21:38:31 +00:00
Implement DB changes
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
//! Utilities for managing database schema changes.
|
//! Utilities for managing database schema changes.
|
||||||
|
mod migration_schema_v11;
|
||||||
mod migration_schema_v6;
|
mod migration_schema_v6;
|
||||||
mod migration_schema_v7;
|
mod migration_schema_v7;
|
||||||
mod migration_schema_v8;
|
mod migration_schema_v8;
|
||||||
@@ -130,6 +131,16 @@ pub fn migrate_schema<T: BeaconChainTypes>(
|
|||||||
migration_schema_v9::downgrade_from_v9::<T>(db.clone(), log)?;
|
migration_schema_v9::downgrade_from_v9::<T>(db.clone(), log)?;
|
||||||
db.store_schema_version(to)
|
db.store_schema_version(to)
|
||||||
}
|
}
|
||||||
|
// Upgrade from v9 to v11 to store richer metadata in the attestation op pool.
|
||||||
|
(SchemaVersion(9), SchemaVersion(11)) => {
|
||||||
|
let ops = migration_schema_v11::upgrade_to_v11::<T>(db.clone(), log)?;
|
||||||
|
db.store_schema_version_atomically(to, ops)
|
||||||
|
}
|
||||||
|
// Downgrade from v11 to v9 to drop richer metadata from the attestation op pool.
|
||||||
|
(SchemaVersion(11), SchemaVersion(9)) => {
|
||||||
|
let ops = migration_schema_v11::downgrade_from_v11::<T>(db.clone(), log)?;
|
||||||
|
db.store_schema_version_atomically(to, ops)
|
||||||
|
}
|
||||||
// Anything else is an error.
|
// Anything else is an error.
|
||||||
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
|
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
|
||||||
target_version: to,
|
target_version: to,
|
||||||
|
|||||||
@@ -0,0 +1,64 @@
|
|||||||
|
use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY};
|
||||||
|
use operation_pool::{PersistedOperationPool, PersistedOperationPoolV11, PersistedOperationPoolV5};
|
||||||
|
use slog::{debug, Logger};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
|
||||||
|
|
||||||
|
pub fn upgrade_to_v11<T: BeaconChainTypes>(
|
||||||
|
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
||||||
|
log: Logger,
|
||||||
|
) -> Result<Vec<KeyValueStoreOp>, Error> {
|
||||||
|
// Load a V5 op pool and transform it to V11.
|
||||||
|
let v5 = if let Some(op_pool) =
|
||||||
|
db.get_item::<PersistedOperationPoolV5<T::EthSpec>>(&OP_POOL_DB_KEY)?
|
||||||
|
{
|
||||||
|
op_pool
|
||||||
|
} else {
|
||||||
|
return Ok(vec![]);
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
log,
|
||||||
|
"Dropping attestations from pool";
|
||||||
|
"count" => v5.attestations_v5.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// FIXME(sproul): work out whether it's worth trying to carry across the attestations
|
||||||
|
let v11 = PersistedOperationPool::V11(PersistedOperationPoolV11 {
|
||||||
|
attestations: vec![],
|
||||||
|
sync_contributions: v5.sync_contributions,
|
||||||
|
attester_slashings: v5.attester_slashings,
|
||||||
|
proposer_slashings: v5.proposer_slashings,
|
||||||
|
voluntary_exits: v5.voluntary_exits,
|
||||||
|
});
|
||||||
|
Ok(vec![v11.as_kv_store_op(OP_POOL_DB_KEY)])
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn downgrade_from_v11<T: BeaconChainTypes>(
|
||||||
|
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
||||||
|
log: Logger,
|
||||||
|
) -> Result<Vec<KeyValueStoreOp>, Error> {
|
||||||
|
// Load a V11 op pool and transform it to V5.
|
||||||
|
let v11 = if let Some(PersistedOperationPool::V11(op_pool)) =
|
||||||
|
db.get_item::<PersistedOperationPool<T::EthSpec>>(&OP_POOL_DB_KEY)?
|
||||||
|
{
|
||||||
|
op_pool
|
||||||
|
} else {
|
||||||
|
return Ok(vec![]);
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
log,
|
||||||
|
"Dropping attestations from pool";
|
||||||
|
"count" => v11.attestations.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let v5 = PersistedOperationPoolV5 {
|
||||||
|
attestations_v5: vec![],
|
||||||
|
sync_contributions: v11.sync_contributions,
|
||||||
|
attester_slashings: v11.attester_slashings,
|
||||||
|
proposer_slashings: v11.proposer_slashings,
|
||||||
|
voluntary_exits: v11.voluntary_exits,
|
||||||
|
};
|
||||||
|
Ok(vec![v5.as_kv_store_op(OP_POOL_DB_KEY)])
|
||||||
|
}
|
||||||
@@ -1,45 +1,12 @@
|
|||||||
use serde_derive::{Deserialize, Serialize};
|
use serde_derive::{Deserialize, Serialize};
|
||||||
use ssz::ssz_encode;
|
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use types::{AttestationData, ChainSpec, Domain, Epoch, Fork, Hash256};
|
|
||||||
|
|
||||||
/// Serialized `AttestationData` augmented with a domain to encode the fork info.
|
/// Serialized `AttestationData` augmented with a domain to encode the fork info.
|
||||||
|
///
|
||||||
|
/// [DEPRECATED] To be removed once all nodes have updated to schema v11.
|
||||||
#[derive(
|
#[derive(
|
||||||
PartialEq, Eq, Clone, Hash, Debug, PartialOrd, Ord, Encode, Decode, Serialize, Deserialize,
|
PartialEq, Eq, Clone, Hash, Debug, PartialOrd, Ord, Encode, Decode, Serialize, Deserialize,
|
||||||
)]
|
)]
|
||||||
pub struct AttestationId {
|
pub struct AttestationId {
|
||||||
v: Vec<u8>,
|
v: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Number of domain bytes that the end of an attestation ID is padded with.
|
|
||||||
const DOMAIN_BYTES_LEN: usize = std::mem::size_of::<Hash256>();
|
|
||||||
|
|
||||||
impl AttestationId {
|
|
||||||
pub fn from_data(
|
|
||||||
attestation: &AttestationData,
|
|
||||||
fork: &Fork,
|
|
||||||
genesis_validators_root: Hash256,
|
|
||||||
spec: &ChainSpec,
|
|
||||||
) -> Self {
|
|
||||||
let mut bytes = ssz_encode(attestation);
|
|
||||||
let epoch = attestation.target.epoch;
|
|
||||||
bytes.extend_from_slice(
|
|
||||||
AttestationId::compute_domain_bytes(epoch, fork, genesis_validators_root, spec)
|
|
||||||
.as_bytes(),
|
|
||||||
);
|
|
||||||
AttestationId { v: bytes }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn compute_domain_bytes(
|
|
||||||
epoch: Epoch,
|
|
||||||
fork: &Fork,
|
|
||||||
genesis_validators_root: Hash256,
|
|
||||||
spec: &ChainSpec,
|
|
||||||
) -> Hash256 {
|
|
||||||
spec.get_domain(epoch, Domain::BeaconAttester, fork, genesis_validators_root)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn domain_bytes_match(&self, domain_bytes: &Hash256) -> bool {
|
|
||||||
&self.v[self.v.len() - DOMAIN_BYTES_LEN..] == domain_bytes.as_bytes()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -11,7 +11,9 @@ mod sync_aggregate_id;
|
|||||||
pub use attestation::AttMaxCover;
|
pub use attestation::AttMaxCover;
|
||||||
pub use attestation_storage::{AttestationRef, SplitAttestation};
|
pub use attestation_storage::{AttestationRef, SplitAttestation};
|
||||||
pub use max_cover::MaxCover;
|
pub use max_cover::MaxCover;
|
||||||
pub use persistence::{PersistedOperationPool, PersistedOperationPoolAltair};
|
pub use persistence::{
|
||||||
|
PersistedOperationPool, PersistedOperationPoolV11, PersistedOperationPoolV5,
|
||||||
|
};
|
||||||
pub use reward_cache::RewardCache;
|
pub use reward_cache::RewardCache;
|
||||||
|
|
||||||
use crate::attestation_storage::{AttestationMap, CheckpointKey};
|
use crate::attestation_storage::{AttestationMap, CheckpointKey};
|
||||||
@@ -665,8 +667,6 @@ mod release_tests {
|
|||||||
num_committees: usize,
|
num_committees: usize,
|
||||||
) -> (BeaconChainHarness<EphemeralHarnessType<E>>, ChainSpec) {
|
) -> (BeaconChainHarness<EphemeralHarnessType<E>>, ChainSpec) {
|
||||||
let mut spec = test_spec::<E>();
|
let mut spec = test_spec::<E>();
|
||||||
|
|
||||||
// FIXME(sproul): make this modular?
|
|
||||||
spec.altair_fork_epoch = Some(Epoch::new(0));
|
spec.altair_fork_epoch = Some(Epoch::new(0));
|
||||||
|
|
||||||
let num_validators =
|
let num_validators =
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ type PersistedSyncContributions<T> = Vec<(SyncAggregateId, Vec<SyncCommitteeCont
|
|||||||
/// Operations are stored in arbitrary order, so it's not a good idea to compare instances
|
/// Operations are stored in arbitrary order, so it's not a good idea to compare instances
|
||||||
/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first.
|
/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first.
|
||||||
#[superstruct(
|
#[superstruct(
|
||||||
variants(Altair),
|
variants(V5, V11),
|
||||||
variant_attributes(
|
variant_attributes(
|
||||||
derive(Derivative, PartialEq, Debug, Serialize, Deserialize, Encode, Decode),
|
derive(Derivative, PartialEq, Debug, Serialize, Deserialize, Encode, Decode),
|
||||||
serde(bound = "T: EthSpec", deny_unknown_fields),
|
serde(bound = "T: EthSpec", deny_unknown_fields),
|
||||||
@@ -31,33 +31,36 @@ type PersistedSyncContributions<T> = Vec<(SyncAggregateId, Vec<SyncCommitteeCont
|
|||||||
#[serde(bound = "T: EthSpec")]
|
#[serde(bound = "T: EthSpec")]
|
||||||
#[ssz(enum_behaviour = "transparent")]
|
#[ssz(enum_behaviour = "transparent")]
|
||||||
pub struct PersistedOperationPool<T: EthSpec> {
|
pub struct PersistedOperationPool<T: EthSpec> {
|
||||||
/// Mapping from attestation ID to attestation mappings.
|
/// [DEPRECATED] Mapping from attestation ID to attestation mappings.
|
||||||
// We could save space by not storing the attestation ID, but it might
|
#[superstruct(only(V5))]
|
||||||
// be difficult to make that roundtrip due to eager aggregation.
|
pub attestations_v5: Vec<(AttestationId, Vec<Attestation<T>>)>,
|
||||||
attestations: Vec<(AttestationId, Vec<Attestation<T>>)>,
|
/// Attestations and their attesting indices.
|
||||||
|
#[superstruct(only(V11))]
|
||||||
|
pub attestations: Vec<(Attestation<T>, Vec<u64>)>,
|
||||||
/// Mapping from sync contribution ID to sync contributions and aggregate.
|
/// Mapping from sync contribution ID to sync contributions and aggregate.
|
||||||
#[superstruct(only(Altair))]
|
pub sync_contributions: PersistedSyncContributions<T>,
|
||||||
sync_contributions: PersistedSyncContributions<T>,
|
|
||||||
/// Attester slashings.
|
/// Attester slashings.
|
||||||
attester_slashings: Vec<(AttesterSlashing<T>, ForkVersion)>,
|
pub attester_slashings: Vec<(AttesterSlashing<T>, ForkVersion)>,
|
||||||
/// Proposer slashings.
|
/// Proposer slashings.
|
||||||
proposer_slashings: Vec<ProposerSlashing>,
|
pub proposer_slashings: Vec<ProposerSlashing>,
|
||||||
/// Voluntary exits.
|
/// Voluntary exits.
|
||||||
voluntary_exits: Vec<SignedVoluntaryExit>,
|
pub voluntary_exits: Vec<SignedVoluntaryExit>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: EthSpec> PersistedOperationPool<T> {
|
impl<T: EthSpec> PersistedOperationPool<T> {
|
||||||
/// Convert an `OperationPool` into serializable form.
|
/// Convert an `OperationPool` into serializable form.
|
||||||
pub fn from_operation_pool(operation_pool: &OperationPool<T>) -> Self {
|
pub fn from_operation_pool(operation_pool: &OperationPool<T>) -> Self {
|
||||||
/* FIXME(sproul): fix persistence
|
|
||||||
let attestations = operation_pool
|
let attestations = operation_pool
|
||||||
.attestations
|
.attestations
|
||||||
.read()
|
.read()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(att_id, att)| (att_id.clone(), att.clone()))
|
.map(|att| {
|
||||||
|
(
|
||||||
|
att.clone_as_attestation(),
|
||||||
|
att.indexed.attesting_indices.clone(),
|
||||||
|
)
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
*/
|
|
||||||
let attestations = vec![];
|
|
||||||
|
|
||||||
let sync_contributions = operation_pool
|
let sync_contributions = operation_pool
|
||||||
.sync_contributions
|
.sync_contributions
|
||||||
@@ -87,7 +90,7 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
|||||||
.map(|(_, exit)| exit.clone())
|
.map(|(_, exit)| exit.clone())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
PersistedOperationPool::Altair(PersistedOperationPoolAltair {
|
PersistedOperationPool::V11(PersistedOperationPoolV11 {
|
||||||
attestations,
|
attestations,
|
||||||
sync_contributions,
|
sync_contributions,
|
||||||
attester_slashings,
|
attester_slashings,
|
||||||
@@ -96,12 +99,8 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reconstruct an `OperationPool`. Sets `sync_contributions` to its `Default` if `self` matches
|
/// Reconstruct an `OperationPool`.
|
||||||
/// `PersistedOperationPool::Base`.
|
|
||||||
pub fn into_operation_pool(self) -> Result<OperationPool<T>, OpPoolError> {
|
pub fn into_operation_pool(self) -> Result<OperationPool<T>, OpPoolError> {
|
||||||
// FIXME(sproul): fix load
|
|
||||||
// let attestations = RwLock::new(self.attestations().iter().cloned().collect());
|
|
||||||
let attestations = RwLock::new(AttestationMap::default());
|
|
||||||
let attester_slashings = RwLock::new(self.attester_slashings().iter().cloned().collect());
|
let attester_slashings = RwLock::new(self.attester_slashings().iter().cloned().collect());
|
||||||
let proposer_slashings = RwLock::new(
|
let proposer_slashings = RwLock::new(
|
||||||
self.proposer_slashings()
|
self.proposer_slashings()
|
||||||
@@ -117,27 +116,45 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
|||||||
.map(|exit| (exit.message.validator_index, exit))
|
.map(|exit| (exit.message.validator_index, exit))
|
||||||
.collect(),
|
.collect(),
|
||||||
);
|
);
|
||||||
let op_pool = match self {
|
let sync_contributions = RwLock::new(self.sync_contributions().iter().cloned().collect());
|
||||||
PersistedOperationPool::Altair(_) => {
|
let attestations = match self {
|
||||||
let sync_contributions =
|
PersistedOperationPool::V5(_) => return Err(OpPoolError::IncorrectOpPoolVariant),
|
||||||
RwLock::new(self.sync_contributions()?.iter().cloned().collect());
|
PersistedOperationPool::V11(pool) => {
|
||||||
|
let mut map = AttestationMap::default();
|
||||||
OperationPool {
|
for (att, attesting_indices) in pool.attestations {
|
||||||
attestations,
|
map.insert(att, attesting_indices);
|
||||||
sync_contributions,
|
|
||||||
attester_slashings,
|
|
||||||
proposer_slashings,
|
|
||||||
voluntary_exits,
|
|
||||||
reward_cache: Default::default(),
|
|
||||||
_phantom: Default::default(),
|
|
||||||
}
|
}
|
||||||
|
RwLock::new(map)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let op_pool = OperationPool {
|
||||||
|
attestations,
|
||||||
|
sync_contributions,
|
||||||
|
attester_slashings,
|
||||||
|
proposer_slashings,
|
||||||
|
voluntary_exits,
|
||||||
|
reward_cache: Default::default(),
|
||||||
|
_phantom: Default::default(),
|
||||||
|
};
|
||||||
Ok(op_pool)
|
Ok(op_pool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair`.
|
impl<T: EthSpec> StoreItem for PersistedOperationPoolV5<T> {
|
||||||
|
fn db_column() -> DBColumn {
|
||||||
|
DBColumn::OpPool
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_store_bytes(&self) -> Vec<u8> {
|
||||||
|
self.as_ssz_bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
|
||||||
|
PersistedOperationPoolV5::from_ssz_bytes(bytes).map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V11`.
|
||||||
impl<T: EthSpec> StoreItem for PersistedOperationPool<T> {
|
impl<T: EthSpec> StoreItem for PersistedOperationPool<T> {
|
||||||
fn db_column() -> DBColumn {
|
fn db_column() -> DBColumn {
|
||||||
DBColumn::OpPool
|
DBColumn::OpPool
|
||||||
@@ -148,9 +165,9 @@ impl<T: EthSpec> StoreItem for PersistedOperationPool<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
|
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
|
||||||
// Default deserialization to the Altair variant.
|
// Default deserialization to the latest variant.
|
||||||
PersistedOperationPoolAltair::from_ssz_bytes(bytes)
|
PersistedOperationPoolV11::from_ssz_bytes(bytes)
|
||||||
.map(Self::Altair)
|
.map(Self::V11)
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
|
|||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use types::{Checkpoint, Hash256, Slot};
|
use types::{Checkpoint, Hash256, Slot};
|
||||||
|
|
||||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(9);
|
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(11);
|
||||||
|
|
||||||
// All the keys that get stored under the `BeaconMeta` column.
|
// All the keys that get stored under the `BeaconMeta` column.
|
||||||
//
|
//
|
||||||
|
|||||||
Reference in New Issue
Block a user