Check slashability of attestations in batches to avoid sequential bottleneck (#8516)

Closes:

- https://github.com/sigp/lighthouse/issues/1914


  Sign attestations prior to checking them against the slashing protection DB. This allows us to avoid the sequential DB checks which are observed in traces here:

- https://github.com/sigp/lighthouse/pull/8508#discussion_r2576686107


Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>
This commit is contained in:
Michael Sproul
2026-01-27 18:56:09 +11:00
committed by GitHub
parent 1476c20cfc
commit 0f57fc9d8e
13 changed files with 563 additions and 267 deletions

View File

@@ -8,7 +8,7 @@ use std::ops::Deref;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::{Duration, Instant, sleep, sleep_until};
use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace, warn};
use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn};
use tree_hash::TreeHash;
use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
@@ -231,7 +231,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.await
.map_err(|e| {
crit!(
error = format!("{:?}", e),
error = e,
slot = slot.as_u64(),
"Error during attestation routine"
);
@@ -383,96 +383,75 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.ok_or("Unable to determine current slot from clock")?
.epoch(S::E::slots_per_epoch());
// Create futures to produce signed `Attestation` objects.
let attestation_data_ref = &attestation_data;
let signing_futures = validator_duties.iter().map(|duty_and_proof| {
async move {
let duty = &duty_and_proof.duty;
let attestation_data = attestation_data_ref;
// Make sure the target epoch is not higher than the current epoch to avoid potential attacks.
if attestation_data.target.epoch > current_epoch {
return Err(format!(
"Attestation target epoch {} is higher than current epoch {}",
attestation_data.target.epoch, current_epoch
));
}
// Ensure that the attestation matches the duties.
if !duty.match_attestation_data::<S::E>(attestation_data, &self.chain_spec) {
// Create attestations for each validator duty.
let mut attestations_to_sign = Vec::with_capacity(validator_duties.len());
for duty_and_proof in validator_duties {
let duty = &duty_and_proof.duty;
// Ensure that the attestation matches the duties.
if !duty.match_attestation_data::<S::E>(&attestation_data, &self.chain_spec) {
crit!(
validator = ?duty.pubkey,
duty_slot = %duty.slot,
attestation_slot = %attestation_data.slot,
duty_index = duty.committee_index,
attestation_index = attestation_data.index,
"Inconsistent validator duties during signing"
);
continue;
}
let attestation = match Attestation::empty_for_signing(
duty.committee_index,
duty.committee_length as usize,
attestation_data.slot,
attestation_data.beacon_block_root,
attestation_data.source,
attestation_data.target,
&self.chain_spec,
) {
Ok(attestation) => attestation,
Err(err) => {
crit!(
validator = ?duty.pubkey,
duty_slot = %duty.slot,
attestation_slot = %attestation_data.slot,
duty_index = duty.committee_index,
attestation_index = attestation_data.index,
"Inconsistent validator duties during signing"
?duty,
?err,
"Invalid validator duties during signing"
);
return None;
continue;
}
};
let mut attestation = match Attestation::empty_for_signing(
duty.committee_index,
duty.committee_length as usize,
attestation_data.slot,
attestation_data.beacon_block_root,
attestation_data.source,
attestation_data.target,
&self.chain_spec,
) {
Ok(attestation) => attestation,
Err(err) => {
crit!(
validator = ?duty.pubkey,
?duty,
?err,
"Invalid validator duties during signing"
);
return None;
}
};
attestations_to_sign.push((
duty.validator_index,
duty.pubkey,
duty.validator_committee_index as usize,
attestation,
));
}
match self
.validator_store
.sign_attestation(
duty.pubkey,
duty.validator_committee_index as usize,
&mut attestation,
current_epoch,
)
.await
{
Ok(()) => Some((attestation, duty.validator_index)),
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
warn!(
info = "a validator may have recently been removed from this VC",
pubkey = ?pubkey,
validator = ?duty.pubkey,
slot = slot.as_u64(),
"Missing pubkey for attestation"
);
None
}
Err(e) => {
crit!(
error = ?e,
validator = ?duty.pubkey,
slot = slot.as_u64(),
"Failed to sign attestation"
);
None
}
}
}
.instrument(Span::current())
});
if attestations_to_sign.is_empty() {
warn!("No valid attestations to sign");
return Ok(());
}
// Execute all the futures in parallel, collecting any successful results.
let (ref attestations, ref validator_indices): (Vec<_>, Vec<_>) = join_all(signing_futures)
.instrument(info_span!(
"sign_attestations",
count = validator_duties.len()
))
// Sign and check all attestations (includes slashing protection).
let safe_attestations = self
.validator_store
.sign_attestations(attestations_to_sign)
.await
.into_iter()
.flatten()
.unzip();
.map_err(|e| format!("Failed to sign attestations: {e:?}"))?;
if attestations.is_empty() {
if safe_attestations.is_empty() {
warn!("No attestations were published");
return Ok(());
}
@@ -480,6 +459,33 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.chain_spec
.fork_name_at_slot::<S::E>(attestation_data.slot);
let single_attestations = safe_attestations
.iter()
.filter_map(|(i, a)| {
match a.to_single_attestation_with_attester_index(*i) {
Ok(a) => Some(a),
Err(e) => {
// This shouldn't happen unless BN and VC are out of sync with
// respect to the Electra fork.
error!(
error = ?e,
committee_index = attestation_data.index,
slot = slot.as_u64(),
"type" = "unaggregated",
"Unable to convert to SingleAttestation"
);
None
}
}
})
.collect::<Vec<_>>();
let single_attestations = &single_attestations;
let validator_indices = single_attestations
.iter()
.map(|att| att.attester_index)
.collect::<Vec<_>>();
let published_count = single_attestations.len();
// Post the attestations to the BN.
match self
.beacon_nodes
@@ -489,40 +495,18 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
&[validator_metrics::ATTESTATIONS_HTTP_POST],
);
let single_attestations = attestations
.iter()
.zip(validator_indices)
.filter_map(|(a, i)| {
match a.to_single_attestation_with_attester_index(*i) {
Ok(a) => Some(a),
Err(e) => {
// This shouldn't happen unless BN and VC are out of sync with
// respect to the Electra fork.
error!(
error = ?e,
committee_index = attestation_data.index,
slot = slot.as_u64(),
"type" = "unaggregated",
"Unable to convert to SingleAttestation"
);
None
}
}
})
.collect::<Vec<_>>();
beacon_node
.post_beacon_pool_attestations_v2::<S::E>(single_attestations, fork_name)
.post_beacon_pool_attestations_v2::<S::E>(
single_attestations.clone(),
fork_name,
)
.await
})
.instrument(info_span!(
"publish_attestations",
count = attestations.len()
))
.instrument(info_span!("publish_attestations", count = published_count))
.await
{
Ok(()) => info!(
count = attestations.len(),
count = published_count,
validator_indices = ?validator_indices,
head_block = ?attestation_data.beacon_block_root,
committee_index = attestation_data.index,