Merge branch 'unstable' into deneb-free-blobs

# Conflicts:
#	.github/workflows/docker.yml
#	.github/workflows/local-testnet.yml
#	.github/workflows/test-suite.yml
#	Cargo.lock
#	Cargo.toml
#	beacon_node/beacon_chain/src/beacon_chain.rs
#	beacon_node/beacon_chain/src/builder.rs
#	beacon_node/beacon_chain/src/test_utils.rs
#	beacon_node/execution_layer/src/engine_api/json_structures.rs
#	beacon_node/network/src/beacon_processor/mod.rs
#	beacon_node/network/src/beacon_processor/worker/gossip_methods.rs
#	beacon_node/network/src/sync/backfill_sync/mod.rs
#	beacon_node/store/src/config.rs
#	beacon_node/store/src/hot_cold_store.rs
#	common/eth2_network_config/Cargo.toml
#	consensus/ssz/src/decode/impls.rs
#	consensus/ssz_derive/src/lib.rs
#	consensus/ssz_derive/tests/tests.rs
#	consensus/ssz_types/src/serde_utils/mod.rs
#	consensus/tree_hash/src/impls.rs
#	consensus/tree_hash/src/lib.rs
#	consensus/types/Cargo.toml
#	consensus/types/src/beacon_state.rs
#	consensus/types/src/chain_spec.rs
#	consensus/types/src/eth_spec.rs
#	consensus/types/src/fork_name.rs
#	lcli/Cargo.toml
#	lcli/src/main.rs
#	lcli/src/new_testnet.rs
#	scripts/local_testnet/el_bootnode.sh
#	scripts/local_testnet/genesis.json
#	scripts/local_testnet/geth.sh
#	scripts/local_testnet/setup.sh
#	scripts/local_testnet/start_local_testnet.sh
#	scripts/local_testnet/vars.env
#	scripts/tests/doppelganger_protection.sh
#	scripts/tests/genesis.json
#	scripts/tests/vars.env
#	testing/ef_tests/Cargo.toml
#	validator_client/src/block_service.rs
This commit is contained in:
Jimmy Chen
2023-05-30 11:26:33 +10:00
333 changed files with 5930 additions and 13386 deletions

View File

@@ -33,12 +33,12 @@ serde_derive = "1.0.116"
slog = { version = "2.5.2", features = ["max_level_trace"] }
sloggers = { version = "2.1.1", features = ["json"] }
slot_clock = { path = "../../common/slot_clock" }
eth2_hashing = "0.3.0"
eth2_ssz = "0.4.1"
eth2_ssz_types = "0.2.2"
eth2_ssz_derive = "0.3.1"
ethereum_hashing = "1.0.0-beta.2"
ethereum_ssz = "0.5.0"
ssz_types = "0.5.0"
ethereum_ssz_derive = "0.5.0"
state_processing = { path = "../../consensus/state_processing" }
tree_hash = "0.4.1"
tree_hash = "0.5.0"
types = { path = "../../consensus/types" }
tokio = "1.14.0"
tokio-stream = "0.1.3"

View File

@@ -65,14 +65,15 @@ where
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;
let fork = chain.canonical_head.cached_head().head_fork();
let mut signature_sets = Vec::with_capacity(num_indexed * 3);
// Iterate, flattening to get only the `Ok` values.
for indexed in indexing_results.iter().flatten() {
let signed_aggregate = &indexed.signed_aggregate;
let indexed_attestation = &indexed.indexed_attestation;
let fork = chain
.spec
.fork_at_epoch(indexed_attestation.data.target.epoch);
signature_sets.push(
signed_aggregate_selection_proof_signature_set(
@@ -169,8 +170,6 @@ where
&metrics::ATTESTATION_PROCESSING_BATCH_UNAGG_SIGNATURE_SETUP_TIMES,
);
let fork = chain.canonical_head.cached_head().head_fork();
let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
@@ -181,6 +180,9 @@ where
// Iterate, flattening to get only the `Ok` values.
for partially_verified in partial_results.iter().flatten() {
let indexed_attestation = &partially_verified.indexed_attestation;
let fork = chain
.spec
.fork_at_epoch(indexed_attestation.data.target.epoch);
let signature_set = indexed_attestation_signature_set_from_pubkeys(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),

View File

@@ -97,7 +97,8 @@ use state_processing::{
},
per_slot_processing,
state_advance::{complete_state_advance, partial_state_advance},
BlockSignatureStrategy, ConsensusContext, SigVerifiedOp, VerifyBlockRoot, VerifyOperation,
BlockSignatureStrategy, ConsensusContext, SigVerifiedOp, StateProcessingStrategy,
VerifyBlockRoot, VerifyOperation,
};
use std::borrow::Cow;
use std::cmp::Ordering;
@@ -464,6 +465,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub slasher: Option<Arc<Slasher<T::EthSpec>>>,
/// Provides monitoring of a set of explicitly defined validators.
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
/// The slot at which blocks are downloaded back to.
pub genesis_backfill_slot: Slot,
pub proposal_blob_cache: BlobCache<T::EthSpec>,
pub data_availability_checker: DataAvailabilityChecker<T>,
pub kzg: Option<Arc<Kzg>>,
@@ -599,7 +602,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Persists `self.eth1_chain` and its caches to disk.
pub fn persist_eth1_cache(&self) -> Result<(), Error> {
let _timer = metrics::start_timer(&metrics::PERSIST_OP_POOL);
let _timer = metrics::start_timer(&metrics::PERSIST_ETH1_CACHE);
if let Some(eth1_chain) = self.eth1_chain.as_ref() {
self.store
@@ -4891,6 +4894,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&mut state,
&block,
signature_strategy,
StateProcessingStrategy::Accurate,
VerifyBlockRoot::True,
&mut ctxt,
&self.spec,
@@ -5805,6 +5809,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let shuffling_id = BlockShufflingIds {
current: head_block.current_epoch_shuffling_id.clone(),
next: head_block.next_epoch_shuffling_id.clone(),
previous: None,
block_root: head_block.root,
}
.id_for_epoch(shuffling_epoch)

View File

@@ -218,7 +218,6 @@ where
finalized_checkpoint: self.finalized_checkpoint,
justified_checkpoint: self.justified_checkpoint,
justified_balances: self.justified_balances.effective_balances.clone(),
best_justified_checkpoint: JUNK_BEST_JUSTIFIED_CHECKPOINT,
unrealized_justified_checkpoint: self.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint,
proposer_boost_root: self.proposer_boost_root,
@@ -355,24 +354,62 @@ where
}
}
pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV17;
/// A container which allows persisting the `BeaconForkChoiceStore` to the on-disk database.
#[superstruct(variants(V11), variant_attributes(derive(Encode, Decode)), no_enum)]
#[superstruct(
variants(V11, V17),
variant_attributes(derive(Encode, Decode)),
no_enum
)]
pub struct PersistedForkChoiceStore {
#[superstruct(only(V11))]
#[superstruct(only(V11, V17))]
pub balances_cache: BalancesCacheV8,
pub time: Slot,
pub finalized_checkpoint: Checkpoint,
pub justified_checkpoint: Checkpoint,
pub justified_balances: Vec<u64>,
#[superstruct(only(V11))]
pub best_justified_checkpoint: Checkpoint,
#[superstruct(only(V11))]
#[superstruct(only(V11, V17))]
pub unrealized_justified_checkpoint: Checkpoint,
#[superstruct(only(V11))]
#[superstruct(only(V11, V17))]
pub unrealized_finalized_checkpoint: Checkpoint,
#[superstruct(only(V11))]
#[superstruct(only(V11, V17))]
pub proposer_boost_root: Hash256,
#[superstruct(only(V11))]
#[superstruct(only(V11, V17))]
pub equivocating_indices: BTreeSet<u64>,
}
pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV11;
impl Into<PersistedForkChoiceStore> for PersistedForkChoiceStoreV11 {
fn into(self) -> PersistedForkChoiceStore {
PersistedForkChoiceStore {
balances_cache: self.balances_cache,
time: self.time,
finalized_checkpoint: self.finalized_checkpoint,
justified_checkpoint: self.justified_checkpoint,
justified_balances: self.justified_balances,
unrealized_justified_checkpoint: self.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint,
proposer_boost_root: self.proposer_boost_root,
equivocating_indices: self.equivocating_indices,
}
}
}
impl Into<PersistedForkChoiceStoreV11> for PersistedForkChoiceStore {
fn into(self) -> PersistedForkChoiceStoreV11 {
PersistedForkChoiceStoreV11 {
balances_cache: self.balances_cache,
time: self.time,
finalized_checkpoint: self.finalized_checkpoint,
justified_checkpoint: self.justified_checkpoint,
justified_balances: self.justified_balances,
best_justified_checkpoint: JUNK_BEST_JUSTIFIED_CHECKPOINT,
unrealized_justified_checkpoint: self.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint,
proposer_boost_root: self.proposer_boost_root,
equivocating_indices: self.equivocating_indices,
}
}
}

View File

@@ -84,7 +84,7 @@ use state_processing::{
per_block_processing, per_slot_processing,
state_advance::partial_state_advance,
BlockProcessingError, BlockSignatureStrategy, ConsensusContext, SlotProcessingError,
VerifyBlockRoot,
StateProcessingStrategy, VerifyBlockRoot,
};
use std::borrow::Cow;
use std::fs;
@@ -1615,6 +1615,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
block.as_block(),
// Signatures were verified earlier in this function.
BlockSignatureStrategy::NoVerification,
StateProcessingStrategy::Accurate,
VerifyBlockRoot::True,
&mut consensus_context,
&chain.spec,

View File

