Merge branch 'unstable' into off-4844

This commit is contained in:
Diva M
2023-03-02 15:38:00 -05:00
86 changed files with 1224 additions and 316 deletions

View File

@@ -57,7 +57,7 @@ fork_choice = { path = "../../consensus/fork_choice" }
task_executor = { path = "../../common/task_executor" }
derivative = "2.1.1"
itertools = "0.10.0"
slasher = { path = "../../slasher" }
slasher = { path = "../../slasher", default-features = false }
eth2 = { path = "../../common/eth2" }
strum = { version = "0.24.0", features = ["derive"] }
logging = { path = "../../common/logging" }

View File

@@ -21,8 +21,6 @@ pub const ENGINE_CAPABILITIES_REFRESH_INTERVAL: u64 = 300;
pub enum CapellaReadiness {
/// The execution engine is capella-enabled (as far as we can tell)
Ready,
/// The EL can be reached and has the correct configuration, however it's not yet synced.
NotSynced,
/// We are connected to an execution engine which doesn't support the V2 engine api methods
V2MethodsNotSupported { error: String },
/// The transition configuration with the EL failed, there might be a problem with
@@ -44,11 +42,6 @@ impl fmt::Display for CapellaReadiness {
execution endpoint: {}",
error
),
CapellaReadiness::NotSynced => write!(
f,
"The execution endpoint is connected and configured, \
however it is not yet synced"
),
CapellaReadiness::NoExecutionEndpoint => write!(
f,
"The --execution-endpoint flag is not specified, this is a \
@@ -56,8 +49,7 @@ impl fmt::Display for CapellaReadiness {
),
CapellaReadiness::V2MethodsNotSupported { error } => write!(
f,
"The execution endpoint does not appear to support \
the required engine api methods for Capella: {}",
"Execution endpoint does not support Capella methods: {}",
error
),
}
@@ -115,12 +107,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
if all_good {
if !el.is_synced_for_notifier().await {
// The EL is not synced.
CapellaReadiness::NotSynced
} else {
CapellaReadiness::Ready
}
CapellaReadiness::Ready
} else {
CapellaReadiness::V2MethodsNotSupported {
error: missing_methods,

View File

@@ -2,9 +2,41 @@ use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY};
use operation_pool::{
PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14,
};
use slog::{debug, info, Logger};
use slog::{debug, error, info, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
use types::{EthSpec, Hash256, Slot};
/// The slot clock isn't usually available before the database is initialized, so we construct a
/// temporary slot clock by reading the genesis state. It should always exist if the database is
/// initialized at a prior schema version, however we still handle the lack of genesis state
/// gracefully.
fn get_slot_clock<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
log: &Logger,
) -> Result<Option<T::SlotClock>, Error> {
let spec = db.get_chain_spec();
let genesis_block = if let Some(block) = db.get_blinded_block(&Hash256::zero())? {
block
} else {
error!(log, "Missing genesis block");
return Ok(None);
};
let genesis_state =
if let Some(state) = db.get_state(&genesis_block.state_root(), Some(Slot::new(0)))? {
state
} else {
error!(log, "Missing genesis state"; "state_root" => ?genesis_block.state_root());
return Ok(None);
};
Ok(Some(T::SlotClock::new(
spec.genesis_slot,
Duration::from_secs(genesis_state.genesis_time()),
Duration::from_secs(spec.seconds_per_slot),
)))
}
pub fn upgrade_to_v14<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
@@ -41,17 +73,35 @@ pub fn downgrade_from_v14<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// We cannot downgrade from V14 once the Capella fork has been reached because there will
// be HistoricalSummaries stored in the database instead of HistoricalRoots and prior versions
// of Lighthouse can't handle that.
if let Some(capella_fork_epoch) = db.get_chain_spec().capella_fork_epoch {
let current_epoch = get_slot_clock::<T>(&db, &log)?
.and_then(|clock| clock.now())
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
.ok_or(Error::SlotClockUnavailableForMigration)?;
if current_epoch >= capella_fork_epoch {
error!(
log,
"Capella already active: v14+ is mandatory";
"current_epoch" => current_epoch,
"capella_fork_epoch" => capella_fork_epoch,
);
return Err(Error::UnableToDowngrade);
}
}
// Load a V14 op pool and transform it to V12.
let PersistedOperationPoolV14 {
let PersistedOperationPoolV14::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
} = if let Some(PersistedOperationPool::<T::EthSpec>::V14(op_pool)) =
db.get_item(&OP_POOL_DB_KEY)?
{
} = if let Some(op_pool) = db.get_item(&OP_POOL_DB_KEY)? {
op_pool
} else {
debug!(log, "Nothing to do, no operation pool stored");

View File

@@ -43,7 +43,7 @@ pub fn downgrade_from_v15<T: BeaconChainTypes>(
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Load a V15 op pool and transform it to V14.
let PersistedOperationPoolV15 {
let PersistedOperationPoolV15::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
@@ -51,9 +51,7 @@ pub fn downgrade_from_v15<T: BeaconChainTypes>(
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
} = if let Some(PersistedOperationPool::<T::EthSpec>::V15(op_pool)) =
db.get_item(&OP_POOL_DB_KEY)?
{
} = if let Some(op_pool) = db.get_item(&OP_POOL_DB_KEY)? {
op_pool
} else {
debug!(log, "Nothing to do, no operation pool stored");

View File

@@ -29,7 +29,7 @@ const TOTAL_LABEL: &str = "total";
/// The validator monitor collects per-epoch data about each monitored validator. Historical data
/// will be kept around for `HISTORIC_EPOCHS` before it is pruned.
pub const HISTORIC_EPOCHS: usize = 4;
pub const HISTORIC_EPOCHS: usize = 10;
/// Once the validator monitor reaches this number of validators it will stop
/// tracking their metrics/logging individually in an effort to reduce
@@ -45,7 +45,7 @@ pub enum Error {
/// Contains data pertaining to one validator for one epoch.
#[derive(Default)]
struct EpochSummary {
pub struct EpochSummary {
/*
* Attestations with a target in the current epoch.
*/
@@ -103,6 +103,12 @@ struct EpochSummary {
pub proposer_slashings: usize,
/// The number of attester slashings observed.
pub attester_slashings: usize,
/*
* Other validator info helpful for the UI.
*/
/// The total balance of the validator.
pub total_balance: Option<u64>,
}
impl EpochSummary {
@@ -176,18 +182,60 @@ impl EpochSummary {
pub fn register_attester_slashing(&mut self) {
self.attester_slashings += 1;
}
pub fn register_validator_total_balance(&mut self, total_balance: u64) {
self.total_balance = Some(total_balance)
}
}
type SummaryMap = HashMap<Epoch, EpochSummary>;
#[derive(Default)]
pub struct ValidatorMetrics {
pub attestation_hits: u64,
pub attestation_misses: u64,
pub attestation_head_hits: u64,
pub attestation_head_misses: u64,
pub attestation_target_hits: u64,
pub attestation_target_misses: u64,
}
impl ValidatorMetrics {
pub fn increment_hits(&mut self) {
self.attestation_hits += 1;
}
pub fn increment_misses(&mut self) {
self.attestation_misses += 1;
}
pub fn increment_target_hits(&mut self) {
self.attestation_target_hits += 1;
}
pub fn increment_target_misses(&mut self) {
self.attestation_target_misses += 1;
}
pub fn increment_head_hits(&mut self) {
self.attestation_head_hits += 1;
}
pub fn increment_head_misses(&mut self) {
self.attestation_head_misses += 1;
}
}
/// A validator that is being monitored by the `ValidatorMonitor`.
struct MonitoredValidator {
pub struct MonitoredValidator {
/// A human-readable identifier for the validator.
pub id: String,
/// The validator index in the state.
pub index: Option<u64>,
/// A history of the validator over time.
pub summaries: RwLock<SummaryMap>,
/// Validator metrics to be exposed over the HTTP API.
pub metrics: RwLock<ValidatorMetrics>,
}
impl MonitoredValidator {
@@ -198,6 +246,7 @@ impl MonitoredValidator {
.unwrap_or_else(|| pubkey.to_string()),
index,
summaries: <_>::default(),
metrics: <_>::default(),
}
}
@@ -252,6 +301,20 @@ impl MonitoredValidator {
fn touch_epoch_summary(&self, epoch: Epoch) {
self.with_epoch_summary(epoch, |_| {});
}
fn get_from_epoch_summary<F, U>(&self, epoch: Epoch, func: F) -> Option<U>
where
F: Fn(Option<&EpochSummary>) -> Option<U>,
{
let summaries = self.summaries.read();
func(summaries.get(&epoch))
}
pub fn get_total_balance(&self, epoch: Epoch) -> Option<u64> {
self.get_from_epoch_summary(epoch, |summary_opt| {
summary_opt.and_then(|summary| summary.total_balance)
})
}
}
/// Holds a collection of `MonitoredValidator` and is notified about a variety of events on the P2P
@@ -347,12 +410,20 @@ impl<T: EthSpec> ValidatorMonitor<T> {
if let Some(i) = monitored_validator.index {
monitored_validator.touch_epoch_summary(current_epoch);
let i = i as usize;
// Cache relevant validator info.
if let Some(balance) = state.balances().get(i) {
monitored_validator.with_epoch_summary(current_epoch, |summary| {
summary.register_validator_total_balance(*balance)
});
}
// Only log the per-validator metrics if it's enabled.
if !self.individual_tracking() {
continue;
}
let i = i as usize;
let id = &monitored_validator.id;
if let Some(balance) = state.balances().get(i) {
@@ -479,6 +550,25 @@ impl<T: EthSpec> ValidatorMonitor<T> {
continue;
}
// Store some metrics directly to be re-exposed on the HTTP API.
let mut validator_metrics = monitored_validator.metrics.write();
if previous_epoch_matched_any {
validator_metrics.increment_hits();
if previous_epoch_matched_target {
validator_metrics.increment_target_hits()
} else {
validator_metrics.increment_target_misses()
}
if previous_epoch_matched_head {
validator_metrics.increment_head_hits()
} else {
validator_metrics.increment_head_misses()
}
} else {
validator_metrics.increment_misses()
}
drop(validator_metrics);
// Indicates if any attestation made it on-chain.
//
// For Base states, this will be *any* attestation whatsoever. For Altair states,
@@ -717,6 +807,14 @@ impl<T: EthSpec> ValidatorMonitor<T> {
self.validators.values().map(|val| val.id.clone()).collect()
}
pub fn get_monitored_validator(&self, index: u64) -> Option<&MonitoredValidator> {
if let Some(pubkey) = self.indices.get(&index) {
self.validators.get(pubkey)
} else {
None
}
}
/// If `self.auto_register == true`, add the `validator_index` to `self.monitored_validators`.
/// Otherwise, do nothing.
pub fn auto_register_local_validator(&mut self, validator_index: u64) {

View File

@@ -2,6 +2,7 @@
use beacon_chain::attestation_verification::Error as AttnError;
use beacon_chain::builder::BeaconChainBuilder;
use beacon_chain::schema_change::migrate_schema;
use beacon_chain::test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType,
};
@@ -24,6 +25,7 @@ use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::{
iter::{BlockRootsIterator, StateRootsIterator},
HotColdDB, LevelDB, StoreConfig,
@@ -78,6 +80,7 @@ fn get_harness(
let harness = TestHarness::builder(MinimalEthSpec)
.default_spec()
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.logger(store.logger().clone())
.fresh_disk_store(store)
.mock_execution_layer()
.build();
@@ -2543,6 +2546,91 @@ async fn revert_minority_fork_on_resume() {
assert_eq!(heads.len(), 1);
}
// This test checks whether the schema downgrade from the latest version to some minimum supported
// version is correct. This is the easiest schema test to write without historic versions of
// Lighthouse on-hand, but has the disadvantage that the min version needs to be adjusted manually
// as old downgrades are deprecated.
#[tokio::test]
async fn schema_downgrade_to_min_version() {
let num_blocks_produced = E::slots_per_epoch() * 4;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let spec = &harness.chain.spec.clone();
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let min_version = if harness.spec.capella_fork_epoch.is_some() {
// Can't downgrade beyond V14 once Capella is reached, for simplicity don't test that
// at all if Capella is enabled.
SchemaVersion(14)
} else {
SchemaVersion(11)
};
// Close the database to ensure everything is written to disk.
drop(store);
drop(harness);
// Re-open the store.
let store = get_store(&db_path);
// Downgrade.
let deposit_contract_deploy_block = 0;
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
deposit_contract_deploy_block,
CURRENT_SCHEMA_VERSION,
min_version,
store.logger().clone(),
spec,
)
.expect("schema downgrade to minimum version should work");
// Upgrade back.
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
deposit_contract_deploy_block,
min_version,
CURRENT_SCHEMA_VERSION,
store.logger().clone(),
spec,
)
.expect("schema upgrade from minimum version should work");
// Rescreate the harness.
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.keypairs(KEYPAIRS[0..LOW_VALIDATOR_COUNT].to_vec())
.logger(store.logger().clone())
.resumed_disk_store(store.clone())
.mock_execution_layer()
.build();
check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store.clone());
check_chain_dump(&harness, num_blocks_produced + 1);
check_iterators(&harness);
// Check that downgrading beyond the minimum version fails (bound is *tight*).
let min_version_sub_1 = SchemaVersion(min_version.as_u64().checked_sub(1).unwrap());
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
deposit_contract_deploy_block,
CURRENT_SCHEMA_VERSION,
min_version_sub_1,
harness.logger().clone(),
spec,
)
.expect_err("should not downgrade below minimum version");
}
/// Checks that two chains are the same, for the purpose of these tests.
///
/// Several fields that are hard/impossible to check are ignored (e.g., the store).