add inclusion list duties mgmt to VC duties service

This commit is contained in:
jacobkaufmann
2024-12-12 18:54:07 -07:00
parent 2b3c602b8f
commit 7125f25f3a
3 changed files with 312 additions and 1 deletions

View File

@@ -13,7 +13,8 @@ use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use doppelganger_service::DoppelgangerStatus;
use environment::RuntimeContext;
use eth2::types::{
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
AttesterData, BeaconCommitteeSubscription, DutiesResponse, InclusionListDutyData, ProposerData,
StateId, ValidatorId,
};
use futures::{stream, StreamExt};
use parking_lot::RwLock;
@@ -94,6 +95,7 @@ pub enum Error {
InvalidModulo(#[allow(dead_code)] ArithError),
Arith(#[allow(dead_code)] ArithError),
SyncDutiesNotFound(#[allow(dead_code)] u64),
FailedToDownloadInclusionListDuties(#[allow(dead_code)] String),
}
impl From<ArithError> for Error {
@@ -204,6 +206,8 @@ type DependentRoot = Hash256;
type AttesterMap = HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, DutyAndProof)>>;
type ProposerMap = HashMap<Epoch, (DependentRoot, Vec<ProposerData>)>;
type InclusionListDutiesMap =
HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, InclusionListDutyData)>>;
/// See the module-level documentation.
pub struct DutiesService<T, E: EthSpec> {
@@ -214,6 +218,8 @@ pub struct DutiesService<T, E: EthSpec> {
pub proposers: RwLock<ProposerMap>,
/// Map from validator index to sync committee duties.
pub sync_duties: SyncDutiesMap<E>,
/// Maps a validator public key to their inclusion list committee duties for each epoch.
pub inclusion_list_duties: RwLock<InclusionListDutiesMap>,
/// Provides the canonical list of locally-managed validators.
pub validator_store: Arc<ValidatorStore<T, E>>,
/// Maps unknown validator pubkeys to the next slot time when a poll should be conducted again.
@@ -462,6 +468,35 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
},
"duties_service_sync_committee",
);
/*
* Spawn the task which keeps track of local inclusion list duties.
*/
let duties_service = core_duties_service.clone();
let log = core_duties_service.context.log().clone();
core_duties_service.context.executor.spawn(
async move {
loop {
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
sleep(duration).await;
} else {
// Just sleep for one slot if we are unable to read the system clock, this gives
// us an opportunity for the clock to eventually come good.
sleep(duties_service.slot_clock.slot_duration()).await;
continue;
}
if let Err(e) = poll_beacon_inclusion_list_duties(&duties_service).await {
error!(
log,
"Failed to poll inclusion list duties";
"error" => ?e
);
}
}
},
"duties_service_inclusion_list_committee",
);
}
/// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown
@@ -1196,6 +1231,253 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
}
}
/// Query the beacon node for inclusion list duties for any known validators.
///
/// This function will perform (in the following order):
///
/// 1. Poll for current-epoch duties and update the local `duties_service.inclusion_list_duties`
/// map.
/// 2. As above, but for the next-epoch.
/// 3. Prune old entries from `duties_service.inclusion_list_duties`.
///
/// NOTE: There are no subscriptions to manage, since the inclusion list topics are global.
async fn poll_beacon_inclusion_list_duties<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
// TODO: add timer metric for current epoch
let log = duties_service.context.log();
let current_slot = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let next_epoch = current_epoch + 1;
// Collect *all* pubkeys, even those undergoing doppelganger protection.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys {
if let Some(validator_index) = vals.get_index(&pubkey) {
local_indices.push(validator_index)
}
}
local_indices
};
// Download the duties and update the duties for the current epoch.
if let Err(e) = poll_beacon_inclusion_list_duties_for_epoch(
duties_service,
current_epoch,
&local_indices,
&local_pubkeys,
)
.await
{
error!(
log,
"Failed to download inclusion list duties";
"current_epoch" => current_epoch,
"request_epoch" => current_epoch,
"err" => ?e,
)
}
// TODO: validator duty metrics
// TODO: add timer metric for next epoch
// Download the duties and update the duties for the next epoch.
if let Err(e) = poll_beacon_inclusion_list_duties_for_epoch(
duties_service,
next_epoch,
&local_indices,
&local_pubkeys,
)
.await
{
error!(
log,
"Failed to download inclusion list duties";
"current_epoch" => current_epoch,
"request_epoch" => next_epoch,
"err" => ?e,
)
}
// TODO: validator duty metrics
// Prune old duties.
duties_service
.inclusion_list_duties
.write()
.iter_mut()
.for_each(|(_, map)| {
map.retain(|&epoch, _| epoch + HISTORICAL_DUTIES_EPOCHS >= current_epoch)
});
Ok(())
}
/// For the given `local_indices` and `local_pubkeys`, download the inclusion list duties for the
/// given epoch and store them in `duties_service.inclusion_list_duties`.
async fn poll_beacon_inclusion_list_duties_for_epoch<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Result<(), Error> {
let log = duties_service.context.log();
// No need to bother the BN if we don't have any validators.
if local_indices.is_empty() {
debug!(
duties_service.context.log(),
"No validators, not downloading inclusion list duties";
"epoch" => epoch,
);
return Ok(());
}
// TODO: add fetch metric
// Request duties for all uninitialized validators. If there isn't any, we will just request for
// `INITIAL_DUTIES_QUERY_SIZE` validators. We use the `dependent_root` in the response to
// determine whether validator duties need to be updated. This is to ensure that we don't
// request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
let initial_indices_to_request = if !uninitialized_validators.is_empty() {
uninitialized_validators.as_slice()
} else {
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
};
let response =
post_validator_duties_inclusion_list(duties_service, epoch, initial_indices_to_request)
.await?;
let dependent_root = response.dependent_root;
// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
let validators_to_update: Vec<_> = {
// Avoid holding the read-lock for any longer than required.
let inclusion_list_duties = duties_service.inclusion_list_duties.read();
local_pubkeys
.iter()
.filter(|pubkey| {
inclusion_list_duties.get(pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
})
.collect::<Vec<_>>()
};
if validators_to_update.is_empty() {
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch.
return Ok(());
}
// Make a request for all indices that require updating which we have not already made a request
// for.
let indices_to_request = validators_to_update
.iter()
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.filter(|validator_index| !initial_indices_to_request.contains(validator_index))
.collect::<Vec<_>>();
// Filter the initial duties by their relevance so that we don't hit the warning below about
// overwriting duties. There was previously a bug here.
let new_initial_duties = response
.data
.into_iter()
.filter(|duty| validators_to_update.contains(&&duty.pubkey));
let mut new_duties = if !indices_to_request.is_empty() {
post_validator_duties_inclusion_list(duties_service, epoch, indices_to_request.as_slice())
.await?
.data
} else {
vec![]
};
new_duties.extend(new_initial_duties);
// TODO: add store metric
debug!(
log,
"Downloaded inclusion list duties";
"dependent_root" => %dependent_root,
"num_new_duties" => new_duties.len(),
);
// Update the duties service with the new `InclusionListDutyData` messages.
let mut inclusion_list_duties = duties_service.inclusion_list_duties.write();
let current_slot = duties_service
.slot_clock
.now_or_genesis()
.unwrap_or_default();
for duty in new_duties {
let inclusion_list_duty_map = inclusion_list_duties.entry(duty.pubkey).or_default();
match inclusion_list_duty_map.entry(epoch) {
hash_map::Entry::Occupied(mut occupied) => {
let mut_value = occupied.get_mut();
let (prior_dependent_root, prior_duty) = &mut_value;
// NOTE: We do not need to worry about an overwrite here, since there is no
// information that we store aside from the duty itself. There is no selection proof
// to compute, and there are no subscriptions to keep track of.
// NOTE: We could use a flag here to avoid redundant logs.
if dependent_root != *prior_dependent_root {
warn!(
log,
"Inclusion list duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"note" => "this may happen from time to time"
)
}
*mut_value = (dependent_root, duty);
}
hash_map::Entry::Vacant(vacant) => {
vacant.insert((dependent_root, duty));
}
}
}
drop(inclusion_list_duties);
Ok(())
}
/// Retrieve inclusion list committee duties for validators corresponding to `validator_indices`.
async fn post_validator_duties_inclusion_list<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<InclusionListDutyData>>, Error> {
duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
// TODO: add metric
beacon_node
.post_validator_duties_inclusion_list(epoch, validator_indices)
.await
})
.await
.map_err(|e| Error::FailedToDownloadInclusionListDuties(e.to_string()))
}
/// Download the proposer duties for the current epoch and store them in `duties_service.proposers`.
/// If there are any proposer for this slot, send out a notification to the block proposers.
///