Merge branch 'electra-engine-api' of https://github.com/sigp/lighthouse into beacon-api-electra

This commit is contained in:
realbigsean
2024-07-08 18:25:56 -07:00
113 changed files with 1728 additions and 2663 deletions

View File

@@ -114,7 +114,7 @@ use std::collections::HashSet;
use std::io::prelude::*;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
@@ -1410,10 +1410,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}
let start_slot = head_state.slot();
let task_start = Instant::now();
let max_task_runtime = Duration::from_secs(self.spec.seconds_per_slot);
let head_state_slot = head_state.slot();
let mut state = head_state;
@@ -1423,18 +1419,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};
while state.slot() < slot {
// Do not allow and forward state skip that takes longer than the maximum task duration.
//
// This is a protection against nodes doing too much work when they're not synced
// to a chain.
if task_start + max_task_runtime < Instant::now() {
return Err(Error::StateSkipTooLarge {
start_slot,
requested_slot: slot,
max_task_runtime,
});
}
// Note: supplying some `state_root` when it is known would be a cheap and easy
// optimization.
match per_slot_processing(&mut state, skip_state_root, &self.spec) {

View File

@@ -20,26 +20,12 @@ use types::{
Hash256, Slot,
};
/// Ensure this justified checkpoint has an epoch of 0 so that it is never
/// greater than the justified checkpoint and enshrined as the actual justified
/// checkpoint.
const JUNK_BEST_JUSTIFIED_CHECKPOINT: Checkpoint = Checkpoint {
epoch: Epoch::new(0),
root: Hash256::repeat_byte(0),
};
#[derive(Debug)]
pub enum Error {
UnableToReadSlot,
UnableToReadTime,
InvalidGenesisSnapshot(Slot),
AncestorUnknown { ancestor_slot: Slot },
UninitializedBestJustifiedBalances,
FailedToReadBlock(StoreError),
MissingBlock(Hash256),
FailedToReadState(StoreError),
MissingState(Hash256),
InvalidPersistedBytes(ssz::DecodeError),
BeaconStateError(BeaconStateError),
Arith(ArithError),
}
@@ -66,7 +52,6 @@ const MAX_BALANCE_CACHE_SIZE: usize = 4;
)]
pub(crate) struct CacheItem {
pub(crate) block_root: Hash256,
#[superstruct(only(V8))]
pub(crate) epoch: Epoch,
pub(crate) balances: Vec<u64>,
}
@@ -79,7 +64,6 @@ pub(crate) type CacheItem = CacheItemV8;
no_enum
)]
pub struct BalancesCache {
#[superstruct(only(V8))]
pub(crate) items: Vec<CacheItemV8>,
}
@@ -365,59 +349,15 @@ where
pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV17;
/// A container which allows persisting the `BeaconForkChoiceStore` to the on-disk database.
#[superstruct(
variants(V11, V17),
variant_attributes(derive(Encode, Decode)),
no_enum
)]
#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)]
pub struct PersistedForkChoiceStore {
#[superstruct(only(V11, V17))]
pub balances_cache: BalancesCacheV8,
pub time: Slot,
pub finalized_checkpoint: Checkpoint,
pub justified_checkpoint: Checkpoint,
pub justified_balances: Vec<u64>,
#[superstruct(only(V11))]
pub best_justified_checkpoint: Checkpoint,
#[superstruct(only(V11, V17))]
pub unrealized_justified_checkpoint: Checkpoint,
#[superstruct(only(V11, V17))]
pub unrealized_finalized_checkpoint: Checkpoint,
#[superstruct(only(V11, V17))]
pub proposer_boost_root: Hash256,
#[superstruct(only(V11, V17))]
pub equivocating_indices: BTreeSet<u64>,
}
impl From<PersistedForkChoiceStoreV11> for PersistedForkChoiceStore {
fn from(from: PersistedForkChoiceStoreV11) -> PersistedForkChoiceStore {
PersistedForkChoiceStore {
balances_cache: from.balances_cache,
time: from.time,
finalized_checkpoint: from.finalized_checkpoint,
justified_checkpoint: from.justified_checkpoint,
justified_balances: from.justified_balances,
unrealized_justified_checkpoint: from.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: from.unrealized_finalized_checkpoint,
proposer_boost_root: from.proposer_boost_root,
equivocating_indices: from.equivocating_indices,
}
}
}
impl From<PersistedForkChoiceStore> for PersistedForkChoiceStoreV11 {
fn from(from: PersistedForkChoiceStore) -> PersistedForkChoiceStoreV11 {
PersistedForkChoiceStoreV11 {
balances_cache: from.balances_cache,
time: from.time,
finalized_checkpoint: from.finalized_checkpoint,
justified_checkpoint: from.justified_checkpoint,
justified_balances: from.justified_balances,
best_justified_checkpoint: JUNK_BEST_JUSTIFIED_CHECKPOINT,
unrealized_justified_checkpoint: from.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: from.unrealized_finalized_checkpoint,
proposer_boost_root: from.proposer_boost_root,
equivocating_indices: from.equivocating_indices,
}
}
}