@@ -8,7 +8,7 @@ use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_bound
use crate::head_tracker::HeadTracker;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::ValidatorMonitor;
@@ -710,6 +710,8 @@ where
)?;
}
let head_shuffling_ids = BlockShufflingIds::try_from_head(head_block_root, &head_state)?;
let mut head_snapshot = BeaconSnapshot {
beacon_block_root: head_block_root,
beacon_block: Arc::new(head_block),
@@ -791,6 +793,29 @@ where
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
// Calculate the weak subjectivity point in which to backfill blocks to.
let genesis_backfill_slot = if self.chain_config.genesis_backfill {
Slot::new(0)
} else {
let backfill_epoch_range = (self.spec.min_validator_withdrawability_delay
+ self.spec.churn_limit_quotient)
.as_u64()
/ 2;
match slot_clock.now() {
Some(current_slot) => {
let genesis_backfill_epoch = current_slot
.epoch(TEthSpec::slots_per_epoch())
.saturating_sub(backfill_epoch_range);
genesis_backfill_epoch.start_slot(TEthSpec::slots_per_epoch())
}
None => {
// The slot clock cannot derive the current slot. We therefore assume we are
// at or prior to genesis and backfill should sync all the way to genesis.
Slot::new(0)
}
}
};
let beacon_chain = BeaconChain {
spec: self.spec.clone(),
config: self.chain_config,
@@ -845,7 +870,11 @@ where
DEFAULT_SNAPSHOT_CACHE_SIZE,
head_for_snapshot_cache,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(shuffling_cache_size)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(
shuffling_cache_size,
head_shuffling_ids,
log.clone(),
)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),
@@ -860,6 +889,7 @@ where
graffiti: self.graffiti,
slasher: self.slasher.clone(),
validator_monitor: RwLock::new(validator_monitor),
genesis_backfill_slot,
//TODO(sean) should we move kzg solely to the da checker?
data_availability_checker: DataAvailabilityChecker::new(
slot_clock,
@@ -1036,7 +1066,7 @@ mod test {
use super::*;
use crate::test_utils::EphemeralHarnessType;
use crate::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD;
use eth2_hashing::hash;
use ethereum_hashing::hash;
use genesis::{
generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH,
};

View File

@@ -31,7 +31,9 @@
//! the head block root. This is unacceptable for fast-responding functions like the networking
//! stack.
use crate::beacon_chain::ATTESTATION_CACHE_LOCK_TIMEOUT;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::shuffling_cache::BlockShufflingIds;
use crate::{
beacon_chain::{
BeaconForkChoice, BeaconStore, OverrideForkchoiceUpdate,
@@ -850,6 +852,35 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
});
match BlockShufflingIds::try_from_head(
new_snapshot.beacon_block_root,
&new_snapshot.beacon_state,
) {
Ok(head_shuffling_ids) => {
self.shuffling_cache
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
.map(|mut shuffling_cache| {
shuffling_cache.update_head_shuffling_ids(head_shuffling_ids)
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "shuffling_cache",
"task" => "update head shuffling decision root"
);
});
}
Err(e) => {
error!(
self.log,
"Failed to get head shuffling ids";
"error" => ?e,
"head_block_root" => ?new_snapshot.beacon_block_root
);
}
}
observe_head_block_delays(
&mut self.block_times_cache.write(),
&new_head_proto_block,

View File

@@ -73,6 +73,9 @@ pub struct ChainConfig {
pub optimistic_finalized_sync: bool,
/// The size of the shuffling cache,
pub shuffling_cache_size: usize,
/// If using a weak-subjectivity sync, whether we should download blocks all the way back to
/// genesis.
pub genesis_backfill: bool,
/// Whether to send payload attributes every slot, regardless of connected proposers.
///
/// This is useful for block builders and testing.
@@ -106,6 +109,7 @@ impl Default for ChainConfig {
// This value isn't actually read except in tests.
optimistic_finalized_sync: true,
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
genesis_backfill: false,
always_prepare_payload: false,
enable_backfill_rate_limiting: true,
}

View File

@@ -1,7 +1,7 @@
use crate::metrics;
use eth1::{Config as Eth1Config, Eth1Block, Service as HttpService};
use eth2::lighthouse::Eth1SyncStatusData;
use eth2_hashing::hash;
use ethereum_hashing::hash;
use int_to_bytes::int_to_bytes32;
use slog::{debug, error, trace, Logger};
use ssz::{Decode, Encode};

View File

@@ -5,7 +5,7 @@ use slog::{info, warn, Logger};
use state_processing::state_advance::complete_state_advance;
use state_processing::{
per_block_processing, per_block_processing::BlockSignatureStrategy, ConsensusContext,
VerifyBlockRoot,
StateProcessingStrategy, VerifyBlockRoot,
};
use std::sync::Arc;
use std::time::Duration;
@@ -177,6 +177,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
&mut state,
&block,
BlockSignatureStrategy::NoVerification,
StateProcessingStrategy::Accurate,
VerifyBlockRoot::True,
&mut ctxt,
spec,

View File

@@ -193,13 +193,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
oldest_block_parent: expected_block_root,
..anchor_info
};
let backfill_complete = new_anchor.block_backfill_complete();
let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot);
self.store
.compare_and_set_anchor_info_with_write(Some(anchor_info), Some(new_anchor))?;
// If backfill has completed and the chain is configured to reconstruct historic states,
// send a message to the background migrator instructing it to begin reconstruction.
if backfill_complete && self.config.reconstruct_historic_states {
// This can only happen if we have backfilled all the way to genesis.
if backfill_complete
&& self.genesis_backfill_slot == Slot::new(0)
&& self.config.reconstruct_historic_states
{
self.store_migrator.process_reconstruction();
}

View File

@@ -876,6 +876,14 @@ lazy_static! {
"beacon_sync_committee_message_gossip_verification_seconds",
"Full runtime of sync contribution gossip verification"
);
pub static ref SYNC_MESSAGE_EQUIVOCATIONS: Result<IntCounter> = try_create_int_counter(
"sync_message_equivocations_total",
"Number of sync messages with the same validator index for different blocks"
);
pub static ref SYNC_MESSAGE_EQUIVOCATIONS_TO_HEAD: Result<IntCounter> = try_create_int_counter(
"sync_message_equivocations_to_head_total",
"Number of sync message which conflict with a previous message but elect the head"
);
/*
* Sync Committee Contribution Verification

View File

@@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::marker::PhantomData;
use types::slot_data::SlotData;
use types::{Epoch, EthSpec, Slot, Unsigned};
use types::{Epoch, EthSpec, Hash256, Slot, Unsigned};
/// The maximum capacity of the `AutoPruningEpochContainer`.
///
@@ -39,10 +39,10 @@ pub const MAX_CACHED_EPOCHS: u64 = 3;
pub type ObservedAttesters<E> = AutoPruningEpochContainer<EpochBitfield, E>;
pub type ObservedSyncContributors<E> =
AutoPruningSlotContainer<SlotSubcommitteeIndex, SyncContributorSlotHashSet<E>, E>;
AutoPruningSlotContainer<SlotSubcommitteeIndex, Hash256, SyncContributorSlotHashSet<E>, E>;
pub type ObservedAggregators<E> = AutoPruningEpochContainer<EpochHashSet, E>;
pub type ObservedSyncAggregators<E> =
AutoPruningSlotContainer<SlotSubcommitteeIndex, SyncAggregatorSlotHashSet, E>;
AutoPruningSlotContainer<SlotSubcommitteeIndex, (), SyncAggregatorSlotHashSet, E>;
#[derive(Debug, PartialEq)]
pub enum Error {
@@ -62,7 +62,7 @@ pub enum Error {
}
/// Implemented on an item in an `AutoPruningContainer`.
pub trait Item {
pub trait Item<T> {
/// Instantiate `Self` with the given `capacity`.
fn with_capacity(capacity: usize) -> Self;
@@ -75,11 +75,11 @@ pub trait Item {
/// Returns the number of validators that have been observed by `self`.
fn validator_count(&self) -> usize;
/// Store `validator_index` in `self`.
fn insert(&mut self, validator_index: usize) -> bool;
/// Store `validator_index` and `value` in `self`.
fn insert(&mut self, validator_index: usize, value: T) -> bool;
/// Returns `true` if `validator_index` has been stored in `self`.
fn contains(&self, validator_index: usize) -> bool;
/// Returns `Some(T)` if there is an entry for `validator_index`.
fn get(&self, validator_index: usize) -> Option<T>;
}
/// Stores a `BitVec` that represents which validator indices have attested or sent sync committee
@@ -88,7 +88,7 @@ pub struct EpochBitfield {
bitfield: BitVec,
}
impl Item for EpochBitfield {
impl Item<()> for EpochBitfield {
fn with_capacity(capacity: usize) -> Self {
Self {
bitfield: BitVec::with_capacity(capacity),
@@ -108,7 +108,7 @@ impl Item for EpochBitfield {
self.bitfield.iter().filter(|bit| **bit).count()
}
fn insert(&mut self, validator_index: usize) -> bool {
fn insert(&mut self, validator_index: usize, _value: ()) -> bool {
self.bitfield
.get_mut(validator_index)
.map(|mut bit| {
@@ -129,8 +129,11 @@ impl Item for EpochBitfield {
})
}
fn contains(&self, validator_index: usize) -> bool {
self.bitfield.get(validator_index).map_or(false, |bit| *bit)
fn get(&self, validator_index: usize) -> Option<()> {
self.bitfield
.get(validator_index)
.map_or(false, |bit| *bit)
.then_some(())
}
}
@@ -140,7 +143,7 @@ pub struct EpochHashSet {
set: HashSet<usize>,
}
impl Item for EpochHashSet {
impl Item<()> for EpochHashSet {
fn with_capacity(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity),
@@ -163,27 +166,27 @@ impl Item for EpochHashSet {
/// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was
/// already in the set.
fn insert(&mut self, validator_index: usize) -> bool {
fn insert(&mut self, validator_index: usize, _value: ()) -> bool {
!self.set.insert(validator_index)
}
/// Returns `true` if the `validator_index` is in the set.
fn contains(&self, validator_index: usize) -> bool {
self.set.contains(&validator_index)
fn get(&self, validator_index: usize) -> Option<()> {
self.set.contains(&validator_index).then_some(())
}
}
/// Stores a `HashSet` of which validator indices have created a sync aggregate during a
/// slot.
pub struct SyncContributorSlotHashSet<E> {
set: HashSet<usize>,
map: HashMap<usize, Hash256>,
phantom: PhantomData<E>,
}
impl<E: EthSpec> Item for SyncContributorSlotHashSet<E> {
impl<E: EthSpec> Item<Hash256> for SyncContributorSlotHashSet<E> {
fn with_capacity(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity),
map: HashMap::with_capacity(capacity),
phantom: PhantomData,
}
}
@@ -194,22 +197,24 @@ impl<E: EthSpec> Item for SyncContributorSlotHashSet<E> {
}
fn len(&self) -> usize {
self.set.len()
self.map.len()
}
fn validator_count(&self) -> usize {
self.set.len()
self.map.len()
}
/// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was
/// already in the set.
fn insert(&mut self, validator_index: usize) -> bool {
!self.set.insert(validator_index)
fn insert(&mut self, validator_index: usize, beacon_block_root: Hash256) -> bool {
self.map
.insert(validator_index, beacon_block_root)
.is_some()
}
/// Returns `true` if the `validator_index` is in the set.
fn contains(&self, validator_index: usize) -> bool {
self.set.contains(&validator_index)
fn get(&self, validator_index: usize) -> Option<Hash256> {
self.map.get(&validator_index).copied()
}
}
@@ -219,7 +224,7 @@ pub struct SyncAggregatorSlotHashSet {
set: HashSet<usize>,
}
impl Item for SyncAggregatorSlotHashSet {
impl Item<()> for SyncAggregatorSlotHashSet {
fn with_capacity(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity),
@@ -241,13 +246,13 @@ impl Item for SyncAggregatorSlotHashSet {
/// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was
/// already in the set.
fn insert(&mut self, validator_index: usize) -> bool {
fn insert(&mut self, validator_index: usize, _value: ()) -> bool {
!self.set.insert(validator_index)
}
/// Returns `true` if the `validator_index` is in the set.
fn contains(&self, validator_index: usize) -> bool {
self.set.contains(&validator_index)
fn get(&self, validator_index: usize) -> Option<()> {
self.set.contains(&validator_index).then_some(())
}
}
@@ -275,7 +280,7 @@ impl<T, E: EthSpec> Default for AutoPruningEpochContainer<T, E> {
}
}
impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
impl<T: Item<()>, E: EthSpec> AutoPruningEpochContainer<T, E> {
/// Observe that `validator_index` has produced attestation `a`. Returns `Ok(true)` if `a` has
/// previously been observed for `validator_index`.
///
@@ -293,7 +298,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
self.prune(epoch);
if let Some(item) = self.items.get_mut(&epoch) {
Ok(item.insert(validator_index))
Ok(item.insert(validator_index, ()))
} else {
// To avoid re-allocations, try and determine a rough initial capacity for the new item
// by obtaining the mean size of all items in earlier epoch.
@@ -309,7 +314,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
let initial_capacity = sum.checked_div(count).unwrap_or_else(T::default_capacity);
let mut item = T::with_capacity(initial_capacity);
item.insert(validator_index);
item.insert(validator_index, ());
self.items.insert(epoch, item);
Ok(false)
@@ -333,7 +338,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
let exists = self
.items
.get(&epoch)
.map_or(false, |item| item.contains(validator_index));
.map_or(false, |item| item.get(validator_index).is_some());
Ok(exists)
}
@@ -392,7 +397,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
pub fn index_seen_at_epoch(&self, index: usize, epoch: Epoch) -> bool {
self.items
.get(&epoch)
.map(|item| item.contains(index))
.map(|item| item.get(index).is_some())
.unwrap_or(false)
}
}
@@ -405,23 +410,63 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
/// sync contributions with an epoch prior to `data.slot - 3` will be cleared from the cache.
///
/// `V` should be set to a `SyncAggregatorSlotHashSet` or a `SyncContributorSlotHashSet`.
pub struct AutoPruningSlotContainer<K: SlotData + Eq + Hash, V, E: EthSpec> {
pub struct AutoPruningSlotContainer<K: SlotData + Eq + Hash, S, V, E: EthSpec> {
lowest_permissible_slot: Slot,
items: HashMap<K, V>,
_phantom: PhantomData<E>,
_phantom_e: PhantomData<E>,
_phantom_s: PhantomData<S>,
}
impl<K: SlotData + Eq + Hash, V, E: EthSpec> Default for AutoPruningSlotContainer<K, V, E> {
impl<K: SlotData + Eq + Hash, S, V, E: EthSpec> Default for AutoPruningSlotContainer<K, S, V, E> {
fn default() -> Self {
Self {
lowest_permissible_slot: Slot::new(0),
items: HashMap::new(),
_phantom: PhantomData,
_phantom_e: PhantomData,
_phantom_s: PhantomData,
}
}
}
impl<K: SlotData + Eq + Hash, V: Item, E: EthSpec> AutoPruningSlotContainer<K, V, E> {
impl<K: SlotData + Eq + Hash + Copy, S, V: Item<S>, E: EthSpec>
AutoPruningSlotContainer<K, S, V, E>
{
/// Observes the given `value` for the given `validator_index`.
///
/// The `override_observation` function is supplied `previous_observation`
/// and `value`. If it returns `true`, then any existing observation will be
/// overridden.
///
/// This function returns `Some` if:
/// - An observation already existed for the validator, AND,
/// - The `override_observation` function returned `false`.
///
/// Alternatively, it returns `None` if:
/// - An observation did not already exist for the given validator, OR,
/// - The `override_observation` function returned `true`.
pub fn observe_validator_with_override<F>(
&mut self,
key: K,
validator_index: usize,
value: S,
override_observation: F,
) -> Result<Option<S>, Error>
where
F: Fn(&S, &S) -> bool,
{
if let Some(prev_observation) = self.observation_for_validator(key, validator_index)? {
if override_observation(&prev_observation, &value) {
self.observe_validator(key, validator_index, value)?;
Ok(None)
} else {
Ok(Some(prev_observation))
}
} else {
self.observe_validator(key, validator_index, value)?;
Ok(None)
}
}
/// Observe that `validator_index` has produced a sync committee message. Returns `Ok(true)` if
/// the sync committee message has previously been observed for `validator_index`.
///
@@ -429,14 +474,19 @@ impl<K: SlotData + Eq + Hash, V: Item, E: EthSpec> AutoPruningSlotContainer<K, V
///
/// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`.
/// - `key.slot` is earlier than `self.lowest_permissible_slot`.
pub fn observe_validator(&mut self, key: K, validator_index: usize) -> Result<bool, Error> {
pub fn observe_validator(
&mut self,
key: K,
validator_index: usize,
value: S,
) -> Result<bool, Error> {
let slot = key.get_slot();
self.sanitize_request(slot, validator_index)?;
self.prune(slot);
if let Some(item) = self.items.get_mut(&key) {
Ok(item.insert(validator_index))
Ok(item.insert(validator_index, value))
} else {
// To avoid re-allocations, try and determine a rough initial capacity for the new item
// by obtaining the mean size of all items in earlier slot.
@@ -452,32 +502,45 @@ impl<K: SlotData + Eq + Hash, V: Item, E: EthSpec> AutoPruningSlotContainer<K, V
let initial_capacity = sum.checked_div(count).unwrap_or_else(V::default_capacity);
let mut item = V::with_capacity(initial_capacity);
item.insert(validator_index);
item.insert(validator_index, value);
self.items.insert(key, item);
Ok(false)
}
}
/// Returns `Ok(true)` if the `validator_index` has already produced a conflicting sync committee message.
///
/// ## Errors
///
/// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`.
/// - `key.slot` is earlier than `self.lowest_permissible_slot`.
// Identical to `Self::observation_for_validator` but discards the
// observation, simply returning `true` if the validator has been observed
// at all.
pub fn validator_has_been_observed(
&self,
key: K,
validator_index: usize,
) -> Result<bool, Error> {
self.observation_for_validator(key, validator_index)
.map(|observation| observation.is_some())
}
/// Returns `Ok(Some)` if the `validator_index` has already produced a
/// conflicting sync committee message.
///
/// ## Errors
///
/// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`.
/// - `key.slot` is earlier than `self.lowest_permissible_slot`.
pub fn observation_for_validator(
&self,
key: K,
validator_index: usize,
) -> Result<Option<S>, Error> {
self.sanitize_request(key.get_slot(), validator_index)?;
let exists = self
let observation = self
.items
.get(&key)
.map_or(false, |item| item.contains(validator_index));
.and_then(|item| item.get(validator_index));
Ok(exists)
Ok(observation)
}
/// Returns the number of validators that have been observed at the given `slot`. Returns
@@ -561,6 +624,116 @@ mod tests {
type E = types::MainnetEthSpec;
#[test]
fn value_storage() {
type Container = AutoPruningSlotContainer<Slot, Hash256, SyncContributorSlotHashSet<E>, E>;
let mut store: Container = <_>::default();
let key = Slot::new(0);
let validator_index = 0;
let value = Hash256::zero();
// Assert there is no entry.
assert!(store
.observation_for_validator(key, validator_index)
.unwrap()
.is_none());
assert!(!store
.validator_has_been_observed(key, validator_index)
.unwrap());
// Add an entry.
assert!(!store
.observe_validator(key, validator_index, value)
.unwrap());
// Assert there is a correct entry.
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
Some(value)
);
assert!(store
.validator_has_been_observed(key, validator_index)
.unwrap());
let alternate_value = Hash256::from_low_u64_be(1);
// Assert that override false does not override.
assert_eq!(
store
.observe_validator_with_override(key, validator_index, alternate_value, |_, _| {
false
})
.unwrap(),
Some(value)
);
// Assert that override true overrides and acts as if there was never an
// entry there.
assert_eq!(
store
.observe_validator_with_override(key, validator_index, alternate_value, |_, _| {
true
})
.unwrap(),
None
);
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
Some(alternate_value)
);
// Reset the store.
let mut store: Container = <_>::default();
// Asset that a new entry with override = false is inserted
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
None
);
assert_eq!(
store
.observe_validator_with_override(key, validator_index, value, |_, _| { false })
.unwrap(),
None,
);
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
Some(value)
);
// Reset the store.
let mut store: Container = <_>::default();
// Asset that a new entry with override = true is inserted
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
None
);
assert_eq!(
store
.observe_validator_with_override(key, validator_index, value, |_, _| { true })
.unwrap(),
None,
);
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
Some(value)
);
}
macro_rules! test_suite_epoch {
($mod_name: ident, $type: ident) => {
#[cfg(test)]
@@ -722,7 +895,7 @@ mod tests {
test_suite_epoch!(observed_aggregators, ObservedAggregators);
macro_rules! test_suite_slot {
($mod_name: ident, $type: ident) => {
($mod_name: ident, $type: ident, $value: expr) => {
#[cfg(test)]
mod $mod_name {
use super::*;
@@ -737,7 +910,7 @@ mod tests {
"should indicate an unknown item is unknown"
);
assert_eq!(
store.observe_validator(key, i),
store.observe_validator(key, i, $value),
Ok(false),
"should observe new item"
);
@@ -750,7 +923,7 @@ mod tests {
"should indicate a known item is known"
);
assert_eq!(
store.observe_validator(key, i),
store.observe_validator(key, i, $value),
Ok(true),
"should acknowledge an existing item"
);
@@ -997,6 +1170,10 @@ mod tests {
}
};
}
test_suite_slot!(observed_sync_contributors, ObservedSyncContributors);
test_suite_slot!(observed_sync_aggregators, ObservedSyncAggregators);
test_suite_slot!(
observed_sync_contributors,
ObservedSyncContributors,
Hash256::zero()
);
test_suite_slot!(observed_sync_aggregators, ObservedSyncAggregators, ());
}

View File

@@ -1,17 +1,41 @@
use crate::beacon_fork_choice_store::PersistedForkChoiceStoreV11;
use crate::beacon_fork_choice_store::{PersistedForkChoiceStoreV11, PersistedForkChoiceStoreV17};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use store::{DBColumn, Error, StoreItem};
use superstruct::superstruct;
// If adding a new version you should update this type alias and fix the breakages.
pub type PersistedForkChoice = PersistedForkChoiceV11;
pub type PersistedForkChoice = PersistedForkChoiceV17;
#[superstruct(variants(V11), variant_attributes(derive(Encode, Decode)), no_enum)]
#[superstruct(
variants(V11, V17),
variant_attributes(derive(Encode, Decode)),
no_enum
)]
pub struct PersistedForkChoice {
pub fork_choice: fork_choice::PersistedForkChoice,
#[superstruct(only(V11))]
pub fork_choice_store: PersistedForkChoiceStoreV11,
#[superstruct(only(V17))]
pub fork_choice_store: PersistedForkChoiceStoreV17,
}
impl Into<PersistedForkChoice> for PersistedForkChoiceV11 {
fn into(self) -> PersistedForkChoice {
PersistedForkChoice {
fork_choice: self.fork_choice,
fork_choice_store: self.fork_choice_store.into(),
}
}
}
impl Into<PersistedForkChoiceV11> for PersistedForkChoice {
fn into(self) -> PersistedForkChoiceV11 {
PersistedForkChoiceV11 {
fork_choice: self.fork_choice,
fork_choice_store: self.fork_choice_store.into(),
}
}
}
macro_rules! impl_store_item {
@@ -33,3 +57,4 @@ macro_rules! impl_store_item {
}
impl_store_item!(PersistedForkChoiceV11);
impl_store_item!(PersistedForkChoiceV17);

View File

@@ -4,6 +4,7 @@ mod migration_schema_v13;
mod migration_schema_v14;
mod migration_schema_v15;
mod migration_schema_v16;
mod migration_schema_v17;
use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY};
use crate::eth1_chain::SszEth1;
@@ -141,6 +142,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v16::downgrade_from_v16::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(16), SchemaVersion(17)) => {
let ops = migration_schema_v17::upgrade_to_v17::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(17), SchemaVersion(16)) => {
let ops = migration_schema_v17::downgrade_from_v17::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,

View File

@@ -0,0 +1,88 @@
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY};
use crate::persisted_fork_choice::{PersistedForkChoiceV11, PersistedForkChoiceV17};
use proto_array::core::{SszContainerV16, SszContainerV17};
use slog::{debug, Logger};
use ssz::{Decode, Encode};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
pub fn upgrade_fork_choice(
mut fork_choice: PersistedForkChoiceV11,
) -> Result<PersistedForkChoiceV17, Error> {
let ssz_container_v16 = SszContainerV16::from_ssz_bytes(
&fork_choice.fork_choice.proto_array_bytes,
)
.map_err(|e| {
Error::SchemaMigrationError(format!(
"Failed to decode ProtoArrayForkChoice during schema migration: {:?}",
e
))
})?;
let ssz_container_v17: SszContainerV17 = ssz_container_v16.try_into().map_err(|e| {
Error::SchemaMigrationError(format!(
"Missing checkpoint during schema migration: {:?}",
e
))
})?;
fork_choice.fork_choice.proto_array_bytes = ssz_container_v17.as_ssz_bytes();
Ok(fork_choice.into())
}
pub fn downgrade_fork_choice(
mut fork_choice: PersistedForkChoiceV17,
) -> Result<PersistedForkChoiceV11, Error> {
let ssz_container_v17 = SszContainerV17::from_ssz_bytes(
&fork_choice.fork_choice.proto_array_bytes,
)
.map_err(|e| {
Error::SchemaMigrationError(format!(
"Failed to decode ProtoArrayForkChoice during schema migration: {:?}",
e
))
})?;
let ssz_container_v16: SszContainerV16 = ssz_container_v17.into();
fork_choice.fork_choice.proto_array_bytes = ssz_container_v16.as_ssz_bytes();
Ok(fork_choice.into())
}
pub fn upgrade_to_v17<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Get persisted_fork_choice.
let v11 = db
.get_item::<PersistedForkChoiceV11>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
let v17 = upgrade_fork_choice(v11)?;
debug!(
log,
"Removing unused best_justified_checkpoint from fork choice store."
);
Ok(vec![v17.as_kv_store_op(FORK_CHOICE_DB_KEY)])
}
pub fn downgrade_from_v17<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Get persisted_fork_choice.
let v17 = db
.get_item::<PersistedForkChoiceV17>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
let v11 = downgrade_fork_choice(v17)?;
debug!(
log,
"Adding junk best_justified_checkpoint to fork choice store."
);
Ok(vec![v11.as_kv_store_op(FORK_CHOICE_DB_KEY)])
}

View File

@@ -1,10 +1,18 @@
use crate::{metrics, BeaconChainError};
use lru::LruCache;
use oneshot_broadcast::{oneshot, Receiver, Sender};
use std::collections::HashMap;
use std::sync::Arc;
use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256};
/// The size of the LRU cache that stores committee caches for quicker verification.
use itertools::Itertools;
use slog::{debug, Logger};
use oneshot_broadcast::{oneshot, Receiver, Sender};
use types::{
beacon_state::CommitteeCache, AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256,
RelativeEpoch,
};
use crate::{metrics, BeaconChainError};
/// The size of the cache that stores committee caches for quicker verification.
///
/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash +
/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this
@@ -45,18 +53,24 @@ impl CacheItem {
}
}
/// Provides an LRU cache for `CommitteeCache`.
/// Provides a cache for `CommitteeCache`.
///
/// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like
/// a find/replace error.
pub struct ShufflingCache {
cache: LruCache<AttestationShufflingId, CacheItem>,
cache: HashMap<AttestationShufflingId, CacheItem>,
cache_size: usize,
head_shuffling_ids: BlockShufflingIds,
logger: Logger,
}
impl ShufflingCache {
pub fn new(cache_size: usize) -> Self {
pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds, logger: Logger) -> Self {
Self {
cache: LruCache::new(cache_size),
cache: HashMap::new(),
cache_size,
head_shuffling_ids,
logger,
}
}
@@ -76,7 +90,7 @@ impl ShufflingCache {
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS);
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
let ready = CacheItem::Committee(committee);
self.cache.put(key.clone(), ready.clone());
self.insert_cache_item(key.clone(), ready.clone());
Some(ready)
}
// The promise has not yet been resolved. Return the promise so the caller can await
@@ -93,13 +107,12 @@ impl ShufflingCache {
// It's worth noting that this is the only place where we removed unresolved
// promises from the cache. This means unresolved promises will only be removed if
// we try to access them again. This is OK, since the promises don't consume much
// memory and the nature of the LRU cache means that future, relevant entries will
// still be added to the cache. We expect that *all* promises should be resolved,
// unless there is a programming or database error.
// memory. We expect that *all* promises should be resolved, unless there is a
// programming or database error.
Err(oneshot_broadcast::Error::SenderDropped) => {
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS);
metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES);
self.cache.pop(key);
self.cache.remove(key);
None
}
},
@@ -112,13 +125,13 @@ impl ShufflingCache {
}
pub fn contains(&self, key: &AttestationShufflingId) -> bool {
self.cache.contains(key)
self.cache.contains_key(key)
}
pub fn insert_committee_cache<T: ToArcCommitteeCache>(
pub fn insert_committee_cache<C: ToArcCommitteeCache>(
&mut self,
key: AttestationShufflingId,
committee_cache: &T,
committee_cache: &C,
) {
if self
.cache
@@ -127,13 +140,55 @@ impl ShufflingCache {
// worth two in the promise-bush!
.map_or(true, CacheItem::is_promise)
{
self.cache.put(
self.insert_cache_item(
key,
CacheItem::Committee(committee_cache.to_arc_committee_cache()),
);
}
}
/// Prunes the cache first before inserting a new cache item.
fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) {
self.prune_cache();
self.cache.insert(key, cache_item);
}
/// Prunes the `cache` to keep the size below the `cache_size` limit, based on the following
/// preferences:
/// - Entries from more recent epochs are preferred over older ones.
/// - Entries with shuffling ids matching the head's previous, current, and future epochs must
/// not be pruned.
fn prune_cache(&mut self) {
let target_cache_size = self.cache_size.saturating_sub(1);
if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) {
let shuffling_ids_to_prune = self
.cache
.keys()
.sorted_by_key(|key| key.shuffling_epoch)
.filter(|shuffling_id| {
Some(shuffling_id)
!= self
.head_shuffling_ids
.id_for_epoch(shuffling_id.shuffling_epoch)
.as_ref()
.as_ref()
})
.take(prune_count)
.cloned()
.collect::<Vec<_>>();
for shuffling_id in shuffling_ids_to_prune.iter() {
debug!(
self.logger,
"Removing old shuffling from cache";
"shuffling_epoch" => shuffling_id.shuffling_epoch,
"shuffling_decision_block" => ?shuffling_id.shuffling_decision_block
);
self.cache.remove(shuffling_id);
}
}
}
pub fn create_promise(
&mut self,
key: AttestationShufflingId,
@@ -148,9 +203,17 @@ impl ShufflingCache {
}
let (sender, receiver) = oneshot();
self.cache.put(key, CacheItem::Promise(receiver));
self.insert_cache_item(key, CacheItem::Promise(receiver));
Ok(sender)
}
/// Inform the cache that the shuffling decision roots for the head has changed.
///
/// The shufflings for the head's previous, current, and future epochs will never be ejected from
/// the cache during `Self::insert_cache_item`.
pub fn update_head_shuffling_ids(&mut self, head_shuffling_ids: BlockShufflingIds) {
self.head_shuffling_ids = head_shuffling_ids;
}
}
/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache.
@@ -170,26 +233,29 @@ impl ToArcCommitteeCache for Arc<CommitteeCache> {
}
}
impl Default for ShufflingCache {
fn default() -> Self {
Self::new(DEFAULT_CACHE_SIZE)
}
}
/// Contains the shuffling IDs for a beacon block.
#[derive(Clone)]
pub struct BlockShufflingIds {
pub current: AttestationShufflingId,
pub next: AttestationShufflingId,
pub previous: Option<AttestationShufflingId>,
pub block_root: Hash256,
}
impl BlockShufflingIds {
/// Returns the shuffling ID for the given epoch.
///
/// Returns `None` if `epoch` is prior to `self.current.shuffling_epoch`.
/// Returns `None` if `epoch` is prior to `self.previous?.shuffling_epoch` or
/// `self.current.shuffling_epoch` (if `previous` is `None`).
pub fn id_for_epoch(&self, epoch: Epoch) -> Option<AttestationShufflingId> {
if epoch == self.current.shuffling_epoch {
Some(self.current.clone())
} else if self
.previous
.as_ref()
.map_or(false, |id| id.shuffling_epoch == epoch)
{
self.previous.clone()
} else if epoch == self.next.shuffling_epoch {
Some(self.next.clone())
} else if epoch > self.next.shuffling_epoch {
@@ -201,18 +267,57 @@ impl BlockShufflingIds {
None
}
}
pub fn try_from_head<T: EthSpec>(
head_block_root: Hash256,
head_state: &BeaconState<T>,
) -> Result<Self, String> {
let get_shuffling_id = |relative_epoch| {
AttestationShufflingId::new(head_block_root, head_state, relative_epoch).map_err(|e| {
format!(
"Unable to get attester shuffling decision slot for the epoch {:?}: {:?}",
relative_epoch, e
)
})
};
Ok(Self {
current: get_shuffling_id(RelativeEpoch::Current)?,
next: get_shuffling_id(RelativeEpoch::Next)?,
previous: Some(get_shuffling_id(RelativeEpoch::Previous)?),
block_root: head_block_root,
})
}
}
// Disable tests in debug since the beacon chain harness is slow unless in release.
#[cfg(not(debug_assertions))]
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::EphemeralHarnessType;
use task_executor::test_utils::null_logger;
use types::*;
type BeaconChainHarness =
crate::test_utils::BeaconChainHarness<EphemeralHarnessType<MinimalEthSpec>>;
use crate::test_utils::EphemeralHarnessType;
use super::*;
type E = MinimalEthSpec;
type TestBeaconChainType = EphemeralHarnessType<E>;
type BeaconChainHarness = crate::test_utils::BeaconChainHarness<TestBeaconChainType>;
const TEST_CACHE_SIZE: usize = 5;
// Creates a new shuffling cache for testing
fn new_shuffling_cache() -> ShufflingCache {
let current_epoch = 8;
let head_shuffling_ids = BlockShufflingIds {
current: shuffling_id(current_epoch),
next: shuffling_id(current_epoch + 1),
previous: Some(shuffling_id(current_epoch - 1)),
block_root: Hash256::from_low_u64_le(0),
};
let logger = null_logger().unwrap();
ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids, logger)
}
/// Returns two different committee caches for testing.
fn committee_caches() -> (Arc<CommitteeCache>, Arc<CommitteeCache>) {
@@ -249,7 +354,7 @@ mod test {
fn resolved_promise() {
let (committee_a, _) = committee_caches();
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::default();
let mut cache = new_shuffling_cache();
// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
@@ -276,7 +381,7 @@ mod test {
#[test]
fn unresolved_promise() {
let id_a = shuffling_id(1);
let mut cache = ShufflingCache::default();
let mut cache = new_shuffling_cache();
// Create a promise.
let sender = cache.create_promise(id_a.clone()).unwrap();
@@ -301,7 +406,7 @@ mod test {
fn two_promises() {
let (committee_a, committee_b) = committee_caches();
let (id_a, id_b) = (shuffling_id(1), shuffling_id(2));
let mut cache = ShufflingCache::default();
let mut cache = new_shuffling_cache();
// Create promise A.
let sender_a = cache.create_promise(id_a.clone()).unwrap();
@@ -355,7 +460,7 @@ mod test {
#[test]
fn too_many_promises() {
let mut cache = ShufflingCache::default();
let mut cache = new_shuffling_cache();
for i in 0..MAX_CONCURRENT_PROMISES {
cache.create_promise(shuffling_id(i as u64)).unwrap();
@@ -375,4 +480,105 @@ mod test {
"the cache should have two entries"
);
}
#[test]
fn should_insert_committee_cache() {
let mut cache = new_shuffling_cache();
let id_a = shuffling_id(1);
let committee_cache_a = Arc::new(CommitteeCache::default());
cache.insert_committee_cache(id_a.clone(), &committee_cache_a);
assert!(
matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a),
"should insert committee cache"
);
}
#[test]
fn should_prune_committee_cache_with_lowest_epoch() {
let mut cache = new_shuffling_cache();
let shuffling_id_and_committee_caches = (0..(TEST_CACHE_SIZE + 1))
.map(|i| (shuffling_id(i as u64), Arc::new(CommitteeCache::default())))
.collect::<Vec<_>>();
for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() {
cache.insert_committee_cache(shuffling_id.clone(), committee_cache);
}
for i in 1..(TEST_CACHE_SIZE + 1) {
assert!(
cache.contains(&shuffling_id_and_committee_caches.get(i).unwrap().0),
"should contain recent epoch shuffling ids"
);
}
assert!(
!cache.contains(&shuffling_id_and_committee_caches.get(0).unwrap().0),
"should not contain oldest epoch shuffling id"
);
assert_eq!(
cache.cache.len(),
cache.cache_size,
"should limit cache size"
);
}
#[test]
fn should_retain_head_state_shufflings() {
let mut cache = new_shuffling_cache();
let current_epoch = 10;
let committee_cache = Arc::new(CommitteeCache::default());
// Insert a few entries for next the epoch with different decision roots.
for i in 0..TEST_CACHE_SIZE {
let shuffling_id = AttestationShufflingId {
shuffling_epoch: (current_epoch + 1).into(),
shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64),
};
cache.insert_committee_cache(shuffling_id, &committee_cache);
}
// Now, update the head shuffling ids
let head_shuffling_ids = BlockShufflingIds {
current: shuffling_id(current_epoch),
next: shuffling_id(current_epoch + 1),
previous: Some(shuffling_id(current_epoch - 1)),
block_root: Hash256::from_low_u64_le(42),
};
cache.update_head_shuffling_ids(head_shuffling_ids.clone());
// Insert head state shuffling ids. Should not be overridden by other shuffling ids.
cache.insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache);
cache.insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache);
cache.insert_committee_cache(
head_shuffling_ids.previous.clone().unwrap(),
&committee_cache,
);
// Insert a few entries for older epochs.
for i in 0..TEST_CACHE_SIZE {
let shuffling_id = AttestationShufflingId {
shuffling_epoch: Epoch::from(i),
shuffling_decision_block: Hash256::from_low_u64_be(i as u64),
};
cache.insert_committee_cache(shuffling_id, &committee_cache);
}
assert!(
cache.contains(&head_shuffling_ids.current),
"should retain head shuffling id for the current epoch."
);
assert!(
cache.contains(&head_shuffling_ids.next),
"should retain head shuffling id for the next epoch."
);
assert!(
cache.contains(&head_shuffling_ids.previous.unwrap()),
"should retain head shuffling id for previous epoch."
);
assert_eq!(
cache.cache.len(),
cache.cache_size,
"should limit cache size"
);
}
}

View File

@@ -153,7 +153,21 @@ pub enum Error {
/// It's unclear if this sync message is valid, however we have already observed a
/// signature from this validator for this slot and should not observe
/// another.
PriorSyncCommitteeMessageKnown { validator_index: u64, slot: Slot },
PriorSyncCommitteeMessageKnown {
validator_index: u64,
slot: Slot,
prev_root: Hash256,
new_root: Hash256,
},
/// We have already observed a contribution for the aggregator and refuse to
/// process another.
///
/// ## Peer scoring
///
/// It's unclear if this sync message is valid, however we have already observed a
/// signature from this validator for this slot and should not observe
/// another.
PriorSyncContributionMessageKnown { validator_index: u64, slot: Slot },
/// The sync committee message was received on an invalid sync committee message subnet.
///
/// ## Peer scoring
@@ -378,10 +392,10 @@ impl<T: BeaconChainTypes> VerifiedSyncContribution<T> {
if chain
.observed_sync_aggregators
.write()
.observe_validator(observed_key, aggregator_index as usize)
.observe_validator(observed_key, aggregator_index as usize, ())
.map_err(BeaconChainError::from)?
{
return Err(Error::PriorSyncCommitteeMessageKnown {
return Err(Error::PriorSyncContributionMessageKnown {
validator_index: aggregator_index,
slot: contribution.slot,
});
@@ -450,19 +464,40 @@ impl VerifiedSyncCommitteeMessage {
// The sync committee message is the first valid message received for the participating validator
// for the slot, sync_message.slot.
let validator_index = sync_message.validator_index;
if chain
let head_root = chain.canonical_head.cached_head().head_block_root();
let new_root = sync_message.beacon_block_root;
let should_override_prev = |prev_root: &Hash256, new_root: &Hash256| {
let roots_differ = new_root != prev_root;
let new_elects_head = new_root == &head_root;
if roots_differ {
// Track sync committee messages that differ from each other.
metrics::inc_counter(&metrics::SYNC_MESSAGE_EQUIVOCATIONS);
if new_elects_head {
// Track sync committee messages that swap from an old block to a new block.
metrics::inc_counter(&metrics::SYNC_MESSAGE_EQUIVOCATIONS_TO_HEAD);
}
}
roots_differ && new_elects_head
};
if let Some(prev_root) = chain
.observed_sync_contributors
.read()
.validator_has_been_observed(
.observation_for_validator(
SlotSubcommitteeIndex::new(sync_message.slot, subnet_id.into()),
validator_index as usize,
)
.map_err(BeaconChainError::from)?
{
return Err(Error::PriorSyncCommitteeMessageKnown {
validator_index,
slot: sync_message.slot,
});
if !should_override_prev(&prev_root, &new_root) {
return Err(Error::PriorSyncCommitteeMessageKnown {
validator_index,
slot: sync_message.slot,
prev_root,
new_root,
});
}
}
// The aggregate signature of the sync committee message is valid.
@@ -474,18 +509,22 @@ impl VerifiedSyncCommitteeMessage {
// It's important to double check that the sync committee message still hasn't been observed, since
// there can be a race-condition if we receive two sync committee messages at the same time and
// process them in different threads.
if chain
if let Some(prev_root) = chain
.observed_sync_contributors
.write()
.observe_validator(
.observe_validator_with_override(
SlotSubcommitteeIndex::new(sync_message.slot, subnet_id.into()),
validator_index as usize,
sync_message.beacon_block_root,
should_override_prev,
)
.map_err(BeaconChainError::from)?
{
return Err(Error::PriorSyncCommitteeMessageKnown {
validator_index,
slot: sync_message.slot,
prev_root,
new_root,
});
}

View File

@@ -1,4 +1,5 @@
use crate::blob_verification::{AsBlock, BlockWrapper};
use crate::observed_operations::ObservationOutcome;
pub use crate::persisted_beacon_chain::PersistedBeaconChain;
pub use crate::{
beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY},
@@ -29,6 +30,7 @@ pub use genesis::{interop_genesis_state_with_eth1, DEFAULT_ETH1_BLOCK_HASH};
use int_to_bytes::int_to_bytes32;
use kzg::{Kzg, TrustedSetup};
use merkle_proof::MerkleTree;
use operation_pool::ReceivedPreCapella;
use parking_lot::Mutex;
use parking_lot::RwLockWriteGuard;
use rand::rngs::StdRng;
@@ -43,7 +45,7 @@ use slot_clock::{SlotClock, TestingSlotClock};
use state_processing::per_block_processing::compute_timestamp_at_slot;
use state_processing::{
state_advance::{complete_state_advance, partial_state_advance},
StateRootStrategy,
StateProcessingStrategy,
};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
@@ -67,7 +69,7 @@ const FORK_NAME_ENV_VAR: &str = "FORK_NAME";
//
// You should mutate the `ChainSpec` prior to initialising the harness if you would like to use
// a different value.
pub const DEFAULT_TARGET_AGGREGATORS: u64 = u64::max_value();
pub const DEFAULT_TARGET_AGGREGATORS: u64 = u64::MAX;
pub type BaseHarnessType<TEthSpec, THotStore, TColdStore> =
Witness<TestingSlotClock, CachingEth1Backend<TEthSpec>, TEthSpec, THotStore, TColdStore>;
@@ -88,7 +90,7 @@ pub type AddBlocksResult<E> = (
BeaconState<E>,
);
/// Deprecated: Indicates how the `BeaconChainHarness` should produce blocks.
/// Indicates how the `BeaconChainHarness` should produce blocks.
#[derive(Clone, Copy, Debug)]
pub enum BlockStrategy {
/// Produce blocks upon the canonical head (normal case).
@@ -104,7 +106,7 @@ pub enum BlockStrategy {
},
}
/// Deprecated: Indicates how the `BeaconChainHarness` should produce attestations.
/// Indicates how the `BeaconChainHarness` should produce attestations.
#[derive(Clone, Debug)]
pub enum AttestationStrategy {
/// All validators attest to whichever block the `BeaconChainHarness` has produced.
@@ -744,7 +746,7 @@ where
pub fn get_hot_state(&self, state_hash: BeaconStateHash) -> Option<BeaconState<E>> {
self.chain
.store
.load_hot_state(&state_hash.into(), StateRootStrategy::Accurate)
.load_hot_state(&state_hash.into(), StateProcessingStrategy::Accurate)
.unwrap()
}
@@ -767,6 +769,7 @@ where
state.get_block_root(slot).unwrap() == state.get_block_root(slot - 1).unwrap()
}
/// Returns a newly created block, signed by the proposer for the given slot.
pub async fn make_block(
&self,
mut state: BeaconState<E>,
@@ -1003,31 +1006,31 @@ where
head_block_root: SignedBeaconBlockHash,
attestation_slot: Slot,
) -> Vec<CommitteeAttestations<E>> {
self.make_unaggregated_attestations_with_limit(
let fork = self
.spec
.fork_at_epoch(attestation_slot.epoch(E::slots_per_epoch()));
self.make_unaggregated_attestations_with_opts(
attesting_validators,
state,
state_root,
head_block_root,
attestation_slot,
None,
MakeAttestationOptions { limit: None, fork },
)
.0
}
pub fn make_unaggregated_attestations_with_limit(
pub fn make_unaggregated_attestations_with_opts(
&self,
attesting_validators: &[usize],
state: &BeaconState<E>,
state_root: Hash256,
head_block_root: SignedBeaconBlockHash,
attestation_slot: Slot,
limit: Option<usize>,
opts: MakeAttestationOptions,
) -> (Vec<CommitteeAttestations<E>>, Vec<usize>) {
let MakeAttestationOptions { limit, fork } = opts;
let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap();
let fork = self
.spec
.fork_at_epoch(attestation_slot.epoch(E::slots_per_epoch()));
let attesters = Mutex::new(vec![]);
let attestations = state
@@ -1160,8 +1163,6 @@ where
.collect()
}
/// Deprecated: Use make_unaggregated_attestations() instead.
///
/// A list of attestations for each committee for the given slot.
///
/// The first layer of the Vec is organised per committee. For example, if the return value is
@@ -1219,16 +1220,35 @@ where
slot: Slot,
limit: Option<usize>,
) -> (HarnessAttestations<E>, Vec<usize>) {
let (unaggregated_attestations, attesters) = self
.make_unaggregated_attestations_with_limit(
attesting_validators,
state,
state_root,
block_hash,
slot,
limit,
);
let fork = self.spec.fork_at_epoch(slot.epoch(E::slots_per_epoch()));
self.make_attestations_with_opts(
attesting_validators,
state,
state_root,
block_hash,
slot,
MakeAttestationOptions { limit, fork },
)
}
pub fn make_attestations_with_opts(
&self,
attesting_validators: &[usize],
state: &BeaconState<E>,
state_root: Hash256,
block_hash: SignedBeaconBlockHash,
slot: Slot,
opts: MakeAttestationOptions,
) -> (HarnessAttestations<E>, Vec<usize>) {
let MakeAttestationOptions { fork, .. } = opts;
let (unaggregated_attestations, attesters) = self.make_unaggregated_attestations_with_opts(
attesting_validators,
state,
state_root,
block_hash,
slot,
opts,
);
let aggregated_attestations: Vec<Option<SignedAggregateAndProof<E>>> =
unaggregated_attestations
@@ -1560,6 +1580,26 @@ where
.sign(sk, &fork, genesis_validators_root, &self.chain.spec)
}
pub fn add_bls_to_execution_change(
&self,
validator_index: u64,
address: Address,
) -> Result<(), String> {
let signed_bls_change = self.make_bls_to_execution_change(validator_index, address);
if let ObservationOutcome::New(verified_bls_change) = self
.chain
.verify_bls_to_execution_change_for_gossip(signed_bls_change)
.expect("should verify BLS to execution change for gossip")
{
self.chain
.import_bls_to_execution_change(verified_bls_change, ReceivedPreCapella::No)
.then_some(())
.ok_or("should import BLS to execution change to the op pool".to_string())
} else {
Err("should observe new BLS to execution change".to_string())
}
}
pub fn make_bls_to_execution_change(
&self,
validator_index: u64,
@@ -2077,9 +2117,6 @@ where
.collect()
}
/// Deprecated: Do not modify the slot clock manually; rely on add_attested_blocks_at_slots()
/// instead
///
/// Advance the slot of the `BeaconChain`.
///
/// Does not produce blocks or attestations.
@@ -2140,8 +2177,6 @@ where
.await
}
/// Deprecated: Use add_attested_blocks_at_slots() instead
///
/// Extend the `BeaconChain` with some blocks and attestations. Returns the root of the
/// last-produced block (the head of the chain).
///
@@ -2296,6 +2331,13 @@ impl<T: BeaconChainTypes> fmt::Debug for BeaconChainHarness<T> {
}
}
pub struct MakeAttestationOptions {
/// Produce exactly `limit` attestations.
pub limit: Option<usize>,
/// Fork to use for signing attestations.
pub fork: Fork,
}
pub fn build_log(level: slog::Level, enabled: bool) -> Logger {
let decorator = TermDecorator::new().build();
let drain = FullFormat::new(decorator).build().fuse();

View File

@@ -199,6 +199,7 @@ pub struct ValidatorMetrics {
pub attestation_head_misses: u64,
pub attestation_target_hits: u64,
pub attestation_target_misses: u64,
pub latest_attestation_inclusion_distance: u64,
}
impl ValidatorMetrics {
@@ -225,6 +226,10 @@ impl ValidatorMetrics {
pub fn increment_head_misses(&mut self) {
self.attestation_head_misses += 1;
}
pub fn set_latest_inclusion_distance(&mut self, distance: u64) {
self.latest_attestation_inclusion_distance = distance;
}
}
/// A validator that is being monitored by the `ValidatorMonitor`.
@@ -568,7 +573,6 @@ impl<T: EthSpec> ValidatorMonitor<T> {
} else {
validator_metrics.increment_misses()
}
drop(validator_metrics);
// Indicates if any attestation made it on-chain.
//
@@ -693,8 +697,10 @@ impl<T: EthSpec> ValidatorMonitor<T> {
&[id],
inclusion_delay as i64,
);
validator_metrics.set_latest_inclusion_distance(inclusion_delay);
}
}
drop(validator_metrics);
// Indicates the number of sync committee signatures that made it into
// a sync aggregate in the current_epoch (state.epoch - 1).

View File

@@ -1,5 +1,9 @@
#![cfg(not(debug_assertions))]
use beacon_chain::attestation_verification::{
batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations, Error,
};
use beacon_chain::test_utils::{MakeAttestationOptions, HARNESS_GENESIS_TIME};
use beacon_chain::{
attestation_verification::Error as AttnError,
test_utils::{
@@ -7,6 +11,7 @@ use beacon_chain::{
},
BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped,
};
use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
use int_to_bytes::int_to_bytes32;
use lazy_static::lazy_static;
use state_processing::{
@@ -14,9 +19,9 @@ use state_processing::{
};
use tree_hash::TreeHash;
use types::{
test_utils::generate_deterministic_keypair, AggregateSignature, Attestation, BeaconStateError,
BitList, Epoch, EthSpec, Hash256, Keypair, MainnetEthSpec, SecretKey, SelectionProof,
SignedAggregateAndProof, Slot, SubnetId, Unsigned,
test_utils::generate_deterministic_keypair, Address, AggregateSignature, Attestation,
BeaconStateError, BitList, ChainSpec, Epoch, EthSpec, ForkName, Hash256, Keypair,
MainnetEthSpec, SecretKey, SelectionProof, SignedAggregateAndProof, Slot, SubnetId, Unsigned,
};
pub type E = MainnetEthSpec;
@@ -25,6 +30,8 @@ pub type E = MainnetEthSpec;
/// have committees where _some_ validators are aggregators but not _all_.
pub const VALIDATOR_COUNT: usize = 256;
pub const CAPELLA_FORK_EPOCH: usize = 1;
lazy_static! {
/// A cached set of keys.
static ref KEYPAIRS: Vec<Keypair> = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
@@ -50,6 +57,50 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessTyp
harness
}
/// Returns a beacon chain harness with Capella fork enabled at epoch 1, and
/// all genesis validators start with BLS withdrawal credentials.
fn get_harness_capella_spec(
validator_count: usize,
) -> (BeaconChainHarness<EphemeralHarnessType<E>>, ChainSpec) {
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
spec.capella_fork_epoch = Some(Epoch::new(CAPELLA_FORK_EPOCH as u64));
let validator_keypairs = KEYPAIRS[0..validator_count].to_vec();
let genesis_state = interop_genesis_state(
&validator_keypairs,
HARNESS_GENESIS_TIME,
Hash256::from_slice(DEFAULT_ETH1_BLOCK_HASH),
None,
&spec,
)
.unwrap();
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec.clone())
.keypairs(validator_keypairs)
.withdrawal_keypairs(
KEYPAIRS[0..validator_count]
.iter()
.cloned()
.map(Some)
.collect(),
)
.genesis_state_ephemeral_store(genesis_state)
.mock_execution_layer()
.build();
harness
.execution_block_generator()
.move_to_terminal_block()
.unwrap();
harness.advance_slot();
(harness, spec)
}
/// Returns an attestation that is valid for some slot in the given `chain`.
///
/// Also returns some info about who created it.
@@ -998,6 +1049,100 @@ async fn attestation_that_skips_epochs() {
.expect("should gossip verify attestation that skips slots");
}
/// Ensures that an attestation can be processed when a validator receives proposer reward
/// in an epoch _and_ is scheduled for a withdrawal. This is a regression test for a scenario where
/// inconsistent state lookup could cause withdrawal root mismatch.
#[tokio::test]
async fn attestation_validator_receive_proposer_reward_and_withdrawals() {
let (harness, _) = get_harness_capella_spec(VALIDATOR_COUNT);
// Advance to a Capella block. Make sure the blocks have attestations.
let two_thirds = (VALIDATOR_COUNT / 3) * 2;
let attesters = (0..two_thirds).collect();
harness
.extend_chain(
// To trigger the bug we need the proposer attestation reward to be signed at a block
// that isn't the first in the epoch.
MainnetEthSpec::slots_per_epoch() as usize + 1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::SomeValidators(attesters),
)
.await;
// Add BLS change for the block proposer at slot 33. This sets up a withdrawal for the block proposer.
let proposer_index = harness
.chain
.block_at_slot(harness.get_current_slot(), WhenSlotSkipped::None)
.expect("should not error getting block at slot")
.expect("should find block at slot")
.message()
.proposer_index();
harness
.add_bls_to_execution_change(proposer_index, Address::from_low_u64_be(proposer_index))
.unwrap();
// Apply two blocks: one to process the BLS change, and another to process the withdrawal.
harness.advance_slot();
harness
.extend_chain(
2,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::SomeValidators(vec![]),
)
.await;
let earlier_slot = harness.get_current_slot();
let earlier_block = harness
.chain
.block_at_slot(earlier_slot, WhenSlotSkipped::None)
.expect("should not error getting block at slot")
.expect("should find block at slot");
// Extend the chain out a few epochs so we have some chain depth to play with.
harness.advance_slot();
harness
.extend_chain(
MainnetEthSpec::slots_per_epoch() as usize * 2,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::SomeValidators(vec![]),
)
.await;
let current_slot = harness.get_current_slot();
let mut state = harness
.chain
.get_state(&earlier_block.state_root(), Some(earlier_slot))
.expect("should not error getting state")
.expect("should find state");
while state.slot() < current_slot {
per_slot_processing(&mut state, None, &harness.spec).expect("should process slot");
}
let state_root = state.update_tree_hash_cache().unwrap();
// Get an attestation pointed to an old block (where we do not have its shuffling cached).
// Verifying the attestation triggers an inconsistent state replay.
let remaining_attesters = (two_thirds..VALIDATOR_COUNT).collect();
let (attestation, subnet_id) = harness
.get_unaggregated_attestations(
&AttestationStrategy::SomeValidators(remaining_attesters),
&state,
state_root,
earlier_block.canonical_root(),
current_slot,
)
.first()
.expect("should have at least one committee")
.first()
.cloned()
.expect("should have at least one attestation in committee");
harness
.chain
.verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id))
.expect("should gossip verify attestation without checking withdrawals root");
}
#[tokio::test]
async fn attestation_to_finalized_block() {
let harness = get_harness(VALIDATOR_COUNT);
@@ -1189,3 +1334,198 @@ async fn verify_attestation_for_gossip_doppelganger_detection() {
.validator_has_been_observed(epoch, index)
.expect("should check if gossip aggregator was observed"));
}
#[tokio::test]
async fn attestation_verification_use_head_state_fork() {
let (harness, spec) = get_harness_capella_spec(VALIDATOR_COUNT);
// Advance to last block of the pre-Capella fork epoch. Capella is at slot 32.
harness
.extend_chain(
MainnetEthSpec::slots_per_epoch() as usize * CAPELLA_FORK_EPOCH - 1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::SomeValidators(vec![]),
)
.await;
// Assert our head is a block at slot 31 in the pre-Capella fork epoch.
let pre_capella_slot = harness.get_current_slot();
let pre_capella_block = harness
.chain
.block_at_slot(pre_capella_slot, WhenSlotSkipped::Prev)
.expect("should not error getting block at slot")
.expect("should find block at slot");
assert_eq!(pre_capella_block.fork_name(&spec).unwrap(), ForkName::Merge);
// Advance slot clock to Capella fork.
harness.advance_slot();
let first_capella_slot = harness.get_current_slot();
assert_eq!(
spec.fork_name_at_slot::<E>(first_capella_slot),
ForkName::Capella
);
let (state, state_root) = harness.get_current_state_and_root();
// Scenario 1: other node signed attestation using the Capella fork epoch.
{
let attesters = (0..VALIDATOR_COUNT / 2).collect::<Vec<_>>();
let capella_fork = spec.fork_for_name(ForkName::Capella).unwrap();
let committee_attestations = harness
.make_unaggregated_attestations_with_opts(
attesters.as_slice(),
&state,
state_root,
pre_capella_block.canonical_root().into(),
first_capella_slot,
MakeAttestationOptions {
fork: capella_fork,
limit: None,
},
)
.0
.first()
.cloned()
.expect("should have at least one committee");
let attestations_and_subnets = committee_attestations
.iter()
.map(|(attestation, subnet_id)| (attestation, Some(*subnet_id)));
assert!(
batch_verify_unaggregated_attestations(attestations_and_subnets, &harness.chain).is_ok(),
"should accept attestations with `data.slot` >= first capella slot signed using the Capella fork"
);
}
// Scenario 2: other node forgot to update their node and signed attestations using bellatrix fork
{
let attesters = (VALIDATOR_COUNT / 2..VALIDATOR_COUNT).collect::<Vec<_>>();
let merge_fork = spec.fork_for_name(ForkName::Merge).unwrap();
let committee_attestations = harness
.make_unaggregated_attestations_with_opts(
attesters.as_slice(),
&state,
state_root,
pre_capella_block.canonical_root().into(),
first_capella_slot,
MakeAttestationOptions {
fork: merge_fork,
limit: None,
},
)
.0
.first()
.cloned()
.expect("should have at least one committee");
let attestations_and_subnets = committee_attestations
.iter()
.map(|(attestation, subnet_id)| (attestation, Some(*subnet_id)));
let results =
batch_verify_unaggregated_attestations(attestations_and_subnets, &harness.chain)
.expect("should return attestation results");
let error = results
.into_iter()
.collect::<Result<Vec<_>, _>>()
.err()
.expect("should return an error");
assert!(
matches!(error, Error::InvalidSignature),
"should reject attestations with `data.slot` >= first capella slot signed using the pre-Capella fork"
);
}
}
#[tokio::test]
async fn aggregated_attestation_verification_use_head_state_fork() {
let (harness, spec) = get_harness_capella_spec(VALIDATOR_COUNT);
// Advance to last block of the pre-Capella fork epoch. Capella is at slot 32.
harness
.extend_chain(
MainnetEthSpec::slots_per_epoch() as usize * CAPELLA_FORK_EPOCH - 1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::SomeValidators(vec![]),
)
.await;
// Assert our head is a block at slot 31 in the pre-Capella fork epoch.
let pre_capella_slot = harness.get_current_slot();
let pre_capella_block = harness
.chain
.block_at_slot(pre_capella_slot, WhenSlotSkipped::Prev)
.expect("should not error getting block at slot")
.expect("should find block at slot");
assert_eq!(pre_capella_block.fork_name(&spec).unwrap(), ForkName::Merge);
// Advance slot clock to Capella fork.
harness.advance_slot();
let first_capella_slot = harness.get_current_slot();
assert_eq!(
spec.fork_name_at_slot::<E>(first_capella_slot),
ForkName::Capella
);
let (state, state_root) = harness.get_current_state_and_root();
// Scenario 1: other node signed attestation using the Capella fork epoch.
{
let attesters = (0..VALIDATOR_COUNT / 2).collect::<Vec<_>>();
let capella_fork = spec.fork_for_name(ForkName::Capella).unwrap();
let aggregates = harness
.make_attestations_with_opts(
attesters.as_slice(),
&state,
state_root,
pre_capella_block.canonical_root().into(),
first_capella_slot,
MakeAttestationOptions {
fork: capella_fork,
limit: None,
},
)
.0
.into_iter()
.map(|(_, aggregate)| aggregate.expect("should have signed aggregate and proof"))
.collect::<Vec<_>>();
assert!(
batch_verify_aggregated_attestations(aggregates.iter(), &harness.chain).is_ok(),
"should accept aggregates with `data.slot` >= first capella slot signed using the Capella fork"
);
}
// Scenario 2: other node forgot to update their node and signed attestations using bellatrix fork
{
let attesters = (VALIDATOR_COUNT / 2..VALIDATOR_COUNT).collect::<Vec<_>>();
let merge_fork = spec.fork_for_name(ForkName::Merge).unwrap();
let aggregates = harness
.make_attestations_with_opts(
attesters.as_slice(),
&state,
state_root,
pre_capella_block.canonical_root().into(),
first_capella_slot,
MakeAttestationOptions {
fork: merge_fork,
limit: None,
},
)
.0
.into_iter()
.map(|(_, aggregate)| aggregate.expect("should have signed aggregate and proof"))
.collect::<Vec<_>>();
let results = batch_verify_aggregated_attestations(aggregates.iter(), &harness.chain)
.expect("should return attestation results");
let error = results
.into_iter()
.collect::<Result<Vec<_>, _>>()
.err()
.expect("should return an error");
assert!(
matches!(error, Error::InvalidSignature),
"should reject aggregates with `data.slot` >= first capella slot signed using the pre-Capella fork"
);
}
}

View File

@@ -13,7 +13,8 @@ use slasher::{Config as SlasherConfig, Slasher};
use state_processing::{
common::get_indexed_attestation,
per_block_processing::{per_block_processing, BlockSignatureStrategy},
per_slot_processing, BlockProcessingError, ConsensusContext, VerifyBlockRoot,
per_slot_processing, BlockProcessingError, ConsensusContext, StateProcessingStrategy,
VerifyBlockRoot,
};
use std::marker::PhantomData;
use std::sync::Arc;
@@ -1191,6 +1192,7 @@ async fn add_base_block_to_altair_chain() {
&mut state,
&base_block,
BlockSignatureStrategy::NoVerification,
StateProcessingStrategy::Accurate,
VerifyBlockRoot::True,
&mut ctxt,
&harness.chain.spec,
@@ -1329,6 +1331,7 @@ async fn add_altair_block_to_base_chain() {
&mut state,
&altair_block,
BlockSignatureStrategy::NoVerification,
StateProcessingStrategy::Accurate,
VerifyBlockRoot::True,
&mut ctxt,
&harness.chain.spec,

View File

@@ -916,6 +916,9 @@ async fn invalid_after_optimistic_sync() {
.await,
);
// EL status should still be online, no errors.
assert!(!rig.execution_layer().is_offline_or_erroring().await);
// Running fork choice is necessary since a block has been invalidated.
rig.recompute_head().await;

View File

@@ -5,12 +5,16 @@ use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType, Relativ
use int_to_bytes::int_to_bytes32;
use lazy_static::lazy_static;
use safe_arith::SafeArith;
use state_processing::{
per_block_processing::{altair::sync_committee::process_sync_aggregate, VerifySignatures},
state_advance::complete_state_advance,
};
use store::{SignedContributionAndProof, SyncCommitteeMessage};
use tree_hash::TreeHash;
use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT;
use types::{
AggregateSignature, Epoch, EthSpec, Hash256, Keypair, MainnetEthSpec, SecretKey, Slot,
SyncSelectionProof, SyncSubnetId, Unsigned,
SyncContributionData, SyncSelectionProof, SyncSubnetId, Unsigned,
};
pub type E = MainnetEthSpec;
@@ -47,10 +51,29 @@ fn get_valid_sync_committee_message(
relative_sync_committee: RelativeSyncCommittee,
message_index: usize,
) -> (SyncCommitteeMessage, usize, SecretKey, SyncSubnetId) {
let head_state = harness.chain.head_beacon_state_cloned();
let head_block_root = harness.chain.head_snapshot().beacon_block_root;
get_valid_sync_committee_message_for_block(
harness,
slot,
relative_sync_committee,
message_index,
head_block_root,
)
}
/// Returns a sync message that is valid for some slot in the given `chain`.
///
/// Also returns some info about who created it.
fn get_valid_sync_committee_message_for_block(
harness: &BeaconChainHarness<EphemeralHarnessType<E>>,
slot: Slot,
relative_sync_committee: RelativeSyncCommittee,
message_index: usize,
block_root: Hash256,
) -> (SyncCommitteeMessage, usize, SecretKey, SyncSubnetId) {
let head_state = harness.chain.head_beacon_state_cloned();
let (signature, _) = harness
.make_sync_committee_messages(&head_state, head_block_root, slot, relative_sync_committee)
.make_sync_committee_messages(&head_state, block_root, slot, relative_sync_committee)
.get(0)
.expect("sync messages should exist")
.get(message_index)
@@ -119,7 +142,7 @@ fn get_non_aggregator(
subcommittee.iter().find_map(|pubkey| {
let validator_index = harness
.chain
.validator_index(&pubkey)
.validator_index(pubkey)
.expect("should get validator index")
.expect("pubkey should exist in beacon chain");
@@ -376,7 +399,7 @@ async fn aggregated_gossip_verification() {
SyncCommitteeError::AggregatorNotInCommittee {
aggregator_index
}
if aggregator_index == valid_aggregate.message.aggregator_index as u64
if aggregator_index == valid_aggregate.message.aggregator_index
);
/*
@@ -472,7 +495,7 @@ async fn aggregated_gossip_verification() {
assert_invalid!(
"sync contribution created with incorrect sync committee",
next_valid_contribution.clone(),
next_valid_contribution,
SyncCommitteeError::InvalidSignature | SyncCommitteeError::AggregatorNotInCommittee { .. }
);
}
@@ -496,6 +519,30 @@ async fn unaggregated_gossip_verification() {
let (valid_sync_committee_message, expected_validator_index, validator_sk, subnet_id) =
get_valid_sync_committee_message(&harness, current_slot, RelativeSyncCommittee::Current, 0);
let parent_root = harness.chain.head_snapshot().beacon_block.parent_root();
let (valid_sync_committee_message_to_parent, _, _, _) =
get_valid_sync_committee_message_for_block(
&harness,
current_slot,
RelativeSyncCommittee::Current,
0,
parent_root,
);
assert_eq!(
valid_sync_committee_message.slot, valid_sync_committee_message_to_parent.slot,
"test pre-condition: same slot"
);
assert_eq!(
valid_sync_committee_message.validator_index,
valid_sync_committee_message_to_parent.validator_index,
"test pre-condition: same validator index"
);
assert!(
valid_sync_committee_message.beacon_block_root
!= valid_sync_committee_message_to_parent.beacon_block_root,
"test pre-condition: differing roots"
);
macro_rules! assert_invalid {
($desc: tt, $attn_getter: expr, $subnet_getter: expr, $($error: pat_param) |+ $( if $guard: expr )?) => {
@@ -602,28 +649,130 @@ async fn unaggregated_gossip_verification() {
SyncCommitteeError::InvalidSignature
);
let head_root = valid_sync_committee_message.beacon_block_root;
let parent_root = valid_sync_committee_message_to_parent.beacon_block_root;
let verifed_message_to_parent = harness
.chain
.verify_sync_committee_message_for_gossip(
valid_sync_committee_message_to_parent.clone(),
subnet_id,
)
.expect("valid sync message to parent should be verified");
// Add the aggregate to the pool.
harness
.chain
.verify_sync_committee_message_for_gossip(valid_sync_committee_message.clone(), subnet_id)
.expect("valid sync message should be verified");
.add_to_naive_sync_aggregation_pool(verifed_message_to_parent)
.unwrap();
/*
* The following test ensures that:
*
* There has been no other valid sync committee message for the declared slot for the
* validator referenced by sync_committee_message.validator_index.
* A sync committee message from the same validator to the same block will
* be rejected.
*/
assert_invalid!(
"sync message that has already been seen",
valid_sync_committee_message,
"sync message to parent block that has already been seen",
valid_sync_committee_message_to_parent.clone(),
subnet_id,
SyncCommitteeError::PriorSyncCommitteeMessageKnown {
validator_index,
slot,
prev_root,
new_root
}
if validator_index == expected_validator_index as u64 && slot == current_slot
if validator_index == expected_validator_index as u64 && slot == current_slot && prev_root == parent_root && new_root == parent_root
);
let verified_message_to_head = harness
.chain
.verify_sync_committee_message_for_gossip(valid_sync_committee_message.clone(), subnet_id)
.expect("valid sync message to the head should be verified");
// Add the aggregate to the pool.
harness
.chain
.add_to_naive_sync_aggregation_pool(verified_message_to_head)
.unwrap();
/*
* The following test ensures that:
*
* A sync committee message from the same validator to the same block will
* be rejected.
*/
assert_invalid!(
"sync message to the head that has already been seen",
valid_sync_committee_message.clone(),
subnet_id,
SyncCommitteeError::PriorSyncCommitteeMessageKnown {
validator_index,
slot,
prev_root,
new_root
}
if validator_index == expected_validator_index as u64 && slot == current_slot && prev_root == head_root && new_root == head_root
);
/*
* The following test ensures that:
*
* A sync committee message from the same validator to a non-head block will
* be rejected.
*/
assert_invalid!(
"sync message to parent after message to head has already been seen",
valid_sync_committee_message_to_parent.clone(),
subnet_id,
SyncCommitteeError::PriorSyncCommitteeMessageKnown {
validator_index,
slot,
prev_root,
new_root
}
if validator_index == expected_validator_index as u64 && slot == current_slot && prev_root == head_root && new_root == parent_root
);
// Ensure that the sync aggregates in the op pool for both the parent block and head block are valid.
let chain = &harness.chain;
let check_sync_aggregate = |root: Hash256| async move {
// Generate an aggregate sync message from the naive aggregation pool.
let aggregate = chain
.get_aggregated_sync_committee_contribution(&SyncContributionData {
// It's a test pre-condition that both sync messages have the same slot.
slot: valid_sync_committee_message.slot,
beacon_block_root: root,
subcommittee_index: subnet_id.into(),
})
.unwrap()
.unwrap();
// Insert the aggregate into the op pool.
chain.op_pool.insert_sync_contribution(aggregate).unwrap();
// Load the block and state for the given root.
let block = chain.get_block(&root).await.unwrap().unwrap();
let mut state = chain.get_state(&block.state_root(), None).unwrap().unwrap();
// Advance the state to simulate a pre-state for block production.
let slot = valid_sync_committee_message.slot + 1;
complete_state_advance(&mut state, Some(block.state_root()), slot, &chain.spec).unwrap();
// Get an aggregate that would be included in a block.
let aggregate_for_inclusion = chain.op_pool.get_sync_aggregate(&state).unwrap().unwrap();
// Validate the retrieved aggregate against the state.
process_sync_aggregate(
&mut state,
&aggregate_for_inclusion,
0,
VerifySignatures::True,
&chain.spec,
)
.unwrap();
};
check_sync_aggregate(valid_sync_committee_message.beacon_block_root).await;
check_sync_aggregate(valid_sync_committee_message_to_parent.beacon_block_root).await;
/*
* The following test ensures that:
*
@@ -649,7 +798,7 @@ async fn unaggregated_gossip_verification() {
assert_invalid!(
"sync message on incorrect subnet",
next_valid_sync_committee_message.clone(),
next_valid_sync_committee_message,
next_subnet_id,
SyncCommitteeError::InvalidSubnetId {
received,