Merge branch 'unstable' into dvt

This commit is contained in:
chonghe
2025-05-09 11:06:44 +08:00
committed by GitHub
101 changed files with 3863 additions and 2412 deletions

View File

@@ -10,8 +10,6 @@ use crate::block_service::BlockServiceNotification;
use crate::sync::poll_sync_committee_duties;
use crate::sync::SyncDutiesMap;
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use doppelganger_service::DoppelgangerStatus;
use environment::RuntimeContext;
use eth2::types::{
AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse,
ProposerData, StateId, ValidatorId,
@@ -28,11 +26,12 @@ use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::{sync::mpsc::Sender, time::sleep};
use tracing::{debug, error, info, warn};
use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot};
use validator_metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
@@ -80,16 +79,16 @@ const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBS
// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
#[derive(Debug)]
pub enum Error {
pub enum Error<T> {
UnableToReadSlotClock,
FailedToDownloadAttesters(#[allow(dead_code)] String),
FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError),
FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError<T>),
InvalidModulo(#[allow(dead_code)] ArithError),
Arith(#[allow(dead_code)] ArithError),
SyncDutiesNotFound(#[allow(dead_code)] u64),
}
impl From<ArithError> for Error {
impl<T> From<ArithError> for Error<T> {
fn from(e: ArithError) -> Self {
Self::Arith(e)
}
@@ -125,9 +124,9 @@ pub struct SelectionProofConfig {
/// Create a selection proof for `duty`.
///
/// Return `Ok(None)` if the attesting validator is not an aggregator.
async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
async fn make_selection_proof<S: ValidatorStore + 'static>(
duty: &AttesterData,
validator_store: &ValidatorStore<T, E>,
validator_store: &S,
spec: &ChainSpec,
beacon_nodes: &Arc<BeaconNodeFallback<T, E>>,
config: &SelectionProofConfig,
@@ -253,25 +252,132 @@ type DependentRoot = Hash256;
type AttesterMap = HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, DutyAndProof)>>;
type ProposerMap = HashMap<Epoch, (DependentRoot, Vec<ProposerData>)>;
pub struct DutiesServiceBuilder<S, T> {
/// Provides the canonical list of locally-managed validators.
validator_store: Option<Arc<S>>,
/// Tracks the current slot.
slot_clock: Option<T>,
/// Provides HTTP access to remote beacon nodes.
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
/// The runtime for spawning tasks.
executor: Option<TaskExecutor>,
/// The current chain spec.
spec: Option<Arc<ChainSpec>>,
//// Whether we permit large validator counts in the metrics.
enable_high_validator_count_metrics: bool,
/// If this validator is running in distributed mode.
distributed: bool,
disable_attesting: bool,
}
impl<S, T> Default for DutiesServiceBuilder<S, T> {
fn default() -> Self {
Self::new()
}
}
impl<S, T> DutiesServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
validator_store: None,
slot_clock: None,
beacon_nodes: None,
executor: None,
spec: None,
enable_high_validator_count_metrics: false,
distributed: false,
disable_attesting: false,
}
}
pub fn validator_store(mut self, validator_store: Arc<S>) -> Self {
self.validator_store = Some(validator_store);
self
}
pub fn slot_clock(mut self, slot_clock: T) -> Self {
self.slot_clock = Some(slot_clock);
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn spec(mut self, spec: Arc<ChainSpec>) -> Self {
self.spec = Some(spec);
self
}
pub fn enable_high_validator_count_metrics(
mut self,
enable_high_validator_count_metrics: bool,
) -> Self {
self.enable_high_validator_count_metrics = enable_high_validator_count_metrics;
self
}
pub fn distributed(mut self, distributed: bool) -> Self {
self.distributed = distributed;
self
}
pub fn disable_attesting(mut self, disable_attesting: bool) -> Self {
self.disable_attesting = disable_attesting;
self
}
pub fn build(self) -> Result<DutiesService<S, T>, String> {
Ok(DutiesService {
attesters: Default::default(),
proposers: Default::default(),
sync_duties: SyncDutiesMap::new(self.distributed),
validator_store: self
.validator_store
.ok_or("Cannot build DutiesService without validator_store")?,
unknown_validator_next_poll_slots: Default::default(),
slot_clock: self
.slot_clock
.ok_or("Cannot build DutiesService without slot_clock")?,
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build DutiesService without beacon_nodes")?,
executor: self
.executor
.ok_or("Cannot build DutiesService without executor")?,
spec: self.spec.ok_or("Cannot build DutiesService without spec")?,
enable_high_validator_count_metrics: self.enable_high_validator_count_metrics,
distributed: self.distributed,
disable_attesting: self.disable_attesting,
})
}
}
/// See the module-level documentation.
pub struct DutiesService<T, E: EthSpec> {
pub struct DutiesService<S, T> {
/// Maps a validator public key to their duties for each epoch.
pub attesters: RwLock<AttesterMap>,
/// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain
/// proposals for any validators which are not registered locally.
pub proposers: RwLock<ProposerMap>,
/// Map from validator index to sync committee duties.
pub sync_duties: SyncDutiesMap<E>,
pub sync_duties: SyncDutiesMap,
/// Provides the canonical list of locally-managed validators.
pub validator_store: Arc<ValidatorStore<T, E>>,
pub validator_store: Arc<S>,
/// Maps unknown validator pubkeys to the next slot time when a poll should be conducted again.
pub unknown_validator_next_poll_slots: RwLock<HashMap<PublicKeyBytes, Slot>>,
/// Tracks the current slot.
pub slot_clock: T,
/// Provides HTTP access to remote beacon nodes.
pub beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
pub beacon_nodes: Arc<BeaconNodeFallback<T>>,
/// The runtime for spawning tasks.
pub context: RuntimeContext<E>,
pub executor: TaskExecutor,
/// The current chain spec.
pub spec: Arc<ChainSpec>,
/// Whether we permit large validator counts in the metrics.
@@ -281,7 +387,7 @@ pub struct DutiesService<T, E: EthSpec> {
pub disable_attesting: bool,
}
impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
/// Returns the total number of validators known to the duties service.
pub fn total_validator_count(&self) -> usize {
self.validator_store.num_voting_validators()
@@ -332,7 +438,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// It is possible that multiple validators have an identical proposal slot, however that is
/// likely the result of heavy forking (lol) or inconsistent beacon node connections.
pub fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
let epoch = slot.epoch(E::slots_per_epoch());
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
@@ -357,7 +463,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// Returns all `ValidatorDuty` for the given `slot`.
pub fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> {
let epoch = slot.epoch(E::slots_per_epoch());
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
@@ -395,15 +501,15 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// process every slot, which has the chance of creating a theoretically unlimited backlog of tasks.
/// It was a conscious decision to choose to drop tasks on an overloaded/latent system rather than
/// overload it even more.
pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
core_duties_service: Arc<DutiesService<T, E>>,
pub fn start_update_service<S: ValidatorStore + 'static, T: SlotClock + 'static>(
core_duties_service: Arc<DutiesService<S, T>>,
mut block_service_tx: Sender<BlockServiceNotification>,
) {
/*
* Spawn the task which updates the map of pubkey to validator index.
*/
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
// Run this poll before the wait, this should hopefully download all the indices
@@ -426,7 +532,7 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
* Spawn the task which keeps track of local block proposal duties.
*/
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
@@ -459,7 +565,7 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
* Spawn the task which keeps track of local attestation duties.
*/
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
@@ -484,7 +590,7 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
// Spawn the task which keeps track of local sync committee duties.
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Err(e) = poll_sync_committee_duties(&duties_service).await {
@@ -514,8 +620,8 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
/// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown
/// validator indices.
async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
async fn poll_validator_indices<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
) {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
@@ -534,16 +640,14 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
// This is on its own line to avoid some weirdness with locks and if statements.
let is_known = duties_service
.validator_store
.initialized_validators()
.read()
.get_index(&pubkey)
.validator_index(&pubkey)
.is_some();
if !is_known {
let current_slot_opt = duties_service.slot_clock.now();
if let Some(current_slot) = current_slot_opt {
let is_first_slot_of_epoch = current_slot % E::slots_per_epoch() == 0;
let is_first_slot_of_epoch = current_slot % S::E::slots_per_epoch() == 0;
// Query an unknown validator later if it was queried within the last epoch, or if
// the current slot is the first slot of an epoch.
@@ -594,9 +698,7 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
);
duties_service
.validator_store
.initialized_validators()
.write()
.set_index(&pubkey, response.data.index);
.set_validator_index(&pubkey, response.data.index);
duties_service
.unknown_validator_next_poll_slots
@@ -607,7 +709,7 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
// the beacon chain.
Ok(None) => {
if let Some(current_slot) = current_slot_opt {
let next_poll_slot = current_slot.saturating_add(E::slots_per_epoch());
let next_poll_slot = current_slot.saturating_add(S::E::slots_per_epoch());
duties_service
.unknown_validator_next_poll_slots
.write()
@@ -638,9 +740,9 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
/// 2. As above, but for the next-epoch.
/// 3. Push out any attestation subnet subscriptions to the BN.
/// 4. Prune old entries from `duties_service.attesters`.
async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
async fn poll_beacon_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
) -> Result<(), Error<S::Error>> {
let current_epoch_timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_ATTESTERS_CURRENT_EPOCH],
@@ -650,7 +752,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
let next_epoch = current_epoch + 1;
// Collect *all* pubkeys, even those undergoing doppelganger protection.
@@ -664,10 +766,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys {
if let Some(validator_index) = vals.get_index(&pubkey) {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
local_indices.push(validator_index)
}
}
@@ -691,7 +791,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
)
}
update_per_validator_duty_metrics::<T, E>(duties_service, current_epoch, current_slot);
update_per_validator_duty_metrics(duties_service, current_epoch, current_slot);
drop(current_epoch_timer);
let next_epoch_timer = validator_metrics::start_timer_vec(
@@ -712,7 +812,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
)
}
update_per_validator_duty_metrics::<T, E>(duties_service, next_epoch, current_slot);
update_per_validator_duty_metrics(duties_service, next_epoch, current_slot);
drop(next_epoch_timer);
let subscriptions_timer = validator_metrics::start_timer_vec(
@@ -733,7 +833,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
* std::cmp::max(
1,
local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len()
/ E::slots_per_epoch() as usize,
/ S::E::slots_per_epoch() as usize,
)
/ overallocation_denominator;
let mut subscriptions = Vec::with_capacity(num_expected_subscriptions);
@@ -829,12 +929,12 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
/// For the given `local_indices` and `local_pubkeys`, download the duties for the given `epoch` and
/// store them in `duties_service.attesters`.
async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Result<(), Error> {
) -> Result<(), Error<S::Error>> {
// No need to bother the BN if we don't have any validators.
if local_indices.is_empty() {
debug!(
@@ -978,8 +1078,7 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// Spawn the background task to compute selection proofs.
let subservice = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_selection_proofs(subservice, new_duties, dependent_root).await;
},
@@ -990,8 +1089,8 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
}
/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
fn get_uninitialized_validators<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
@@ -1007,8 +1106,8 @@ fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
.collect::<Vec<_>>()
}
fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
fn update_per_validator_duty_metrics<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
current_slot: Slot,
) {
@@ -1023,14 +1122,14 @@ fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());
let existing_epoch = existing_slot.epoch(S::E::slots_per_epoch());
// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
|| (duty_slot.epoch(S::E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
@@ -1048,11 +1147,11 @@ fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
}
}
async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
async fn post_validator_duties_attester<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
) -> Result<DutiesResponse<Vec<AttesterData>>, Error<S::Error>> {
duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
@@ -1141,8 +1240,8 @@ fn process_duty_and_proof<E: EthSpec>(
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
/// terminate early as it is assumed the selection proofs from `duties` are no longer relevant.
async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: Arc<DutiesService<S, T>>,
duties: Vec<AttesterData>,
dependent_root: Hash256,
) {
@@ -1298,10 +1397,10 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
/// through the slow path every time. I.e., the proposal will only happen after we've been able to
/// download and process the duties from the BN. This means it is very important to ensure this
/// function is as fast as possible.
async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
block_service_tx: &mut Sender<BlockServiceNotification>,
) -> Result<(), Error> {
) -> Result<(), Error<S::Error>> {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_PROPOSERS],
@@ -1311,17 +1410,17 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
// Notify the block proposal service for any proposals that we have in our cache.
//
// See the function-level documentation for more information.
let initial_block_proposers = duties_service.block_proposers(current_slot);
notify_block_production_service(
notify_block_production_service::<S>(
current_slot,
&initial_block_proposers,
block_service_tx,
&duties_service.validator_store,
duties_service.validator_store.as_ref(),
)
.await;
@@ -1403,11 +1502,11 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
//
// See the function-level documentation for more reasoning about this behaviour.
if !additional_block_producers.is_empty() {
notify_block_production_service(
notify_block_production_service::<S>(
current_slot,
&additional_block_producers,
block_service_tx,
&duties_service.validator_store,
duties_service.validator_store.as_ref(),
)
.await;
debug!(
@@ -1428,11 +1527,11 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
}
/// Notify the block service if it should produce a block.
async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
async fn notify_block_production_service<S: ValidatorStore>(
current_slot: Slot,
block_proposers: &HashSet<PublicKeyBytes>,
block_service_tx: &mut Sender<BlockServiceNotification>,
validator_store: &ValidatorStore<T, E>,
validator_store: &S,
) {
let non_doppelganger_proposers = block_proposers
.iter()