Implement selections Beacon API endpoints to support DVT middleware (#7016)

* #6610


  - [x] Add `beacon_committee_selections` endpoint
- [x] Test beacon committee aggregator and confirmed working
- [x] Add `sync_committee_selections` endpoint
- [x] Test sync committee aggregator and confirmed working
This commit is contained in:
chonghe
2025-09-03 11:50:41 +08:00
committed by GitHub
parent 7b5be8b1e7
commit a93cafee08
8 changed files with 658 additions and 237 deletions

View File

@@ -55,11 +55,13 @@ pub const JSON_CONTENT_TYPE_HEADER: &str = "application/json";
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
const HTTP_ATTESTATION_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24; // For DVT involving middleware only
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24; // For DVT involving middleware only
const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
@@ -150,11 +152,13 @@ pub struct Timeouts {
pub attestation: Duration,
pub attester_duties: Duration,
pub attestation_subscriptions: Duration,
pub attestation_aggregators: Duration,
pub liveness: Duration,
pub proposal: Duration,
pub proposer_duties: Duration,
pub sync_committee_contribution: Duration,
pub sync_duties: Duration,
pub sync_aggregators: Duration,
pub get_beacon_blocks_ssz: Duration,
pub get_debug_beacon_states: Duration,
pub get_deposit_snapshot: Duration,
@@ -168,11 +172,13 @@ impl Timeouts {
attestation: timeout,
attester_duties: timeout,
attestation_subscriptions: timeout,
attestation_aggregators: timeout,
liveness: timeout,
proposal: timeout,
proposer_duties: timeout,
sync_committee_contribution: timeout,
sync_duties: timeout,
sync_aggregators: timeout,
get_beacon_blocks_ssz: timeout,
get_debug_beacon_states: timeout,
get_deposit_snapshot: timeout,
@@ -187,12 +193,14 @@ impl Timeouts {
attester_duties: base_timeout / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
attestation_subscriptions: base_timeout
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
attestation_aggregators: base_timeout / HTTP_ATTESTATION_AGGREGATOR_TIMEOUT_QUOTIENT,
liveness: base_timeout / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
proposal: base_timeout / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
proposer_duties: base_timeout / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
sync_committee_contribution: base_timeout
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
sync_duties: base_timeout / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
sync_aggregators: base_timeout / HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT,
get_beacon_blocks_ssz: base_timeout / HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT,
get_debug_beacon_states: base_timeout / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
get_deposit_snapshot: base_timeout / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
@@ -2841,6 +2849,42 @@ impl BeaconNodeHttpClient {
)
.await
}
/// `POST validator/beacon_committee_selections`
pub async fn post_validator_beacon_committee_selections(
&self,
selections: &[BeaconCommitteeSelection],
) -> Result<GenericResponse<Vec<BeaconCommitteeSelection>>, Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("validator")
.push("beacon_committee_selections");
self.post_with_timeout_and_response(
path,
&selections,
self.timeouts.attestation_aggregators,
)
.await
}
/// `POST validator/sync_committee_selections`
pub async fn post_validator_sync_committee_selections(
&self,
selections: &[SyncCommitteeSelection],
) -> Result<GenericResponse<Vec<SyncCommitteeSelection>>, Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("validator")
.push("sync_committee_selections");
self.post_with_timeout_and_response(path, &selections, self.timeouts.sync_aggregators)
.await
}
}
/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an

View File

