This commit is contained in:
Michael Sproul
2025-05-08 16:14:50 +10:00
parent eaca4dfa47
commit 2e50c2daea
8 changed files with 216 additions and 126 deletions

View File

@@ -163,7 +163,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot + slot_duration / 3).await;
if let Err(e) = self.spawn_attestation_tasks(slot_duration) {
if let Err(e) = self.spawn_attestation_tasks(slot_duration).await {
crit!(error = e, "Failed to spawn attestation tasks")
} else {
trace!("Spawned attestation tasks");
@@ -183,7 +183,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
/// For each each required attestation, spawn a new task that downloads, signs and uploads the
/// attestation to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
async fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
@@ -197,16 +197,16 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service
.attesters(slot)
.into_iter()
.fold(HashMap::new(), |mut map, duty_and_proof| {
map.entry(duty_and_proof.duty.committee_index)
.or_default()
.push(duty_and_proof);
map
});
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> =
self.duties_service.attesters(slot).await.into_iter().fold(
HashMap::new(),
|mut map, duty_and_proof| {
map.entry(duty_and_proof.duty.committee_index)
.or_default()
.push(duty_and_proof);
map
},
);
// For each committee index for this slot:
//
@@ -704,11 +704,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
async move {
sleep_until(pruning_instant).await;
executor.spawn_blocking(
move || {
executor.spawn(
async move {
attestation_service
.validator_store
.prune_slashing_protection_db(current_epoch, false)
.await
},
"slashing_protection_pruning",
)

View File

@@ -296,12 +296,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
}
for validator_pubkey in proposers {
let builder_boost_factor = self
.validator_store
.determine_builder_boost_factor(&validator_pubkey);
let service = self.clone();
self.inner.executor.spawn(
async move {
let builder_boost_factor = service
.validator_store
.determine_builder_boost_factor(&validator_pubkey).await;
let result = service
.publish_block(slot, validator_pubkey, builder_boost_factor)
.await;
@@ -448,13 +448,16 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
let graffiti = determine_graffiti(
&validator_pubkey,
self.graffiti_file.clone(),
self.validator_store.graffiti(&validator_pubkey),
self.validator_store.graffiti(&validator_pubkey).await,
self.graffiti,
);
let randao_reveal_ref = &randao_reveal;
let self_ref = &self;
let proposer_index = self.validator_store.validator_index(&validator_pubkey);
let proposer_index = self
.validator_store
.validator_index(&validator_pubkey)
.await;
let proposer_fallback = ProposerFallback {
beacon_nodes: self.beacon_nodes.clone(),
proposer_nodes: self.proposer_nodes.clone(),

View File

@@ -341,16 +341,17 @@ pub struct DutiesService<S, T> {
impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
/// Returns the total number of validators known to the duties service.
pub fn total_validator_count(&self) -> usize {
self.validator_store.num_voting_validators()
pub async fn total_validator_count(&self) -> usize {
self.validator_store.num_voting_validators().await
}
/// Returns the total number of validators that should propose in the given epoch.
pub fn proposer_count(&self, epoch: Epoch) -> usize {
pub async fn proposer_count(&self, epoch: Epoch) -> usize {
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
.voting_pubkeys(DoppelgangerStatus::only_safe)
.await;
self.proposers
.read()
@@ -364,11 +365,12 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
}
/// Returns the total number of validators that should attest in the given epoch.
pub fn attester_count(&self, epoch: Epoch) -> usize {
pub async fn attester_count(&self, epoch: Epoch) -> usize {
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
.voting_pubkeys(DoppelgangerStatus::only_safe)
.await;
self.attesters
.read()
.iter()
@@ -379,9 +381,10 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
}
/// Returns the total number of validators that are in a doppelganger detection period.
pub fn doppelganger_detecting_count(&self) -> usize {
pub async fn doppelganger_detecting_count(&self) -> usize {
self.validator_store
.voting_pubkeys::<HashSet<_>, _>(DoppelgangerStatus::only_unsafe)
.await
.len()
}
@@ -389,13 +392,14 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
///
/// It is possible that multiple validators have an identical proposal slot, however that is
/// likely the result of heavy forking (lol) or inconsistent beacon node connections.
pub fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
pub async fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
.voting_pubkeys(DoppelgangerStatus::only_safe)
.await;
self.proposers
.read()
@@ -414,13 +418,14 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
}
/// Returns all `ValidatorDuty` for the given `slot`.
pub fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> {
pub async fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> {
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
.voting_pubkeys(DoppelgangerStatus::only_safe)
.await;
self.attesters
.read()
@@ -436,9 +441,9 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
}
/// Returns `true` if we should collect per validator metrics and `false` otherwise.
pub fn per_validator_metrics(&self) -> bool {
pub async fn per_validator_metrics(&self) -> bool {
self.enable_high_validator_count_metrics
|| self.total_validator_count() <= VALIDATOR_METRICS_MIN_COUNT
|| self.total_validator_count().await <= VALIDATOR_METRICS_MIN_COUNT
}
}
@@ -586,13 +591,15 @@ async fn poll_validator_indices<S: ValidatorStore, T: SlotClock + 'static>(
// collect those indices.
let all_pubkeys: Vec<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
for pubkey in all_pubkeys {
// This is on its own line to avoid some weirdness with locks and if statements.
let is_known = duties_service
.validator_store
.validator_index(&pubkey)
.await
.is_some();
if !is_known {
@@ -634,6 +641,7 @@ async fn poll_validator_indices<S: ValidatorStore, T: SlotClock + 'static>(
let fee_recipient = duties_service
.validator_store
.get_fee_recipient(&pubkey)
.await
.map(|fr| fr.to_string())
.unwrap_or_else(|| {
"Fee recipient for validator not set in validator_definitions.yml \
@@ -713,13 +721,18 @@ async fn poll_beacon_attesters<S: ValidatorStore + 'static, T: SlotClock + 'stat
// and get more information about other running instances.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
for &pubkey in &local_pubkeys {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
if let Some(validator_index) = duties_service
.validator_store
.validator_index(&pubkey)
.await
{
local_indices.push(validator_index)
}
}
@@ -906,7 +919,7 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
// determine whether validator duties need to be updated. This is to ensure that we don't
// request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
get_uninitialized_validators(duties_service, &epoch, local_pubkeys).await;
let initial_indices_to_request = if !uninitialized_validators.is_empty() {
uninitialized_validators.as_slice()
} else {
@@ -940,11 +953,15 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
// Make a request for all indices that require updating which we have not already made a request
// for.
let indices_to_request = validators_to_update
.iter()
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.filter(|validator_index| !initial_indices_to_request.contains(validator_index))
.collect::<Vec<_>>();
let mut indices_to_request = vec![];
for pubkey in &validators_to_update {
if let Some(validator_index) = duties_service.validator_store.validator_index(pubkey).await
{
if !initial_indices_to_request.contains(&validator_index) {
indices_to_request.push(validator_index);
}
}
}
// Filter the initial duties by their relevance so that we don't hit the warning below about
// overwriting duties. There was previously a bug here.
@@ -1041,29 +1058,39 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
}
/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<S: ValidatorStore, T: SlotClock + 'static>(
async fn get_uninitialized_validators<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters
.get(pubkey)
.is_none_or(|duties| !duties.contains_key(epoch))
})
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>()
// Allocate a temporary vec to prevent holding two locks simultaneously (deadlock risk if we use
// a different lock order in another function).
let uninitialized_pubkeys = {
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters
.get(pubkey)
.is_none_or(|duties| !duties.contains_key(epoch))
})
.collect::<Vec<_>>()
};
let mut uninitialized_indices = Vec::with_capacity(uninitialized_pubkeys.len());
for pubkey in uninitialized_pubkeys {
if let Some(index) = duties_service.validator_store.validator_index(pubkey).await {
uninitialized_indices.push(index);
}
}
uninitialized_indices
}
fn update_per_validator_duty_metrics<S: ValidatorStore, T: SlotClock + 'static>(
async fn update_per_validator_duty_metrics<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
current_slot: Slot,
) {
if duties_service.per_validator_metrics() {
if duties_service.per_validator_metrics().await {
let attesters = duties_service.attesters.read();
attesters.values().for_each(|attester_duties_by_epoch| {
if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) {
@@ -1309,7 +1336,7 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// Notify the block proposal service for any proposals that we have in our cache.
//
// See the function-level documentation for more information.
let initial_block_proposers = duties_service.block_proposers(current_slot);
let initial_block_proposers = duties_service.block_proposers(current_slot).await;
notify_block_production_service::<S>(
current_slot,
&initial_block_proposers,
@@ -1324,7 +1351,8 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// doppelganger finishes.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
// Only download duties and push out additional block production events if we have some
// validators.
@@ -1387,6 +1415,7 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// which were not included in the initial notification to the `BlockService`.
let additional_block_producers = duties_service
.block_proposers(current_slot)
.await
.difference(&initial_block_proposers)
.copied()
.collect::<HashSet<PublicKeyBytes>>();

View File

@@ -302,21 +302,22 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S,
})
}
fn collect_proposal_data<G, U>(&self, map_fn: G) -> Vec<U>
async fn collect_proposal_data<G, U>(&self, map_fn: G) -> Vec<U>
where
G: Fn(PublicKeyBytes, ProposalData) -> Option<U>,
{
let all_pubkeys: Vec<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
all_pubkeys
.into_iter()
.filter_map(|pubkey| {
let proposal_data = self.validator_store.proposal_data(&pubkey)?;
map_fn(pubkey, proposal_data)
})
.collect()
let mut proposal_data = Vec::with_capacity(all_pubkeys.len());
for pubkey in all_pubkeys {
if let Some(proposal_data) = self.validator_store.proposal_data(&pubkey).await? {
proposal_data.push(map_fn(pubkey, proposal_data));
}
}
proposal_data
}
async fn publish_preparation_data(

View File

@@ -308,13 +308,18 @@ pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotCloc
// protection.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
.voting_pubkeys(DoppelgangerStatus::ignored)
.await;
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
for &pubkey in &local_pubkeys {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
if let Some(validator_index) = duties_service
.validator_store
.validator_index(&pubkey)
.await
{
local_indices.push(validator_index)
}
}