Always use committee index 0 when getting attestation data (#8171)

* #8046


  Split the function `publish_attestations_and_aggregates` into `publish_attestations` and `handle_aggregates`, so that for attestations, only 1 task is spawned.


Co-Authored-By: Tan Chee Keong <tanck@sigmaprime.io>

Co-Authored-By: chonghe <44791194+chong-he@users.noreply.github.com>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
chonghe
2025-12-03 09:45:47 +08:00
committed by GitHub
parent 7ef9501ff6
commit 0bccc7090c

View File

@@ -180,8 +180,9 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(()) Ok(())
} }
/// For each each required attestation, spawn a new task that downloads, signs and uploads the /// Spawn only one new task for attestation post-Electra
/// attestation to the beacon node. /// For each required aggregates, spawn a new task that downloads, signs and uploads the
/// aggregates to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> { fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self let duration_to_next_slot = self
@@ -189,6 +190,53 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.duration_to_next_slot() .duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?; .ok_or("Unable to determine duration to next slot")?;
// Create and publish an `Attestation` for all validators only once
// as the committee_index is not included in AttestationData post-Electra
let attestation_duties: Vec<_> = self.duties_service.attesters(slot).into_iter().collect();
let attestation_service = self.clone();
let attestation_data_handle = self
.inner
.executor
.spawn_handle(
async move {
let attestation_data = attestation_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;
attestation_service
.sign_and_publish_attestations(
slot,
&attestation_duties,
attestation_data.clone(),
)
.await
.map_err(|e| {
crit!(
error = format!("{:?}", e),
slot = slot.as_u64(),
"Error during attestation routine"
);
e
})?;
Ok::<AttestationData, String>(attestation_data)
},
"unaggregated attestation production",
)
.ok_or("Failed to spawn attestation data task")?;
// If a validator needs to publish an aggregate attestation, they must do so at 2/3 // If a validator needs to publish an aggregate attestation, they must do so at 2/3
// through the slot. This delay triggers at this time // through the slot. This delay triggers at this time
let aggregate_production_instant = Instant::now() let aggregate_production_instant = Instant::now()
@@ -196,7 +244,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.checked_sub(slot_duration / 3) .checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0)); .unwrap_or_else(|| Duration::from_secs(0));
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self let aggregate_duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service .duties_service
.attesters(slot) .attesters(slot)
.into_iter() .into_iter()
@@ -207,24 +255,45 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
map map
}); });
// For each committee index for this slot: // Spawn a task that awaits the attestation data handle and then spawns aggregate tasks
// let attestation_service_clone = self.clone();
// - Create and publish an `Attestation` for all required validators. let executor = self.inner.executor.clone();
// - Create and publish `SignedAggregateAndProof` for all aggregating validators. self.inner.executor.spawn(
duties_by_committee_index async move {
.into_iter() // Log an error if the handle fails and return, skipping aggregates
.for_each(|(committee_index, validator_duties)| { let attestation_data = match attestation_data_handle.await {
// Spawn a separate task for each attestation. Ok(Some(Ok(data))) => data,
self.inner.executor.spawn_ignoring_error( Ok(Some(Err(err))) => {
self.clone().publish_attestations_and_aggregates( error!(?err, "Attestation production failed");
slot, return;
committee_index, }
validator_duties, Ok(None) | Err(_) => {
aggregate_production_instant, info!("Aborting attestation production due to shutdown");
), return;
"attestation publish", }
); };
});
// For each committee index for this slot:
// Create and publish `SignedAggregateAndProof` for all aggregating validators.
aggregate_duties_by_committee_index.into_iter().for_each(
|(committee_index, validator_duties)| {
let attestation_service = attestation_service_clone.clone();
let attestation_data = attestation_data.clone();
executor.spawn_ignoring_error(
attestation_service.handle_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
attestation_data,
),
"aggregate publish",
);
},
)
},
"attestation and aggregate publish",
);
// Schedule pruning of the slashing protection database once all unaggregated // Schedule pruning of the slashing protection database once all unaggregated
// attestations have (hopefully) been signed, i.e. at the same time as aggregate // attestations have (hopefully) been signed, i.e. at the same time as aggregate
@@ -234,114 +303,76 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(()) Ok(())
} }
/// Performs the first step of the attesting process: downloading `Attestation` objects,
/// signing them and returning them to the validator.
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
///
/// ## Detail
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
#[instrument( #[instrument(
name = "attestation_duty_cycle", name = "handle_aggregates",
skip_all, skip_all,
fields(%slot, %committee_index) fields(%slot, %committee_index)
)] )]
async fn publish_attestations_and_aggregates( async fn handle_aggregates(
self, self,
slot: Slot, slot: Slot,
committee_index: CommitteeIndex, committee_index: CommitteeIndex,
validator_duties: Vec<DutyAndProof>, validator_duties: Vec<DutyAndProof>,
aggregate_production_instant: Instant, aggregate_production_instant: Instant,
attestation_data: AttestationData,
) -> Result<(), ()> { ) -> Result<(), ()> {
let attestations_timer = validator_metrics::start_timer_vec( // There's not need to produce `SignedAggregateAndProof` if we do not have
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);
// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
// any validators for the given `slot` and `committee_index`. // any validators for the given `slot` and `committee_index`.
if validator_duties.is_empty() { if validator_duties.is_empty() {
return Ok(()); return Ok(());
} }
// Step 1. // Wait until the `aggregation_production_instant` (2/3rds
// // of the way though the slot). As verified in the
// Download, sign and publish an `Attestation` for each validator. // `delay_triggers_when_in_the_past` test, this code will still run
let attestation_opt = self // even if the instant has already elapsed.
.produce_and_publish_attestations(slot, committee_index, &validator_duties) sleep_until(aggregate_production_instant).await;
// Start the metrics timer *after* we've done the delay.
let _aggregates_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES],
);
// Download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties)
.await .await
.map_err(move |e| { .map_err(move |e| {
crit!( crit!(
error = format!("{:?}", e), error = format!("{:?}", e),
committee_index, committee_index,
slot = slot.as_u64(), slot = slot.as_u64(),
"Error during attestation routine" "Error during aggregate attestation routine"
) )
})?; })?;
drop(attestations_timer);
// Step 2.
//
// If an attestation was produced, make an aggregate.
if let Some(attestation_data) = attestation_opt {
// First, wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
sleep_until(aggregate_production_instant).await;
// Start the metrics timer *after* we've done the delay.
let _aggregates_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES],
);
// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(
&attestation_data,
committee_index,
&validator_duties,
)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
)
})?;
}
Ok(()) Ok(())
} }
/// Performs the first step of the attesting process: downloading `Attestation` objects, /// Performs the main steps of the attesting process: signing and publishing to the BN.
/// signing them and returning them to the validator.
/// ///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting /// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/validator.md#attesting
/// ///
/// ## Detail /// ## Detail
/// ///
/// The given `validator_duties` should already be filtered to only contain those that match /// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case. /// `slot`. Critical errors will be logged if this is not the case.
/// #[instrument(skip_all, fields(%slot, %attestation_data.beacon_block_root))]
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each async fn sign_and_publish_attestations(
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
#[instrument(skip_all, fields(%slot, %committee_index))]
async fn produce_and_publish_attestations(
&self, &self,
slot: Slot, slot: Slot,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof], validator_duties: &[DutyAndProof],
) -> Result<Option<AttestationData>, String> { attestation_data: AttestationData,
) -> Result<(), String> {
let _attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);
if validator_duties.is_empty() { if validator_duties.is_empty() {
return Ok(None); return Ok(());
} }
let current_epoch = self let current_epoch = self
@@ -350,23 +381,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.ok_or("Unable to determine current slot from clock")? .ok_or("Unable to determine current slot from clock")?
.epoch(S::E::slots_per_epoch()); .epoch(S::E::slots_per_epoch());
let attestation_data = self
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.instrument(info_span!("fetch_attestation_data"))
.await
.map_err(|e| e.to_string())?;
// Create futures to produce signed `Attestation` objects. // Create futures to produce signed `Attestation` objects.
let attestation_data_ref = &attestation_data; let attestation_data_ref = &attestation_data;
let signing_futures = validator_duties.iter().map(|duty_and_proof| { let signing_futures = validator_duties.iter().map(|duty_and_proof| {
@@ -426,7 +440,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
info = "a validator may have recently been removed from this VC", info = "a validator may have recently been removed from this VC",
pubkey = ?pubkey, pubkey = ?pubkey,
validator = ?duty.pubkey, validator = ?duty.pubkey,
committee_index = committee_index,
slot = slot.as_u64(), slot = slot.as_u64(),
"Missing pubkey for attestation" "Missing pubkey for attestation"
); );
@@ -436,7 +449,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
crit!( crit!(
error = ?e, error = ?e,
validator = ?duty.pubkey, validator = ?duty.pubkey,
committee_index,
slot = slot.as_u64(), slot = slot.as_u64(),
"Failed to sign attestation" "Failed to sign attestation"
); );
@@ -460,7 +472,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
if attestations.is_empty() { if attestations.is_empty() {
warn!("No attestations were published"); warn!("No attestations were published");
return Ok(None); return Ok(());
} }
let fork_name = self let fork_name = self
.chain_spec .chain_spec
@@ -525,7 +537,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
), ),
} }
Ok(Some(attestation_data)) Ok(())
} }
/// Performs the second step of the attesting process: downloading an aggregated `Attestation`, /// Performs the second step of the attesting process: downloading an aggregated `Attestation`,