mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 05:14:35 +00:00
Refactor op pool for speed and correctness (#3312)
## Proposed Changes
This PR has two aims: to speed up attestation packing in the op pool, and to fix bugs in the verification of attester slashings, proposer slashings and voluntary exits. The changes are bundled into a single database schema upgrade (v12).
Attestation packing is sped up by removing several inefficiencies:
- No more recalculation of `attesting_indices` during packing.
- No (unnecessary) examination of the `ParticipationFlags`: a bitfield suffices. See `RewardCache`.
- No re-checking of attestation validity during packing: the `AttestationMap` provides attestations which are "correct by construction" (I have checked this using Hydra).
- No SSZ re-serialization for the clunky `AttestationId` type (it can be removed in a future release).
So far the speed-up seems to be roughly 2-10x, from 500ms down to 50-100ms.
Verification of attester slashings, proposer slashings and voluntary exits is fixed by:
- Tracking the `ForkVersion`s that were used to verify each message inside the `SigVerifiedOp`. This allows us to quickly re-verify that they match the head state's opinion of what the `ForkVersion` should be at the epoch(s) relevant to the message.
- Storing the `SigVerifiedOp` on disk rather than the raw operation. This allows us to continue track the fork versions after a reboot.
This is mostly contained in this commit 52bb1840ae.
## Additional Info
The schema upgrade uses the justified state to re-verify attestations and compute `attesting_indices` for them. It will drop any attestations that fail to verify, by the logic that attestations are most valuable in the few slots after they're observed, and are probably stale and useless by the time a node restarts. Exits and proposer slashings and similarly re-verified to obtain `SigVerifiedOp`s.
This PR contains a runtime killswitch `--paranoid-block-proposal` which opts out of all the optimisations in favour of closely verifying every included message. Although I'm quite sure that the optimisations are correct this flag could be useful in the event of an unforeseen emergency.
Finally, you might notice that the `RewardCache` appears quite useless in its current form because it is only updated on the hot-path immediately before proposal. My hope is that in future we can shift calls to `RewardCache::update` into the background, e.g. while performing the state advance. It is also forward-looking to `tree-states` compatibility, where iterating and indexing `state.{previous,current}_epoch_participation` is expensive and needs to be minimised.
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
use crate::attestation_storage::AttestationRef;
|
||||
use crate::max_cover::MaxCover;
|
||||
use crate::reward_cache::RewardCache;
|
||||
use state_processing::common::{
|
||||
altair, base, get_attestation_participation_flag_indices, get_attesting_indices,
|
||||
};
|
||||
@@ -12,34 +14,35 @@ use types::{
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AttMaxCover<'a, T: EthSpec> {
|
||||
/// Underlying attestation.
|
||||
pub att: &'a Attestation<T>,
|
||||
pub att: AttestationRef<'a, T>,
|
||||
/// Mapping of validator indices and their rewards.
|
||||
pub fresh_validators_rewards: HashMap<u64, u64>,
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec> AttMaxCover<'a, T> {
|
||||
pub fn new(
|
||||
att: &'a Attestation<T>,
|
||||
att: AttestationRef<'a, T>,
|
||||
state: &BeaconState<T>,
|
||||
reward_cache: &'a RewardCache,
|
||||
total_active_balance: u64,
|
||||
spec: &ChainSpec,
|
||||
) -> Option<Self> {
|
||||
if let BeaconState::Base(ref base_state) = state {
|
||||
Self::new_for_base(att, state, base_state, total_active_balance, spec)
|
||||
} else {
|
||||
Self::new_for_altair(att, state, total_active_balance, spec)
|
||||
Self::new_for_altair(att, state, reward_cache, total_active_balance, spec)
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialise an attestation cover object for base/phase0 hard fork.
|
||||
pub fn new_for_base(
|
||||
att: &'a Attestation<T>,
|
||||
att: AttestationRef<'a, T>,
|
||||
state: &BeaconState<T>,
|
||||
base_state: &BeaconStateBase<T>,
|
||||
total_active_balance: u64,
|
||||
spec: &ChainSpec,
|
||||
) -> Option<Self> {
|
||||
let fresh_validators = earliest_attestation_validators(att, state, base_state);
|
||||
let fresh_validators = earliest_attestation_validators(&att, state, base_state);
|
||||
let committee = state
|
||||
.get_beacon_committee(att.data.slot, att.data.index)
|
||||
.ok()?;
|
||||
@@ -67,45 +70,41 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
|
||||
|
||||
/// Initialise an attestation cover object for Altair or later.
|
||||
pub fn new_for_altair(
|
||||
att: &'a Attestation<T>,
|
||||
att: AttestationRef<'a, T>,
|
||||
state: &BeaconState<T>,
|
||||
reward_cache: &'a RewardCache,
|
||||
total_active_balance: u64,
|
||||
spec: &ChainSpec,
|
||||
) -> Option<Self> {
|
||||
let committee = state
|
||||
.get_beacon_committee(att.data.slot, att.data.index)
|
||||
.ok()?;
|
||||
let attesting_indices =
|
||||
get_attesting_indices::<T>(committee.committee, &att.aggregation_bits).ok()?;
|
||||
let att_data = att.attestation_data();
|
||||
|
||||
let participation_list = if att.data.target.epoch == state.current_epoch() {
|
||||
state.current_epoch_participation().ok()?
|
||||
} else if att.data.target.epoch == state.previous_epoch() {
|
||||
state.previous_epoch_participation().ok()?
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let inclusion_delay = state.slot().as_u64().checked_sub(att.data.slot.as_u64())?;
|
||||
let inclusion_delay = state.slot().as_u64().checked_sub(att_data.slot.as_u64())?;
|
||||
let att_participation_flags =
|
||||
get_attestation_participation_flag_indices(state, &att.data, inclusion_delay, spec)
|
||||
get_attestation_participation_flag_indices(state, &att_data, inclusion_delay, spec)
|
||||
.ok()?;
|
||||
let base_reward_per_increment =
|
||||
altair::BaseRewardPerIncrement::new(total_active_balance, spec).ok()?;
|
||||
|
||||
let fresh_validators_rewards = attesting_indices
|
||||
let fresh_validators_rewards = att
|
||||
.indexed
|
||||
.attesting_indices
|
||||
.iter()
|
||||
.filter_map(|&index| {
|
||||
if reward_cache
|
||||
.has_attested_in_epoch(index, att_data.target.epoch)
|
||||
.ok()?
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut proposer_reward_numerator = 0;
|
||||
let participation = participation_list.get(index)?;
|
||||
|
||||
let base_reward =
|
||||
altair::get_base_reward(state, index, base_reward_per_increment, spec).ok()?;
|
||||
altair::get_base_reward(state, index as usize, base_reward_per_increment, spec)
|
||||
.ok()?;
|
||||
|
||||
for (flag_index, weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() {
|
||||
if att_participation_flags.contains(&flag_index)
|
||||
&& !participation.has_flag(flag_index).ok()?
|
||||
{
|
||||
if att_participation_flags.contains(&flag_index) {
|
||||
proposer_reward_numerator += base_reward.checked_mul(*weight)?;
|
||||
}
|
||||
}
|
||||
@@ -113,7 +112,7 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
|
||||
let proposer_reward = proposer_reward_numerator
|
||||
.checked_div(WEIGHT_DENOMINATOR.checked_mul(spec.proposer_reward_quotient)?)?;
|
||||
|
||||
Some((index as u64, proposer_reward)).filter(|_| proposer_reward != 0)
|
||||
Some((index, proposer_reward)).filter(|_| proposer_reward != 0)
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -126,10 +125,15 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
|
||||
|
||||
impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> {
|
||||
type Object = Attestation<T>;
|
||||
type Intermediate = AttestationRef<'a, T>;
|
||||
type Set = HashMap<u64, u64>;
|
||||
|
||||
fn object(&self) -> &Attestation<T> {
|
||||
self.att
|
||||
fn intermediate(&self) -> &AttestationRef<'a, T> {
|
||||
&self.att
|
||||
}
|
||||
|
||||
fn convert_to_object(att_ref: &AttestationRef<'a, T>) -> Attestation<T> {
|
||||
att_ref.clone_as_attestation()
|
||||
}
|
||||
|
||||
fn covering_set(&self) -> &HashMap<u64, u64> {
|
||||
@@ -148,7 +152,7 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> {
|
||||
/// of slashable voting, which is rare.
|
||||
fn update_covering_set(
|
||||
&mut self,
|
||||
best_att: &Attestation<T>,
|
||||
best_att: &AttestationRef<'a, T>,
|
||||
covered_validators: &HashMap<u64, u64>,
|
||||
) {
|
||||
if self.att.data.slot == best_att.data.slot && self.att.data.index == best_att.data.index {
|
||||
@@ -172,16 +176,16 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> {
|
||||
///
|
||||
/// This isn't optimal, but with the Altair fork this code is obsolete and not worth upgrading.
|
||||
pub fn earliest_attestation_validators<T: EthSpec>(
|
||||
attestation: &Attestation<T>,
|
||||
attestation: &AttestationRef<T>,
|
||||
state: &BeaconState<T>,
|
||||
base_state: &BeaconStateBase<T>,
|
||||
) -> BitList<T::MaxValidatorsPerCommittee> {
|
||||
// Bitfield of validators whose attestations are new/fresh.
|
||||
let mut new_validators = attestation.aggregation_bits.clone();
|
||||
let mut new_validators = attestation.indexed.aggregation_bits.clone();
|
||||
|
||||
let state_attestations = if attestation.data.target.epoch == state.current_epoch() {
|
||||
let state_attestations = if attestation.checkpoint.target_epoch == state.current_epoch() {
|
||||
&base_state.current_epoch_attestations
|
||||
} else if attestation.data.target.epoch == state.previous_epoch() {
|
||||
} else if attestation.checkpoint.target_epoch == state.previous_epoch() {
|
||||
&base_state.previous_epoch_attestations
|
||||
} else {
|
||||
return BitList::with_capacity(0).unwrap();
|
||||
|
||||
@@ -1,45 +1,12 @@
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use ssz::ssz_encode;
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use types::{AttestationData, ChainSpec, Domain, Epoch, Fork, Hash256};
|
||||
|
||||
/// Serialized `AttestationData` augmented with a domain to encode the fork info.
|
||||
///
|
||||
/// [DEPRECATED] To be removed once all nodes have updated to schema v12.
|
||||
#[derive(
|
||||
PartialEq, Eq, Clone, Hash, Debug, PartialOrd, Ord, Encode, Decode, Serialize, Deserialize,
|
||||
)]
|
||||
pub struct AttestationId {
|
||||
v: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Number of domain bytes that the end of an attestation ID is padded with.
|
||||
const DOMAIN_BYTES_LEN: usize = std::mem::size_of::<Hash256>();
|
||||
|
||||
impl AttestationId {
|
||||
pub fn from_data(
|
||||
attestation: &AttestationData,
|
||||
fork: &Fork,
|
||||
genesis_validators_root: Hash256,
|
||||
spec: &ChainSpec,
|
||||
) -> Self {
|
||||
let mut bytes = ssz_encode(attestation);
|
||||
let epoch = attestation.target.epoch;
|
||||
bytes.extend_from_slice(
|
||||
AttestationId::compute_domain_bytes(epoch, fork, genesis_validators_root, spec)
|
||||
.as_bytes(),
|
||||
);
|
||||
AttestationId { v: bytes }
|
||||
}
|
||||
|
||||
pub fn compute_domain_bytes(
|
||||
epoch: Epoch,
|
||||
fork: &Fork,
|
||||
genesis_validators_root: Hash256,
|
||||
spec: &ChainSpec,
|
||||
) -> Hash256 {
|
||||
spec.get_domain(epoch, Domain::BeaconAttester, fork, genesis_validators_root)
|
||||
}
|
||||
|
||||
pub fn domain_bytes_match(&self, domain_bytes: &Hash256) -> bool {
|
||||
&self.v[self.v.len() - DOMAIN_BYTES_LEN..] == domain_bytes.as_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
245
beacon_node/operation_pool/src/attestation_storage.rs
Normal file
245
beacon_node/operation_pool/src/attestation_storage.rs
Normal file
@@ -0,0 +1,245 @@
|
||||
use crate::AttestationStats;
|
||||
use itertools::Itertools;
|
||||
use std::collections::HashMap;
|
||||
use types::{
|
||||
AggregateSignature, Attestation, AttestationData, BeaconState, BitList, Checkpoint, Epoch,
|
||||
EthSpec, Hash256, Slot,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||
pub struct CheckpointKey {
|
||||
pub source: Checkpoint,
|
||||
pub target_epoch: Epoch,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
pub struct CompactAttestationData {
|
||||
pub slot: Slot,
|
||||
pub index: u64,
|
||||
pub beacon_block_root: Hash256,
|
||||
pub target_root: Hash256,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct CompactIndexedAttestation<T: EthSpec> {
|
||||
pub attesting_indices: Vec<u64>,
|
||||
pub aggregation_bits: BitList<T::MaxValidatorsPerCommittee>,
|
||||
pub signature: AggregateSignature,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SplitAttestation<T: EthSpec> {
|
||||
pub checkpoint: CheckpointKey,
|
||||
pub data: CompactAttestationData,
|
||||
pub indexed: CompactIndexedAttestation<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AttestationRef<'a, T: EthSpec> {
|
||||
pub checkpoint: &'a CheckpointKey,
|
||||
pub data: &'a CompactAttestationData,
|
||||
pub indexed: &'a CompactIndexedAttestation<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
pub struct AttestationMap<T: EthSpec> {
|
||||
checkpoint_map: HashMap<CheckpointKey, AttestationDataMap<T>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
pub struct AttestationDataMap<T: EthSpec> {
|
||||
attestations: HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> SplitAttestation<T> {
|
||||
pub fn new(attestation: Attestation<T>, attesting_indices: Vec<u64>) -> Self {
|
||||
let checkpoint = CheckpointKey {
|
||||
source: attestation.data.source,
|
||||
target_epoch: attestation.data.target.epoch,
|
||||
};
|
||||
let data = CompactAttestationData {
|
||||
slot: attestation.data.slot,
|
||||
index: attestation.data.index,
|
||||
beacon_block_root: attestation.data.beacon_block_root,
|
||||
target_root: attestation.data.target.root,
|
||||
};
|
||||
let indexed = CompactIndexedAttestation {
|
||||
attesting_indices,
|
||||
aggregation_bits: attestation.aggregation_bits,
|
||||
signature: attestation.signature,
|
||||
};
|
||||
Self {
|
||||
checkpoint,
|
||||
data,
|
||||
indexed,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_ref(&self) -> AttestationRef<T> {
|
||||
AttestationRef {
|
||||
checkpoint: &self.checkpoint,
|
||||
data: &self.data,
|
||||
indexed: &self.indexed,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec> AttestationRef<'a, T> {
|
||||
pub fn attestation_data(&self) -> AttestationData {
|
||||
AttestationData {
|
||||
slot: self.data.slot,
|
||||
index: self.data.index,
|
||||
beacon_block_root: self.data.beacon_block_root,
|
||||
source: self.checkpoint.source,
|
||||
target: Checkpoint {
|
||||
epoch: self.checkpoint.target_epoch,
|
||||
root: self.data.target_root,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clone_as_attestation(&self) -> Attestation<T> {
|
||||
Attestation {
|
||||
aggregation_bits: self.indexed.aggregation_bits.clone(),
|
||||
data: self.attestation_data(),
|
||||
signature: self.indexed.signature.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckpointKey {
|
||||
/// Return two checkpoint keys: `(previous, current)` for the previous and current epochs of
|
||||
/// the `state`.
|
||||
pub fn keys_for_state<T: EthSpec>(state: &BeaconState<T>) -> (Self, Self) {
|
||||
(
|
||||
CheckpointKey {
|
||||
source: state.previous_justified_checkpoint(),
|
||||
target_epoch: state.previous_epoch(),
|
||||
},
|
||||
CheckpointKey {
|
||||
source: state.current_justified_checkpoint(),
|
||||
target_epoch: state.current_epoch(),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> CompactIndexedAttestation<T> {
|
||||
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
|
||||
self.aggregation_bits
|
||||
.intersection(&other.aggregation_bits)
|
||||
.is_zero()
|
||||
}
|
||||
|
||||
pub fn aggregate(&mut self, other: &Self) {
|
||||
self.attesting_indices = self
|
||||
.attesting_indices
|
||||
.drain(..)
|
||||
.merge(other.attesting_indices.iter().copied())
|
||||
.dedup()
|
||||
.collect();
|
||||
self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits);
|
||||
self.signature.add_assign_aggregate(&other.signature);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> AttestationMap<T> {
|
||||
pub fn insert(&mut self, attestation: Attestation<T>, attesting_indices: Vec<u64>) {
|
||||
let SplitAttestation {
|
||||
checkpoint,
|
||||
data,
|
||||
indexed,
|
||||
} = SplitAttestation::new(attestation, attesting_indices);
|
||||
|
||||
let attestation_map = self
|
||||
.checkpoint_map
|
||||
.entry(checkpoint)
|
||||
.or_insert_with(AttestationDataMap::default);
|
||||
let attestations = attestation_map
|
||||
.attestations
|
||||
.entry(data)
|
||||
.or_insert_with(Vec::new);
|
||||
|
||||
// Greedily aggregate the attestation with all existing attestations.
|
||||
// NOTE: this is sub-optimal and in future we will remove this in favour of max-clique
|
||||
// aggregation.
|
||||
let mut aggregated = false;
|
||||
for existing_attestation in attestations.iter_mut() {
|
||||
if existing_attestation.signers_disjoint_from(&indexed) {
|
||||
existing_attestation.aggregate(&indexed);
|
||||
aggregated = true;
|
||||
} else if *existing_attestation == indexed {
|
||||
aggregated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if !aggregated {
|
||||
attestations.push(indexed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate all attestations matching the given `checkpoint_key`.
|
||||
pub fn get_attestations<'a>(
|
||||
&'a self,
|
||||
checkpoint_key: &'a CheckpointKey,
|
||||
) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a {
|
||||
self.checkpoint_map
|
||||
.get(checkpoint_key)
|
||||
.into_iter()
|
||||
.flat_map(|attestation_map| attestation_map.iter(checkpoint_key))
|
||||
}
|
||||
|
||||
/// Iterate all attestations in the map.
|
||||
pub fn iter(&self) -> impl Iterator<Item = AttestationRef<T>> {
|
||||
self.checkpoint_map
|
||||
.iter()
|
||||
.flat_map(|(checkpoint_key, attestation_map)| attestation_map.iter(checkpoint_key))
|
||||
}
|
||||
|
||||
/// Prune attestations that are from before the previous epoch.
|
||||
pub fn prune(&mut self, current_epoch: Epoch) {
|
||||
self.checkpoint_map
|
||||
.retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1);
|
||||
}
|
||||
|
||||
/// Statistics about all attestations stored in the map.
|
||||
pub fn stats(&self) -> AttestationStats {
|
||||
self.checkpoint_map
|
||||
.values()
|
||||
.map(AttestationDataMap::stats)
|
||||
.fold(AttestationStats::default(), |mut acc, new| {
|
||||
acc.num_attestations += new.num_attestations;
|
||||
acc.num_attestation_data += new.num_attestation_data;
|
||||
acc.max_aggregates_per_data =
|
||||
std::cmp::max(acc.max_aggregates_per_data, new.max_aggregates_per_data);
|
||||
acc
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: EthSpec> AttestationDataMap<T> {
|
||||
pub fn iter<'a>(
|
||||
&'a self,
|
||||
checkpoint_key: &'a CheckpointKey,
|
||||
) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a {
|
||||
self.attestations.iter().flat_map(|(data, vec_indexed)| {
|
||||
vec_indexed.iter().map(|indexed| AttestationRef {
|
||||
checkpoint: checkpoint_key,
|
||||
data,
|
||||
indexed,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> AttestationStats {
|
||||
let mut stats = AttestationStats::default();
|
||||
|
||||
for aggregates in self.attestations.values() {
|
||||
stats.num_attestations += aggregates.len();
|
||||
stats.num_attestation_data += 1;
|
||||
stats.max_aggregates_per_data =
|
||||
std::cmp::max(stats.max_aggregates_per_data, aggregates.len());
|
||||
}
|
||||
stats
|
||||
}
|
||||
}
|
||||
@@ -39,14 +39,18 @@ impl<'a, T: EthSpec> AttesterSlashingMaxCover<'a, T> {
|
||||
impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> {
|
||||
/// The result type, of which we would eventually like a collection of maximal quality.
|
||||
type Object = AttesterSlashing<T>;
|
||||
type Intermediate = AttesterSlashing<T>;
|
||||
/// The type used to represent sets.
|
||||
type Set = HashMap<u64, u64>;
|
||||
|
||||
/// Extract an object for inclusion in a solution.
|
||||
fn object(&self) -> &AttesterSlashing<T> {
|
||||
fn intermediate(&self) -> &AttesterSlashing<T> {
|
||||
self.slashing
|
||||
}
|
||||
|
||||
fn convert_to_object(slashing: &AttesterSlashing<T>) -> AttesterSlashing<T> {
|
||||
slashing.clone()
|
||||
}
|
||||
|
||||
/// Get the set of elements covered.
|
||||
fn covering_set(&self) -> &HashMap<u64, u64> {
|
||||
&self.effective_balances
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -11,16 +11,21 @@ use itertools::Itertools;
|
||||
pub trait MaxCover: Clone {
|
||||
/// The result type, of which we would eventually like a collection of maximal quality.
|
||||
type Object: Clone;
|
||||
/// The intermediate object type, which can be converted to `Object`.
|
||||
type Intermediate: Clone;
|
||||
/// The type used to represent sets.
|
||||
type Set: Clone;
|
||||
|
||||
/// Extract an object for inclusion in a solution.
|
||||
fn object(&self) -> &Self::Object;
|
||||
/// Extract the intermediate object.
|
||||
fn intermediate(&self) -> &Self::Intermediate;
|
||||
|
||||
/// Convert the borrowed intermediate object to an owned object for the solution.
|
||||
fn convert_to_object(intermediate: &Self::Intermediate) -> Self::Object;
|
||||
|
||||
/// Get the set of elements covered.
|
||||
fn covering_set(&self) -> &Self::Set;
|
||||
/// Update the set of items covered, for the inclusion of some object in the solution.
|
||||
fn update_covering_set(&mut self, max_obj: &Self::Object, max_set: &Self::Set);
|
||||
fn update_covering_set(&mut self, max_obj: &Self::Intermediate, max_set: &Self::Set);
|
||||
/// The quality of this item's covering set, usually its cardinality.
|
||||
fn score(&self) -> usize;
|
||||
}
|
||||
@@ -86,7 +91,7 @@ where
|
||||
.filter(|x| x.available && x.item.score() != 0)
|
||||
.for_each(|x| {
|
||||
x.item
|
||||
.update_covering_set(best.object(), best.covering_set())
|
||||
.update_covering_set(best.intermediate(), best.covering_set())
|
||||
});
|
||||
|
||||
result.push(best);
|
||||
@@ -106,7 +111,7 @@ where
|
||||
.into_iter()
|
||||
.merge_by(cover2, |item1, item2| item1.score() >= item2.score())
|
||||
.take(limit)
|
||||
.map(|item| item.object().clone())
|
||||
.map(|item| T::convert_to_object(item.intermediate()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -121,12 +126,17 @@ mod test {
|
||||
T: Clone + Eq + Hash,
|
||||
{
|
||||
type Object = Self;
|
||||
type Intermediate = Self;
|
||||
type Set = Self;
|
||||
|
||||
fn object(&self) -> &Self {
|
||||
fn intermediate(&self) -> &Self {
|
||||
self
|
||||
}
|
||||
|
||||
fn convert_to_object(set: &Self) -> Self {
|
||||
set.clone()
|
||||
}
|
||||
|
||||
fn covering_set(&self) -> &Self {
|
||||
self
|
||||
}
|
||||
|
||||
@@ -3,6 +3,10 @@ use lazy_static::lazy_static;
|
||||
pub use lighthouse_metrics::*;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref BUILD_REWARD_CACHE_TIME: Result<Histogram> = try_create_histogram(
|
||||
"op_pool_build_reward_cache_time",
|
||||
"Time to build the reward cache before packing attestations"
|
||||
);
|
||||
pub static ref ATTESTATION_PREV_EPOCH_PACKING_TIME: Result<Histogram> = try_create_histogram(
|
||||
"op_pool_attestation_prev_epoch_packing_time",
|
||||
"Time to pack previous epoch attestations"
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use crate::attestation_id::AttestationId;
|
||||
use crate::attestation_storage::AttestationMap;
|
||||
use crate::sync_aggregate_id::SyncAggregateId;
|
||||
use crate::OpPoolError;
|
||||
use crate::OperationPool;
|
||||
use derivative::Derivative;
|
||||
use parking_lot::RwLock;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use state_processing::SigVerifiedOp;
|
||||
use store::{DBColumn, Error as StoreError, StoreItem};
|
||||
use types::*;
|
||||
|
||||
@@ -17,32 +18,42 @@ type PersistedSyncContributions<T> = Vec<(SyncAggregateId, Vec<SyncCommitteeCont
|
||||
/// Operations are stored in arbitrary order, so it's not a good idea to compare instances
|
||||
/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first.
|
||||
#[superstruct(
|
||||
variants(Altair),
|
||||
variants(V5, V12),
|
||||
variant_attributes(
|
||||
derive(Derivative, PartialEq, Debug, Serialize, Deserialize, Encode, Decode),
|
||||
serde(bound = "T: EthSpec", deny_unknown_fields),
|
||||
derive(Derivative, PartialEq, Debug, Encode, Decode),
|
||||
derivative(Clone),
|
||||
),
|
||||
partial_getter_error(ty = "OpPoolError", expr = "OpPoolError::IncorrectOpPoolVariant")
|
||||
)]
|
||||
#[derive(PartialEq, Debug, Serialize, Deserialize, Encode)]
|
||||
#[serde(untagged)]
|
||||
#[serde(bound = "T: EthSpec")]
|
||||
#[derive(PartialEq, Debug, Encode)]
|
||||
#[ssz(enum_behaviour = "transparent")]
|
||||
pub struct PersistedOperationPool<T: EthSpec> {
|
||||
/// Mapping from attestation ID to attestation mappings.
|
||||
// We could save space by not storing the attestation ID, but it might
|
||||
// be difficult to make that roundtrip due to eager aggregation.
|
||||
attestations: Vec<(AttestationId, Vec<Attestation<T>>)>,
|
||||
/// [DEPRECATED] Mapping from attestation ID to attestation mappings.
|
||||
#[superstruct(only(V5))]
|
||||
pub attestations_v5: Vec<(AttestationId, Vec<Attestation<T>>)>,
|
||||
/// Attestations and their attesting indices.
|
||||
#[superstruct(only(V12))]
|
||||
pub attestations: Vec<(Attestation<T>, Vec<u64>)>,
|
||||
/// Mapping from sync contribution ID to sync contributions and aggregate.
|
||||
#[superstruct(only(Altair))]
|
||||
sync_contributions: PersistedSyncContributions<T>,
|
||||
pub sync_contributions: PersistedSyncContributions<T>,
|
||||
/// [DEPRECATED] Attester slashings.
|
||||
#[superstruct(only(V5))]
|
||||
pub attester_slashings_v5: Vec<(AttesterSlashing<T>, ForkVersion)>,
|
||||
/// Attester slashings.
|
||||
attester_slashings: Vec<(AttesterSlashing<T>, ForkVersion)>,
|
||||
/// Proposer slashings.
|
||||
proposer_slashings: Vec<ProposerSlashing>,
|
||||
/// Voluntary exits.
|
||||
voluntary_exits: Vec<SignedVoluntaryExit>,
|
||||
#[superstruct(only(V12))]
|
||||
pub attester_slashings: Vec<SigVerifiedOp<AttesterSlashing<T>, T>>,
|
||||
/// [DEPRECATED] Proposer slashings.
|
||||
#[superstruct(only(V5))]
|
||||
pub proposer_slashings_v5: Vec<ProposerSlashing>,
|
||||
/// Proposer slashings with fork information.
|
||||
#[superstruct(only(V12))]
|
||||
pub proposer_slashings: Vec<SigVerifiedOp<ProposerSlashing, T>>,
|
||||
/// [DEPRECATED] Voluntary exits.
|
||||
#[superstruct(only(V5))]
|
||||
pub voluntary_exits_v5: Vec<SignedVoluntaryExit>,
|
||||
/// Voluntary exits with fork information.
|
||||
#[superstruct(only(V12))]
|
||||
pub voluntary_exits: Vec<SigVerifiedOp<SignedVoluntaryExit, T>>,
|
||||
}
|
||||
|
||||
impl<T: EthSpec> PersistedOperationPool<T> {
|
||||
@@ -52,7 +63,12 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
||||
.attestations
|
||||
.read()
|
||||
.iter()
|
||||
.map(|(att_id, att)| (att_id.clone(), att.clone()))
|
||||
.map(|att| {
|
||||
(
|
||||
att.clone_as_attestation(),
|
||||
att.indexed.attesting_indices.clone(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let sync_contributions = operation_pool
|
||||
@@ -83,7 +99,7 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
||||
.map(|(_, exit)| exit.clone())
|
||||
.collect();
|
||||
|
||||
PersistedOperationPool::Altair(PersistedOperationPoolAltair {
|
||||
PersistedOperationPool::V12(PersistedOperationPoolV12 {
|
||||
attestations,
|
||||
sync_contributions,
|
||||
attester_slashings,
|
||||
@@ -92,45 +108,62 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Reconstruct an `OperationPool`. Sets `sync_contributions` to its `Default` if `self` matches
|
||||
/// `PersistedOperationPool::Base`.
|
||||
/// Reconstruct an `OperationPool`.
|
||||
pub fn into_operation_pool(self) -> Result<OperationPool<T>, OpPoolError> {
|
||||
let attestations = RwLock::new(self.attestations().iter().cloned().collect());
|
||||
let attester_slashings = RwLock::new(self.attester_slashings().iter().cloned().collect());
|
||||
let attester_slashings = RwLock::new(self.attester_slashings()?.iter().cloned().collect());
|
||||
let proposer_slashings = RwLock::new(
|
||||
self.proposer_slashings()
|
||||
self.proposer_slashings()?
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|slashing| (slashing.signed_header_1.message.proposer_index, slashing))
|
||||
.map(|slashing| (slashing.as_inner().proposer_index(), slashing))
|
||||
.collect(),
|
||||
);
|
||||
let voluntary_exits = RwLock::new(
|
||||
self.voluntary_exits()
|
||||
self.voluntary_exits()?
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|exit| (exit.message.validator_index, exit))
|
||||
.map(|exit| (exit.as_inner().message.validator_index, exit))
|
||||
.collect(),
|
||||
);
|
||||
let op_pool = match self {
|
||||
PersistedOperationPool::Altair(_) => {
|
||||
let sync_contributions =
|
||||
RwLock::new(self.sync_contributions()?.iter().cloned().collect());
|
||||
|
||||
OperationPool {
|
||||
attestations,
|
||||
sync_contributions,
|
||||
attester_slashings,
|
||||
proposer_slashings,
|
||||
voluntary_exits,
|
||||
_phantom: Default::default(),
|
||||
let sync_contributions = RwLock::new(self.sync_contributions().iter().cloned().collect());
|
||||
let attestations = match self {
|
||||
PersistedOperationPool::V5(_) => return Err(OpPoolError::IncorrectOpPoolVariant),
|
||||
PersistedOperationPool::V12(pool) => {
|
||||
let mut map = AttestationMap::default();
|
||||
for (att, attesting_indices) in pool.attestations {
|
||||
map.insert(att, attesting_indices);
|
||||
}
|
||||
RwLock::new(map)
|
||||
}
|
||||
};
|
||||
let op_pool = OperationPool {
|
||||
attestations,
|
||||
sync_contributions,
|
||||
attester_slashings,
|
||||
proposer_slashings,
|
||||
voluntary_exits,
|
||||
reward_cache: Default::default(),
|
||||
_phantom: Default::default(),
|
||||
};
|
||||
Ok(op_pool)
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::Altair`.
|
||||
impl<T: EthSpec> StoreItem for PersistedOperationPoolV5<T> {
|
||||
fn db_column() -> DBColumn {
|
||||
DBColumn::OpPool
|
||||
}
|
||||
|
||||
fn as_store_bytes(&self) -> Vec<u8> {
|
||||
self.as_ssz_bytes()
|
||||
}
|
||||
|
||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
|
||||
PersistedOperationPoolV5::from_ssz_bytes(bytes).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V12`.
|
||||
impl<T: EthSpec> StoreItem for PersistedOperationPool<T> {
|
||||
fn db_column() -> DBColumn {
|
||||
DBColumn::OpPool
|
||||
@@ -141,9 +174,9 @@ impl<T: EthSpec> StoreItem for PersistedOperationPool<T> {
|
||||
}
|
||||
|
||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
|
||||
// Default deserialization to the Altair variant.
|
||||
PersistedOperationPoolAltair::from_ssz_bytes(bytes)
|
||||
.map(Self::Altair)
|
||||
// Default deserialization to the latest variant.
|
||||
PersistedOperationPoolV12::from_ssz_bytes(bytes)
|
||||
.map(Self::V12)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
122
beacon_node/operation_pool/src/reward_cache.rs
Normal file
122
beacon_node/operation_pool/src/reward_cache.rs
Normal file
@@ -0,0 +1,122 @@
|
||||
use crate::OpPoolError;
|
||||
use bitvec::vec::BitVec;
|
||||
use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
struct Initialization {
|
||||
current_epoch: Epoch,
|
||||
latest_block_root: Hash256,
|
||||
}
|
||||
|
||||
/// Cache to store pre-computed information for block proposal.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct RewardCache {
|
||||
initialization: Option<Initialization>,
|
||||
/// `BitVec` of validator indices which don't have default participation flags for the prev. epoch.
|
||||
///
|
||||
/// We choose to only track whether validators have *any* participation flag set because
|
||||
/// it's impossible to include a new attestation which is better than the existing participation
|
||||
/// UNLESS the validator makes a slashable attestation, and we assume that this is rare enough
|
||||
/// that it's acceptable to be slightly sub-optimal in this case.
|
||||
previous_epoch_participation: BitVec,
|
||||
/// `BitVec` of validator indices which don't have default participation flags for the current epoch.
|
||||
current_epoch_participation: BitVec,
|
||||
}
|
||||
|
||||
impl RewardCache {
|
||||
pub fn has_attested_in_epoch(
|
||||
&self,
|
||||
validator_index: u64,
|
||||
epoch: Epoch,
|
||||
) -> Result<bool, OpPoolError> {
|
||||
if let Some(init) = &self.initialization {
|
||||
if init.current_epoch == epoch {
|
||||
Ok(*self
|
||||
.current_epoch_participation
|
||||
.get(validator_index as usize)
|
||||
.ok_or(OpPoolError::RewardCacheOutOfBounds)?)
|
||||
} else if init.current_epoch == epoch + 1 {
|
||||
Ok(*self
|
||||
.previous_epoch_participation
|
||||
.get(validator_index as usize)
|
||||
.ok_or(OpPoolError::RewardCacheOutOfBounds)?)
|
||||
} else {
|
||||
Err(OpPoolError::RewardCacheWrongEpoch)
|
||||
}
|
||||
} else {
|
||||
Err(OpPoolError::RewardCacheWrongEpoch)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the root of the latest block applied to `state`.
|
||||
///
|
||||
/// For simplicity at genesis we return the zero hash, which will cause one unnecessary
|
||||
/// re-calculation in `update`.
|
||||
fn latest_block_root<E: EthSpec>(state: &BeaconState<E>) -> Result<Hash256, OpPoolError> {
|
||||
if state.slot() == 0 {
|
||||
Ok(Hash256::zero())
|
||||
} else {
|
||||
Ok(*state
|
||||
.get_block_root(state.slot() - 1)
|
||||
.map_err(OpPoolError::RewardCacheGetBlockRoot)?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the cache.
|
||||
pub fn update<E: EthSpec>(&mut self, state: &BeaconState<E>) -> Result<(), OpPoolError> {
|
||||
if matches!(state, BeaconState::Base(_)) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let current_epoch = state.current_epoch();
|
||||
let latest_block_root = Self::latest_block_root(state)?;
|
||||
|
||||
let new_init = Initialization {
|
||||
current_epoch,
|
||||
latest_block_root,
|
||||
};
|
||||
|
||||
// The participation flags change every block, and will almost always need updating when
|
||||
// this function is called at a new slot.
|
||||
if self
|
||||
.initialization
|
||||
.as_ref()
|
||||
.map_or(true, |init| *init != new_init)
|
||||
{
|
||||
self.update_previous_epoch_participation(state)
|
||||
.map_err(OpPoolError::RewardCacheUpdatePrevEpoch)?;
|
||||
self.update_current_epoch_participation(state)
|
||||
.map_err(OpPoolError::RewardCacheUpdateCurrEpoch)?;
|
||||
|
||||
self.initialization = Some(new_init);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_previous_epoch_participation<E: EthSpec>(
|
||||
&mut self,
|
||||
state: &BeaconState<E>,
|
||||
) -> Result<(), BeaconStateError> {
|
||||
let default_participation = ParticipationFlags::default();
|
||||
self.previous_epoch_participation = state
|
||||
.previous_epoch_participation()?
|
||||
.iter()
|
||||
.map(|participation| *participation != default_participation)
|
||||
.collect();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_current_epoch_participation<E: EthSpec>(
|
||||
&mut self,
|
||||
state: &BeaconState<E>,
|
||||
) -> Result<(), BeaconStateError> {
|
||||
let default_participation = ParticipationFlags::default();
|
||||
self.current_epoch_participation = state
|
||||
.current_epoch_participation()?
|
||||
.iter()
|
||||
.map(|participation| *participation != default_participation)
|
||||
.collect();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user