address comments

This commit is contained in:
Tan Chee Keong
2025-04-29 16:28:42 +08:00
parent 9d6d1cb654
commit d34a91ac07
3 changed files with 34 additions and 44 deletions

View File

@@ -86,6 +86,11 @@ const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1;
/// Fraction of a slot at which selection proof signing should happen (2 means half way). /// Fraction of a slot at which selection proof signing should happen (2 means half way).
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
/// Number of epochs in advance to compute selection proofs when not in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
/// Number of slots in advance to compute selection proofs when in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
#[derive(Clone)] #[derive(Clone)]
pub struct ProductionValidatorClient<E: EthSpec> { pub struct ProductionValidatorClient<E: EthSpec> {
context: RuntimeContext<E>, context: RuntimeContext<E>,
@@ -459,7 +464,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Define a config to be pass to fill_in_selection_proofs. // Define a config to be pass to fill_in_selection_proofs.
// The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode)
// Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs.
let selection_proof_config = if config.distributed { let attestation_selection_proof_config = if config.distributed {
SelectionProofConfig { SelectionProofConfig {
lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD_DVT, lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD_DVT,
computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM, computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM,
@@ -475,11 +480,27 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
} }
}; };
let sync_selection_proof_config = if config.distributed {
SelectionProofConfig {
lookahead_slot: AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED,
computation_offset: Duration::default(),
selections_endpoint: true,
parallel_sign: true,
}
} else {
SelectionProofConfig {
lookahead_slot: E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS,
computation_offset: Duration::default(),
selections_endpoint: false,
parallel_sign: false,
}
};
let duties_context = context.service_context("duties".into()); let duties_context = context.service_context("duties".into());
let duties_service = Arc::new(DutiesService { let duties_service = Arc::new(DutiesService {
attesters: <_>::default(), attesters: <_>::default(),
proposers: <_>::default(), proposers: <_>::default(),
sync_duties: SyncDutiesMap::new(selection_proof_config.clone()), sync_duties: SyncDutiesMap::new(sync_selection_proof_config),
slot_clock: slot_clock.clone(), slot_clock: slot_clock.clone(),
beacon_nodes: beacon_nodes.clone(), beacon_nodes: beacon_nodes.clone(),
validator_store: validator_store.clone(), validator_store: validator_store.clone(),
@@ -487,7 +508,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
spec: context.eth2_config.spec.clone(), spec: context.eth2_config.spec.clone(),
context: duties_context, context: duties_context,
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics, enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
selection_proof_config, selection_proof_config: attestation_selection_proof_config,
disable_attesting: config.disable_attesting, disable_attesting: config.disable_attesting,
}); });

View File

@@ -115,7 +115,6 @@ pub struct SubscriptionSlots {
duty_slot: Slot, duty_slot: Slot,
} }
#[derive(Clone)]
pub struct SelectionProofConfig { pub struct SelectionProofConfig {
pub lookahead_slot: u64, pub lookahead_slot: u64,
pub computation_offset: Duration, // The seconds to compute the selection proof before a slot pub computation_offset: Duration, // The seconds to compute the selection proof before a slot
@@ -130,10 +129,10 @@ async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
duty: &AttesterData, duty: &AttesterData,
validator_store: &ValidatorStore<T, E>, validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec, spec: &ChainSpec,
duties_service: &DutiesService<T, E>,
beacon_nodes: &Arc<BeaconNodeFallback<T, E>>, beacon_nodes: &Arc<BeaconNodeFallback<T, E>>,
config: &SelectionProofConfig,
) -> Result<Option<SelectionProof>, Error> { ) -> Result<Option<SelectionProof>, Error> {
let selection_proof = if duties_service.selection_proof_config.selections_endpoint { let selection_proof = if config.selections_endpoint {
let beacon_committee_selection = BeaconCommitteeSelection { let beacon_committee_selection = BeaconCommitteeSelection {
validator_index: duty.validator_index, validator_index: duty.validator_index,
slot: duty.slot, slot: duty.slot,
@@ -1161,12 +1160,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
while !duties_by_slot.is_empty() { while !duties_by_slot.is_empty() {
if let Some(duration) = slot_clock.duration_to_next_slot() { if let Some(duration) = slot_clock.duration_to_next_slot() {
sleep( sleep(
duration.saturating_sub( duration.saturating_sub(duties_service.selection_proof_config.computation_offset),
duties_service
.sync_duties
.selection_proof_config
.computation_offset,
),
) )
.await; .await;
@@ -1174,18 +1168,11 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
continue; continue;
}; };
let selection_lookahead = duties_service let selection_lookahead = duties_service.selection_proof_config.lookahead_slot;
.sync_duties
.selection_proof_config
.lookahead_slot;
let lookahead_slot = current_slot + selection_lookahead; let lookahead_slot = current_slot + selection_lookahead;
let relevant_duties = if duties_service let relevant_duties = if duties_service.selection_proof_config.parallel_sign {
.sync_duties
.selection_proof_config
.parallel_sign
{
// Remove old slot duties and only keep current duties in distributed mode // Remove old slot duties and only keep current duties in distributed mode
duties_by_slot duties_by_slot
.remove(&lookahead_slot) .remove(&lookahead_slot)
@@ -1211,11 +1198,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties, // In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties,
// as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
if duties_service if duties_service.selection_proof_config.parallel_sign {
.sync_duties
.selection_proof_config
.parallel_sign
{
let mut duty_and_proof_results = relevant_duties let mut duty_and_proof_results = relevant_duties
.into_values() .into_values()
.flatten() .flatten()
@@ -1224,8 +1207,8 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
&duty, &duty,
&duties_service.validator_store, &duties_service.validator_store,
&duties_service.spec, &duties_service.spec,
&duties_service,
&duties_service.beacon_nodes, &duties_service.beacon_nodes,
&duties_service.selection_proof_config,
) )
.await?; .await?;
Ok((duty, opt_selection_proof)) Ok((duty, opt_selection_proof))
@@ -1252,8 +1235,8 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
&duty, &duty,
&duties_service.validator_store, &duties_service.validator_store,
&duties_service.spec, &duties_service.spec,
&duties_service,
&duties_service.beacon_nodes, &duties_service.beacon_nodes,
&duties_service.selection_proof_config,
) )
.await?; .await?;
Ok((duty, opt_selection_proof)) Ok((duty, opt_selection_proof))

View File

@@ -14,11 +14,6 @@ use tracing::{debug, error, info, warn};
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId}; use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
use validator_store::Error as ValidatorStoreError; use validator_store::Error as ValidatorStoreError;
/// Number of epochs in advance to compute selection proofs when not in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
/// Number of slots in advance to compute selection proofs when in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
/// Top-level data-structure containing sync duty information. /// Top-level data-structure containing sync duty information.
/// ///
/// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained /// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained
@@ -106,15 +101,6 @@ impl<E: EthSpec> SyncDutiesMap<E> {
}) })
} }
/// Number of slots in advance to compute selection proofs
fn aggregation_pre_compute_slots(&self) -> u64 {
if self.selection_proof_config.parallel_sign {
AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED
} else {
E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS
}
}
/// Prepare for pre-computation of selection proofs for `committee_period`. /// Prepare for pre-computation of selection proofs for `committee_period`.
/// ///
/// Return the slot up to which proofs should be pre-computed, as well as a vec of /// Return the slot up to which proofs should be pre-computed, as well as a vec of
@@ -130,7 +116,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
current_slot, current_slot,
first_slot_of_period::<E>(committee_period, spec), first_slot_of_period::<E>(committee_period, spec),
); );
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots(); let pre_compute_lookahead_slots = self.selection_proof_config.lookahead_slot;
let pre_compute_slot = std::cmp::min( let pre_compute_slot = std::cmp::min(
current_slot + pre_compute_lookahead_slots, current_slot + pre_compute_lookahead_slots,
last_slot_of_period::<E>(committee_period, spec), last_slot_of_period::<E>(committee_period, spec),
@@ -382,7 +368,7 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
} }
// Pre-compute aggregator selection proofs for the next period. // Pre-compute aggregator selection proofs for the next period.
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots(); let aggregate_pre_compute_lookahead_slots = sync_duties.selection_proof_config.lookahead_slot;
if (current_slot + aggregate_pre_compute_lookahead_slots) if (current_slot + aggregate_pre_compute_lookahead_slots)
.epoch(E::slots_per_epoch()) .epoch(E::slots_per_epoch())
.sync_committee_period(spec)? .sync_committee_period(spec)?