Refactor/stream vc vote publishing (#8880)

Changes four `ValidatorStore` batch signing methods to return `impl Stream` instead of `Future`. Services consume the stream and publish each batch as it arrives.  No behavioral change for lh since `LighthouseValidatorStore` wraps everything in `stream::once`

Also replaces anonymous tuples in method signatures with named structs


Co-Authored-By: shane-moore <skm1790@gmail.com>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>

Co-Authored-By: Mac L <mjladson@pm.me>
This commit is contained in:
Shane K Moore
2026-03-12 02:53:32 -07:00
committed by GitHub
parent e1e97e6df0
commit 4b3a9d3d10
8 changed files with 740 additions and 543 deletions

View File

@@ -1,6 +1,6 @@
use crate::duties_service::{DutiesService, DutyAndProof};
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, beacon_head_monitor::HeadEvent};
use futures::future::join_all;
use futures::StreamExt;
use logging::crit;
use slot_clock::SlotClock;
use std::collections::HashMap;
@@ -13,7 +13,7 @@ use tokio::time::{Duration, Instant, sleep, sleep_until};
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use tree_hash::TreeHash;
use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Hash256, Slot};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
use validator_store::{AggregateToSign, AttestationToSign, ValidatorStore};
/// Builds an `AttestationService`.
#[derive(Default)]
@@ -560,12 +560,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
}
};
attestations_to_sign.push((
duty.validator_index,
duty.pubkey,
duty.validator_committee_index as usize,
attestations_to_sign.push(AttestationToSign {
validator_index: duty.validator_index,
pubkey: duty.pubkey,
validator_committee_index: duty.validator_committee_index as usize,
attestation,
));
});
}
if attestations_to_sign.is_empty() {
@@ -573,83 +573,95 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
return Ok(());
}
// Sign and check all attestations (includes slashing protection).
let safe_attestations = self
.validator_store
.sign_attestations(attestations_to_sign)
.await
.map_err(|e| format!("Failed to sign attestations: {e:?}"))?;
let attestation_stream = self.validator_store.sign_attestations(attestations_to_sign);
tokio::pin!(attestation_stream);
if safe_attestations.is_empty() {
warn!("No attestations were published");
return Ok(());
}
let fork_name = self
.chain_spec
.fork_name_at_slot::<S::E>(attestation_data.slot);
let single_attestations = safe_attestations
.iter()
.filter_map(|(i, a)| {
match a.to_single_attestation_with_attester_index(*i) {
Ok(a) => Some(a),
Err(e) => {
// This shouldn't happen unless BN and VC are out of sync with
// respect to the Electra fork.
error!(
error = ?e,
// Publish each batch as it arrives from the stream.
let mut received_non_empty_batch = false;
while let Some(result) = attestation_stream.next().await {
match result {
Ok(batch) if !batch.is_empty() => {
received_non_empty_batch = true;
let single_attestations = batch
.iter()
.filter_map(|(attester_index, attestation)| {
match attestation
.to_single_attestation_with_attester_index(*attester_index)
{
Ok(single_attestation) => Some(single_attestation),
Err(e) => {
// This shouldn't happen unless BN and VC are out of sync with
// respect to the Electra fork.
error!(
error = ?e,
committee_index = attestation_data.index,
slot = slot.as_u64(),
"type" = "unaggregated",
"Unable to convert to SingleAttestation"
);
None
}
}
})
.collect::<Vec<_>>();
let single_attestations = &single_attestations;
let validator_indices = single_attestations
.iter()
.map(|att| att.attester_index)
.collect::<Vec<_>>();
let published_count = single_attestations.len();
// Post the attestations to the BN.
match self
.beacon_nodes
.request(ApiTopic::Attestations, |beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations_v2::<S::E>(
single_attestations.clone(),
fork_name,
)
.await
})
.instrument(info_span!("publish_attestations", count = published_count))
.await
{
Ok(()) => info!(
count = published_count,
validator_indices = ?validator_indices,
head_block = ?attestation_data.beacon_block_root,
committee_index = attestation_data.index,
slot = attestation_data.slot.as_u64(),
"type" = "unaggregated",
"Successfully published attestations"
),
Err(e) => error!(
error = %e,
committee_index = attestation_data.index,
slot = slot.as_u64(),
"type" = "unaggregated",
"Unable to convert to SingleAttestation"
);
None
"Unable to publish attestations"
),
}
}
})
.collect::<Vec<_>>();
let single_attestations = &single_attestations;
let validator_indices = single_attestations
.iter()
.map(|att| att.attester_index)
.collect::<Vec<_>>();
let published_count = single_attestations.len();
Err(e) => {
crit!(error = ?e, "Failed to sign attestations");
}
_ => {}
}
}
// Post the attestations to the BN.
match self
.beacon_nodes
.request(ApiTopic::Attestations, |beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations_v2::<S::E>(
single_attestations.clone(),
fork_name,
)
.await
})
.instrument(info_span!("publish_attestations", count = published_count))
.await
{
Ok(()) => info!(
count = published_count,
validator_indices = ?validator_indices,
head_block = ?attestation_data.beacon_block_root,
committee_index = attestation_data.index,
slot = attestation_data.slot.as_u64(),
"type" = "unaggregated",
"Successfully published attestations"
),
Err(e) => error!(
error = %e,
committee_index = attestation_data.index,
slot = slot.as_u64(),
"type" = "unaggregated",
"Unable to publish attestations"
),
if !received_non_empty_batch {
warn!("No attestations were published");
}
Ok(())
@@ -725,113 +737,103 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.await
.map_err(|e| e.to_string())?;
// Create futures to produce the signed aggregated attestations.
let signing_futures = validator_duties.iter().map(|duty_and_proof| async move {
let duty = &duty_and_proof.duty;
let selection_proof = duty_and_proof.selection_proof.as_ref()?;
if !duty.match_attestation_data::<S::E>(attestation_data, &self.chain_spec) {
crit!("Inconsistent validator duties during signing");
return None;
}
match self
.validator_store
.produce_signed_aggregate_and_proof(
duty.pubkey,
duty.validator_index,
aggregated_attestation.clone(),
selection_proof.clone(),
)
.await
{
Ok(aggregate) => Some(aggregate),
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(?pubkey, "Missing pubkey for aggregate");
None
}
Err(e) => {
crit!(
error = ?e,
pubkey = ?duty.pubkey,
"Failed to sign aggregate"
);
None
}
}
});
// Execute all the futures in parallel, collecting any successful results.
let aggregator_count = validator_duties
// Build the batch of aggregates to sign.
let aggregates_to_sign: Vec<_> = validator_duties
.iter()
.filter(|d| d.selection_proof.is_some())
.count();
let signed_aggregate_and_proofs = join_all(signing_futures)
.instrument(info_span!("sign_aggregates", count = aggregator_count))
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
.filter_map(|duty_and_proof| {
let duty = &duty_and_proof.duty;
let selection_proof = duty_and_proof.selection_proof.as_ref()?;
if !signed_aggregate_and_proofs.is_empty() {
let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice();
match self
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_validator_aggregate_and_proof_v2(
signed_aggregate_and_proofs_slice,
fork_name,
)
.await
} else {
beacon_node
.post_validator_aggregate_and_proof_v1(
signed_aggregate_and_proofs_slice,
)
.await
}
if !duty.match_attestation_data::<S::E>(attestation_data, &self.chain_spec) {
crit!("Inconsistent validator duties during signing");
return None;
}
Some(AggregateToSign {
pubkey: duty.pubkey,
aggregator_index: duty.validator_index,
aggregate: aggregated_attestation.clone(),
selection_proof: selection_proof.clone(),
})
.instrument(info_span!(
"publish_aggregates",
count = signed_aggregate_and_proofs.len()
))
.await
{
Ok(()) => {
for signed_aggregate_and_proof in signed_aggregate_and_proofs {
let attestation = signed_aggregate_and_proof.message().aggregate();
info!(
aggregator = signed_aggregate_and_proof.message().aggregator_index(),
signatures = attestation.num_set_aggregation_bits(),
head_block = format!("{:?}", attestation.data().beacon_block_root),
committee_index = attestation.committee_index(),
slot = attestation.data().slot.as_u64(),
"type" = "aggregated",
"Successfully published attestation"
);
})
.collect();
// Sign aggregates. Returns a stream of batches.
let aggregate_stream = self
.validator_store
.sign_aggregate_and_proofs(aggregates_to_sign);
tokio::pin!(aggregate_stream);
// Publish each batch as it arrives from the stream.
while let Some(result) = aggregate_stream.next().await {
match result {
Ok(batch) if !batch.is_empty() => {
let signed_aggregate_and_proofs = batch.as_slice();
match self
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_validator_aggregate_and_proof_v2(
signed_aggregate_and_proofs,
fork_name,
)
.await
} else {
beacon_node
.post_validator_aggregate_and_proof_v1(
signed_aggregate_and_proofs,
)
.await
}
})
.instrument(info_span!(
"publish_aggregates",
count = signed_aggregate_and_proofs.len()
))
.await
{
Ok(()) => {
for signed_aggregate_and_proof in signed_aggregate_and_proofs {
let attestation = signed_aggregate_and_proof.message().aggregate();
info!(
aggregator =
signed_aggregate_and_proof.message().aggregator_index(),
signatures = attestation.num_set_aggregation_bits(),
head_block =
format!("{:?}", attestation.data().beacon_block_root),
committee_index = attestation.committee_index(),
slot = attestation.data().slot.as_u64(),
"type" = "aggregated",
"Successfully published attestation"
);
}
}
Err(e) => {
for signed_aggregate_and_proof in signed_aggregate_and_proofs {
let attestation = &signed_aggregate_and_proof.message().aggregate();
crit!(
error = %e,
aggregator = signed_aggregate_and_proof
.message()
.aggregator_index(),
committee_index = attestation.committee_index(),
slot = attestation.data().slot.as_u64(),
"type" = "aggregated",
"Failed to publish attestation"
);
}
}
}
}
Err(e) => {
for signed_aggregate_and_proof in signed_aggregate_and_proofs {
let attestation = &signed_aggregate_and_proof.message().aggregate();
crit!(
error = %e,
aggregator = signed_aggregate_and_proof.message().aggregator_index(),
committee_index = attestation.committee_index(),
slot = attestation.data().slot.as_u64(),
"type" = "aggregated",
"Failed to publish attestation"
);
}
crit!(error = ?e, "Failed to sign aggregates");
}
_ => {}
}
}