View File

@@ -19,7 +19,7 @@ pub const ENGINE_CAPABILITIES_REFRESH_INTERVAL: u64 = 300;
pub enum ElectraReadiness {
/// The execution engine is electra-enabled (as far as we can tell)
Ready,
/// We are connected to an execution engine which doesn't support the V3 engine api methods
/// We are connected to an execution engine which doesn't support the V4 engine api methods
V4MethodsNotSupported { error: String },
/// The transition configuration with the EL failed, there might be a problem with
/// connectivity, authentication or a difference in configuration.

View File

@@ -28,7 +28,6 @@ use state_processing::{
state_advance::Error as StateAdvanceError,
BlockProcessingError, BlockReplayError, EpochProcessingError, SlotProcessingError,
};
use std::time::Duration;
use task_executor::ShutdownReason;
use tokio::task::JoinError;
use types::milhouse::Error as MilhouseError;
@@ -77,11 +76,6 @@ pub enum BeaconChainError {
ProposerSlashingValidationError(ProposerSlashingValidationError),
AttesterSlashingValidationError(AttesterSlashingValidationError),
BlsExecutionChangeValidationError(BlsExecutionChangeValidationError),
StateSkipTooLarge {
start_slot: Slot,
requested_slot: Slot,
max_task_runtime: Duration,
},
MissingFinalizedStateRoot(Slot),
/// Returned when an internal check fails, indicating corrupt data.
InvariantViolated(String),

View File

@@ -1,4 +1,4 @@
use crate::beacon_fork_choice_store::{PersistedForkChoiceStoreV11, PersistedForkChoiceStoreV17};
use crate::beacon_fork_choice_store::PersistedForkChoiceStoreV17;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use store::{DBColumn, Error, StoreItem};
@@ -7,37 +7,12 @@ use superstruct::superstruct;
// If adding a new version you should update this type alias and fix the breakages.
pub type PersistedForkChoice = PersistedForkChoiceV17;
#[superstruct(
variants(V11, V17),
variant_attributes(derive(Encode, Decode)),
no_enum
)]
#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)]
pub struct PersistedForkChoice {
pub fork_choice: fork_choice::PersistedForkChoice,
#[superstruct(only(V11))]
pub fork_choice_store: PersistedForkChoiceStoreV11,
#[superstruct(only(V17))]
pub fork_choice_store: PersistedForkChoiceStoreV17,
}
impl From<PersistedForkChoiceV11> for PersistedForkChoice {
fn from(from: PersistedForkChoiceV11) -> PersistedForkChoice {
PersistedForkChoice {
fork_choice: from.fork_choice,
fork_choice_store: from.fork_choice_store.into(),
}
}
}
impl From<PersistedForkChoice> for PersistedForkChoiceV11 {
fn from(from: PersistedForkChoice) -> PersistedForkChoiceV11 {
PersistedForkChoiceV11 {
fork_choice: from.fork_choice,
fork_choice_store: from.fork_choice_store.into(),
}
}
}
macro_rules! impl_store_item {
($type:ty) => {
impl StoreItem for $type {
@@ -56,5 +31,4 @@ macro_rules! impl_store_item {
};
}
impl_store_item!(PersistedForkChoiceV11);
impl_store_item!(PersistedForkChoiceV17);

View File

@@ -1,8 +1,6 @@
//! Utilities for managing database schema changes.
mod migration_schema_v17;
mod migration_schema_v18;
mod migration_schema_v19;
mod migration_schema_v20;
mod migration_schema_v21;
use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec;
@@ -53,32 +51,8 @@ pub fn migrate_schema<T: BeaconChainTypes>(
}
//
// Migrations from before SchemaVersion(16) are deprecated.
// Migrations from before SchemaVersion(19) are deprecated.
//
(SchemaVersion(16), SchemaVersion(17)) => {
let ops = migration_schema_v17::upgrade_to_v17::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(17), SchemaVersion(16)) => {
let ops = migration_schema_v17::downgrade_from_v17::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(17), SchemaVersion(18)) => {
let ops = migration_schema_v18::upgrade_to_v18::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(18), SchemaVersion(17)) => {
let ops = migration_schema_v18::downgrade_from_v18::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(18), SchemaVersion(19)) => {
let ops = migration_schema_v19::upgrade_to_v19::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(19), SchemaVersion(18)) => {
let ops = migration_schema_v19::downgrade_from_v19::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(19), SchemaVersion(20)) => {
let ops = migration_schema_v20::upgrade_to_v20::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
@@ -87,6 +61,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v20::downgrade_from_v20::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(20), SchemaVersion(21)) => {
let ops = migration_schema_v21::upgrade_to_v21::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(20)) => {
let ops = migration_schema_v21::downgrade_from_v21::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,

View File

@@ -1,88 +0,0 @@
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY};
use crate::persisted_fork_choice::{PersistedForkChoiceV11, PersistedForkChoiceV17};
use proto_array::core::{SszContainerV16, SszContainerV17};
use slog::{debug, Logger};
use ssz::{Decode, Encode};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
pub fn upgrade_fork_choice(
mut fork_choice: PersistedForkChoiceV11,
) -> Result<PersistedForkChoiceV17, Error> {
let ssz_container_v16 = SszContainerV16::from_ssz_bytes(
&fork_choice.fork_choice.proto_array_bytes,
)
.map_err(|e| {
Error::SchemaMigrationError(format!(
"Failed to decode ProtoArrayForkChoice during schema migration: {:?}",
e
))
})?;
let ssz_container_v17: SszContainerV17 = ssz_container_v16.try_into().map_err(|e| {
Error::SchemaMigrationError(format!(
"Missing checkpoint during schema migration: {:?}",
e
))
})?;
fork_choice.fork_choice.proto_array_bytes = ssz_container_v17.as_ssz_bytes();
Ok(fork_choice.into())
}
pub fn downgrade_fork_choice(
mut fork_choice: PersistedForkChoiceV17,
) -> Result<PersistedForkChoiceV11, Error> {
let ssz_container_v17 = SszContainerV17::from_ssz_bytes(
&fork_choice.fork_choice.proto_array_bytes,
)
.map_err(|e| {
Error::SchemaMigrationError(format!(
"Failed to decode ProtoArrayForkChoice during schema migration: {:?}",
e
))
})?;
let ssz_container_v16: SszContainerV16 = ssz_container_v17.into();
fork_choice.fork_choice.proto_array_bytes = ssz_container_v16.as_ssz_bytes();
Ok(fork_choice.into())
}
pub fn upgrade_to_v17<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Get persisted_fork_choice.
let v11 = db
.get_item::<PersistedForkChoiceV11>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
let v17 = upgrade_fork_choice(v11)?;
debug!(
log,
"Removing unused best_justified_checkpoint from fork choice store."
);
Ok(vec![v17.as_kv_store_op(FORK_CHOICE_DB_KEY)])
}
pub fn downgrade_from_v17<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Get persisted_fork_choice.
let v17 = db
.get_item::<PersistedForkChoiceV17>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
let v11 = downgrade_fork_choice(v17)?;
debug!(
log,
"Adding junk best_justified_checkpoint to fork choice store."
);
Ok(vec![v11.as_kv_store_op(FORK_CHOICE_DB_KEY)])
}

