diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 522c6414ea..18433ff9a0 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -128,6 +128,7 @@ pub struct Timeouts { pub proposer_duties: Duration, pub sync_committee_contribution: Duration, pub sync_duties: Duration, + pub inclusion_list_duties: Duration, pub get_beacon_blocks_ssz: Duration, pub get_debug_beacon_states: Duration, pub get_deposit_snapshot: Duration, @@ -145,6 +146,7 @@ impl Timeouts { proposer_duties: timeout, sync_committee_contribution: timeout, sync_duties: timeout, + inclusion_list_duties: timeout, get_beacon_blocks_ssz: timeout, get_debug_beacon_states: timeout, get_deposit_snapshot: timeout, @@ -2494,6 +2496,29 @@ impl BeaconNodeHttpClient { .await } + /// `POST validator/duties/inclusion_list/{epoch}` + pub async fn post_validator_duties_inclusion_list( + &self, + epoch: Epoch, + indices: &[u64], + ) -> Result>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("duties") + .push("inclusion_list") + .push(&epoch.to_string()); + + self.post_with_timeout_and_response( + path, + &ValidatorIndexDataRef(indices), + self.timeouts.inclusion_list_duties, + ) + .await + } + /// `POST v1/validator/aggregate_and_proofs` pub async fn post_validator_aggregate_and_proof_v1( &self, diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 2cc22357fb..b0dc651f27 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -66,6 +66,7 @@ const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_INCLUSION_LIST_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4; @@ -318,6 +319,8 @@ impl ProductionValidatorClient { sync_committee_contribution: slot_duration / HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT, sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, + inclusion_list_duties: slot_duration + / HTTP_INCLUSION_LIST_DUTIES_TIMEOUT_QUOTIENT, get_beacon_blocks_ssz: slot_duration / HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT, get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT, @@ -470,6 +473,7 @@ impl ProductionValidatorClient { attesters: <_>::default(), proposers: <_>::default(), sync_duties: SyncDutiesMap::new(config.distributed), + inclusion_list_duties: <_>::default(), slot_clock: slot_clock.clone(), beacon_nodes: beacon_nodes.clone(), validator_store: validator_store.clone(), diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 187eb4feb5..f47a117fb5 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -13,7 +13,8 @@ use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use doppelganger_service::DoppelgangerStatus; use environment::RuntimeContext; use eth2::types::{ - AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId, + AttesterData, BeaconCommitteeSubscription, DutiesResponse, InclusionListDutyData, ProposerData, + StateId, ValidatorId, }; use futures::{stream, StreamExt}; use parking_lot::RwLock; @@ -94,6 +95,7 @@ pub enum Error { InvalidModulo(#[allow(dead_code)] ArithError), Arith(#[allow(dead_code)] ArithError), SyncDutiesNotFound(#[allow(dead_code)] u64), + FailedToDownloadInclusionListDuties(#[allow(dead_code)] String), } impl From for Error { @@ -204,6 +206,8 @@ type DependentRoot = Hash256; type AttesterMap = HashMap>; type ProposerMap = HashMap)>; +type InclusionListDutiesMap = + HashMap>; /// See the module-level documentation. pub struct DutiesService { @@ -214,6 +218,8 @@ pub struct DutiesService { pub proposers: RwLock, /// Map from validator index to sync committee duties. pub sync_duties: SyncDutiesMap, + /// Maps a validator public key to their inclusion list committee duties for each epoch. + pub inclusion_list_duties: RwLock, /// Provides the canonical list of locally-managed validators. pub validator_store: Arc>, /// Maps unknown validator pubkeys to the next slot time when a poll should be conducted again. @@ -462,6 +468,35 @@ pub fn start_update_service( }, "duties_service_sync_committee", ); + + /* + * Spawn the task which keeps track of local inclusion list duties. + */ + let duties_service = core_duties_service.clone(); + let log = core_duties_service.context.log().clone(); + core_duties_service.context.executor.spawn( + async move { + loop { + if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() { + sleep(duration).await; + } else { + // Just sleep for one slot if we are unable to read the system clock, this gives + // us an opportunity for the clock to eventually come good. + sleep(duties_service.slot_clock.slot_duration()).await; + continue; + } + + if let Err(e) = poll_beacon_inclusion_list_duties(&duties_service).await { + error!( + log, + "Failed to poll inclusion list duties"; + "error" => ?e + ); + } + } + }, + "duties_service_inclusion_list_committee", + ); } /// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown @@ -1196,6 +1231,253 @@ async fn fill_in_selection_proofs( } } +/// Query the beacon node for inclusion list duties for any known validators. +/// +/// This function will perform (in the following order): +/// +/// 1. Poll for current-epoch duties and update the local `duties_service.inclusion_list_duties` +/// map. +/// 2. As above, but for the next-epoch. +/// 3. Prune old entries from `duties_service.inclusion_list_duties`. +/// +/// NOTE: There are no subscriptions to manage, since the inclusion list topics are global. +async fn poll_beacon_inclusion_list_duties( + duties_service: &Arc>, +) -> Result<(), Error> { + // TODO: add timer metric for current epoch + + let log = duties_service.context.log(); + + let current_slot = duties_service + .slot_clock + .now() + .ok_or(Error::UnableToReadSlotClock)?; + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + let next_epoch = current_epoch + 1; + + // Collect *all* pubkeys, even those undergoing doppelganger protection. + let local_pubkeys: HashSet<_> = duties_service + .validator_store + .voting_pubkeys(DoppelgangerStatus::ignored); + + let local_indices = { + let mut local_indices = Vec::with_capacity(local_pubkeys.len()); + + let vals_ref = duties_service.validator_store.initialized_validators(); + let vals = vals_ref.read(); + for &pubkey in &local_pubkeys { + if let Some(validator_index) = vals.get_index(&pubkey) { + local_indices.push(validator_index) + } + } + local_indices + }; + + // Download the duties and update the duties for the current epoch. + if let Err(e) = poll_beacon_inclusion_list_duties_for_epoch( + duties_service, + current_epoch, + &local_indices, + &local_pubkeys, + ) + .await + { + error!( + log, + "Failed to download inclusion list duties"; + "current_epoch" => current_epoch, + "request_epoch" => current_epoch, + "err" => ?e, + ) + } + + // TODO: validator duty metrics + + // TODO: add timer metric for next epoch + + // Download the duties and update the duties for the next epoch. + if let Err(e) = poll_beacon_inclusion_list_duties_for_epoch( + duties_service, + next_epoch, + &local_indices, + &local_pubkeys, + ) + .await + { + error!( + log, + "Failed to download inclusion list duties"; + "current_epoch" => current_epoch, + "request_epoch" => next_epoch, + "err" => ?e, + ) + } + + // TODO: validator duty metrics + + // Prune old duties. + duties_service + .inclusion_list_duties + .write() + .iter_mut() + .for_each(|(_, map)| { + map.retain(|&epoch, _| epoch + HISTORICAL_DUTIES_EPOCHS >= current_epoch) + }); + + Ok(()) +} + +/// For the given `local_indices` and `local_pubkeys`, download the inclusion list duties for the +/// given epoch and store them in `duties_service.inclusion_list_duties`. +async fn poll_beacon_inclusion_list_duties_for_epoch( + duties_service: &Arc>, + epoch: Epoch, + local_indices: &[u64], + local_pubkeys: &HashSet, +) -> Result<(), Error> { + let log = duties_service.context.log(); + + // No need to bother the BN if we don't have any validators. + if local_indices.is_empty() { + debug!( + duties_service.context.log(), + "No validators, not downloading inclusion list duties"; + "epoch" => epoch, + ); + return Ok(()); + } + + // TODO: add fetch metric + + // Request duties for all uninitialized validators. If there isn't any, we will just request for + // `INITIAL_DUTIES_QUERY_SIZE` validators. We use the `dependent_root` in the response to + // determine whether validator duties need to be updated. This is to ensure that we don't + // request for extra data unless necessary in order to save on network bandwidth. + let uninitialized_validators = + get_uninitialized_validators(duties_service, &epoch, local_pubkeys); + let initial_indices_to_request = if !uninitialized_validators.is_empty() { + uninitialized_validators.as_slice() + } else { + &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())] + }; + + let response = + post_validator_duties_inclusion_list(duties_service, epoch, initial_indices_to_request) + .await?; + let dependent_root = response.dependent_root; + + // Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch. + let validators_to_update: Vec<_> = { + // Avoid holding the read-lock for any longer than required. + let inclusion_list_duties = duties_service.inclusion_list_duties.read(); + local_pubkeys + .iter() + .filter(|pubkey| { + inclusion_list_duties.get(pubkey).map_or(true, |duties| { + duties + .get(&epoch) + .map_or(true, |(prior, _)| *prior != dependent_root) + }) + }) + .collect::>() + }; + + if validators_to_update.is_empty() { + // No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch. + return Ok(()); + } + + // Make a request for all indices that require updating which we have not already made a request + // for. + let indices_to_request = validators_to_update + .iter() + .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) + .filter(|validator_index| !initial_indices_to_request.contains(validator_index)) + .collect::>(); + + // Filter the initial duties by their relevance so that we don't hit the warning below about + // overwriting duties. There was previously a bug here. + let new_initial_duties = response + .data + .into_iter() + .filter(|duty| validators_to_update.contains(&&duty.pubkey)); + + let mut new_duties = if !indices_to_request.is_empty() { + post_validator_duties_inclusion_list(duties_service, epoch, indices_to_request.as_slice()) + .await? + .data + } else { + vec![] + }; + new_duties.extend(new_initial_duties); + + // TODO: add store metric + + debug!( + log, + "Downloaded inclusion list duties"; + "dependent_root" => %dependent_root, + "num_new_duties" => new_duties.len(), + ); + + // Update the duties service with the new `InclusionListDutyData` messages. + let mut inclusion_list_duties = duties_service.inclusion_list_duties.write(); + let current_slot = duties_service + .slot_clock + .now_or_genesis() + .unwrap_or_default(); + for duty in new_duties { + let inclusion_list_duty_map = inclusion_list_duties.entry(duty.pubkey).or_default(); + + match inclusion_list_duty_map.entry(epoch) { + hash_map::Entry::Occupied(mut occupied) => { + let mut_value = occupied.get_mut(); + let (prior_dependent_root, prior_duty) = &mut_value; + + // NOTE: We do not need to worry about an overwrite here, since there is no + // information that we store aside from the duty itself. There is no selection proof + // to compute, and there are no subscriptions to keep track of. + + // NOTE: We could use a flag here to avoid redundant logs. + if dependent_root != *prior_dependent_root { + warn!( + log, + "Inclusion list duties re-org"; + "prior_dependent_root" => %prior_dependent_root, + "dependent_root" => %dependent_root, + "note" => "this may happen from time to time" + ) + } + *mut_value = (dependent_root, duty); + } + hash_map::Entry::Vacant(vacant) => { + vacant.insert((dependent_root, duty)); + } + } + } + drop(inclusion_list_duties); + + Ok(()) +} + +/// Retrieve inclusion list committee duties for validators corresponding to `validator_indices`. +async fn post_validator_duties_inclusion_list( + duties_service: &Arc>, + epoch: Epoch, + validator_indices: &[u64], +) -> Result>, Error> { + duties_service + .beacon_nodes + .first_success(|beacon_node| async move { + // TODO: add metric + beacon_node + .post_validator_duties_inclusion_list(epoch, validator_indices) + .await + }) + .await + .map_err(|e| Error::FailedToDownloadInclusionListDuties(e.to_string())) +} + /// Download the proposer duties for the current epoch and store them in `duties_service.proposers`. /// If there are any proposer for this slot, send out a notification to the block proposers. ///