Revise config move to lib.rs

This commit is contained in:
Tan Chee Keong
2025-04-29 15:12:58 +08:00
parent 05df47bed8
commit b71e40d4a6
3 changed files with 85 additions and 87 deletions

View File

@@ -4,6 +4,7 @@ mod latency;
mod notifier; mod notifier;
use crate::cli::ValidatorClient; use crate::cli::ValidatorClient;
use crate::duties_service::SelectionProofConfig;
pub use config::Config; pub use config::Config;
use initialized_validators::InitializedValidators; use initialized_validators::InitializedValidators;
use metrics::set_gauge; use metrics::set_gauge;
@@ -74,6 +75,17 @@ const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;
const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger"; const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
/// Compute attestation selection proofs this many slots before they are required.
///
/// At start-up selection proofs will be computed with less lookahead out of necessity.
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;
/// The attestation selection proof lookahead for those running with the --distributed flag.
const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1;
/// Fraction of a slot at which selection proof signing should happen (2 means half way).
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
#[derive(Clone)] #[derive(Clone)]
pub struct ProductionValidatorClient<E: EthSpec> { pub struct ProductionValidatorClient<E: EthSpec> {
context: RuntimeContext<E>, context: RuntimeContext<E>,
@@ -444,11 +456,30 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true); validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true);
} }
// Define a config to be pass to fill_in_selection_proofs.
// The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode)
// Other DVT applications, e.g., Anchor can pass in different configs to suit different needs.
let selection_proof_config = if config.distributed {
SelectionProofConfig {
lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD_DVT,
computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM,
selections_endpoint: true,
parallel_sign: true,
}
} else {
SelectionProofConfig {
lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD,
computation_offset: Duration::from_secs(6),
selections_endpoint: false,
parallel_sign: false,
}
};
let duties_context = context.service_context("duties".into()); let duties_context = context.service_context("duties".into());
let duties_service = Arc::new(DutiesService { let duties_service = Arc::new(DutiesService {
attesters: <_>::default(), attesters: <_>::default(),
proposers: <_>::default(), proposers: <_>::default(),
sync_duties: SyncDutiesMap::new(config.distributed), sync_duties: SyncDutiesMap::new(selection_proof_config.clone()),
slot_clock: slot_clock.clone(), slot_clock: slot_clock.clone(),
beacon_nodes: beacon_nodes.clone(), beacon_nodes: beacon_nodes.clone(),
validator_store: validator_store.clone(), validator_store: validator_store.clone(),
@@ -456,7 +487,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
spec: context.eth2_config.spec.clone(), spec: context.eth2_config.spec.clone(),
context: duties_context, context: duties_context,
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics, enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
distributed: config.distributed, selection_proof_config,
disable_attesting: config.disable_attesting, disable_attesting: config.disable_attesting,
}); });

View File