View File

@@ -1,119 +0,0 @@
use crate::beacon_chain::BeaconChainTypes;
use slog::{error, info, warn, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
use store::{
get_key_for_col, metadata::BLOB_INFO_KEY, DBColumn, Error, HotColdDB, KeyValueStoreOp,
};
use types::{Epoch, EthSpec, Hash256, Slot};
/// The slot clock isn't usually available before the database is initialized, so we construct a
/// temporary slot clock by reading the genesis state. It should always exist if the database is
/// initialized at a prior schema version, however we still handle the lack of genesis state
/// gracefully.
fn get_slot_clock<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
log: &Logger,
) -> Result<Option<T::SlotClock>, Error> {
let spec = db.get_chain_spec();
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero())? else {
error!(log, "Missing genesis block");
return Ok(None);
};
let Some(genesis_state) = db.get_state(&genesis_block.state_root(), Some(Slot::new(0)))? else {
error!(log, "Missing genesis state"; "state_root" => ?genesis_block.state_root());
return Ok(None);
};
Ok(Some(T::SlotClock::new(
spec.genesis_slot,
Duration::from_secs(genesis_state.genesis_time()),
Duration::from_secs(spec.seconds_per_slot),
)))
}
fn get_current_epoch<T: BeaconChainTypes>(
db: &Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: &Logger,
) -> Result<Epoch, Error> {
get_slot_clock::<T>(db, log)?
.and_then(|clock| clock.now())
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
.ok_or(Error::SlotClockUnavailableForMigration)
}
pub fn upgrade_to_v18<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
db.heal_freezer_block_roots_at_split()?;
db.heal_freezer_block_roots_at_genesis()?;
info!(log, "Healed freezer block roots");
// No-op, even if Deneb has already occurred. The database is probably borked in this case, but
// *maybe* the fork recovery will revert the minority fork and succeed.
if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch {
let current_epoch = get_current_epoch::<T>(&db, &log)?;
if current_epoch >= deneb_fork_epoch {
warn!(
log,
"Attempting upgrade to v18 schema";
"info" => "this may not work as Deneb has already been activated"
);
} else {
info!(
log,
"Upgrading to v18 schema";
"info" => "ready for Deneb",
"epochs_until_deneb" => deneb_fork_epoch - current_epoch
);
}
} else {
info!(
log,
"Upgrading to v18 schema";
"info" => "ready for Deneb once it is scheduled"
);
}
Ok(vec![])
}
pub fn downgrade_from_v18<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// We cannot downgrade from V18 once the Deneb fork has been activated, because there will
// be blobs and blob metadata in the database that aren't understood by the V17 schema.
if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch {
let current_epoch = get_current_epoch::<T>(&db, &log)?;
if current_epoch >= deneb_fork_epoch {
error!(
log,
"Deneb already active: v18+ is mandatory";
"current_epoch" => current_epoch,
"deneb_fork_epoch" => deneb_fork_epoch,
);
return Err(Error::UnableToDowngrade);
} else {
info!(
log,
"Downgrading to v17 schema";
"info" => "you will need to upgrade before Deneb",
"epochs_until_deneb" => deneb_fork_epoch - current_epoch
);
}
} else {
info!(
log,
"Downgrading to v17 schema";
"info" => "you need to upgrade before Deneb",
);
}
let ops = vec![KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconMeta.into(),
BLOB_INFO_KEY.as_bytes(),
))];
Ok(ops)
}

