This commit is contained in:
Tan Chee Keong
2025-05-09 14:15:46 +08:00
parent bcfe5ff2e8
commit 46c29b09b7
4 changed files with 48 additions and 40 deletions

View File

@@ -483,7 +483,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
} }
}; };
let sync_selection_proof_config = if config.distributed { let _sync_selection_proof_config = if config.distributed {
SelectionProofConfig { SelectionProofConfig {
lookahead_slot: AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED, lookahead_slot: AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED,
computation_offset: Duration::default(), computation_offset: Duration::default(),
@@ -499,21 +499,20 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
} }
}; };
let duties_context = context.service_context("duties".into()); let duties_service: Arc<
let duties_service = Arc::new(DutiesService { DutiesService<LighthouseValidatorStore<SystemTimeSlotClock, E>, SystemTimeSlotClock>,
attesters: <_>::default(), > = Arc::new(
proposers: <_>::default(), DutiesServiceBuilder::new()
sync_duties: SyncDutiesMap::new(sync_selection_proof_config), .slot_clock(slot_clock.clone())
slot_clock: slot_clock.clone(), .beacon_nodes(beacon_nodes.clone())
beacon_nodes: beacon_nodes.clone(), .validator_store(validator_store.clone())
validator_store: validator_store.clone(), .spec(context.eth2_config.spec.clone())
unknown_validator_next_poll_slots: <_>::default(), .executor(context.executor.clone())
spec: context.eth2_config.spec.clone(), .enable_high_validator_count_metrics(config.enable_high_validator_count_metrics)
context: duties_context, .selection_proof_config(attestation_selection_proof_config)
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics, .disable_attesting(config.disable_attesting)
selection_proof_config: attestation_selection_proof_config, .build()?,
disable_attesting: config.disable_attesting, );
});
// Update the metrics server. // Update the metrics server.
if let Some(ctx) = &validator_metrics_ctx { if let Some(ctx) = &validator_metrics_ctx {

View File

@@ -114,6 +114,7 @@ pub struct SubscriptionSlots {
duty_slot: Slot, duty_slot: Slot,
} }
#[derive(Copy, Clone)]
pub struct SelectionProofConfig { pub struct SelectionProofConfig {
pub lookahead_slot: u64, pub lookahead_slot: u64,
pub computation_offset: Duration, // The seconds to compute the selection proof before a slot pub computation_offset: Duration, // The seconds to compute the selection proof before a slot
@@ -121,16 +122,27 @@ pub struct SelectionProofConfig {
pub parallel_sign: bool, pub parallel_sign: bool,
} }
impl SelectionProofConfig {
fn default() -> Self {
Self {
lookahead_slot: 0,
computation_offset: Duration::default(),
selections_endpoint: false,
parallel_sign: false,
}
}
}
/// Create a selection proof for `duty`. /// Create a selection proof for `duty`.
/// ///
/// Return `Ok(None)` if the attesting validator is not an aggregator. /// Return `Ok(None)` if the attesting validator is not an aggregator.
async fn make_selection_proof<S: ValidatorStore + 'static>( async fn make_selection_proof<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duty: &AttesterData, duty: &AttesterData,
validator_store: &S, validator_store: &S,
spec: &ChainSpec, spec: &ChainSpec,
beacon_nodes: &Arc<BeaconNodeFallback<T, E>>, beacon_nodes: &Arc<BeaconNodeFallback<T>>,
config: &SelectionProofConfig, config: &SelectionProofConfig,
) -> Result<Option<SelectionProof>, Error> { ) -> Result<Option<SelectionProof>, Error<S::Error>> {
let selection_proof = if config.selections_endpoint { let selection_proof = if config.selections_endpoint {
let beacon_committee_selection = BeaconCommitteeSelection { let beacon_committee_selection = BeaconCommitteeSelection {
validator_index: duty.validator_index, validator_index: duty.validator_index,
@@ -266,7 +278,7 @@ pub struct DutiesServiceBuilder<S, T> {
//// Whether we permit large validator counts in the metrics. //// Whether we permit large validator counts in the metrics.
enable_high_validator_count_metrics: bool, enable_high_validator_count_metrics: bool,
/// If this validator is running in distributed mode. /// If this validator is running in distributed mode.
distributed: bool, selection_proof_config: SelectionProofConfig,
disable_attesting: bool, disable_attesting: bool,
} }
@@ -285,7 +297,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
executor: None, executor: None,
spec: None, spec: None,
enable_high_validator_count_metrics: false, enable_high_validator_count_metrics: false,
distributed: false, selection_proof_config: SelectionProofConfig::default(),
disable_attesting: false, disable_attesting: false,
} }
} }
@@ -323,8 +335,8 @@ impl<S, T> DutiesServiceBuilder<S, T> {
self self
} }
pub fn distributed(mut self, distributed: bool) -> Self { pub fn selection_proof_config(mut self, selection_proof_config: SelectionProofConfig) -> Self {
self.distributed = distributed; self.selection_proof_config = selection_proof_config;
self self
} }
@@ -337,7 +349,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
Ok(DutiesService { Ok(DutiesService {
attesters: Default::default(), attesters: Default::default(),
proposers: Default::default(), proposers: Default::default(),
sync_duties: SyncDutiesMap::new(self.distributed), sync_duties: SyncDutiesMap::new(self.selection_proof_config),
validator_store: self validator_store: self
.validator_store .validator_store
.ok_or("Cannot build DutiesService without validator_store")?, .ok_or("Cannot build DutiesService without validator_store")?,
@@ -353,7 +365,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
.ok_or("Cannot build DutiesService without executor")?, .ok_or("Cannot build DutiesService without executor")?,
spec: self.spec.ok_or("Cannot build DutiesService without spec")?, spec: self.spec.ok_or("Cannot build DutiesService without spec")?,
enable_high_validator_count_metrics: self.enable_high_validator_count_metrics, enable_high_validator_count_metrics: self.enable_high_validator_count_metrics,
distributed: self.distributed, selection_proof_config: self.selection_proof_config,
disable_attesting: self.disable_attesting, disable_attesting: self.disable_attesting,
}) })
} }
@@ -1168,9 +1180,9 @@ async fn post_validator_duties_attester<S: ValidatorStore, T: SlotClock + 'stati
} }
// Create a helper function here to reduce code duplication for normal and distributed mode // Create a helper function here to reduce code duplication for normal and distributed mode
fn process_duty_and_proof<E: EthSpec>( fn process_duty_and_proof<S: ValidatorStore>(
attesters: &mut RwLockWriteGuard<AttesterMap>, attesters: &mut RwLockWriteGuard<AttesterMap>,
result: Result<(AttesterData, Option<SelectionProof>), Error>, result: Result<(AttesterData, Option<SelectionProof>), Error<S::Error>>,
dependent_root: Hash256, dependent_root: Hash256,
current_slot: Slot, current_slot: Slot,
) -> bool { ) -> bool {
@@ -1198,7 +1210,7 @@ fn process_duty_and_proof<E: EthSpec>(
}; };
let attester_map = attesters.entry(duty.pubkey).or_default(); let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(E::slots_per_epoch()); let epoch = duty.slot.epoch(S::E::slots_per_epoch());
match attester_map.entry(epoch) { match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => { hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed. // No need to update duties for which no proof was computed.
@@ -1304,7 +1316,7 @@ async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 's
.map(|duty| async { .map(|duty| async {
let opt_selection_proof = make_selection_proof( let opt_selection_proof = make_selection_proof(
&duty, &duty,
&duties_service.validator_store, duties_service.validator_store.as_ref(),
&duties_service.spec, &duties_service.spec,
&duties_service.beacon_nodes, &duties_service.beacon_nodes,
&duties_service.selection_proof_config, &duties_service.selection_proof_config,
@@ -1317,7 +1329,7 @@ async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 's
while let Some(result) = duty_and_proof_results.next().await { while let Some(result) = duty_and_proof_results.next().await {
let mut attesters = duties_service.attesters.write(); let mut attesters = duties_service.attesters.write();
// if process_duty_and_proof returns false, exit the loop // if process_duty_and_proof returns false, exit the loop
if !process_duty_and_proof::<E>( if !process_duty_and_proof::<S>(
&mut attesters, &mut attesters,
result, result,
dependent_root, dependent_root,
@@ -1332,7 +1344,7 @@ async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 's
.then(|duty| async { .then(|duty| async {
let opt_selection_proof = make_selection_proof( let opt_selection_proof = make_selection_proof(
&duty, &duty,
&duties_service.validator_store, duties_service.validator_store.as_ref(),
&duties_service.spec, &duties_service.spec,
&duties_service.beacon_nodes, &duties_service.beacon_nodes,
&duties_service.selection_proof_config, &duties_service.selection_proof_config,
@@ -1346,7 +1358,7 @@ async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 's
// Add to attesters store. // Add to attesters store.
let mut attesters = duties_service.attesters.write(); let mut attesters = duties_service.attesters.write();
for result in duty_and_proof_results { for result in duty_and_proof_results {
if !process_duty_and_proof::<E>( if !process_duty_and_proof::<S>(
&mut attesters, &mut attesters,
result, result,
dependent_root, dependent_root,

View File

@@ -1,6 +1,4 @@
use crate::duties_service::{DutiesService, Error, SelectionProofConfig}; use crate::duties_service::{DutiesService, Error, SelectionProofConfig};
// use beacon_node_fallback::BeaconNodeFallback;
use doppelganger_service::DoppelgangerStatus;
use eth2::types::{Signature, SyncCommitteeSelection}; use eth2::types::{Signature, SyncCommitteeSelection};
use futures::future::join_all; use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
@@ -489,8 +487,8 @@ pub async fn poll_sync_committee_duties_for_period<S: ValidatorStore, T: SlotClo
} }
// Create a helper function here to reduce code duplication for normal and distributed mode // Create a helper function here to reduce code duplication for normal and distributed mode
pub async fn make_sync_selection_proof<T: SlotClock + 'static, E: EthSpec>( pub async fn make_sync_selection_proof<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<T, E>>, duties_service: &Arc<DutiesService<S, T>>,
duty: &SyncDuty, duty: &SyncDuty,
proof_slot: Slot, proof_slot: Slot,
subnet_id: SyncSubnetId, subnet_id: SyncSubnetId,
@@ -608,7 +606,7 @@ pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'stati
let mut futures_unordered = FuturesUnordered::new(); let mut futures_unordered = FuturesUnordered::new();
for (_, duty) in pre_compute_duties { for (_, duty) in pre_compute_duties {
let subnet_ids = match duty.subnet_ids::<E>() { let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids, Ok(subnet_ids) => subnet_ids,
Err(e) => { Err(e) => {
crit!( crit!(
@@ -646,7 +644,7 @@ pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'stati
let validators = committee_duties.validators.read(); let validators = committee_duties.validators.read();
// Check if the validator is an aggregator // Check if the validator is an aggregator
match proof.is_aggregator::<E>() { match proof.is_aggregator::<S::E>() {
Ok(true) => { Ok(true) => {
if let Some(Some(duty)) = validators.get(&validator_index) { if let Some(Some(duty)) = validators.get(&validator_index) {
debug!( debug!(

View File

@@ -20,7 +20,6 @@ pub enum Error<T> {
GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch }, GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch },
UnableToSignAttestation(AttestationError), UnableToSignAttestation(AttestationError),
SpecificError(T), SpecificError(T),
UnableToSign(SigningError),
Middleware(String), Middleware(String),
} }