@@ -967,6 +967,23 @@ pub struct PeerCount {
pub disconnecting: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BeaconCommitteeSelection {
#[serde(with = "serde_utils::quoted_u64")]
pub validator_index: u64,
pub slot: Slot,
pub selection_proof: Signature,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SyncCommitteeSelection {
#[serde(with = "serde_utils::quoted_u64")]
pub validator_index: u64,
pub slot: Slot,
#[serde(with = "serde_utils::quoted_u64")]
pub subcommittee_index: u64,
pub selection_proof: Signature,
}
// --------- Server Sent Event Types -----------
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]

View File

@@ -3,11 +3,13 @@ use crate::{
};
use ethereum_hashing::hash;
use safe_arith::{ArithError, SafeArith};
use serde::{Deserialize, Serialize};
use ssz::Encode;
use std::cmp;
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[derive(PartialEq, Debug, Clone)]
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct SelectionProof(Signature);
impl SelectionProof {

View File

@@ -7,12 +7,14 @@ use crate::{
};
use ethereum_hashing::hash;
use safe_arith::{ArithError, SafeArith};
use serde::{Deserialize, Serialize};
use ssz::Encode;
use ssz_types::typenum::Unsigned;
use std::cmp;
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[derive(PartialEq, Debug, Clone)]
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct SyncSelectionProof(Signature);
impl SyncSelectionProof {

View File

@@ -2,6 +2,7 @@ pub mod cli;
pub mod config;
use crate::cli::ValidatorClient;
use crate::duties_service::SelectionProofConfig;
pub use config::Config;
use initialized_validators::InitializedValidators;
use metrics::set_gauge;
@@ -55,6 +56,22 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
/// Compute attestation selection proofs this many slots before they are required.
///
/// At start-up selection proofs will be computed with less lookahead out of necessity.
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;
/// The attestation selection proof lookahead for those running with the --distributed flag.
const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1;
/// Fraction of a slot at which attestation selection proof signing should happen (2 means half way).
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
/// Number of epochs in advance to compute sync selection proofs when not in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
/// Number of slots in advance to compute sync selection proofs when in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
type ValidatorStore<E> = LighthouseValidatorStore<SystemTimeSlotClock, E>;
#[derive(Clone)]
@@ -407,6 +424,41 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true);
}
// Define a config to be pass to duties_service.
// The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode)
// Other DVT applications, e.g., Anchor can pass in different configs to suit different needs.
let attestation_selection_proof_config = if config.distributed {
SelectionProofConfig {
lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD_DVT,
computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM,
selections_endpoint: true,
parallel_sign: true,
}
} else {
SelectionProofConfig {
lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD,
computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM,
selections_endpoint: false,
parallel_sign: false,
}
};
let sync_selection_proof_config = if config.distributed {
SelectionProofConfig {
lookahead_slot: AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED,
computation_offset: Duration::default(),
selections_endpoint: true,
parallel_sign: true,
}
} else {
SelectionProofConfig {
lookahead_slot: E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS,
computation_offset: Duration::default(),
selections_endpoint: false,
parallel_sign: false,
}
};
let duties_service = Arc::new(
DutiesServiceBuilder::new()
.slot_clock(slot_clock.clone())
@@ -415,7 +467,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.spec(context.eth2_config.spec.clone())
.executor(context.executor.clone())
.enable_high_validator_count_metrics(config.enable_high_validator_count_metrics)
.distributed(config.distributed)
.attestation_selection_proof_config(attestation_selection_proof_config)
.sync_selection_proof_config(sync_selection_proof_config)
.disable_attesting(config.disable_attesting)
.build()?,
);

View File

@@ -11,10 +11,14 @@ use crate::sync::SyncDutiesMap;
use crate::sync::poll_sync_committee_duties;
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use eth2::types::{
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse,
ProposerData, StateId, ValidatorId,
};
use futures::{StreamExt, stream};
use parking_lot::RwLock;
use futures::{
StreamExt,
stream::{self, FuturesUnordered},
};
use parking_lot::{RwLock, RwLockWriteGuard};
use safe_arith::{ArithError, SafeArith};
use slot_clock::SlotClock;
use std::cmp::min;
@@ -32,17 +36,6 @@ use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, Validato
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
/// Compute attestation selection proofs this many slots before they are required.
///
/// At start-up selection proofs will be computed with less lookahead out of necessity.
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;
/// The attestation selection proof lookahead for those running with the --distributed flag.
const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1;
/// Fraction of a slot at which selection proof signing should happen (2 means half way).
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
/// Minimum number of validators for which we auto-enable per-validator metrics.
/// For validators greater than this value, we need to manually set the `enable-per-validator-metrics`
/// flag in the cli to enable collection of per validator metrics.
@@ -121,18 +114,97 @@ pub struct SubscriptionSlots {
duty_slot: Slot,
}
#[derive(Copy, Clone, Debug)]
pub struct SelectionProofConfig {
pub lookahead_slot: u64,
/// The seconds to compute the selection proof before a slot.
pub computation_offset: Duration,
/// Whether to call the selections endpoint, true for DVT with middleware.
pub selections_endpoint: bool,
/// Whether to sign the selection proof in parallel, true in distributed mode.
pub parallel_sign: bool,
}
/// The default config for selection proofs covers the non-DVT case.
impl Default for SelectionProofConfig {
fn default() -> Self {
Self {
lookahead_slot: 0,
computation_offset: Duration::default(),
selections_endpoint: false,
parallel_sign: false,
}
}
}
/// Create a selection proof for `duty`.
///
/// Return `Ok(None)` if the attesting validator is not an aggregator.
async fn make_selection_proof<S: ValidatorStore + 'static>(
async fn make_selection_proof<S: ValidatorStore + 'static, T: SlotClock>(
duty: &AttesterData,
validator_store: &S,
spec: &ChainSpec,
beacon_nodes: &Arc<BeaconNodeFallback<T>>,
config: &SelectionProofConfig,
) -> Result<Option<SelectionProof>, Error<S::Error>> {
let selection_proof = validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
.map_err(Error::FailedToProduceSelectionProof)?;
let selection_proof = if config.selections_endpoint {
let beacon_committee_selection = BeaconCommitteeSelection {
validator_index: duty.validator_index,
slot: duty.slot,
// This is partial selection proof
selection_proof: validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
.map_err(Error::FailedToProduceSelectionProof)?
.into(),
};
// Call the endpoint /eth/v1/validator/beacon_committee_selections
// by sending the BeaconCommitteeSelection that contains partial selection proof
// The middleware should return BeaconCommitteeSelection that contains full selection proof
let middleware_response = beacon_nodes
.first_success(|beacon_node| {
let selection_data = beacon_committee_selection.clone();
debug!(
"validator_index" = duty.validator_index,
"slot" = %duty.slot,
"partial selection proof" = ?beacon_committee_selection.selection_proof,
"Sending selection to middleware"
);
async move {
beacon_node
.post_validator_beacon_committee_selections(&[selection_data])
.await
}
})
.await;
let response_data = middleware_response
.map_err(|e| {
Error::FailedToProduceSelectionProof(ValidatorStoreError::Middleware(e.to_string()))
})?
.data
.pop()
.ok_or_else(|| {
Error::FailedToProduceSelectionProof(ValidatorStoreError::Middleware(format!(
"attestation selection proof - empty response for validator {}",
duty.validator_index
)))
})?;
debug!(
"validator_index" = response_data.validator_index,
"slot" = %response_data.slot,
// The selection proof from middleware response will be a full selection proof
"full selection proof" = ?response_data.selection_proof,
"Received selection from middleware"
);
SelectionProof::from(response_data.selection_proof)
} else {
validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
.map_err(Error::FailedToProduceSelectionProof)?
};
selection_proof
.is_aggregator(duty.committee_length as usize, spec)
@@ -217,8 +289,10 @@ pub struct DutiesServiceBuilder<S, T> {
spec: Option<Arc<ChainSpec>>,
//// Whether we permit large validator counts in the metrics.
enable_high_validator_count_metrics: bool,
/// If this validator is running in distributed mode.
distributed: bool,
/// Create attestation selection proof config
attestation_selection_proof_config: SelectionProofConfig,
/// Create sync selection proof config
sync_selection_proof_config: SelectionProofConfig,
disable_attesting: bool,
}
@@ -237,7 +311,8 @@ impl<S, T> DutiesServiceBuilder<S, T> {
executor: None,
spec: None,
enable_high_validator_count_metrics: false,
distributed: false,
attestation_selection_proof_config: SelectionProofConfig::default(),
sync_selection_proof_config: SelectionProofConfig::default(),
disable_attesting: false,
}
}
@@ -275,8 +350,19 @@ impl<S, T> DutiesServiceBuilder<S, T> {
self
}
pub fn distributed(mut self, distributed: bool) -> Self {
self.distributed = distributed;
pub fn attestation_selection_proof_config(
mut self,
attestation_selection_proof_config: SelectionProofConfig,
) -> Self {
self.attestation_selection_proof_config = attestation_selection_proof_config;
self
}
pub fn sync_selection_proof_config(
mut self,
sync_selection_proof_config: SelectionProofConfig,
) -> Self {
self.sync_selection_proof_config = sync_selection_proof_config;
self
}
@@ -289,7 +375,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
Ok(DutiesService {
attesters: Default::default(),
proposers: Default::default(),
sync_duties: SyncDutiesMap::new(self.distributed),
sync_duties: SyncDutiesMap::new(self.sync_selection_proof_config),
validator_store: self
.validator_store
.ok_or("Cannot build DutiesService without validator_store")?,
@@ -305,7 +391,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
.ok_or("Cannot build DutiesService without executor")?,
spec: self.spec.ok_or("Cannot build DutiesService without spec")?,
enable_high_validator_count_metrics: self.enable_high_validator_count_metrics,
distributed: self.distributed,
selection_proof_config: self.attestation_selection_proof_config,
disable_attesting: self.disable_attesting,
})
}
@@ -332,10 +418,10 @@ pub struct DutiesService<S, T> {
pub executor: TaskExecutor,
/// The current chain spec.
pub spec: Arc<ChainSpec>,
//// Whether we permit large validator counts in the metrics.
/// Whether we permit large validator counts in the metrics.
pub enable_high_validator_count_metrics: bool,
/// If this validator is running in distributed mode.
pub distributed: bool,
/// Pass the config for distributed or non-distributed mode.
pub selection_proof_config: SelectionProofConfig,
pub disable_attesting: bool,
}
@@ -1119,6 +1205,75 @@ async fn post_validator_duties_attester<S: ValidatorStore, T: SlotClock + 'stati
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
}
// Create a helper function here to reduce code duplication for normal and distributed mode
fn process_duty_and_proof<S: ValidatorStore>(
attesters: &mut RwLockWriteGuard<AttesterMap>,
result: Result<(AttesterData, Option<SelectionProof>), Error<S::Error>>,
dependent_root: Hash256,
current_slot: Slot,
) -> bool {
let (duty, selection_proof) = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(Error::FailedToProduceSelectionProof(ValidatorStoreError::UnknownPubkey(pubkey))) => {
// A pubkey can be missing when a validator was recently removed via the API.
warn!(
info = "A validator may have recently been removed from this VC",
?pubkey,
"Missing pubkey for duty and proof"
);
// Do not abort the entire batch for a single failure.
// return true means continue processing duties.
return true;
}
Err(e) => {
error!(
error = ?e,
msg = "may impair attestation duties",
"Failed to produce duty and proof"
);
return true;
}
};
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(S::E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
let Some(selection_proof) = selection_proof else {
return true;
};
let (existing_dependent_root, existing_duty) = entry.get_mut();
if *existing_dependent_root == dependent_root {
// Replace existing proof.
existing_duty.selection_proof = Some(selection_proof);
true
} else {
// Our selection proofs are no longer relevant due to a reorg, abandon this entire background process.
debug!(
reason = "re-org",
"Stopping selection proof background task"
);
false
}
}
hash_map::Entry::Vacant(entry) => {
// This probably shouldn't happen, but we have enough info to fill in the entry so we may as well.
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
let duty_and_proof = DutyAndProof {
duty,
selection_proof,
subscription_slots,
};
entry.insert((dependent_root, duty_and_proof));
true
}
}
}
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
@@ -1138,26 +1293,33 @@ async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 's
// At halfway through each slot when nothing else is likely to be getting signed, sign a batch
// of selection proofs and insert them into the duties service `attesters` map.
let slot_clock = &duties_service.slot_clock;
let slot_offset = duties_service.slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM;
while !duties_by_slot.is_empty() {
if let Some(duration) = slot_clock.duration_to_next_slot() {
sleep(duration.saturating_sub(slot_offset)).await;
sleep(
duration.saturating_sub(duties_service.selection_proof_config.computation_offset),
)
.await;
let Some(current_slot) = slot_clock.now() else {
continue;
};
let selection_lookahead = if duties_service.distributed {
SELECTION_PROOF_SLOT_LOOKAHEAD_DVT
} else {
SELECTION_PROOF_SLOT_LOOKAHEAD
};
let selection_lookahead = duties_service.selection_proof_config.lookahead_slot;
let lookahead_slot = current_slot + selection_lookahead;
let mut relevant_duties = duties_by_slot.split_off(&lookahead_slot);
std::mem::swap(&mut relevant_duties, &mut duties_by_slot);
let relevant_duties = if duties_service.selection_proof_config.parallel_sign {
// Remove old slot duties and only keep current duties in distributed mode
duties_by_slot
.remove(&lookahead_slot)
.map(|duties| BTreeMap::from([(lookahead_slot, duties)]))
.unwrap_or_default()
} else {
let mut duties = duties_by_slot.split_off(&lookahead_slot);
std::mem::swap(&mut duties, &mut duties_by_slot);
duties
};
let batch_size = relevant_duties.values().map(Vec::len).sum::<usize>();
@@ -1170,87 +1332,69 @@ async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 's
&[validator_metrics::ATTESTATION_SELECTION_PROOFS],
);
// Sign selection proofs (serially).
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
.then(|duty| async {
let opt_selection_proof = make_selection_proof(
&duty,
duties_service.validator_store.as_ref(),
&duties_service.spec,
)
.await?;
Ok((duty, opt_selection_proof))
})
.collect::<Vec<_>>()
.await;
// In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties,
// as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
if duties_service.selection_proof_config.parallel_sign {
let mut duty_and_proof_results = relevant_duties
.into_values()
.flatten()
.map(|duty| async {
let opt_selection_proof = make_selection_proof(
&duty,
duties_service.validator_store.as_ref(),
&duties_service.spec,
&duties_service.beacon_nodes,
&duties_service.selection_proof_config,
)
.await?;
Ok((duty, opt_selection_proof))
})
.collect::<FuturesUnordered<_>>();
// Add to attesters store.
let mut attesters = duties_service.attesters.write();
for result in duty_and_proof_results {
let (duty, selection_proof) = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(Error::FailedToProduceSelectionProof(
ValidatorStoreError::UnknownPubkey(pubkey),
)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
warn!(
info = "a validator may have recently been removed from this VC",
?pubkey,
"Missing pubkey for duty and proof"
);
// Do not abort the entire batch for a single failure.
continue;
}
Err(e) => {
error!(
error = ?e,
msg = "may impair attestation duties",
"Failed to produce duty and proof"
);
// Do not abort the entire batch for a single failure.
continue;
}
};
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(S::E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
let Some(selection_proof) = selection_proof else {
continue;
};
let (existing_dependent_root, existing_duty) = entry.get_mut();
if *existing_dependent_root == dependent_root {
// Replace existing proof.
existing_duty.selection_proof = Some(selection_proof);
} else {
// Our selection proofs are no longer relevant due to a reorg, abandon
// this entire background process.
debug!(
reason = "re-org",
"Stopping selection proof background task"
);
return;
}
}
hash_map::Entry::Vacant(entry) => {
// This probably shouldn't happen, but we have enough info to fill in the
// entry so we may as well.
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
let duty_and_proof = DutyAndProof {
duty,
selection_proof,
subscription_slots,
};
entry.insert((dependent_root, duty_and_proof));
while let Some(result) = duty_and_proof_results.next().await {
let mut attesters = duties_service.attesters.write();
// if process_duty_and_proof returns false, exit the loop
if !process_duty_and_proof::<S>(
&mut attesters,
result,
dependent_root,
current_slot,
) {
return;
}
}
}
drop(attesters);
} else {
// In normal (non-distributed case), sign selection proofs serially
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
.then(|duty| async {
let opt_selection_proof = make_selection_proof(
&duty,
duties_service.validator_store.as_ref(),
&duties_service.spec,
&duties_service.beacon_nodes,
&duties_service.selection_proof_config,
)
.await?;
Ok((duty, opt_selection_proof))
})
.collect::<Vec<_>>()
.await;
// Add to attesters store.
let mut attesters = duties_service.attesters.write();
for result in duty_and_proof_results {
if !process_duty_and_proof::<S>(
&mut attesters,
result,
dependent_root,
current_slot,
) {
return;
}
}
drop(attesters);
};
let time_taken_ms =
Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();

View File

@@ -1,19 +1,16 @@
use crate::duties_service::{DutiesService, Error};
use crate::duties_service::{DutiesService, Error, SelectionProofConfig};
use eth2::types::SyncCommitteeSelection;
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use logging::crit;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
/// Number of epochs in advance to compute selection proofs when not in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
/// Number of slots in advance to compute selection proofs when in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
/// Top-level data-structure containing sync duty information.
///
/// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained
@@ -30,7 +27,7 @@ pub struct SyncDutiesMap {
/// Map from sync committee period to duties for members of that sync committee.
committees: RwLock<HashMap<u64, CommitteeDuties>>,
/// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute.
distributed: bool,
pub selection_proof_config: SelectionProofConfig,
}
/// Duties for a single sync committee period.
@@ -79,10 +76,10 @@ pub struct SlotDuties {
}
impl SyncDutiesMap {
pub fn new(distributed: bool) -> Self {
pub fn new(selection_proof_config: SelectionProofConfig) -> Self {
Self {
committees: RwLock::new(HashMap::new()),
distributed,
selection_proof_config,
}
}
@@ -99,15 +96,6 @@ impl SyncDutiesMap {
})
}
/// Number of slots in advance to compute selection proofs
fn aggregation_pre_compute_slots<E: EthSpec>(&self) -> u64 {
if self.distributed {
AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED
} else {
E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS
}
}
/// Prepare for pre-computation of selection proofs for `committee_period`.
///
/// Return the slot up to which proofs should be pre-computed, as well as a vec of
@@ -123,7 +111,7 @@ impl SyncDutiesMap {
current_slot,
first_slot_of_period::<E>(committee_period, spec),
);
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots::<E>();
let pre_compute_lookahead_slots = self.selection_proof_config.lookahead_slot;
let pre_compute_slot = std::cmp::min(
current_slot + pre_compute_lookahead_slots,
last_slot_of_period::<E>(committee_period, spec),
@@ -377,7 +365,7 @@ pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotCloc
}
// Pre-compute aggregator selection proofs for the next period.
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots::<S::E>();
let aggregate_pre_compute_lookahead_slots = sync_duties.selection_proof_config.lookahead_slot;
if (current_slot + aggregate_pre_compute_lookahead_slots)
.epoch(S::E::slots_per_epoch())
.sync_committee_period(spec)?
@@ -498,6 +486,114 @@ pub async fn poll_sync_committee_duties_for_period<S: ValidatorStore, T: SlotClo
Ok(())
}
// Create a helper function here to reduce code duplication for normal and distributed mode
pub async fn make_sync_selection_proof<S: ValidatorStore, T: SlotClock>(
duties_service: &Arc<DutiesService<S, T>>,
duty: &SyncDuty,
proof_slot: Slot,
subnet_id: SyncSubnetId,
) -> Option<SyncSelectionProof> {
let sync_selection_proof = duties_service
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, subnet_id)
.await;
let selection_proof = match sync_selection_proof {
Ok(proof) => proof,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently removed via the API
debug!(
?pubkey,
"slot" = %proof_slot,
"Missing pubkey for sync selection proof");
return None;
}
Err(e) => {
warn!(
"error" = ?e,
"pubkey" = ?duty.pubkey,
"slot" = %proof_slot,
"Unable to sign selection proof"
);
return None;
}
};
// In DVT with middleware, when we want to call the selections endpoint
if duties_service
.sync_duties
.selection_proof_config
.selections_endpoint
{
debug!(
"validator_index" = duty.validator_index,
"slot" = %proof_slot,
"subcommittee_index" = *subnet_id,
// This is partial selection proof
"partial selection proof" = ?selection_proof,
"Sending sync selection to middleware"
);
let sync_committee_selection = SyncCommitteeSelection {
validator_index: duty.validator_index,
slot: proof_slot,
subcommittee_index: *subnet_id,
selection_proof: selection_proof.clone().into(),
};
// Call the endpoint /eth/v1/validator/sync_committee_selections
// by sending the SyncCommitteeSelection that contains partial sync selection proof
// The middleware should return SyncCommitteeSelection that contains full sync selection proof
let middleware_response = duties_service
.beacon_nodes
.first_success(|beacon_node| {
let selection_data = sync_committee_selection.clone();
async move {
beacon_node
.post_validator_sync_committee_selections(&[selection_data])
.await
}
})
.await;
match middleware_response {
Ok(mut response) => {
let Some(response_data) = response.data.pop() else {
error!(
validator_index = duty.validator_index,
slot = %proof_slot,
"Empty response from sync selection middleware",
);
return None;
};
debug!(
"validator_index" = response_data.validator_index,
"slot" = %response_data.slot,
"subcommittee_index" = response_data.subcommittee_index,
// The selection proof from middleware response will be a full selection proof
"full selection proof" = ?response_data.selection_proof,
"Received sync selection from middleware"
);
// Convert the response to a SyncSelectionProof
let full_selection_proof = SyncSelectionProof::from(response_data.selection_proof);
Some(full_selection_proof)
}
Err(e) => {
error!(
"error" = %e,
%proof_slot,
"Failed to get sync selection proofs from middleware"
);
None
}
}
} else {
// In non-distributed mode, the selection_proof is already a full selection proof
Some(selection_proof)
}
}
pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: Arc<DutiesService<S, T>>,
pre_compute_duties: &[(Slot, SyncDuty)],
@@ -505,131 +601,193 @@ pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'stati
current_slot: Slot,
pre_compute_slot: Slot,
) {
debug!(
period = sync_committee_period,
%current_slot,
%pre_compute_slot,
"Calculating sync selection proofs"
);
// Start at the next slot, as aggregation proofs for the duty at the current slot are no longer
// required since we do the actual aggregation in the slot before the duty slot.
let start_slot = current_slot.as_u64() + 1;
// Generate selection proofs for each validator at each slot, one slot at a time.
for slot in (start_slot..=pre_compute_slot.as_u64()).map(Slot::new) {
let mut validator_proofs = vec![];
for (validator_start_slot, duty) in pre_compute_duties {
// Proofs are already known at this slot for this validator.
if slot < *validator_start_slot {
continue;
}
// For distributed mode
if duties_service
.sync_duties
.selection_proof_config
.parallel_sign
{
let mut futures_unordered = FuturesUnordered::new();
let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
error = ?e,
"Arithmetic error computing subnet IDs"
);
continue;
}
};
// Create futures to produce proofs.
let duties_service_ref = &duties_service;
let futures = subnet_ids.iter().map(|subnet_id| async move {
// Construct proof for prior slot.
let proof_slot = slot - 1;
let proof = match duties_service_ref
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
.await
{
Ok(proof) => proof,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
?pubkey,
pubkey = ?duty.pubkey,
slot = %proof_slot,
"Missing pubkey for sync selection proof"
);
return None;
}
for (_, duty) in pre_compute_duties {
let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
warn!(
error = ?e,
pubkey = ?duty.pubkey,
slot = %proof_slot,
"Unable to sign selection proof"
crit!(
"error" = ?e,
"Arithmetic error computing subnet IDs"
);
return None;
continue;
}
};
// Construct proof for prior slot.
let proof_slot = slot - 1;
// Calling the make_sync_selection_proof will return a full selection proof
for &subnet_id in &subnet_ids {
let duties_service = duties_service.clone();
futures_unordered.push(async move {
let result =
make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id)
.await;
result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof))
});
}
}
while let Some(result) = futures_unordered.next().await {
let Some((validator_index, proof_slot, subnet_id, proof)) = result else {
continue;
};
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!("period" = sync_committee_period, "Missing sync duties");
continue;
};
let validators = committee_duties.validators.read();
// Check if the validator is an aggregator
match proof.is_aggregator::<S::E>() {
Ok(true) => {
debug!(
validator_index = duty.validator_index,
slot = %proof_slot,
%subnet_id,
"Validator is sync aggregator"
);
Some(((proof_slot, *subnet_id), proof))
if let Some(Some(duty)) = validators.get(&validator_index) {
debug!(
validator_index,
"slot" = %proof_slot,
"subcommittee_index" = *subnet_id,
// log full selection proof for debugging
"full selection proof" = ?proof,
"Validator is sync aggregator"
);
// Store the proof
duty.aggregation_duties
.proofs
.write()
.insert((proof_slot, subnet_id), proof);
}
}
Ok(false) => None,
Ok(false) => {} // Not an aggregator
Err(e) => {
warn!(
pubkey = ?duty.pubkey,
slot = %proof_slot,
error = ?e,
validator_index,
%slot,
"error" = ?e,
"Error determining is_aggregator"
);
None
}
}
});
}
} else {
// For non-distributed mode
debug!(
period = sync_committee_period,
%current_slot,
%pre_compute_slot,
"Calculating sync selection proofs"
);
// Execute all the futures in parallel, collecting any successful results.
let proofs = join_all(futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
let mut validator_proofs = vec![];
for (validator_start_slot, duty) in pre_compute_duties {
// Proofs are already known at this slot for this validator.
if slot < *validator_start_slot {
continue;
}
validator_proofs.push((duty.validator_index, proofs));
}
let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
error = ?e,
"Arithmetic error computing subnet IDs"
);
continue;
}
};
// Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(period = sync_committee_period, "Missing sync duties");
continue;
};
let validators = committee_duties.validators.read();
let num_validators_updated = validator_proofs.len();
// Create futures to produce proofs.
let duties_service_ref = &duties_service;
let futures = subnet_ids.iter().map(|subnet_id| async move {
// Construct proof for prior slot.
let proof_slot = slot - 1;
for (validator_index, proofs) in validator_proofs {
if let Some(Some(duty)) = validators.get(&validator_index) {
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
let proof =
make_sync_selection_proof(duties_service_ref, duty, proof_slot, *subnet_id)
.await;
match proof {
Some(proof) => match proof.is_aggregator::<S::E>() {
Ok(true) => {
debug!(
validator_index = duty.validator_index,
slot = %proof_slot,
%subnet_id,
"Validator is sync aggregator"
);
Some(((proof_slot, *subnet_id), proof))
}
Ok(false) => None,
Err(e) => {
warn!(
pubkey = ?duty.pubkey,
slot = %proof_slot,
error = ?e,
"Error determining is_aggregator"
);
None
}
},
None => None,
}
});
// Execute all the futures in parallel, collecting any successful results.
let proofs = join_all(futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
validator_proofs.push((duty.validator_index, proofs));
}
// Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(period = sync_committee_period, "Missing sync duties");
continue;
};
let validators = committee_duties.validators.read();
let num_validators_updated = validator_proofs.len();
for (validator_index, proofs) in validator_proofs {
if let Some(Some(duty)) = validators.get(&validator_index) {
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
debug!(
validator_index,
period = sync_committee_period,
"Missing sync duty to update"
);
}
}
if num_validators_updated > 0 {
debug!(
validator_index,
period = sync_committee_period,
"Missing sync duty to update"
%slot,
updated_validators = num_validators_updated,
"Finished computing sync selection proofs"
);
}
}
if num_validators_updated > 0 {
debug!(
%slot,
updated_validators = num_validators_updated,
"Finished computing sync selection proofs"
);
}
}
}

View File

@@ -21,6 +21,7 @@ pub enum Error<T> {
GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch },
UnableToSignAttestation(AttestationError),
SpecificError(T),
Middleware(String),
}
impl<T> From<T> for Error<T> {