Modularize validator store (#6705)

- Create trait `ValidatorStore` with all functions used by the `validator_services`
- Make `validator_services` generic on `S: ValidatorStore`
- Introduce `LighthouseValidatorStore`, which has identical functionality to the old `ValidatorStore`
- Remove dependencies (especially `environment`) from `validator_services` and `beacon_node_fallback` in order to be able to cleanly use them in Anchor
This commit is contained in:
Daniel Knopik
2025-05-07 05:43:33 +02:00
committed by GitHub
parent beb0ce68bd
commit 3d92e3663b
42 changed files with 2010 additions and 1622 deletions

View File

@@ -1,15 +1,13 @@
use crate::duties_service::{DutiesService, Error};
use doppelganger_service::DoppelgangerStatus;
use futures::future::join_all;
use logging::crit;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::Arc;
use tracing::{debug, info, warn};
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
use validator_store::Error as ValidatorStoreError;
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
/// Number of epochs in advance to compute selection proofs when not in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
@@ -28,12 +26,11 @@ pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
/// 2. One-at-a-time locking. For the innermost locks on the aggregator duties, all of the functions
/// in this file take care to only lock one validator at a time. We never hold a lock while
/// trying to obtain another one (hence no lock ordering issues).
pub struct SyncDutiesMap<E: EthSpec> {
pub struct SyncDutiesMap {
/// Map from sync committee period to duties for members of that sync committee.
committees: RwLock<HashMap<u64, CommitteeDuties>>,
/// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute.
distributed: bool,
_phantom: PhantomData<E>,
}
/// Duties for a single sync committee period.
@@ -81,12 +78,11 @@ pub struct SlotDuties {
pub aggregators: HashMap<SyncSubnetId, Vec<(u64, PublicKeyBytes, SyncSelectionProof)>>,
}
impl<E: EthSpec> SyncDutiesMap<E> {
impl SyncDutiesMap {
pub fn new(distributed: bool) -> Self {
Self {
committees: RwLock::new(HashMap::new()),
distributed,
_phantom: PhantomData,
}
}
@@ -104,7 +100,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
}
/// Number of slots in advance to compute selection proofs
fn aggregation_pre_compute_slots(&self) -> u64 {
fn aggregation_pre_compute_slots<E: EthSpec>(&self) -> u64 {
if self.distributed {
AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED
} else {
@@ -117,7 +113,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
/// Return the slot up to which proofs should be pre-computed, as well as a vec of
/// `(previous_pre_compute_slot, sync_duty)` pairs for all validators which need to have proofs
/// computed. See `fill_in_aggregation_proofs` for the actual calculation.
fn prepare_for_aggregator_pre_compute(
fn prepare_for_aggregator_pre_compute<E: EthSpec>(
&self,
committee_period: u64,
current_slot: Slot,
@@ -127,7 +123,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
current_slot,
first_slot_of_period::<E>(committee_period, spec),
);
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots();
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots::<E>();
let pre_compute_slot = std::cmp::min(
current_slot + pre_compute_lookahead_slots,
last_slot_of_period::<E>(committee_period, spec),
@@ -187,7 +183,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
/// Get duties for all validators for the given `wall_clock_slot`.
///
/// This is the entry-point for the sync committee service.
pub fn get_duties_for_slot(
pub fn get_duties_for_slot<E: EthSpec>(
&self,
wall_clock_slot: Slot,
spec: &ChainSpec,
@@ -284,16 +280,16 @@ fn last_slot_of_period<E: EthSpec>(sync_committee_period: u64, spec: &ChainSpec)
first_slot_of_period::<E>(sync_committee_period + 1, spec) - 1
}
pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
) -> Result<(), Error<S::Error>> {
let sync_duties = &duties_service.sync_duties;
let spec = &duties_service.spec;
let current_slot = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
// If the Altair fork is yet to be activated, do not attempt to poll for duties.
if spec
@@ -317,10 +313,8 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys {
if let Some(validator_index) = vals.get_index(&pubkey) {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
local_indices.push(validator_index)
}
}
@@ -342,11 +336,15 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
// Pre-compute aggregator selection proofs for the current period.
let (current_pre_compute_slot, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(current_sync_committee_period, current_slot, spec);
.prepare_for_aggregator_pre_compute::<S::E>(
current_sync_committee_period,
current_slot,
spec,
);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_aggregation_proofs(
sub_duties_service,
@@ -379,18 +377,22 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
}
// Pre-compute aggregator selection proofs for the next period.
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots();
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots::<S::E>();
if (current_slot + aggregate_pre_compute_lookahead_slots)
.epoch(E::slots_per_epoch())
.epoch(S::E::slots_per_epoch())
.sync_committee_period(spec)?
== next_sync_committee_period
{
let (pre_compute_slot, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(next_sync_committee_period, current_slot, spec);
.prepare_for_aggregator_pre_compute::<S::E>(
next_sync_committee_period,
current_slot,
spec,
);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_aggregation_proofs(
sub_duties_service,
@@ -409,11 +411,11 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
Ok(())
}
pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
pub async fn poll_sync_committee_duties_for_period<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
local_indices: &[u64],
sync_committee_period: u64,
) -> Result<(), Error> {
) -> Result<(), Error<S::Error>> {
let spec = &duties_service.spec;
// no local validators don't need to poll for sync committee
@@ -496,8 +498,8 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
Ok(())
}
pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: Arc<DutiesService<S, T>>,
pre_compute_duties: &[(Slot, SyncDuty)],
sync_committee_period: u64,
current_slot: Slot,
@@ -519,7 +521,7 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
continue;
}
let subnet_ids = match duty.subnet_ids::<E>() {
let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
@@ -564,7 +566,7 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
}
};
match proof.is_aggregator::<E>() {
match proof.is_aggregator::<S::E>() {
Ok(true) => {
debug!(
validator_index = duty.validator_index,