Broadcast address changes at Capella (#3919)

* Add first efforts at broadcast

* Tidy

* Move broadcast code to client

* Progress with broadcast impl

* Rename to address change

* Fix compile errors

* Use `while` loop

* Tidy

* Flip broadcast condition

* Switch to forgetting individual indices

* Always broadcast when the node starts

* Refactor into two functions

* Add testing

* Add another test

* Tidy, add more testing

* Tidy

* Add test, rename enum

* Rename enum again

* Tidy

* Break loop early

* Add V15 schema migration

* Bump schema version

* Progress with migration

* Update beacon_node/client/src/address_change_broadcast.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Fix typo in function name

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>
This commit is contained in:
Paul Hauner
2023-02-07 17:13:49 +11:00
committed by GitHub
parent 2073518f0f
commit e062a7cf76
21 changed files with 649 additions and 40 deletions

View File

@@ -19,6 +19,7 @@ serde = "1.0.116"
serde_derive = "1.0.116"
store = { path = "../store" }
bitvec = "1"
rand = "0.8.5"
[dev-dependencies]
beacon_chain = { path = "../beacon_chain" }

View File

@@ -1,11 +1,20 @@
use state_processing::SigVerifiedOp;
use std::collections::{hash_map::Entry, HashMap};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::sync::Arc;
use types::{
AbstractExecPayload, BeaconState, ChainSpec, EthSpec, SignedBeaconBlock,
SignedBlsToExecutionChange,
};
/// Indicates if a `BlsToExecutionChange` was received before or after the
/// Capella fork. This is used to know which messages we should broadcast at the
/// Capella fork epoch.
#[derive(Copy, Clone)]
pub enum ReceivedPreCapella {
Yes,
No,
}
/// Pool of BLS to execution changes that maintains a LIFO queue and an index by validator.
///
/// Using the LIFO queue for block production disincentivises spam on P2P at the Capella fork,
@@ -16,6 +25,9 @@ pub struct BlsToExecutionChanges<T: EthSpec> {
by_validator_index: HashMap<u64, Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>>,
/// Last-in-first-out (LIFO) queue of verified messages.
queue: Vec<Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>>,
/// Contains a set of validator indices which need to have their changes
/// broadcast at the capella epoch.
received_pre_capella_indices: HashSet<u64>,
}
impl<T: EthSpec> BlsToExecutionChanges<T> {
@@ -31,16 +43,18 @@ impl<T: EthSpec> BlsToExecutionChanges<T> {
pub fn insert(
&mut self,
verified_change: SigVerifiedOp<SignedBlsToExecutionChange, T>,
received_pre_capella: ReceivedPreCapella,
) -> bool {
let validator_index = verified_change.as_inner().message.validator_index;
// Wrap in an `Arc` once on insert.
let verified_change = Arc::new(verified_change);
match self
.by_validator_index
.entry(verified_change.as_inner().message.validator_index)
{
match self.by_validator_index.entry(validator_index) {
Entry::Vacant(entry) => {
self.queue.push(verified_change.clone());
entry.insert(verified_change);
if matches!(received_pre_capella, ReceivedPreCapella::Yes) {
self.received_pre_capella_indices.insert(validator_index);
}
true
}
Entry::Occupied(_) => false,
@@ -61,6 +75,24 @@ impl<T: EthSpec> BlsToExecutionChanges<T> {
self.queue.iter().rev()
}
/// Returns only those which are flagged for broadcasting at the Capella
/// fork. Uses FIFO ordering, although we expect this list to be shuffled by
/// the caller.
pub fn iter_received_pre_capella(
&self,
) -> impl Iterator<Item = &Arc<SigVerifiedOp<SignedBlsToExecutionChange, T>>> {
self.queue.iter().filter(|address_change| {
self.received_pre_capella_indices
.contains(&address_change.as_inner().message.validator_index)
})
}
/// Returns the set of indicies which should have their address changes
/// broadcast at the Capella fork.
pub fn iter_pre_capella_indices(&self) -> impl Iterator<Item = &u64> {
self.received_pre_capella_indices.iter()
}
/// Prune BLS to execution changes that have been applied to the state more than 1 block ago.
///
/// The block check is necessary to avoid pruning too eagerly and losing the ability to include
@@ -102,4 +134,14 @@ impl<T: EthSpec> BlsToExecutionChanges<T> {
self.by_validator_index.remove(&validator_index);
}
}
/// Removes `broadcasted` validators from the set of validators that should
/// have their BLS changes broadcast at the Capella fork boundary.
pub fn register_indices_broadcasted_at_capella(&mut self, broadcasted: &HashSet<u64>) {
self.received_pre_capella_indices = self
.received_pre_capella_indices
.difference(broadcasted)
.copied()
.collect();
}
}

View File

@@ -9,12 +9,13 @@ mod persistence;
mod reward_cache;
mod sync_aggregate_id;
pub use crate::bls_to_execution_changes::ReceivedPreCapella;
pub use attestation::AttMaxCover;
pub use attestation_storage::{AttestationRef, SplitAttestation};
pub use max_cover::MaxCover;
pub use persistence::{
PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14,
PersistedOperationPoolV5,
PersistedOperationPoolV15, PersistedOperationPoolV5,
};
pub use reward_cache::RewardCache;
@@ -24,6 +25,8 @@ use crate::sync_aggregate_id::SyncAggregateId;
use attester_slashing::AttesterSlashingMaxCover;
use max_cover::maximum_cover;
use parking_lot::{RwLock, RwLockWriteGuard};
use rand::seq::SliceRandom;
use rand::thread_rng;
use state_processing::per_block_processing::errors::AttestationValidationError;
use state_processing::per_block_processing::{
get_slashable_indices_modular, verify_exit, VerifySignatures,
@@ -533,10 +536,11 @@ impl<T: EthSpec> OperationPool<T> {
pub fn insert_bls_to_execution_change(
&self,
verified_change: SigVerifiedOp<SignedBlsToExecutionChange, T>,
received_pre_capella: ReceivedPreCapella,
) -> bool {
self.bls_to_execution_changes
.write()
.insert(verified_change)
.insert(verified_change, received_pre_capella)
}
/// Get a list of execution changes for inclusion in a block.
@@ -562,6 +566,42 @@ impl<T: EthSpec> OperationPool<T> {
)
}
/// Get a list of execution changes to be broadcast at the Capella fork.
///
/// The list that is returned will be shuffled to help provide a fair
/// broadcast of messages.
pub fn get_bls_to_execution_changes_received_pre_capella(
&self,
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Vec<SignedBlsToExecutionChange> {
let mut changes = filter_limit_operations(
self.bls_to_execution_changes
.read()
.iter_received_pre_capella(),
|address_change| {
address_change.signature_is_still_valid(&state.fork())
&& state
.get_validator(address_change.as_inner().message.validator_index as usize)
.map_or(false, |validator| {
!validator.has_eth1_withdrawal_credential(spec)
})
},
|address_change| address_change.as_inner().clone(),
usize::max_value(),
);
changes.shuffle(&mut thread_rng());
changes
}
/// Removes `broadcasted` validators from the set of validators that should
/// have their BLS changes broadcast at the Capella fork boundary.
pub fn register_indices_broadcasted_at_capella(&self, broadcasted: &HashSet<u64>) {
self.bls_to_execution_changes
.write()
.register_indices_broadcasted_at_capella(broadcasted);
}
/// Prune BLS to execution changes that have been applied to the state more than 1 block ago.
pub fn prune_bls_to_execution_changes<Payload: AbstractExecPayload<T>>(
&self,

View File

@@ -1,6 +1,6 @@
use crate::attestation_id::AttestationId;
use crate::attestation_storage::AttestationMap;
use crate::bls_to_execution_changes::BlsToExecutionChanges;
use crate::bls_to_execution_changes::{BlsToExecutionChanges, ReceivedPreCapella};
use crate::sync_aggregate_id::SyncAggregateId;
use crate::OpPoolError;
use crate::OperationPool;
@@ -9,6 +9,8 @@ use parking_lot::RwLock;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::SigVerifiedOp;
use std::collections::HashSet;
use std::mem;
use store::{DBColumn, Error as StoreError, StoreItem};
use types::*;
@@ -19,7 +21,7 @@ type PersistedSyncContributions<T> = Vec<(SyncAggregateId, Vec<SyncCommitteeCont
/// Operations are stored in arbitrary order, so it's not a good idea to compare instances
/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first.
#[superstruct(
variants(V5, V12, V14),
variants(V5, V12, V14, V15),
variant_attributes(
derive(Derivative, PartialEq, Debug, Encode, Decode),
derivative(Clone),
@@ -33,7 +35,7 @@ pub struct PersistedOperationPool<T: EthSpec> {
#[superstruct(only(V5))]
pub attestations_v5: Vec<(AttestationId, Vec<Attestation<T>>)>,
/// Attestations and their attesting indices.
#[superstruct(only(V12, V14))]
#[superstruct(only(V12, V14, V15))]
pub attestations: Vec<(Attestation<T>, Vec<u64>)>,
/// Mapping from sync contribution ID to sync contributions and aggregate.
pub sync_contributions: PersistedSyncContributions<T>,
@@ -41,23 +43,27 @@ pub struct PersistedOperationPool<T: EthSpec> {
#[superstruct(only(V5))]
pub attester_slashings_v5: Vec<(AttesterSlashing<T>, ForkVersion)>,
/// Attester slashings.
#[superstruct(only(V12, V14))]
#[superstruct(only(V12, V14, V15))]
pub attester_slashings: Vec<SigVerifiedOp<AttesterSlashing<T>, T>>,
/// [DEPRECATED] Proposer slashings.
#[superstruct(only(V5))]
pub proposer_slashings_v5: Vec<ProposerSlashing>,
/// Proposer slashings with fork information.
#[superstruct(only(V12, V14))]
#[superstruct(only(V12, V14, V15))]
pub proposer_slashings: Vec<SigVerifiedOp<ProposerSlashing, T>>,
/// [DEPRECATED] Voluntary exits.
#[superstruct(only(V5))]
pub voluntary_exits_v5: Vec<SignedVoluntaryExit>,
/// Voluntary exits with fork information.
#[superstruct(only(V12, V14))]
#[superstruct(only(V12, V14, V15))]
pub voluntary_exits: Vec<SigVerifiedOp<SignedVoluntaryExit, T>>,
/// BLS to Execution Changes
#[superstruct(only(V14))]
#[superstruct(only(V14, V15))]
pub bls_to_execution_changes: Vec<SigVerifiedOp<SignedBlsToExecutionChange, T>>,
/// Validator indices with BLS to Execution Changes to be broadcast at the
/// Capella fork.
#[superstruct(only(V15))]
pub capella_bls_change_broadcast_indices: Vec<u64>,
}
impl<T: EthSpec> PersistedOperationPool<T> {
@@ -110,18 +116,26 @@ impl<T: EthSpec> PersistedOperationPool<T> {
.map(|bls_to_execution_change| (**bls_to_execution_change).clone())
.collect();
PersistedOperationPool::V14(PersistedOperationPoolV14 {
let capella_bls_change_broadcast_indices = operation_pool
.bls_to_execution_changes
.read()
.iter_pre_capella_indices()
.copied()
.collect();
PersistedOperationPool::V15(PersistedOperationPoolV15 {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
})
}
/// Reconstruct an `OperationPool`.
pub fn into_operation_pool(self) -> Result<OperationPool<T>, OpPoolError> {
pub fn into_operation_pool(mut self) -> Result<OperationPool<T>, OpPoolError> {
let attester_slashings = RwLock::new(self.attester_slashings()?.iter().cloned().collect());
let proposer_slashings = RwLock::new(
self.proposer_slashings()?
@@ -142,33 +156,43 @@ impl<T: EthSpec> PersistedOperationPool<T> {
PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => {
return Err(OpPoolError::IncorrectOpPoolVariant)
}
PersistedOperationPool::V14(ref pool) => {
PersistedOperationPool::V14(_) | PersistedOperationPool::V15(_) => {
let mut map = AttestationMap::default();
for (att, attesting_indices) in pool.attestations.clone() {
for (att, attesting_indices) in self.attestations()?.clone() {
map.insert(att, attesting_indices);
}
RwLock::new(map)
}
};
let bls_to_execution_changes = match self {
PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => {
return Err(OpPoolError::IncorrectOpPoolVariant)
let mut bls_to_execution_changes = BlsToExecutionChanges::default();
if let Ok(persisted_changes) = self.bls_to_execution_changes_mut() {
let persisted_changes = mem::take(persisted_changes);
let broadcast_indices =
if let Ok(indices) = self.capella_bls_change_broadcast_indices_mut() {
mem::take(indices).into_iter().collect()
} else {
HashSet::new()
};
for bls_to_execution_change in persisted_changes {
let received_pre_capella = if broadcast_indices
.contains(&bls_to_execution_change.as_inner().message.validator_index)
{
ReceivedPreCapella::Yes
} else {
ReceivedPreCapella::No
};
bls_to_execution_changes.insert(bls_to_execution_change, received_pre_capella);
}
PersistedOperationPool::V14(pool) => {
let mut bls_to_execution_changes = BlsToExecutionChanges::default();
for bls_to_execution_change in pool.bls_to_execution_changes {
bls_to_execution_changes.insert(bls_to_execution_change);
}
RwLock::new(bls_to_execution_changes)
}
};
}
let op_pool = OperationPool {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
bls_to_execution_changes: RwLock::new(bls_to_execution_changes),
reward_cache: Default::default(),
_phantom: Default::default(),
};
@@ -204,6 +228,20 @@ impl<T: EthSpec> StoreItem for PersistedOperationPoolV12<T> {
}
}
impl<T: EthSpec> StoreItem for PersistedOperationPoolV14<T> {
fn db_column() -> DBColumn {
DBColumn::OpPool
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
PersistedOperationPoolV14::from_ssz_bytes(bytes).map_err(Into::into)
}
}
/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V12`.
impl<T: EthSpec> StoreItem for PersistedOperationPool<T> {
fn db_column() -> DBColumn {
@@ -216,8 +254,8 @@ impl<T: EthSpec> StoreItem for PersistedOperationPool<T> {
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
// Default deserialization to the latest variant.
PersistedOperationPoolV14::from_ssz_bytes(bytes)
.map(Self::V14)
PersistedOperationPoolV15::from_ssz_bytes(bytes)
.map(Self::V15)
.map_err(Into::into)
}
}