View File

@@ -1,65 +0,0 @@
use crate::beacon_chain::BeaconChainTypes;
use slog::{debug, info, Logger};
use std::sync::Arc;
use store::{get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp};
pub fn upgrade_to_v19<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut hot_delete_ops = vec![];
let mut blob_keys = vec![];
let column = DBColumn::BeaconBlob;
debug!(log, "Migrating from v18 to v19");
// Iterate through the blobs on disk.
for res in db.hot_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
let key_col = get_key_for_col(column.as_str(), &key);
hot_delete_ops.push(KeyValueStoreOp::DeleteKey(key_col));
blob_keys.push(key);
}
let num_blobs = blob_keys.len();
debug!(log, "Collected {} blob lists to migrate", num_blobs);
let batch_size = 500;
let mut batch = Vec::with_capacity(batch_size);
for key in blob_keys {
let next_blob = db.hot_db.get_bytes(column.as_str(), &key)?;
if let Some(next_blob) = next_blob {
let key_col = get_key_for_col(column.as_str(), &key);
batch.push(KeyValueStoreOp::PutKeyValue(key_col, next_blob));
if batch.len() >= batch_size {
db.blobs_db.do_atomically(batch.clone())?;
batch.clear();
}
}
}
// Process the remaining batch if it's not empty
if !batch.is_empty() {
db.blobs_db.do_atomically(batch)?;
}
debug!(log, "Wrote {} blobs to the blobs db", num_blobs);
// Delete all the blobs
info!(log, "Upgrading to v19 schema");
Ok(hot_delete_ops)
}
pub fn downgrade_from_v19<T: BeaconChainTypes>(
_db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// No-op
info!(
log,
"Downgrading to v18 schema";
);
Ok(vec![])
}

