mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-16 12:28:24 +00:00
Merge branch 'dvt' into into-anchor
This commit is contained in:
@@ -135,11 +135,13 @@ pub struct Timeouts {
|
|||||||
pub attestation: Duration,
|
pub attestation: Duration,
|
||||||
pub attester_duties: Duration,
|
pub attester_duties: Duration,
|
||||||
pub attestation_subscriptions: Duration,
|
pub attestation_subscriptions: Duration,
|
||||||
|
pub attestation_aggregators: Duration,
|
||||||
pub liveness: Duration,
|
pub liveness: Duration,
|
||||||
pub proposal: Duration,
|
pub proposal: Duration,
|
||||||
pub proposer_duties: Duration,
|
pub proposer_duties: Duration,
|
||||||
pub sync_committee_contribution: Duration,
|
pub sync_committee_contribution: Duration,
|
||||||
pub sync_duties: Duration,
|
pub sync_duties: Duration,
|
||||||
|
pub sync_aggregators: Duration,
|
||||||
pub get_beacon_blocks_ssz: Duration,
|
pub get_beacon_blocks_ssz: Duration,
|
||||||
pub get_debug_beacon_states: Duration,
|
pub get_debug_beacon_states: Duration,
|
||||||
pub get_deposit_snapshot: Duration,
|
pub get_deposit_snapshot: Duration,
|
||||||
@@ -153,11 +155,13 @@ impl Timeouts {
|
|||||||
attestation: timeout,
|
attestation: timeout,
|
||||||
attester_duties: timeout,
|
attester_duties: timeout,
|
||||||
attestation_subscriptions: timeout,
|
attestation_subscriptions: timeout,
|
||||||
|
attestation_aggregators: timeout,
|
||||||
liveness: timeout,
|
liveness: timeout,
|
||||||
proposal: timeout,
|
proposal: timeout,
|
||||||
proposer_duties: timeout,
|
proposer_duties: timeout,
|
||||||
sync_committee_contribution: timeout,
|
sync_committee_contribution: timeout,
|
||||||
sync_duties: timeout,
|
sync_duties: timeout,
|
||||||
|
sync_aggregators: timeout,
|
||||||
get_beacon_blocks_ssz: timeout,
|
get_beacon_blocks_ssz: timeout,
|
||||||
get_debug_beacon_states: timeout,
|
get_debug_beacon_states: timeout,
|
||||||
get_deposit_snapshot: timeout,
|
get_deposit_snapshot: timeout,
|
||||||
@@ -2732,6 +2736,42 @@ impl BeaconNodeHttpClient {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `POST validator/beacon_committee_selections`
|
||||||
|
pub async fn post_validator_beacon_committee_selections(
|
||||||
|
&self,
|
||||||
|
selections: &[BeaconCommitteeSelection],
|
||||||
|
) -> Result<GenericResponse<Vec<BeaconCommitteeSelection>>, Error> {
|
||||||
|
let mut path = self.eth_path(V1)?;
|
||||||
|
|
||||||
|
path.path_segments_mut()
|
||||||
|
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||||
|
.push("validator")
|
||||||
|
.push("beacon_committee_selections");
|
||||||
|
|
||||||
|
self.post_with_timeout_and_response(
|
||||||
|
path,
|
||||||
|
&selections,
|
||||||
|
self.timeouts.attestation_aggregators,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `POST validator/sync_committee_selections`
|
||||||
|
pub async fn post_validator_sync_committee_selections(
|
||||||
|
&self,
|
||||||
|
selections: &[SyncCommitteeSelection],
|
||||||
|
) -> Result<GenericResponse<Vec<SyncCommitteeSelection>>, Error> {
|
||||||
|
let mut path = self.eth_path(V1)?;
|
||||||
|
|
||||||
|
path.path_segments_mut()
|
||||||
|
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
|
||||||
|
.push("validator")
|
||||||
|
.push("sync_committee_selections");
|
||||||
|
|
||||||
|
self.post_with_timeout_and_response(path, &selections, self.timeouts.sync_aggregators)
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an
|
/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an
|
||||||
|
|||||||
@@ -930,6 +930,23 @@ pub struct PeerCount {
|
|||||||
pub disconnecting: u64,
|
pub disconnecting: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct BeaconCommitteeSelection {
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub validator_index: u64,
|
||||||
|
pub slot: Slot,
|
||||||
|
pub selection_proof: Signature,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct SyncCommitteeSelection {
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub validator_index: u64,
|
||||||
|
pub slot: Slot,
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub subcommittee_index: u64,
|
||||||
|
pub selection_proof: Signature,
|
||||||
|
}
|
||||||
// --------- Server Sent Event Types -----------
|
// --------- Server Sent Event Types -----------
|
||||||
|
|
||||||
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
||||||
|
|||||||
@@ -3,10 +3,12 @@ use crate::{
|
|||||||
};
|
};
|
||||||
use ethereum_hashing::hash;
|
use ethereum_hashing::hash;
|
||||||
use safe_arith::{ArithError, SafeArith};
|
use safe_arith::{ArithError, SafeArith};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use ssz::Encode;
|
use ssz::Encode;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
|
||||||
#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone)]
|
#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
pub struct SelectionProof(Signature);
|
pub struct SelectionProof(Signature);
|
||||||
|
|
||||||
impl SelectionProof {
|
impl SelectionProof {
|
||||||
|
|||||||
@@ -7,11 +7,13 @@ use crate::{
|
|||||||
};
|
};
|
||||||
use ethereum_hashing::hash;
|
use ethereum_hashing::hash;
|
||||||
use safe_arith::{ArithError, SafeArith};
|
use safe_arith::{ArithError, SafeArith};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use ssz::Encode;
|
use ssz::Encode;
|
||||||
use ssz_types::typenum::Unsigned;
|
use ssz_types::typenum::Unsigned;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
|
||||||
#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone)]
|
#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
pub struct SyncSelectionProof(Signature);
|
pub struct SyncSelectionProof(Signature);
|
||||||
|
|
||||||
impl SyncSelectionProof {
|
impl SyncSelectionProof {
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ mod latency;
|
|||||||
mod notifier;
|
mod notifier;
|
||||||
|
|
||||||
use crate::cli::ValidatorClient;
|
use crate::cli::ValidatorClient;
|
||||||
|
use crate::duties_service::SelectionProofConfig;
|
||||||
pub use config::Config;
|
pub use config::Config;
|
||||||
use initialized_validators::InitializedValidators;
|
use initialized_validators::InitializedValidators;
|
||||||
use metrics::set_gauge;
|
use metrics::set_gauge;
|
||||||
@@ -59,11 +60,13 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
|
|||||||
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
|
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
|
||||||
|
const HTTP_ATTESTATION_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24; // For DVT involving middleware only
|
||||||
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
|
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
|
||||||
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
|
const HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24; // For DVT involving middleware only
|
||||||
const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
|
const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
|
||||||
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
|
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
|
||||||
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
|
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
|
||||||
@@ -72,6 +75,22 @@ const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4;
|
|||||||
|
|
||||||
const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
|
const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
|
||||||
|
|
||||||
|
/// Compute attestation selection proofs this many slots before they are required.
|
||||||
|
///
|
||||||
|
/// At start-up selection proofs will be computed with less lookahead out of necessity.
|
||||||
|
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;
|
||||||
|
|
||||||
|
/// The attestation selection proof lookahead for those running with the --distributed flag.
|
||||||
|
const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1;
|
||||||
|
|
||||||
|
/// Fraction of a slot at which attestation selection proof signing should happen (2 means half way).
|
||||||
|
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
|
||||||
|
|
||||||
|
/// Number of epochs in advance to compute sync selection proofs when not in `distributed` mode.
|
||||||
|
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
|
||||||
|
/// Number of slots in advance to compute sync selection proofs when in `distributed` mode.
|
||||||
|
pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
|
||||||
|
|
||||||
type ValidatorStore<E> = LighthouseValidatorStore<SystemTimeSlotClock, E>;
|
type ValidatorStore<E> = LighthouseValidatorStore<SystemTimeSlotClock, E>;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -297,12 +316,15 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
|||||||
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
|
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
|
||||||
attestation_subscriptions: slot_duration
|
attestation_subscriptions: slot_duration
|
||||||
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
|
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
|
||||||
|
attestation_aggregators: slot_duration
|
||||||
|
/ HTTP_ATTESTATION_AGGREGATOR_TIMEOUT_QUOTIENT,
|
||||||
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
|
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
|
||||||
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
|
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
|
||||||
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
|
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
|
||||||
sync_committee_contribution: slot_duration
|
sync_committee_contribution: slot_duration
|
||||||
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
|
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
|
||||||
sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
|
sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
|
||||||
|
sync_aggregators: slot_duration / HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT,
|
||||||
get_beacon_blocks_ssz: slot_duration
|
get_beacon_blocks_ssz: slot_duration
|
||||||
/ HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT,
|
/ HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT,
|
||||||
get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
|
get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
|
||||||
@@ -442,6 +464,41 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
|||||||
validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true);
|
validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Define a config to be pass to duties_service.
|
||||||
|
// The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode)
|
||||||
|
// Other DVT applications, e.g., Anchor can pass in different configs to suit different needs.
|
||||||
|
let attestation_selection_proof_config = if config.distributed {
|
||||||
|
SelectionProofConfig {
|
||||||
|
lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD_DVT,
|
||||||
|
computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM,
|
||||||
|
selections_endpoint: true,
|
||||||
|
parallel_sign: true,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SelectionProofConfig {
|
||||||
|
lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD,
|
||||||
|
computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM,
|
||||||
|
selections_endpoint: false,
|
||||||
|
parallel_sign: false,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let sync_selection_proof_config = if config.distributed {
|
||||||
|
SelectionProofConfig {
|
||||||
|
lookahead_slot: AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED,
|
||||||
|
computation_offset: Duration::default(),
|
||||||
|
selections_endpoint: true,
|
||||||
|
parallel_sign: true,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SelectionProofConfig {
|
||||||
|
lookahead_slot: E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS,
|
||||||
|
computation_offset: Duration::default(),
|
||||||
|
selections_endpoint: false,
|
||||||
|
parallel_sign: false,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let duties_service = Arc::new(
|
let duties_service = Arc::new(
|
||||||
DutiesServiceBuilder::new()
|
DutiesServiceBuilder::new()
|
||||||
.slot_clock(slot_clock.clone())
|
.slot_clock(slot_clock.clone())
|
||||||
@@ -450,7 +507,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
|||||||
.spec(context.eth2_config.spec.clone())
|
.spec(context.eth2_config.spec.clone())
|
||||||
.executor(context.executor.clone())
|
.executor(context.executor.clone())
|
||||||
.enable_high_validator_count_metrics(config.enable_high_validator_count_metrics)
|
.enable_high_validator_count_metrics(config.enable_high_validator_count_metrics)
|
||||||
.distributed(config.distributed)
|
.attestation_selection_proof_config(attestation_selection_proof_config)
|
||||||
|
.sync_selection_proof_config(sync_selection_proof_config)
|
||||||
.disable_attesting(config.disable_attesting)
|
.disable_attesting(config.disable_attesting)
|
||||||
.build()?,
|
.build()?,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -11,10 +11,14 @@ use crate::sync::poll_sync_committee_duties;
|
|||||||
use crate::sync::SyncDutiesMap;
|
use crate::sync::SyncDutiesMap;
|
||||||
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
|
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
|
||||||
use eth2::types::{
|
use eth2::types::{
|
||||||
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
|
AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse,
|
||||||
|
ProposerData, StateId, ValidatorId,
|
||||||
};
|
};
|
||||||
use futures::{stream, StreamExt};
|
use futures::{
|
||||||
use parking_lot::RwLock;
|
stream::{self, FuturesUnordered},
|
||||||
|
StreamExt,
|
||||||
|
};
|
||||||
|
use parking_lot::{RwLock, RwLockWriteGuard};
|
||||||
use safe_arith::{ArithError, SafeArith};
|
use safe_arith::{ArithError, SafeArith};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
@@ -32,17 +36,6 @@ use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, Validato
|
|||||||
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
|
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
|
||||||
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
|
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
|
||||||
|
|
||||||
/// Compute attestation selection proofs this many slots before they are required.
|
|
||||||
///
|
|
||||||
/// At start-up selection proofs will be computed with less lookahead out of necessity.
|
|
||||||
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;
|
|
||||||
|
|
||||||
/// The attestation selection proof lookahead for those running with the --distributed flag.
|
|
||||||
const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1;
|
|
||||||
|
|
||||||
/// Fraction of a slot at which selection proof signing should happen (2 means half way).
|
|
||||||
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
|
|
||||||
|
|
||||||
/// Minimum number of validators for which we auto-enable per-validator metrics.
|
/// Minimum number of validators for which we auto-enable per-validator metrics.
|
||||||
/// For validators greater than this value, we need to manually set the `enable-per-validator-metrics`
|
/// For validators greater than this value, we need to manually set the `enable-per-validator-metrics`
|
||||||
/// flag in the cli to enable collection of per validator metrics.
|
/// flag in the cli to enable collection of per validator metrics.
|
||||||
@@ -121,18 +114,87 @@ pub struct SubscriptionSlots {
|
|||||||
duty_slot: Slot,
|
duty_slot: Slot,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
pub struct SelectionProofConfig {
|
||||||
|
pub lookahead_slot: u64,
|
||||||
|
pub computation_offset: Duration, // The seconds to compute the selection proof before a slot
|
||||||
|
pub selections_endpoint: bool, // whether to call the selections endpoint, true for DVT with middleware
|
||||||
|
pub parallel_sign: bool, // whether to sign the selection proof in parallel, true in distributed mode
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SelectionProofConfig {
|
||||||
|
// Create a default associated function to be passed in DutiesServiceBuilder::new()
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
lookahead_slot: 0,
|
||||||
|
computation_offset: Duration::default(),
|
||||||
|
selections_endpoint: false,
|
||||||
|
parallel_sign: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a selection proof for `duty`.
|
/// Create a selection proof for `duty`.
|
||||||
///
|
///
|
||||||
/// Return `Ok(None)` if the attesting validator is not an aggregator.
|
/// Return `Ok(None)` if the attesting validator is not an aggregator.
|
||||||
async fn make_selection_proof<S: ValidatorStore + 'static>(
|
async fn make_selection_proof<S: ValidatorStore + 'static, T: SlotClock>(
|
||||||
duty: &AttesterData,
|
duty: &AttesterData,
|
||||||
validator_store: &S,
|
validator_store: &S,
|
||||||
spec: &ChainSpec,
|
spec: &ChainSpec,
|
||||||
|
beacon_nodes: &Arc<BeaconNodeFallback<T>>,
|
||||||
|
config: &SelectionProofConfig,
|
||||||
) -> Result<Option<SelectionProof>, Error<S::Error>> {
|
) -> Result<Option<SelectionProof>, Error<S::Error>> {
|
||||||
let selection_proof = validator_store
|
let selection_proof = if config.selections_endpoint {
|
||||||
.produce_selection_proof(duty.pubkey, duty.slot)
|
let beacon_committee_selection = BeaconCommitteeSelection {
|
||||||
.await
|
validator_index: duty.validator_index,
|
||||||
.map_err(Error::FailedToProduceSelectionProof)?;
|
slot: duty.slot,
|
||||||
|
// This is partial selection proof
|
||||||
|
selection_proof: validator_store
|
||||||
|
.produce_selection_proof(duty.pubkey, duty.slot)
|
||||||
|
.await
|
||||||
|
.map_err(Error::FailedToProduceSelectionProof)?
|
||||||
|
.into(),
|
||||||
|
};
|
||||||
|
// Call the endpoint /eth/v1/validator/beacon_committee_selections
|
||||||
|
// by sending the BeaconCommitteeSelection that contains partial selection proof
|
||||||
|
// The middleware should return BeaconCommitteeSelection that contains full selection proof
|
||||||
|
let middleware_response = beacon_nodes
|
||||||
|
.first_success(|beacon_node| {
|
||||||
|
let selection_data = beacon_committee_selection.clone();
|
||||||
|
debug!(
|
||||||
|
"validator_index" = duty.validator_index,
|
||||||
|
"slot" = %duty.slot,
|
||||||
|
"partial selection proof" = ?beacon_committee_selection.selection_proof,
|
||||||
|
"Sending selection to middleware"
|
||||||
|
);
|
||||||
|
async move {
|
||||||
|
beacon_node
|
||||||
|
.post_validator_beacon_committee_selections(&[selection_data])
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let response_data = &middleware_response
|
||||||
|
.map_err(|e| {
|
||||||
|
Error::FailedToProduceSelectionProof(ValidatorStoreError::Middleware(e.to_string()))
|
||||||
|
})?
|
||||||
|
.data[0];
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"validator_index" = response_data.validator_index,
|
||||||
|
"slot" = %response_data.slot,
|
||||||
|
// The selection proof from middleware response will be a full selection proof
|
||||||
|
"full selection proof" = ?response_data.selection_proof,
|
||||||
|
"Received selection from middleware"
|
||||||
|
);
|
||||||
|
SelectionProof::from(response_data.selection_proof.clone())
|
||||||
|
} else {
|
||||||
|
validator_store
|
||||||
|
.produce_selection_proof(duty.pubkey, duty.slot)
|
||||||
|
.await
|
||||||
|
.map_err(Error::FailedToProduceSelectionProof)?
|
||||||
|
};
|
||||||
|
|
||||||
selection_proof
|
selection_proof
|
||||||
.is_aggregator(duty.committee_length as usize, spec)
|
.is_aggregator(duty.committee_length as usize, spec)
|
||||||
@@ -217,8 +279,10 @@ pub struct DutiesServiceBuilder<S, T> {
|
|||||||
spec: Option<Arc<ChainSpec>>,
|
spec: Option<Arc<ChainSpec>>,
|
||||||
//// Whether we permit large validator counts in the metrics.
|
//// Whether we permit large validator counts in the metrics.
|
||||||
enable_high_validator_count_metrics: bool,
|
enable_high_validator_count_metrics: bool,
|
||||||
/// If this validator is running in distributed mode.
|
/// Create attestation selection proof config
|
||||||
distributed: bool,
|
attestation_selection_proof_config: SelectionProofConfig,
|
||||||
|
/// Create sync selection proof config
|
||||||
|
sync_selection_proof_config: SelectionProofConfig,
|
||||||
disable_attesting: bool,
|
disable_attesting: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -237,7 +301,8 @@ impl<S, T> DutiesServiceBuilder<S, T> {
|
|||||||
executor: None,
|
executor: None,
|
||||||
spec: None,
|
spec: None,
|
||||||
enable_high_validator_count_metrics: false,
|
enable_high_validator_count_metrics: false,
|
||||||
distributed: false,
|
attestation_selection_proof_config: SelectionProofConfig::default(),
|
||||||
|
sync_selection_proof_config: SelectionProofConfig::default(),
|
||||||
disable_attesting: false,
|
disable_attesting: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -275,8 +340,19 @@ impl<S, T> DutiesServiceBuilder<S, T> {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn distributed(mut self, distributed: bool) -> Self {
|
pub fn attestation_selection_proof_config(
|
||||||
self.distributed = distributed;
|
mut self,
|
||||||
|
attestation_selection_proof_config: SelectionProofConfig,
|
||||||
|
) -> Self {
|
||||||
|
self.attestation_selection_proof_config = attestation_selection_proof_config;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sync_selection_proof_config(
|
||||||
|
mut self,
|
||||||
|
sync_selection_proof_config: SelectionProofConfig,
|
||||||
|
) -> Self {
|
||||||
|
self.sync_selection_proof_config = sync_selection_proof_config;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -289,7 +365,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
|
|||||||
Ok(DutiesService {
|
Ok(DutiesService {
|
||||||
attesters: Default::default(),
|
attesters: Default::default(),
|
||||||
proposers: Default::default(),
|
proposers: Default::default(),
|
||||||
sync_duties: SyncDutiesMap::new(self.distributed),
|
sync_duties: SyncDutiesMap::new(self.sync_selection_proof_config),
|
||||||
validator_store: self
|
validator_store: self
|
||||||
.validator_store
|
.validator_store
|
||||||
.ok_or("Cannot build DutiesService without validator_store")?,
|
.ok_or("Cannot build DutiesService without validator_store")?,
|
||||||
@@ -305,7 +381,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
|
|||||||
.ok_or("Cannot build DutiesService without executor")?,
|
.ok_or("Cannot build DutiesService without executor")?,
|
||||||
spec: self.spec.ok_or("Cannot build DutiesService without spec")?,
|
spec: self.spec.ok_or("Cannot build DutiesService without spec")?,
|
||||||
enable_high_validator_count_metrics: self.enable_high_validator_count_metrics,
|
enable_high_validator_count_metrics: self.enable_high_validator_count_metrics,
|
||||||
distributed: self.distributed,
|
selection_proof_config: self.attestation_selection_proof_config,
|
||||||
disable_attesting: self.disable_attesting,
|
disable_attesting: self.disable_attesting,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -332,10 +408,10 @@ pub struct DutiesService<S, T> {
|
|||||||
pub executor: TaskExecutor,
|
pub executor: TaskExecutor,
|
||||||
/// The current chain spec.
|
/// The current chain spec.
|
||||||
pub spec: Arc<ChainSpec>,
|
pub spec: Arc<ChainSpec>,
|
||||||
//// Whether we permit large validator counts in the metrics.
|
/// Whether we permit large validator counts in the metrics.
|
||||||
pub enable_high_validator_count_metrics: bool,
|
pub enable_high_validator_count_metrics: bool,
|
||||||
/// If this validator is running in distributed mode.
|
/// Pass the config for distributed or non-distributed mode.
|
||||||
pub distributed: bool,
|
pub selection_proof_config: SelectionProofConfig,
|
||||||
pub disable_attesting: bool,
|
pub disable_attesting: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1119,6 +1195,75 @@ async fn post_validator_duties_attester<S: ValidatorStore, T: SlotClock + 'stati
|
|||||||
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
|
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a helper function here to reduce code duplication for normal and distributed mode
|
||||||
|
fn process_duty_and_proof<S: ValidatorStore>(
|
||||||
|
attesters: &mut RwLockWriteGuard<AttesterMap>,
|
||||||
|
result: Result<(AttesterData, Option<SelectionProof>), Error<S::Error>>,
|
||||||
|
dependent_root: Hash256,
|
||||||
|
current_slot: Slot,
|
||||||
|
) -> bool {
|
||||||
|
let (duty, selection_proof) = match result {
|
||||||
|
Ok(duty_and_proof) => duty_and_proof,
|
||||||
|
Err(Error::FailedToProduceSelectionProof(ValidatorStoreError::UnknownPubkey(pubkey))) => {
|
||||||
|
// A pubkey can be missing when a validator was recently removed via the API.
|
||||||
|
warn!(
|
||||||
|
info = "A validator may have recently been removed from this VC",
|
||||||
|
?pubkey,
|
||||||
|
"Missing pubkey for duty and proof"
|
||||||
|
);
|
||||||
|
// Do not abort the entire batch for a single failure.
|
||||||
|
// return true means continue processing duties.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
error = ?e,
|
||||||
|
msg = "may impair attestation duties",
|
||||||
|
"Failed to produce duty and proof"
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let attester_map = attesters.entry(duty.pubkey).or_default();
|
||||||
|
let epoch = duty.slot.epoch(S::E::slots_per_epoch());
|
||||||
|
match attester_map.entry(epoch) {
|
||||||
|
hash_map::Entry::Occupied(mut entry) => {
|
||||||
|
// No need to update duties for which no proof was computed.
|
||||||
|
let Some(selection_proof) = selection_proof else {
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
let (existing_dependent_root, existing_duty) = entry.get_mut();
|
||||||
|
|
||||||
|
if *existing_dependent_root == dependent_root {
|
||||||
|
// Replace existing proof.
|
||||||
|
existing_duty.selection_proof = Some(selection_proof);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
// Our selection proofs are no longer relevant due to a reorg, abandon this entire background process.
|
||||||
|
debug!(
|
||||||
|
reason = "re-org",
|
||||||
|
"Stopping selection proof background task"
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hash_map::Entry::Vacant(entry) => {
|
||||||
|
// This probably shouldn't happen, but we have enough info to fill in the entry so we may as well.
|
||||||
|
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
|
||||||
|
let duty_and_proof = DutyAndProof {
|
||||||
|
duty,
|
||||||
|
selection_proof,
|
||||||
|
subscription_slots,
|
||||||
|
};
|
||||||
|
entry.insert((dependent_root, duty_and_proof));
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
|
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
|
||||||
///
|
///
|
||||||
/// Duties are computed in batches each slot. If a re-org is detected then the process will
|
/// Duties are computed in batches each slot. If a re-org is detected then the process will
|
||||||
@@ -1138,26 +1283,33 @@ async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 's
|
|||||||
// At halfway through each slot when nothing else is likely to be getting signed, sign a batch
|
// At halfway through each slot when nothing else is likely to be getting signed, sign a batch
|
||||||
// of selection proofs and insert them into the duties service `attesters` map.
|
// of selection proofs and insert them into the duties service `attesters` map.
|
||||||
let slot_clock = &duties_service.slot_clock;
|
let slot_clock = &duties_service.slot_clock;
|
||||||
let slot_offset = duties_service.slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM;
|
|
||||||
|
|
||||||
while !duties_by_slot.is_empty() {
|
while !duties_by_slot.is_empty() {
|
||||||
if let Some(duration) = slot_clock.duration_to_next_slot() {
|
if let Some(duration) = slot_clock.duration_to_next_slot() {
|
||||||
sleep(duration.saturating_sub(slot_offset)).await;
|
sleep(
|
||||||
|
duration.saturating_sub(duties_service.selection_proof_config.computation_offset),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
let Some(current_slot) = slot_clock.now() else {
|
let Some(current_slot) = slot_clock.now() else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
let selection_lookahead = if duties_service.distributed {
|
let selection_lookahead = duties_service.selection_proof_config.lookahead_slot;
|
||||||
SELECTION_PROOF_SLOT_LOOKAHEAD_DVT
|
|
||||||
} else {
|
|
||||||
SELECTION_PROOF_SLOT_LOOKAHEAD
|
|
||||||
};
|
|
||||||
|
|
||||||
let lookahead_slot = current_slot + selection_lookahead;
|
let lookahead_slot = current_slot + selection_lookahead;
|
||||||
|
|
||||||
let mut relevant_duties = duties_by_slot.split_off(&lookahead_slot);
|
let relevant_duties = if duties_service.selection_proof_config.parallel_sign {
|
||||||
std::mem::swap(&mut relevant_duties, &mut duties_by_slot);
|
// Remove old slot duties and only keep current duties in distributed mode
|
||||||
|
duties_by_slot
|
||||||
|
.remove(&lookahead_slot)
|
||||||
|
.map(|duties| BTreeMap::from([(lookahead_slot, duties)]))
|
||||||
|
.unwrap_or_default()
|
||||||
|
} else {
|
||||||
|
let mut duties = duties_by_slot.split_off(&lookahead_slot);
|
||||||
|
std::mem::swap(&mut duties, &mut duties_by_slot);
|
||||||
|
duties
|
||||||
|
};
|
||||||
|
|
||||||
let batch_size = relevant_duties.values().map(Vec::len).sum::<usize>();
|
let batch_size = relevant_duties.values().map(Vec::len).sum::<usize>();
|
||||||
|
|
||||||
@@ -1170,87 +1322,69 @@ async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 's
|
|||||||
&[validator_metrics::ATTESTATION_SELECTION_PROOFS],
|
&[validator_metrics::ATTESTATION_SELECTION_PROOFS],
|
||||||
);
|
);
|
||||||
|
|
||||||
// Sign selection proofs (serially).
|
// In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties,
|
||||||
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
|
// as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof
|
||||||
.then(|duty| async {
|
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
|
||||||
let opt_selection_proof = make_selection_proof(
|
if duties_service.selection_proof_config.parallel_sign {
|
||||||
&duty,
|
let mut duty_and_proof_results = relevant_duties
|
||||||
duties_service.validator_store.as_ref(),
|
.into_values()
|
||||||
&duties_service.spec,
|
.flatten()
|
||||||
)
|
.map(|duty| async {
|
||||||
.await?;
|
let opt_selection_proof = make_selection_proof(
|
||||||
Ok((duty, opt_selection_proof))
|
&duty,
|
||||||
})
|
duties_service.validator_store.as_ref(),
|
||||||
.collect::<Vec<_>>()
|
&duties_service.spec,
|
||||||
.await;
|
&duties_service.beacon_nodes,
|
||||||
|
&duties_service.selection_proof_config,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok((duty, opt_selection_proof))
|
||||||
|
})
|
||||||
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
// Add to attesters store.
|
while let Some(result) = duty_and_proof_results.next().await {
|
||||||
let mut attesters = duties_service.attesters.write();
|
let mut attesters = duties_service.attesters.write();
|
||||||
for result in duty_and_proof_results {
|
// if process_duty_and_proof returns false, exit the loop
|
||||||
let (duty, selection_proof) = match result {
|
if !process_duty_and_proof::<S>(
|
||||||
Ok(duty_and_proof) => duty_and_proof,
|
&mut attesters,
|
||||||
Err(Error::FailedToProduceSelectionProof(
|
result,
|
||||||
ValidatorStoreError::UnknownPubkey(pubkey),
|
dependent_root,
|
||||||
)) => {
|
current_slot,
|
||||||
// A pubkey can be missing when a validator was recently
|
) {
|
||||||
// removed via the API.
|
return;
|
||||||
warn!(
|
|
||||||
info = "a validator may have recently been removed from this VC",
|
|
||||||
?pubkey,
|
|
||||||
"Missing pubkey for duty and proof"
|
|
||||||
);
|
|
||||||
// Do not abort the entire batch for a single failure.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!(
|
|
||||||
error = ?e,
|
|
||||||
msg = "may impair attestation duties",
|
|
||||||
"Failed to produce duty and proof"
|
|
||||||
);
|
|
||||||
// Do not abort the entire batch for a single failure.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let attester_map = attesters.entry(duty.pubkey).or_default();
|
|
||||||
let epoch = duty.slot.epoch(S::E::slots_per_epoch());
|
|
||||||
match attester_map.entry(epoch) {
|
|
||||||
hash_map::Entry::Occupied(mut entry) => {
|
|
||||||
// No need to update duties for which no proof was computed.
|
|
||||||
let Some(selection_proof) = selection_proof else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
let (existing_dependent_root, existing_duty) = entry.get_mut();
|
|
||||||
|
|
||||||
if *existing_dependent_root == dependent_root {
|
|
||||||
// Replace existing proof.
|
|
||||||
existing_duty.selection_proof = Some(selection_proof);
|
|
||||||
} else {
|
|
||||||
// Our selection proofs are no longer relevant due to a reorg, abandon
|
|
||||||
// this entire background process.
|
|
||||||
debug!(
|
|
||||||
reason = "re-org",
|
|
||||||
"Stopping selection proof background task"
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hash_map::Entry::Vacant(entry) => {
|
|
||||||
// This probably shouldn't happen, but we have enough info to fill in the
|
|
||||||
// entry so we may as well.
|
|
||||||
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
|
|
||||||
let duty_and_proof = DutyAndProof {
|
|
||||||
duty,
|
|
||||||
selection_proof,
|
|
||||||
subscription_slots,
|
|
||||||
};
|
|
||||||
entry.insert((dependent_root, duty_and_proof));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
drop(attesters);
|
// In normal (non-distributed case), sign selection proofs serially
|
||||||
|
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
|
||||||
|
.then(|duty| async {
|
||||||
|
let opt_selection_proof = make_selection_proof(
|
||||||
|
&duty,
|
||||||
|
duties_service.validator_store.as_ref(),
|
||||||
|
&duties_service.spec,
|
||||||
|
&duties_service.beacon_nodes,
|
||||||
|
&duties_service.selection_proof_config,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok((duty, opt_selection_proof))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Add to attesters store.
|
||||||
|
let mut attesters = duties_service.attesters.write();
|
||||||
|
for result in duty_and_proof_results {
|
||||||
|
if !process_duty_and_proof::<S>(
|
||||||
|
&mut attesters,
|
||||||
|
result,
|
||||||
|
dependent_root,
|
||||||
|
current_slot,
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(attesters);
|
||||||
|
};
|
||||||
|
|
||||||
let time_taken_ms =
|
let time_taken_ms =
|
||||||
Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
|
Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
|
||||||
|
|||||||
@@ -1,19 +1,16 @@
|
|||||||
use crate::duties_service::{DutiesService, Error};
|
use crate::duties_service::{DutiesService, Error, SelectionProofConfig};
|
||||||
|
use eth2::types::{Signature, SyncCommitteeSelection};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use logging::crit;
|
use logging::crit;
|
||||||
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
|
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
|
||||||
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
|
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
|
||||||
|
|
||||||
/// Number of epochs in advance to compute selection proofs when not in `distributed` mode.
|
|
||||||
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
|
|
||||||
/// Number of slots in advance to compute selection proofs when in `distributed` mode.
|
|
||||||
pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
|
|
||||||
|
|
||||||
/// Top-level data-structure containing sync duty information.
|
/// Top-level data-structure containing sync duty information.
|
||||||
///
|
///
|
||||||
/// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained
|
/// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained
|
||||||
@@ -30,7 +27,7 @@ pub struct SyncDutiesMap {
|
|||||||
/// Map from sync committee period to duties for members of that sync committee.
|
/// Map from sync committee period to duties for members of that sync committee.
|
||||||
committees: RwLock<HashMap<u64, CommitteeDuties>>,
|
committees: RwLock<HashMap<u64, CommitteeDuties>>,
|
||||||
/// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute.
|
/// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute.
|
||||||
distributed: bool,
|
pub selection_proof_config: SelectionProofConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Duties for a single sync committee period.
|
/// Duties for a single sync committee period.
|
||||||
@@ -79,10 +76,10 @@ pub struct SlotDuties {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SyncDutiesMap {
|
impl SyncDutiesMap {
|
||||||
pub fn new(distributed: bool) -> Self {
|
pub fn new(selection_proof_config: SelectionProofConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
committees: RwLock::new(HashMap::new()),
|
committees: RwLock::new(HashMap::new()),
|
||||||
distributed,
|
selection_proof_config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,15 +96,6 @@ impl SyncDutiesMap {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Number of slots in advance to compute selection proofs
|
|
||||||
fn aggregation_pre_compute_slots<E: EthSpec>(&self) -> u64 {
|
|
||||||
if self.distributed {
|
|
||||||
AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED
|
|
||||||
} else {
|
|
||||||
E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Prepare for pre-computation of selection proofs for `committee_period`.
|
/// Prepare for pre-computation of selection proofs for `committee_period`.
|
||||||
///
|
///
|
||||||
/// Return the slot up to which proofs should be pre-computed, as well as a vec of
|
/// Return the slot up to which proofs should be pre-computed, as well as a vec of
|
||||||
@@ -123,7 +111,7 @@ impl SyncDutiesMap {
|
|||||||
current_slot,
|
current_slot,
|
||||||
first_slot_of_period::<E>(committee_period, spec),
|
first_slot_of_period::<E>(committee_period, spec),
|
||||||
);
|
);
|
||||||
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots::<E>();
|
let pre_compute_lookahead_slots = self.selection_proof_config.lookahead_slot;
|
||||||
let pre_compute_slot = std::cmp::min(
|
let pre_compute_slot = std::cmp::min(
|
||||||
current_slot + pre_compute_lookahead_slots,
|
current_slot + pre_compute_lookahead_slots,
|
||||||
last_slot_of_period::<E>(committee_period, spec),
|
last_slot_of_period::<E>(committee_period, spec),
|
||||||
@@ -377,7 +365,7 @@ pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotCloc
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Pre-compute aggregator selection proofs for the next period.
|
// Pre-compute aggregator selection proofs for the next period.
|
||||||
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots::<S::E>();
|
let aggregate_pre_compute_lookahead_slots = sync_duties.selection_proof_config.lookahead_slot;
|
||||||
if (current_slot + aggregate_pre_compute_lookahead_slots)
|
if (current_slot + aggregate_pre_compute_lookahead_slots)
|
||||||
.epoch(S::E::slots_per_epoch())
|
.epoch(S::E::slots_per_epoch())
|
||||||
.sync_committee_period(spec)?
|
.sync_committee_period(spec)?
|
||||||
@@ -498,6 +486,108 @@ pub async fn poll_sync_committee_duties_for_period<S: ValidatorStore, T: SlotClo
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a helper function here to reduce code duplication for normal and distributed mode
|
||||||
|
pub async fn make_sync_selection_proof<S: ValidatorStore, T: SlotClock>(
|
||||||
|
duties_service: &Arc<DutiesService<S, T>>,
|
||||||
|
duty: &SyncDuty,
|
||||||
|
proof_slot: Slot,
|
||||||
|
subnet_id: SyncSubnetId,
|
||||||
|
) -> Option<SyncSelectionProof> {
|
||||||
|
let sync_selection_proof = duties_service
|
||||||
|
.validator_store
|
||||||
|
.produce_sync_selection_proof(&duty.pubkey, proof_slot, subnet_id)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let selection_proof = match sync_selection_proof {
|
||||||
|
Ok(proof) => proof,
|
||||||
|
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
|
||||||
|
// A pubkey can be missing when a validator was recently removed via the API
|
||||||
|
debug!(
|
||||||
|
?pubkey,
|
||||||
|
"slot" = %proof_slot,
|
||||||
|
"Missing pubkey for sync selection proof");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"error" = ?e,
|
||||||
|
"pubkey" = ?duty.pubkey,
|
||||||
|
"slot" = %proof_slot,
|
||||||
|
"Unable to sign selection proof"
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// In DVT with middleware, when we want to call the selections endpoint
|
||||||
|
if duties_service
|
||||||
|
.sync_duties
|
||||||
|
.selection_proof_config
|
||||||
|
.selections_endpoint
|
||||||
|
{
|
||||||
|
debug!(
|
||||||
|
"validator_index" = duty.validator_index,
|
||||||
|
"slot" = %proof_slot,
|
||||||
|
"subcommittee_index" = *subnet_id,
|
||||||
|
// This is partial selection proof
|
||||||
|
"partial selection proof" = ?Signature::from(selection_proof.clone()),
|
||||||
|
"Sending sync selection to middleware"
|
||||||
|
);
|
||||||
|
|
||||||
|
let sync_committee_selection = SyncCommitteeSelection {
|
||||||
|
validator_index: duty.validator_index,
|
||||||
|
slot: proof_slot,
|
||||||
|
subcommittee_index: *subnet_id,
|
||||||
|
selection_proof: selection_proof.clone().into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Call the endpoint /eth/v1/validator/sync_committee_selections
|
||||||
|
// by sending the SyncCommitteeSelection that contains partial sync selection proof
|
||||||
|
// The middleware should return SyncCommitteeSelection that contains full sync selection proof
|
||||||
|
let middleware_response = duties_service
|
||||||
|
.beacon_nodes
|
||||||
|
.first_success(|beacon_node| {
|
||||||
|
let selection_data = sync_committee_selection.clone();
|
||||||
|
async move {
|
||||||
|
beacon_node
|
||||||
|
.post_validator_sync_committee_selections(&[selection_data])
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match middleware_response {
|
||||||
|
Ok(response) => {
|
||||||
|
let response_data = &response.data[0];
|
||||||
|
debug!(
|
||||||
|
"validator_index" = response_data.validator_index,
|
||||||
|
"slot" = %response_data.slot,
|
||||||
|
"subcommittee_index" = response_data.subcommittee_index,
|
||||||
|
// The selection proof from middleware response will be a full selection proof
|
||||||
|
"full selection proof" = ?response_data.selection_proof,
|
||||||
|
"Received sync selection from middleware"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Convert the response to a SyncSelectionProof
|
||||||
|
let full_selection_proof =
|
||||||
|
SyncSelectionProof::from(response_data.selection_proof.clone());
|
||||||
|
Some(full_selection_proof)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"error" = %e,
|
||||||
|
%proof_slot,
|
||||||
|
"Failed to get sync selection proofs from middleware"
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// In non-distributed mode, the selection_proof is already a full selection proof
|
||||||
|
Some(selection_proof)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'static>(
|
pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'static>(
|
||||||
duties_service: Arc<DutiesService<S, T>>,
|
duties_service: Arc<DutiesService<S, T>>,
|
||||||
pre_compute_duties: &[(Slot, SyncDuty)],
|
pre_compute_duties: &[(Slot, SyncDuty)],
|
||||||
@@ -505,127 +595,188 @@ pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'stati
|
|||||||
current_slot: Slot,
|
current_slot: Slot,
|
||||||
pre_compute_slot: Slot,
|
pre_compute_slot: Slot,
|
||||||
) {
|
) {
|
||||||
debug!(
|
|
||||||
period = sync_committee_period,
|
|
||||||
%current_slot,
|
|
||||||
%pre_compute_slot,
|
|
||||||
"Calculating sync selection proofs"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Generate selection proofs for each validator at each slot, one slot at a time.
|
// Generate selection proofs for each validator at each slot, one slot at a time.
|
||||||
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
|
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
|
||||||
let mut validator_proofs = vec![];
|
// For distributed mode
|
||||||
for (validator_start_slot, duty) in pre_compute_duties {
|
if duties_service
|
||||||
// Proofs are already known at this slot for this validator.
|
.sync_duties
|
||||||
if slot < *validator_start_slot {
|
.selection_proof_config
|
||||||
continue;
|
.parallel_sign
|
||||||
}
|
{
|
||||||
|
let mut futures_unordered = FuturesUnordered::new();
|
||||||
|
|
||||||
let subnet_ids = match duty.subnet_ids::<S::E>() {
|
for (_, duty) in pre_compute_duties {
|
||||||
Ok(subnet_ids) => subnet_ids,
|
let subnet_ids = match duty.subnet_ids::<S::E>() {
|
||||||
Err(e) => {
|
Ok(subnet_ids) => subnet_ids,
|
||||||
crit!(
|
|
||||||
error = ?e,
|
|
||||||
"Arithmetic error computing subnet IDs"
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Create futures to produce proofs.
|
|
||||||
let duties_service_ref = &duties_service;
|
|
||||||
let futures = subnet_ids.iter().map(|subnet_id| async move {
|
|
||||||
// Construct proof for prior slot.
|
|
||||||
let proof_slot = slot - 1;
|
|
||||||
|
|
||||||
let proof = match duties_service_ref
|
|
||||||
.validator_store
|
|
||||||
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(proof) => proof,
|
|
||||||
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
|
|
||||||
// A pubkey can be missing when a validator was recently
|
|
||||||
// removed via the API.
|
|
||||||
debug!(
|
|
||||||
?pubkey,
|
|
||||||
pubkey = ?duty.pubkey,
|
|
||||||
slot = %proof_slot,
|
|
||||||
"Missing pubkey for sync selection proof"
|
|
||||||
);
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(
|
crit!(
|
||||||
error = ?e,
|
"error" = ?e,
|
||||||
pubkey = ?duty.pubkey,
|
"Arithmetic error computing subnet IDs"
|
||||||
slot = %proof_slot,
|
|
||||||
"Unable to sign selection proof"
|
|
||||||
);
|
);
|
||||||
return None;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match proof.is_aggregator::<S::E>() {
|
// Construct proof for prior slot.
|
||||||
Ok(true) => {
|
let proof_slot = slot - 1;
|
||||||
debug!(
|
|
||||||
validator_index = duty.validator_index,
|
// Calling the make_sync_selection_proof will return a full selection proof
|
||||||
slot = %proof_slot,
|
for &subnet_id in &subnet_ids {
|
||||||
%subnet_id,
|
let duties_service = duties_service.clone();
|
||||||
"Validator is sync aggregator"
|
futures_unordered.push(async move {
|
||||||
);
|
let result =
|
||||||
Some(((proof_slot, *subnet_id), proof))
|
make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id)
|
||||||
}
|
.await;
|
||||||
Ok(false) => None,
|
|
||||||
Err(e) => {
|
result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof))
|
||||||
warn!(
|
});
|
||||||
pubkey = ?duty.pubkey,
|
}
|
||||||
slot = %proof_slot,
|
}
|
||||||
error = ?e,
|
|
||||||
"Error determining is_aggregator"
|
while let Some(result) = futures_unordered.next().await {
|
||||||
);
|
if let Some((validator_index, proof_slot, subnet_id, proof)) = result {
|
||||||
None
|
let sync_map = duties_service.sync_duties.committees.read();
|
||||||
|
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
|
||||||
|
debug!("period" = sync_committee_period, "Missing sync duties");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let validators = committee_duties.validators.read();
|
||||||
|
|
||||||
|
// Check if the validator is an aggregator
|
||||||
|
match proof.is_aggregator::<S::E>() {
|
||||||
|
Ok(true) => {
|
||||||
|
if let Some(Some(duty)) = validators.get(&validator_index) {
|
||||||
|
debug!(
|
||||||
|
validator_index,
|
||||||
|
"slot" = %proof_slot,
|
||||||
|
"subcommittee_index" = *subnet_id,
|
||||||
|
// log full selection proof for debugging
|
||||||
|
"full selection proof" = ?proof,
|
||||||
|
"Validator is sync aggregator"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Store the proof
|
||||||
|
duty.aggregation_duties
|
||||||
|
.proofs
|
||||||
|
.write()
|
||||||
|
.insert((proof_slot, subnet_id), proof);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(false) => {} // Not an aggregator
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
validator_index,
|
||||||
|
%slot,
|
||||||
|
"error" = ?e,
|
||||||
|
"Error determining is_aggregator"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
} else {
|
||||||
|
// For non-distributed mode
|
||||||
|
debug!(
|
||||||
|
period = sync_committee_period,
|
||||||
|
%current_slot,
|
||||||
|
%pre_compute_slot,
|
||||||
|
"Calculating sync selection proofs"
|
||||||
|
);
|
||||||
|
|
||||||
// Execute all the futures in parallel, collecting any successful results.
|
let mut validator_proofs = vec![];
|
||||||
let proofs = join_all(futures)
|
for (validator_start_slot, duty) in pre_compute_duties {
|
||||||
.await
|
// Proofs are already known at this slot for this validator.
|
||||||
.into_iter()
|
if slot < *validator_start_slot {
|
||||||
.flatten()
|
continue;
|
||||||
.collect::<Vec<_>>();
|
}
|
||||||
|
|
||||||
validator_proofs.push((duty.validator_index, proofs));
|
let subnet_ids = match duty.subnet_ids::<S::E>() {
|
||||||
}
|
Ok(subnet_ids) => subnet_ids,
|
||||||
|
Err(e) => {
|
||||||
|
crit!(
|
||||||
|
error = ?e,
|
||||||
|
"Arithmetic error computing subnet IDs"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Add to global storage (we add regularly so the proofs can be used ASAP).
|
// Create futures to produce proofs.
|
||||||
let sync_map = duties_service.sync_duties.committees.read();
|
let duties_service_ref = &duties_service;
|
||||||
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
|
let futures = subnet_ids.iter().map(|subnet_id| async move {
|
||||||
debug!(period = sync_committee_period, "Missing sync duties");
|
// Construct proof for prior slot.
|
||||||
continue;
|
let proof_slot = slot - 1;
|
||||||
};
|
|
||||||
let validators = committee_duties.validators.read();
|
|
||||||
let num_validators_updated = validator_proofs.len();
|
|
||||||
|
|
||||||
for (validator_index, proofs) in validator_proofs {
|
let proof =
|
||||||
if let Some(Some(duty)) = validators.get(&validator_index) {
|
make_sync_selection_proof(duties_service_ref, duty, proof_slot, *subnet_id)
|
||||||
duty.aggregation_duties.proofs.write().extend(proofs);
|
.await;
|
||||||
} else {
|
|
||||||
|
match proof {
|
||||||
|
Some(proof) => match proof.is_aggregator::<S::E>() {
|
||||||
|
Ok(true) => {
|
||||||
|
debug!(
|
||||||
|
validator_index = duty.validator_index,
|
||||||
|
slot = %proof_slot,
|
||||||
|
%subnet_id,
|
||||||
|
"Validator is sync aggregator"
|
||||||
|
);
|
||||||
|
Some(((proof_slot, *subnet_id), proof))
|
||||||
|
}
|
||||||
|
Ok(false) => None,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
pubkey = ?duty.pubkey,
|
||||||
|
slot = %proof_slot,
|
||||||
|
error = ?e,
|
||||||
|
"Error determining is_aggregator"
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Execute all the futures in parallel, collecting any successful results.
|
||||||
|
let proofs = join_all(futures)
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
validator_proofs.push((duty.validator_index, proofs));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to global storage (we add regularly so the proofs can be used ASAP).
|
||||||
|
let sync_map = duties_service.sync_duties.committees.read();
|
||||||
|
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
|
||||||
|
debug!(period = sync_committee_period, "Missing sync duties");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let validators = committee_duties.validators.read();
|
||||||
|
let num_validators_updated = validator_proofs.len();
|
||||||
|
|
||||||
|
for (validator_index, proofs) in validator_proofs {
|
||||||
|
if let Some(Some(duty)) = validators.get(&validator_index) {
|
||||||
|
duty.aggregation_duties.proofs.write().extend(proofs);
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
validator_index,
|
||||||
|
period = sync_committee_period,
|
||||||
|
"Missing sync duty to update"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if num_validators_updated > 0 {
|
||||||
debug!(
|
debug!(
|
||||||
validator_index,
|
%slot,
|
||||||
period = sync_committee_period,
|
updated_validators = num_validators_updated,
|
||||||
"Missing sync duty to update"
|
"Finished computing sync selection proofs"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if num_validators_updated > 0 {
|
|
||||||
debug!(
|
|
||||||
%slot,
|
|
||||||
updated_validators = num_validators_updated,
|
|
||||||
"Finished computing sync selection proofs"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ pub enum Error<T> {
|
|||||||
GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch },
|
GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch },
|
||||||
UnableToSignAttestation(AttestationError),
|
UnableToSignAttestation(AttestationError),
|
||||||
SpecificError(T),
|
SpecificError(T),
|
||||||
|
Middleware(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> From<T> for Error<T> {
|
impl<T> From<T> for Error<T> {
|
||||||
|
|||||||
Reference in New Issue
Block a user