@@ -37,17 +37,6 @@ use validator_store::{Error as ValidatorStoreError, ValidatorStore};
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2; const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
/// Compute attestation selection proofs this many slots before they are required.
///
/// At start-up selection proofs will be computed with less lookahead out of necessity.
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;
/// The attestation selection proof lookahead for those running with the --distributed flag.
const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1;
/// Fraction of a slot at which selection proof signing should happen (2 means half way).
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
/// Minimum number of validators for which we auto-enable per-validator metrics. /// Minimum number of validators for which we auto-enable per-validator metrics.
/// For validators greater than this value, we need to manually set the `enable-per-validator-metrics` /// For validators greater than this value, we need to manually set the `enable-per-validator-metrics`
/// flag in the cli to enable collection of per validator metrics. /// flag in the cli to enable collection of per validator metrics.
@@ -126,6 +115,7 @@ pub struct SubscriptionSlots {
duty_slot: Slot, duty_slot: Slot,
} }
#[derive(Clone)]
pub struct SelectionProofConfig { pub struct SelectionProofConfig {
pub lookahead_slot: u64, pub lookahead_slot: u64,
pub computation_offset: Duration, // The seconds to compute the selection proof before a slot pub computation_offset: Duration, // The seconds to compute the selection proof before a slot
@@ -140,10 +130,10 @@ async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
duty: &AttesterData, duty: &AttesterData,
validator_store: &ValidatorStore<T, E>, validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec, spec: &ChainSpec,
config: &SelectionProofConfig, duties_service: &DutiesService<T, E>,
beacon_nodes: &Arc<BeaconNodeFallback<T, E>>, beacon_nodes: &Arc<BeaconNodeFallback<T, E>>,
) -> Result<Option<SelectionProof>, Error> { ) -> Result<Option<SelectionProof>, Error> {
let selection_proof = if config.selections_endpoint { let selection_proof = if duties_service.selection_proof_config.selections_endpoint {
let beacon_committee_selection = BeaconCommitteeSelection { let beacon_committee_selection = BeaconCommitteeSelection {
validator_index: duty.validator_index, validator_index: duty.validator_index,
slot: duty.slot, slot: duty.slot,
@@ -285,10 +275,10 @@ pub struct DutiesService<T, E: EthSpec> {
pub context: RuntimeContext<E>, pub context: RuntimeContext<E>,
/// The current chain spec. /// The current chain spec.
pub spec: Arc<ChainSpec>, pub spec: Arc<ChainSpec>,
//// Whether we permit large validator counts in the metrics. /// Whether we permit large validator counts in the metrics.
pub enable_high_validator_count_metrics: bool, pub enable_high_validator_count_metrics: bool,
/// If this validator is running in distributed mode. /// Pass the config for distributed or non-distributed mode.
pub distributed: bool, pub selection_proof_config: SelectionProofConfig,
pub disable_attesting: bool, pub disable_attesting: bool,
} }
@@ -990,24 +980,9 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// Spawn the background task to compute selection proofs. // Spawn the background task to compute selection proofs.
let subservice = duties_service.clone(); let subservice = duties_service.clone();
// Define a config to be pass to fill_in_selection_proofs.
// The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode)
// Other DVT applications, e.g., Anchor can pass in different configs to suit different needs.
let config = SelectionProofConfig {
lookahead_slot: if duties_service.distributed {
SELECTION_PROOF_SLOT_LOOKAHEAD_DVT
} else {
SELECTION_PROOF_SLOT_LOOKAHEAD
},
computation_offset: duties_service.slot_clock.slot_duration()
/ SELECTION_PROOF_SCHEDULE_DENOM,
selections_endpoint: duties_service.distributed,
parallel_sign: duties_service.distributed,
};
duties_service.context.executor.spawn( duties_service.context.executor.spawn(
async move { async move {
fill_in_selection_proofs(subservice, new_duties, dependent_root, config).await; fill_in_selection_proofs(subservice, new_duties, dependent_root).await;
}, },
"duties_service_selection_proofs_background", "duties_service_selection_proofs_background",
); );
@@ -1171,7 +1146,6 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>, duties_service: Arc<DutiesService<T, E>>,
duties: Vec<AttesterData>, duties: Vec<AttesterData>,
dependent_root: Hash256, dependent_root: Hash256,
config: SelectionProofConfig,
) { ) {
// Sort duties by slot in a BTreeMap. // Sort duties by slot in a BTreeMap.
let mut duties_by_slot: BTreeMap<Slot, Vec<_>> = BTreeMap::new(); let mut duties_by_slot: BTreeMap<Slot, Vec<_>> = BTreeMap::new();
@@ -1186,17 +1160,32 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
while !duties_by_slot.is_empty() { while !duties_by_slot.is_empty() {
if let Some(duration) = slot_clock.duration_to_next_slot() { if let Some(duration) = slot_clock.duration_to_next_slot() {
sleep(duration.saturating_sub(config.computation_offset)).await; sleep(
duration.saturating_sub(
duties_service
.sync_duties
.selection_proof_config
.computation_offset,
),
)
.await;
let Some(current_slot) = slot_clock.now() else { let Some(current_slot) = slot_clock.now() else {
continue; continue;
}; };
let selection_lookahead = config.lookahead_slot; let selection_lookahead = duties_service
.sync_duties
.selection_proof_config
.lookahead_slot;
let lookahead_slot = current_slot + selection_lookahead; let lookahead_slot = current_slot + selection_lookahead;
let relevant_duties = if config.selections_endpoint { let relevant_duties = if duties_service
.sync_duties
.selection_proof_config
.parallel_sign
{
// Remove old slot duties and only keep current duties in distributed mode // Remove old slot duties and only keep current duties in distributed mode
duties_by_slot duties_by_slot
.remove(&lookahead_slot) .remove(&lookahead_slot)
@@ -1222,7 +1211,11 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties, // In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties,
// as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
if config.parallel_sign { if duties_service
.sync_duties
.selection_proof_config
.parallel_sign
{
let mut duty_and_proof_results = relevant_duties let mut duty_and_proof_results = relevant_duties
.into_values() .into_values()
.flatten() .flatten()
@@ -1231,7 +1224,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
&duty, &duty,
&duties_service.validator_store, &duties_service.validator_store,
&duties_service.spec, &duties_service.spec,
&config, &duties_service,
&duties_service.beacon_nodes, &duties_service.beacon_nodes,
) )
.await?; .await?;
@@ -1259,7 +1252,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
&duty, &duty,
&duties_service.validator_store, &duties_service.validator_store,
&duties_service.spec, &duties_service.spec,
&config, &duties_service,
&duties_service.beacon_nodes, &duties_service.beacon_nodes,
) )
.await?; .await?;

View File

@@ -10,7 +10,6 @@ use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId}; use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
use validator_store::Error as ValidatorStoreError; use validator_store::Error as ValidatorStoreError;
@@ -36,7 +35,7 @@ pub struct SyncDutiesMap<E: EthSpec> {
/// Map from sync committee period to duties for members of that sync committee. /// Map from sync committee period to duties for members of that sync committee.
committees: RwLock<HashMap<u64, CommitteeDuties>>, committees: RwLock<HashMap<u64, CommitteeDuties>>,
/// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute. /// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute.
distributed: bool, pub selection_proof_config: SelectionProofConfig,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
@@ -86,10 +85,10 @@ pub struct SlotDuties {
} }
impl<E: EthSpec> SyncDutiesMap<E> { impl<E: EthSpec> SyncDutiesMap<E> {
pub fn new(distributed: bool) -> Self { pub fn new(selection_proof_config: SelectionProofConfig) -> Self {
Self { Self {
committees: RwLock::new(HashMap::new()), committees: RwLock::new(HashMap::new()),
distributed, selection_proof_config,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@@ -109,7 +108,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
/// Number of slots in advance to compute selection proofs /// Number of slots in advance to compute selection proofs
fn aggregation_pre_compute_slots(&self) -> u64 { fn aggregation_pre_compute_slots(&self) -> u64 {
if self.distributed { if self.selection_proof_config.parallel_sign {
AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED
} else { } else {
E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS
@@ -354,14 +353,6 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
async move { async move {
// The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode)
// Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs.
let config = SelectionProofConfig {
lookahead_slot: sub_duties_service
.sync_duties
.aggregation_pre_compute_slots(), // Use the current behaviour defined in the method
computation_offset: Duration::from_secs(0),
selections_endpoint: sub_duties_service.sync_duties.distributed,
parallel_sign: sub_duties_service.sync_duties.distributed,
};
fill_in_aggregation_proofs( fill_in_aggregation_proofs(
sub_duties_service, sub_duties_service,
@@ -369,7 +360,6 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
current_sync_committee_period, current_sync_committee_period,
current_slot, current_slot,
current_pre_compute_slot, current_pre_compute_slot,
config,
) )
.await .await
}, },
@@ -408,22 +398,12 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
let sub_duties_service = duties_service.clone(); let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn( duties_service.context.executor.spawn(
async move { async move {
let config = SelectionProofConfig {
lookahead_slot: sub_duties_service
.sync_duties
.aggregation_pre_compute_slots(),
computation_offset: Duration::from_secs(0),
selections_endpoint: sub_duties_service.sync_duties.distributed,
parallel_sign: sub_duties_service.sync_duties.distributed,
};
fill_in_aggregation_proofs( fill_in_aggregation_proofs(
sub_duties_service, sub_duties_service,
&new_pre_compute_duties, &new_pre_compute_duties,
next_sync_committee_period, next_sync_committee_period,
current_slot, current_slot,
pre_compute_slot, pre_compute_slot,
config,
) )
.await .await
}, },
@@ -528,7 +508,6 @@ pub async fn make_sync_selection_proof<T: SlotClock + 'static, E: EthSpec>(
duty: &SyncDuty, duty: &SyncDuty,
proof_slot: Slot, proof_slot: Slot,
subnet_id: SyncSubnetId, subnet_id: SyncSubnetId,
config: &SelectionProofConfig,
) -> Option<SyncSelectionProof> { ) -> Option<SyncSelectionProof> {
let sync_selection_proof = duties_service let sync_selection_proof = duties_service
.validator_store .validator_store
@@ -557,7 +536,11 @@ pub async fn make_sync_selection_proof<T: SlotClock + 'static, E: EthSpec>(
}; };
// In distributed mode when we want to call the selections endpoint // In distributed mode when we want to call the selections endpoint
if config.selections_endpoint { if duties_service
.sync_duties
.selection_proof_config
.selections_endpoint
{
debug!( debug!(
"validator_index" = duty.validator_index, "validator_index" = duty.validator_index,
"slot" = %proof_slot, "slot" = %proof_slot,
@@ -627,14 +610,16 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
sync_committee_period: u64, sync_committee_period: u64,
current_slot: Slot, current_slot: Slot,
pre_compute_slot: Slot, pre_compute_slot: Slot,
config: SelectionProofConfig,
) { ) {
// Generate selection proofs for each validator at each slot, one slot at a time. // Generate selection proofs for each validator at each slot, one slot at a time.
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) { for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
// For distributed mode // For distributed mode
if config.parallel_sign { if duties_service
.sync_duties
.selection_proof_config
.parallel_sign
{
let mut futures_unordered = FuturesUnordered::new(); let mut futures_unordered = FuturesUnordered::new();
let config_ref = &config;
for (_, duty) in pre_compute_duties { for (_, duty) in pre_compute_duties {
let subnet_ids = match duty.subnet_ids::<E>() { let subnet_ids = match duty.subnet_ids::<E>() {
@@ -655,14 +640,9 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
for &subnet_id in &subnet_ids { for &subnet_id in &subnet_ids {
let duties_service = duties_service.clone(); let duties_service = duties_service.clone();
futures_unordered.push(async move { futures_unordered.push(async move {
let result = make_sync_selection_proof( let result =
&duties_service, make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id)
duty, .await;
proof_slot,
subnet_id,
config_ref,
)
.await;
result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof)) result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof))
}); });
@@ -740,19 +720,13 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
// Create futures to produce proofs. // Create futures to produce proofs.
let duties_service_ref = &duties_service; let duties_service_ref = &duties_service;
let config_ref = &config;
let futures = subnet_ids.iter().map(|subnet_id| async move { let futures = subnet_ids.iter().map(|subnet_id| async move {
// Construct proof for prior slot. // Construct proof for prior slot.
let proof_slot = slot - 1; let proof_slot = slot - 1;
let proof = make_sync_selection_proof( let proof =
duties_service_ref, make_sync_selection_proof(duties_service_ref, duty, proof_slot, *subnet_id)
duty, .await;
proof_slot,
*subnet_id,
config_ref,
)
.await;
match proof { match proof {
Some(proof) => match proof.is_aggregator::<E>() { Some(proof) => match proof.is_aggregator::<E>() {