View File

@@ -11,6 +11,8 @@ pub fn upgrade_to_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v19 to v20");
// Load a V15 op pool and transform it to V20.
let Some(PersistedOperationPoolV15::<T::EthSpec> {
attestations_v15,
@@ -52,6 +54,8 @@ pub fn downgrade_from_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Downgrading from v20 to v19");
// Load a V20 op pool and transform it to V15.
let Some(PersistedOperationPoolV20::<T::EthSpec> {
attestations,

View File

@@ -0,0 +1,83 @@
use crate::beacon_chain::BeaconChainTypes;
use crate::validator_pubkey_cache::DatabasePubkey;
use slog::{info, Logger};
use ssz::{Decode, Encode};
use std::sync::Arc;
use store::{
get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem,
};
use types::{Hash256, PublicKey};
const LOG_EVERY: usize = 200_000;
pub fn upgrade_to_v21<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v20 to v21");
let mut ops = vec![];
// Iterate through all pubkeys and decompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::PubkeyCache)
.enumerate()
{
let (key, value) = res?;
let pubkey = PublicKey::from_ssz_bytes(&value)?;
let decompressed = DatabasePubkey::from_pubkey(&pubkey);
ops.push(decompressed.as_kv_store_op(key));
if i > 0 && i % LOG_EVERY == 0 {
info!(
log,
"Public key decompression in progress";
"keys_decompressed" => i
);
}
}
info!(log, "Public key decompression complete");
Ok(ops)
}
pub fn downgrade_from_v21<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Downgrading from v21 to v20");
let mut ops = vec![];
// Iterate through all pubkeys and recompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::PubkeyCache)
.enumerate()
{
let (key, value) = res?;
let decompressed = DatabasePubkey::from_ssz_bytes(&value)?;
let (_, pubkey_bytes) = decompressed.as_pubkey().map_err(|e| Error::DBError {
message: format!("{e:?}"),
})?;
let db_key = get_key_for_col(DBColumn::PubkeyCache.into(), key.as_bytes());
ops.push(KeyValueStoreOp::PutKeyValue(
db_key,
pubkey_bytes.as_ssz_bytes(),
));
if i > 0 && i % LOG_EVERY == 0 {
info!(
log,
"Public key compression in progress";
"keys_compressed" => i
);
}
}
info!(log, "Public key compression complete");
Ok(ops)
}

View File

@@ -1383,17 +1383,43 @@ impl<E: EthSpec> ValidatorMonitor<E> {
});
if self.individual_tracking() {
info!(
self.log,
"Attestation included in aggregate";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %data.slot,
"src" => src,
"validator" => %id,
);
let is_first_inclusion_aggregate = validator
.get_from_epoch_summary(epoch, |summary_opt| {
if let Some(summary) = summary_opt {
Some(summary.attestation_aggregate_inclusions == 0)
} else {
// No data for this validator: no inclusion.
Some(true)
}
})
.unwrap_or(true);
if is_first_inclusion_aggregate {
info!(
self.log,
"Attestation included in aggregate";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %data.slot,
"src" => src,
"validator" => %id,
);
} else {
// Downgrade to Debug for second and onwards of logging to reduce verbosity
debug!(
self.log,
"Attestation included in aggregate";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %data.slot,
"src" => src,
"validator" => %id,
)
};
}
validator.with_epoch_summary(epoch, |summary| {
@@ -1434,7 +1460,6 @@ impl<E: EthSpec> ValidatorMonitor<E> {
&["block", label],
);
});
if self.individual_tracking() {
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_DELAY_SLOTS,
@@ -1442,16 +1467,41 @@ impl<E: EthSpec> ValidatorMonitor<E> {
delay.as_u64() as i64,
);
info!(
self.log,
"Attestation included in block";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"inclusion_lag" => format!("{} slot(s)", delay),
"epoch" => %epoch,
"slot" => %data.slot,
"validator" => %id,
);
let is_first_inclusion_block = validator
.get_from_epoch_summary(epoch, |summary_opt| {
if let Some(summary) = summary_opt {
Some(summary.attestation_block_inclusions == 0)
} else {
// No data for this validator: no inclusion.
Some(true)
}
})
.unwrap_or(true);
if is_first_inclusion_block {
info!(
self.log,
"Attestation included in block";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"inclusion_lag" => format!("{} slot(s)", delay),
"epoch" => %epoch,
"slot" => %data.slot,
"validator" => %id,
);
} else {
// Downgrade to Debug for second and onwards of logging to reduce verbosity
debug!(
self.log,
"Attestation included in block";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"inclusion_lag" => format!("{} slot(s)", delay),
"epoch" => %epoch,
"slot" => %data.slot,
"validator" => %id,
);
}
}
validator.with_epoch_summary(epoch, |summary| {

View File

@@ -1,6 +1,9 @@
use crate::errors::BeaconChainError;
use crate::{BeaconChainTypes, BeaconStore};
use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN;
use smallvec::SmallVec;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::collections::HashMap;
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
@@ -49,14 +52,13 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
let mut pubkey_bytes = vec![];
for validator_index in 0.. {
if let Some(DatabasePubkey(pubkey)) =
if let Some(db_pubkey) =
store.get_item(&DatabasePubkey::key_for_index(validator_index))?
{
pubkeys.push((&pubkey).try_into().map_err(|e| {
BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e))
})?);
pubkey_bytes.push(pubkey);
indices.insert(pubkey, validator_index);
let (pk, pk_bytes) = DatabasePubkey::as_pubkey(&db_pubkey)?;
pubkeys.push(pk);
indices.insert(pk_bytes, validator_index);
pubkey_bytes.push(pk_bytes);
} else {
break;
}
@@ -104,29 +106,29 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
self.indices.reserve(validator_keys.len());
let mut store_ops = Vec::with_capacity(validator_keys.len());
for pubkey in validator_keys {
for pubkey_bytes in validator_keys {
let i = self.pubkeys.len();
if self.indices.contains_key(&pubkey) {
if self.indices.contains_key(&pubkey_bytes) {
return Err(BeaconChainError::DuplicateValidatorPublicKey);
}
let pubkey = (&pubkey_bytes)
.try_into()
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?;
// Stage the new validator key for writing to disk.
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)),
DatabasePubkey::from_pubkey(&pubkey)
.as_kv_store_op(DatabasePubkey::key_for_index(i)),
));
self.pubkeys.push(
(&pubkey)
.try_into()
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?,
);
self.pubkey_bytes.push(pubkey);
self.indices.insert(pubkey, i);
self.pubkeys.push(pubkey);
self.pubkey_bytes.push(pubkey_bytes);
self.indices.insert(pubkey_bytes, i);
}
Ok(store_ops)
@@ -166,7 +168,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
/// Wrapper for a public key stored in the database.
///
/// Keyed by the validator index as `Hash256::from_low_u64_be(index)`.
struct DatabasePubkey(PublicKeyBytes);
#[derive(Encode, Decode)]
pub struct DatabasePubkey {
pubkey: SmallVec<[u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN]>,
}
impl StoreItem for DatabasePubkey {
fn db_column() -> DBColumn {
@@ -174,11 +179,11 @@ impl StoreItem for DatabasePubkey {
}
fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Ok(Self(PublicKeyBytes::from_ssz_bytes(bytes)?))
Ok(Self::from_ssz_bytes(bytes)?)
}
}
@@ -186,6 +191,19 @@ impl DatabasePubkey {
fn key_for_index(index: usize) -> Hash256 {
Hash256::from_low_u64_be(index as u64)
}
pub fn from_pubkey(pubkey: &PublicKey) -> Self {
Self {
pubkey: pubkey.serialize_uncompressed().into(),
}
}
pub fn as_pubkey(&self) -> Result<(PublicKey, PublicKeyBytes), BeaconChainError> {
let pubkey = PublicKey::deserialize_uncompressed(&self.pubkey)
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?;
let pubkey_bytes = pubkey.compress();
Ok((pubkey, pubkey_bytes))
}
}
#[cfg(test)]

View File

@@ -3050,13 +3050,7 @@ async fn schema_downgrade_to_min_version() {
)
.await;
let min_version = if harness.spec.deneb_fork_epoch.is_some() {
// Can't downgrade beyond V18 once Deneb is reached, for simplicity don't test that
// at all if Deneb is enabled.
SchemaVersion(18)
} else {
SchemaVersion(16)
};
let min_version = SchemaVersion(19);
// Save the slot clock so that the new harness doesn't revert in time.
let slot_clock = harness.chain.slot_clock.clone();