Clean up progressive balance slashings further (#4834)

* Clean up progressive balance slashings further

* Fix Rayon deadlock in test utils

* Fix cargo-fmt
This commit is contained in:
Michael Sproul
2023-10-13 16:45:56 +11:00
committed by GitHub
parent b121e69fcf
commit bb6675e42d
3 changed files with 31 additions and 16 deletions

View File

@@ -49,6 +49,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt; use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore};
@@ -1149,9 +1150,9 @@ where
) -> (Vec<CommitteeAttestations<E>>, Vec<usize>) { ) -> (Vec<CommitteeAttestations<E>>, Vec<usize>) {
let MakeAttestationOptions { limit, fork } = opts; let MakeAttestationOptions { limit, fork } = opts;
let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap(); let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap();
let attesters = Mutex::new(vec![]); let num_attesters = AtomicUsize::new(0);
let attestations = state let (attestations, split_attesters) = state
.get_beacon_committees_at_slot(attestation_slot) .get_beacon_committees_at_slot(attestation_slot)
.expect("should get committees") .expect("should get committees")
.iter() .iter()
@@ -1164,13 +1165,14 @@ where
return None; return None;
} }
let mut attesters = attesters.lock();
if let Some(limit) = limit { if let Some(limit) = limit {
if attesters.len() >= limit { // This atomics stuff is necessary because we're under a par_iter,
// and Rayon will deadlock if we use a mutex.
if num_attesters.fetch_add(1, Ordering::Relaxed) >= limit {
num_attesters.fetch_sub(1, Ordering::Relaxed);
return None; return None;
} }
} }
attesters.push(*validator_index);
let mut attestation = self let mut attestation = self
.produce_unaggregated_attestation_for_block( .produce_unaggregated_attestation_for_block(
@@ -1210,14 +1212,17 @@ where
) )
.unwrap(); .unwrap();
Some((attestation, subnet_id)) Some(((attestation, subnet_id), validator_index))
}) })
.collect::<Vec<_>>() .unzip::<_, _, Vec<_>, Vec<_>>()
}) })
.collect::<Vec<_>>(); .unzip::<_, _, Vec<_>, Vec<_>>();
// Flatten attesters.
let attesters = split_attesters.into_iter().flatten().collect::<Vec<_>>();
let attesters = attesters.into_inner();
if let Some(limit) = limit { if let Some(limit) = limit {
assert_eq!(limit, num_attesters.load(Ordering::Relaxed));
assert_eq!( assert_eq!(
limit, limit,
attesters.len(), attesters.len(),

View File

@@ -69,9 +69,10 @@ pub fn update_progressive_balances_on_attestation<T: EthSpec>(
validator_effective_balance: u64, validator_effective_balance: u64,
validator_slashed: bool, validator_slashed: bool,
) -> Result<(), BlockProcessingError> { ) -> Result<(), BlockProcessingError> {
if is_progressive_balances_enabled(state) && !validator_slashed { if is_progressive_balances_enabled(state) {
state.progressive_balances_cache_mut().on_new_attestation( state.progressive_balances_cache_mut().on_new_attestation(
epoch, epoch,
validator_slashed,
flag_index, flag_index,
validator_effective_balance, validator_effective_balance,
)?; )?;

View File

@@ -64,9 +64,13 @@ impl EpochTotalBalances {
pub fn on_new_attestation( pub fn on_new_attestation(
&mut self, &mut self,
is_slashed: bool,
flag_index: usize, flag_index: usize,
validator_effective_balance: u64, validator_effective_balance: u64,
) -> Result<(), BeaconStateError> { ) -> Result<(), BeaconStateError> {
if is_slashed {
return Ok(());
}
let balance = self let balance = self
.total_flag_balances .total_flag_balances
.get_mut(flag_index) .get_mut(flag_index)
@@ -152,19 +156,24 @@ impl ProgressiveBalancesCache {
pub fn on_new_attestation( pub fn on_new_attestation(
&mut self, &mut self,
epoch: Epoch, epoch: Epoch,
is_slashed: bool,
flag_index: usize, flag_index: usize,
validator_effective_balance: u64, validator_effective_balance: u64,
) -> Result<(), BeaconStateError> { ) -> Result<(), BeaconStateError> {
let cache = self.get_inner_mut()?; let cache = self.get_inner_mut()?;
if epoch == cache.current_epoch { if epoch == cache.current_epoch {
cache cache.current_epoch_cache.on_new_attestation(
.current_epoch_cache is_slashed,
.on_new_attestation(flag_index, validator_effective_balance)?; flag_index,
validator_effective_balance,
)?;
} else if epoch.safe_add(1)? == cache.current_epoch { } else if epoch.safe_add(1)? == cache.current_epoch {
cache cache.previous_epoch_cache.on_new_attestation(
.previous_epoch_cache is_slashed,
.on_new_attestation(flag_index, validator_effective_balance)?; flag_index,
validator_effective_balance,
)?;
} else { } else {
return Err(BeaconStateError::ProgressiveBalancesCacheInconsistent); return Err(BeaconStateError::ProgressiveBalancesCacheInconsistent);
} }