mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-18 22:49:34 +00:00
Merge branch 'unstable' into gloas-attestation-index
This commit is contained in:
@@ -102,10 +102,8 @@ pub fn import<T: SlotClock + 'static, E: EthSpec>(
|
||||
// Import each keystore. Some keystores may fail to be imported, so we record a status for each.
|
||||
let mut statuses = Vec::with_capacity(request.keystores.len());
|
||||
|
||||
for (KeystoreJsonStr(keystore), password) in request
|
||||
.keystores
|
||||
.into_iter()
|
||||
.zip(request.passwords.into_iter())
|
||||
for (KeystoreJsonStr(keystore), password) in
|
||||
request.keystores.into_iter().zip(request.passwords)
|
||||
{
|
||||
let pubkey_str = keystore.pubkey().to_string();
|
||||
|
||||
|
||||
@@ -197,6 +197,16 @@ pub fn gather_prometheus_metrics<E: EthSpec>(
|
||||
&[NEXT_EPOCH],
|
||||
duties_service.attester_count(next_epoch) as i64,
|
||||
);
|
||||
set_int_gauge(
|
||||
&PTC_COUNT,
|
||||
&[CURRENT_EPOCH],
|
||||
duties_service.ptc_count(current_epoch) as i64,
|
||||
);
|
||||
set_int_gauge(
|
||||
&PTC_COUNT,
|
||||
&[NEXT_EPOCH],
|
||||
duties_service.ptc_count(next_epoch) as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use account_utils::write_file_via_temporary;
|
||||
use bls::{Keypair, PublicKey};
|
||||
use eth2_keystore::json_keystore::{
|
||||
Aes128Ctr, ChecksumModule, Cipher, CipherModule, Crypto, EmptyMap, EmptyString, KdfModule,
|
||||
Aes128Ctr, ChecksumModule, Cipher, CipherModule, Crypto, EmptyMap, EmptyString, Kdf, KdfModule,
|
||||
Sha256Checksum,
|
||||
};
|
||||
use eth2_keystore::{
|
||||
@@ -65,10 +65,14 @@ impl KeyCache {
|
||||
}
|
||||
|
||||
pub fn init_crypto() -> Crypto {
|
||||
Self::build_crypto(default_kdf)
|
||||
}
|
||||
|
||||
fn build_crypto(kdf_fn: fn(Vec<u8>) -> Kdf) -> Crypto {
|
||||
let salt = rand::rng().random::<[u8; SALT_SIZE]>();
|
||||
let iv = rand::rng().random::<[u8; IV_SIZE]>().to_vec().into();
|
||||
|
||||
let kdf = default_kdf(salt.to_vec());
|
||||
let kdf = kdf_fn(salt.to_vec());
|
||||
let cipher = Cipher::Aes128Ctr(Aes128Ctr { iv });
|
||||
|
||||
Crypto {
|
||||
@@ -116,7 +120,11 @@ impl KeyCache {
|
||||
}
|
||||
|
||||
fn encrypt(&mut self) -> Result<(), Error> {
|
||||
self.crypto = Self::init_crypto();
|
||||
self.encrypt_with(default_kdf)
|
||||
}
|
||||
|
||||
fn encrypt_with(&mut self, kdf_fn: fn(Vec<u8>) -> Kdf) -> Result<(), Error> {
|
||||
self.crypto = Self::build_crypto(kdf_fn);
|
||||
let secret_map: SerializedKeyMap = self
|
||||
.pairs
|
||||
.iter()
|
||||
@@ -268,7 +276,19 @@ pub enum Error {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use eth2_keystore::json_keystore::HexBytes;
|
||||
use eth2_keystore::json_keystore::{HexBytes, Scrypt};
|
||||
|
||||
/// Scrypt with minimal cost (n=1024) for fast test execution.
|
||||
/// Production uses n=262144 which takes ~45s per derivation.
|
||||
fn insecure_kdf(salt: Vec<u8>) -> Kdf {
|
||||
Kdf::Scrypt(Scrypt {
|
||||
dklen: 32,
|
||||
n: 1024,
|
||||
p: 1,
|
||||
r: 8,
|
||||
salt: salt.into(),
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_serialization() {
|
||||
@@ -302,7 +322,7 @@ mod tests {
|
||||
key_cache.add(keypair.clone(), uuid, password.clone());
|
||||
}
|
||||
|
||||
key_cache.encrypt().unwrap();
|
||||
key_cache.encrypt_with(insecure_kdf).unwrap();
|
||||
key_cache.state = State::DecryptedAndSaved;
|
||||
|
||||
assert_eq!(&key_cache.uuids, &uuids);
|
||||
|
||||
@@ -1030,7 +1030,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
|
||||
|
||||
// Collect successfully signed attestations and log errors.
|
||||
let mut signed_attestations = Vec::with_capacity(attestations.len());
|
||||
for (result, att) in results.into_iter().zip(attestations.into_iter()) {
|
||||
for (result, att) in results.into_iter().zip(attestations) {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
signed_attestations.push((
|
||||
@@ -1432,7 +1432,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
|
||||
) -> Result<SignedExecutionPayloadEnvelope<E>, Error> {
|
||||
let signing_context = self.signing_context(
|
||||
Domain::BeaconBuilder,
|
||||
envelope.slot.epoch(E::slots_per_epoch()),
|
||||
envelope.slot().epoch(E::slots_per_epoch()),
|
||||
);
|
||||
|
||||
// Execution payload envelope signing is not slashable, bypass doppelganger protection.
|
||||
|
||||
@@ -187,6 +187,9 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
||||
info!(new_validators, "Completed validator discovery");
|
||||
}
|
||||
|
||||
// Check for all validators' fee recipient
|
||||
validator_defs.check_all_fee_recipients(config.validator_store.fee_recipient)?;
|
||||
|
||||
let validators = InitializedValidators::from_definitions(
|
||||
validator_defs,
|
||||
config.validator_dir.clone(),
|
||||
|
||||
@@ -22,7 +22,12 @@ pub const UPDATE_ATTESTERS_CURRENT_EPOCH: &str = "update_attesters_current_epoch
|
||||
pub const UPDATE_ATTESTERS_NEXT_EPOCH: &str = "update_attesters_next_epoch";
|
||||
pub const UPDATE_ATTESTERS_FETCH: &str = "update_attesters_fetch";
|
||||
pub const UPDATE_ATTESTERS_STORE: &str = "update_attesters_store";
|
||||
pub const UPDATE_PTC_CURRENT_EPOCH: &str = "update_ptc_current_epoch";
|
||||
pub const UPDATE_PTC_NEXT_EPOCH: &str = "update_ptc_next_epoch";
|
||||
pub const UPDATE_PTC_FETCH: &str = "update_ptc_fetch";
|
||||
pub const UPDATE_PTC_STORE: &str = "update_ptc_store";
|
||||
pub const ATTESTER_DUTIES_HTTP_POST: &str = "attester_duties_http_post";
|
||||
pub const PTC_DUTIES_HTTP_POST: &str = "ptc_duties_http_post";
|
||||
pub const PROPOSER_DUTIES_HTTP_GET: &str = "proposer_duties_http_get";
|
||||
pub const VALIDATOR_DUTIES_SYNC_HTTP_POST: &str = "validator_duties_sync_http_post";
|
||||
pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get";
|
||||
@@ -162,6 +167,13 @@ pub static ATTESTER_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
|
||||
&["task"],
|
||||
)
|
||||
});
|
||||
pub static PTC_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
|
||||
try_create_int_gauge_vec(
|
||||
"vc_beacon_ptc_count",
|
||||
"Number of PTC (Payload Timeliness Committee) validators on this host",
|
||||
&["task"],
|
||||
)
|
||||
});
|
||||
pub static PROPOSAL_CHANGED: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
|
||||
try_create_int_counter(
|
||||
"vc_beacon_block_proposal_changed",
|
||||
|
||||
@@ -439,7 +439,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "handle_aggregates",
|
||||
name = "lh_handle_aggregates",
|
||||
skip_all,
|
||||
fields(%slot, %committee_index)
|
||||
)]
|
||||
@@ -494,7 +494,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
|
||||
///
|
||||
/// The given `validator_duties` should already be filtered to only contain those that match
|
||||
/// `slot`. Critical errors will be logged if this is not the case.
|
||||
#[instrument(skip_all, fields(%slot, %attestation_data.beacon_block_root))]
|
||||
#[instrument(name = "lh_sign_and_publish_attestations", skip_all, fields(%slot, %attestation_data.beacon_block_root))]
|
||||
async fn sign_and_publish_attestations(
|
||||
&self,
|
||||
slot: Slot,
|
||||
|
||||
@@ -404,7 +404,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "block_proposal_duty_cycle",
|
||||
name = "lh_block_proposal_duty_cycle",
|
||||
skip_all,
|
||||
fields(%slot, ?validator_pubkey)
|
||||
)]
|
||||
|
||||
@@ -13,7 +13,7 @@ use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
|
||||
use bls::PublicKeyBytes;
|
||||
use eth2::types::{
|
||||
AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse,
|
||||
ProposerData, StateId, ValidatorId,
|
||||
ProposerData, PtcDuty, StateId, ValidatorId,
|
||||
};
|
||||
use futures::{
|
||||
StreamExt,
|
||||
@@ -46,6 +46,7 @@ const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
|
||||
/// The initial request is used to determine if further requests are required, so that it
|
||||
/// reduces the amount of data that needs to be transferred.
|
||||
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;
|
||||
const INITIAL_PTC_DUTIES_QUERY_SIZE: usize = 1;
|
||||
|
||||
/// Offsets from the attestation duty slot at which a subscription should be sent.
|
||||
const ATTESTATION_SUBSCRIPTION_OFFSETS: [u64; 8] = [3, 4, 5, 6, 7, 8, 16, 32];
|
||||
@@ -83,6 +84,7 @@ const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBS
|
||||
pub enum Error<T> {
|
||||
UnableToReadSlotClock,
|
||||
FailedToDownloadAttesters(#[allow(dead_code)] String),
|
||||
FailedToDownloadPtc(#[allow(dead_code)] String),
|
||||
FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError<T>),
|
||||
InvalidModulo(#[allow(dead_code)] ArithError),
|
||||
Arith(#[allow(dead_code)] ArithError),
|
||||
@@ -283,6 +285,7 @@ type DependentRoot = Hash256;
|
||||
|
||||
type AttesterMap = HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, DutyAndProof)>>;
|
||||
type ProposerMap = HashMap<Epoch, (DependentRoot, Vec<ProposerData>)>;
|
||||
type PtcMap = HashMap<Epoch, (DependentRoot, Vec<PtcDuty>)>;
|
||||
|
||||
pub struct DutiesServiceBuilder<S, T> {
|
||||
/// Provides the canonical list of locally-managed validators.
|
||||
@@ -384,6 +387,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
|
||||
attesters: Default::default(),
|
||||
proposers: Default::default(),
|
||||
sync_duties: SyncDutiesMap::new(self.sync_selection_proof_config),
|
||||
ptc_duties: Default::default(),
|
||||
validator_store: self
|
||||
.validator_store
|
||||
.ok_or("Cannot build DutiesService without validator_store")?,
|
||||
@@ -414,6 +418,8 @@ pub struct DutiesService<S, T> {
|
||||
pub proposers: RwLock<ProposerMap>,
|
||||
/// Map from validator index to sync committee duties.
|
||||
pub sync_duties: SyncDutiesMap,
|
||||
/// Maps an epoch to PTC duties for locally-managed validators.
|
||||
pub ptc_duties: RwLock<PtcMap>,
|
||||
/// Provides the canonical list of locally-managed validators.
|
||||
pub validator_store: Arc<S>,
|
||||
/// Maps unknown validator pubkeys to the next slot time when a poll should be conducted again.
|
||||
@@ -465,13 +471,22 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
|
||||
.voting_pubkeys(DoppelgangerStatus::only_safe);
|
||||
self.attesters
|
||||
.read()
|
||||
.iter()
|
||||
.filter_map(|(_, map)| map.get(&epoch))
|
||||
.values()
|
||||
.filter_map(|map| map.get(&epoch))
|
||||
.map(|(_, duty_and_proof)| duty_and_proof)
|
||||
.filter(|duty_and_proof| signing_pubkeys.contains(&duty_and_proof.duty.pubkey))
|
||||
.count()
|
||||
}
|
||||
|
||||
/// Returns the total number of validators that have PTC duties in the given epoch.
|
||||
pub fn ptc_count(&self, epoch: Epoch) -> usize {
|
||||
self.ptc_duties
|
||||
.read()
|
||||
.get(&epoch)
|
||||
.map(|(_, duties)| duties.len())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Returns the total number of validators that are in a doppelganger detection period.
|
||||
pub fn doppelganger_detecting_count(&self) -> usize {
|
||||
self.validator_store
|
||||
@@ -518,8 +533,8 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
|
||||
|
||||
self.attesters
|
||||
.read()
|
||||
.iter()
|
||||
.filter_map(|(_, map)| map.get(&epoch))
|
||||
.values()
|
||||
.filter_map(|map| map.get(&epoch))
|
||||
.map(|(_, duty_and_proof)| duty_and_proof)
|
||||
.filter(|duty_and_proof| {
|
||||
duty_and_proof.duty.slot == slot
|
||||
@@ -534,6 +549,25 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
|
||||
self.enable_high_validator_count_metrics
|
||||
|| self.total_validator_count() <= VALIDATOR_METRICS_MIN_COUNT
|
||||
}
|
||||
|
||||
/// Get PTC duties for a specific slot.
|
||||
///
|
||||
/// Returns duties for local validators who have PTC assignments at the given slot.
|
||||
pub fn get_ptc_duties_for_slot(&self, slot: Slot) -> Vec<PtcDuty> {
|
||||
let epoch = slot.epoch(S::E::slots_per_epoch());
|
||||
|
||||
self.ptc_duties
|
||||
.read()
|
||||
.get(&epoch)
|
||||
.map(|(_, ptc_duties)| {
|
||||
ptc_duties
|
||||
.iter()
|
||||
.filter(|ptc_duty| ptc_duty.slot == slot)
|
||||
.cloned()
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the service that periodically polls the beacon node for validator duties. This will start
|
||||
@@ -662,6 +696,61 @@ pub fn start_update_service<S: ValidatorStore + 'static, T: SlotClock + 'static>
|
||||
},
|
||||
"duties_service_sync_committee",
|
||||
);
|
||||
|
||||
// Spawn the task which keeps track of local PTC duties.
|
||||
// Only start PTC duties service if Gloas fork is scheduled.
|
||||
if core_duties_service.spec.is_gloas_scheduled() {
|
||||
let duties_service = core_duties_service.clone();
|
||||
core_duties_service.executor.spawn(
|
||||
async move {
|
||||
loop {
|
||||
// Check if we've reached the Gloas fork epoch before polling
|
||||
let Some(current_slot) = duties_service.slot_clock.now() else {
|
||||
// Unable to read slot clock, sleep and try again
|
||||
sleep(duties_service.slot_clock.slot_duration()).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
|
||||
let Some(gloas_fork_epoch) = duties_service.spec.gloas_fork_epoch else {
|
||||
// Gloas fork epoch not configured, should not reach here
|
||||
break;
|
||||
};
|
||||
|
||||
if current_epoch + 1 < gloas_fork_epoch {
|
||||
// Wait until the next slot and check again
|
||||
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
|
||||
sleep(duration).await;
|
||||
} else {
|
||||
sleep(duties_service.slot_clock.slot_duration()).await;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(e) = poll_beacon_ptc_attesters(&duties_service).await {
|
||||
error!(
|
||||
error = ?e,
|
||||
"Failed to poll PTC duties"
|
||||
);
|
||||
}
|
||||
|
||||
// Wait until the next slot before polling again.
|
||||
// This doesn't mean that the beacon node will get polled every slot
|
||||
// as the PTC duties service will return early if it deems it already has
|
||||
// enough information.
|
||||
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
|
||||
sleep(duration).await;
|
||||
} else {
|
||||
// Just sleep for one slot if we are unable to read the system clock, this gives
|
||||
// us an opportunity for the clock to eventually come good.
|
||||
sleep(duties_service.slot_clock.slot_duration()).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
},
|
||||
"duties_service_ptc",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown
|
||||
@@ -894,8 +983,8 @@ async fn poll_beacon_attesters<S: ValidatorStore + 'static, T: SlotClock + 'stat
|
||||
duties_service
|
||||
.attesters
|
||||
.read()
|
||||
.iter()
|
||||
.filter_map(|(_, map)| map.get(epoch))
|
||||
.values()
|
||||
.filter_map(|map| map.get(epoch))
|
||||
.filter(|(_, duty_and_proof)| {
|
||||
duty_and_proof
|
||||
.subscription_slots
|
||||
@@ -1282,6 +1371,26 @@ fn process_duty_and_proof<S: ValidatorStore>(
|
||||
}
|
||||
}
|
||||
|
||||
async fn post_validator_duties_ptc<S: ValidatorStore, T: SlotClock + 'static>(
|
||||
duties_service: &Arc<DutiesService<S, T>>,
|
||||
epoch: Epoch,
|
||||
validator_indices: &[u64],
|
||||
) -> Result<DutiesResponse<Vec<PtcDuty>>, Error<S::Error>> {
|
||||
duties_service
|
||||
.beacon_nodes
|
||||
.first_success(|beacon_node| async move {
|
||||
let _timer = validator_metrics::start_timer_vec(
|
||||
&validator_metrics::DUTIES_SERVICE_TIMES,
|
||||
&[validator_metrics::PTC_DUTIES_HTTP_POST],
|
||||
);
|
||||
beacon_node
|
||||
.post_validator_duties_ptc(epoch, validator_indices)
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::FailedToDownloadPtc(e.to_string()))
|
||||
}
|
||||
|
||||
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
|
||||
///
|
||||
/// Duties are computed in batches each slot. If a re-org is detected then the process will
|
||||
@@ -1641,6 +1750,209 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Query the beacon node for ptc duties for any known validators.
|
||||
async fn poll_beacon_ptc_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>(
|
||||
duties_service: &Arc<DutiesService<S, T>>,
|
||||
) -> Result<(), Error<S::Error>> {
|
||||
let current_epoch_timer = validator_metrics::start_timer_vec(
|
||||
&validator_metrics::DUTIES_SERVICE_TIMES,
|
||||
&[validator_metrics::UPDATE_PTC_CURRENT_EPOCH],
|
||||
);
|
||||
|
||||
let current_slot = duties_service
|
||||
.slot_clock
|
||||
.now()
|
||||
.ok_or(Error::UnableToReadSlotClock)?;
|
||||
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
|
||||
|
||||
// Collect *all* pubkeys, even those undergoing doppelganger protection.
|
||||
let local_pubkeys: HashSet<_> = duties_service
|
||||
.validator_store
|
||||
.voting_pubkeys(DoppelgangerStatus::ignored);
|
||||
|
||||
let local_indices = {
|
||||
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
|
||||
|
||||
for &pubkey in &local_pubkeys {
|
||||
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
|
||||
local_indices.push(validator_index)
|
||||
}
|
||||
}
|
||||
local_indices
|
||||
};
|
||||
|
||||
// Poll for current epoch
|
||||
if let Err(e) = poll_beacon_ptc_attesters_for_epoch(
|
||||
duties_service,
|
||||
current_epoch,
|
||||
&local_indices,
|
||||
&local_pubkeys,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
%current_epoch,
|
||||
request_epoch = %current_epoch,
|
||||
err = ?e,
|
||||
"Failed to download PTC duties"
|
||||
);
|
||||
}
|
||||
drop(current_epoch_timer);
|
||||
let next_epoch_timer = validator_metrics::start_timer_vec(
|
||||
&validator_metrics::DUTIES_SERVICE_TIMES,
|
||||
&[validator_metrics::UPDATE_PTC_NEXT_EPOCH],
|
||||
);
|
||||
|
||||
// Poll for next epoch
|
||||
let next_epoch = current_epoch + 1;
|
||||
if let Err(e) = poll_beacon_ptc_attesters_for_epoch(
|
||||
duties_service,
|
||||
next_epoch,
|
||||
&local_indices,
|
||||
&local_pubkeys,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
%current_epoch,
|
||||
request_epoch = %next_epoch,
|
||||
err = ?e,
|
||||
"Failed to download PTC duties"
|
||||
);
|
||||
}
|
||||
drop(next_epoch_timer);
|
||||
|
||||
// Prune old duties.
|
||||
duties_service
|
||||
.ptc_duties
|
||||
.write()
|
||||
.retain(|&epoch, _| epoch + HISTORICAL_DUTIES_EPOCHS >= current_epoch);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For the given `local_indices` and `local_pubkeys`, download the PTC duties for the given `epoch` and
|
||||
/// store them in `duties_service.ptc_duties` using bandwidth optimization.
|
||||
async fn poll_beacon_ptc_attesters_for_epoch<
|
||||
S: ValidatorStore + 'static,
|
||||
T: SlotClock + 'static,
|
||||
>(
|
||||
duties_service: &Arc<DutiesService<S, T>>,
|
||||
epoch: Epoch,
|
||||
local_indices: &[u64],
|
||||
local_pubkeys: &HashSet<PublicKeyBytes>,
|
||||
) -> Result<(), Error<S::Error>> {
|
||||
// No need to bother the BN if we don't have any validators.
|
||||
if local_indices.is_empty() {
|
||||
debug!(
|
||||
%epoch,
|
||||
"No validators, not downloading PTC duties"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let fetch_timer = validator_metrics::start_timer_vec(
|
||||
&validator_metrics::DUTIES_SERVICE_TIMES,
|
||||
&[validator_metrics::UPDATE_PTC_FETCH],
|
||||
);
|
||||
|
||||
// TODO(gloas) Unlike attester duties which use `get_uninitialized_validators` to detect
|
||||
// newly-added validators, PTC duties only check dependent_root changes. Validators added
|
||||
// mid-epoch won't get PTC duties until the next epoch boundary. We should probably fix this.
|
||||
let initial_indices_to_request =
|
||||
&local_indices[0..min(INITIAL_PTC_DUTIES_QUERY_SIZE, local_indices.len())];
|
||||
|
||||
let response =
|
||||
post_validator_duties_ptc(duties_service, epoch, initial_indices_to_request).await?;
|
||||
let dependent_root = response.dependent_root;
|
||||
|
||||
// Check if we need to update duties for this epoch and collect validators to update.
|
||||
// We update if we have no epoch data OR if the dependent_root changed.
|
||||
let validators_to_update = {
|
||||
// Avoid holding the read-lock for any longer than required.
|
||||
let ptc_duties = duties_service.ptc_duties.read();
|
||||
let needs_update = ptc_duties.get(&epoch).is_none_or(|(prior_root, _duties)| {
|
||||
// Update if dependent_root changed
|
||||
*prior_root != dependent_root
|
||||
});
|
||||
|
||||
if needs_update {
|
||||
local_pubkeys.iter().collect::<Vec<_>>()
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
|
||||
if validators_to_update.is_empty() {
|
||||
// No validators have conflicting (epoch, dependent_root) values for this epoch.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Make a request for all indices that require updating which we have not already made a request for.
|
||||
let indices_to_request = validators_to_update
|
||||
.iter()
|
||||
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
|
||||
.filter(|validator_index| !initial_indices_to_request.contains(validator_index))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Filter the initial duties by their relevance so that we don't hit warnings about
|
||||
// overwriting duties.
|
||||
let new_initial_duties = response
|
||||
.data
|
||||
.into_iter()
|
||||
.filter(|duty| validators_to_update.contains(&&duty.pubkey));
|
||||
|
||||
let mut new_duties = if !indices_to_request.is_empty() {
|
||||
post_validator_duties_ptc(duties_service, epoch, indices_to_request.as_slice())
|
||||
.await?
|
||||
.data
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
new_duties.extend(new_initial_duties);
|
||||
|
||||
drop(fetch_timer);
|
||||
|
||||
let _store_timer = validator_metrics::start_timer_vec(
|
||||
&validator_metrics::DUTIES_SERVICE_TIMES,
|
||||
&[validator_metrics::UPDATE_PTC_STORE],
|
||||
);
|
||||
|
||||
debug!(
|
||||
%dependent_root,
|
||||
num_new_duties = new_duties.len(),
|
||||
"Downloaded PTC duties"
|
||||
);
|
||||
|
||||
// Update duties - we only reach here if dependent_root changed or epoch is missing
|
||||
let mut ptc_duties = duties_service.ptc_duties.write();
|
||||
|
||||
match ptc_duties.entry(epoch) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
// Dependent root must have changed, so we do complete replacement.
|
||||
// We cannot support partial updates for the same dependent_root.
|
||||
// The beacon node may return incomplete duty lists and we cannot distinguish between "no duties" and
|
||||
// "duties not included in this response". We could query all local validators in each
|
||||
// `post_validator_duties_ptc` call regardless of dependent_root changes, but the bandwidth
|
||||
// cost is likely not justified since PTC assignments are sparse.
|
||||
let (existing_root, _existing_duties) = entry.get();
|
||||
debug!(
|
||||
old_root = %existing_root,
|
||||
new_root = %dependent_root,
|
||||
"PTC dependent root changed, replacing all duties"
|
||||
);
|
||||
|
||||
*entry.get_mut() = (dependent_root, new_duties);
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
// No existing duties for this epoch
|
||||
entry.insert((dependent_root, new_duties));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Notify the block service if it should produce a block.
|
||||
async fn notify_block_production_service<S: ValidatorStore>(
|
||||
current_slot: Slot,
|
||||
|
||||
@@ -109,6 +109,7 @@ pub async fn notify<S: ValidatorStore, T: SlotClock + 'static>(
|
||||
let total_validators = duties_service.total_validator_count();
|
||||
let proposing_validators = duties_service.proposer_count(epoch);
|
||||
let attesting_validators = duties_service.attester_count(epoch);
|
||||
let ptc_validators = duties_service.ptc_count(epoch);
|
||||
let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count();
|
||||
|
||||
if doppelganger_detecting_validators > 0 {
|
||||
@@ -126,6 +127,7 @@ pub async fn notify<S: ValidatorStore, T: SlotClock + 'static>(
|
||||
} else if total_validators == attesting_validators {
|
||||
info!(
|
||||
current_epoch_proposers = proposing_validators,
|
||||
current_epoch_ptc = ptc_validators,
|
||||
active_validators = attesting_validators,
|
||||
total_validators = total_validators,
|
||||
%epoch,
|
||||
@@ -135,6 +137,7 @@ pub async fn notify<S: ValidatorStore, T: SlotClock + 'static>(
|
||||
} else if attesting_validators > 0 {
|
||||
info!(
|
||||
current_epoch_proposers = proposing_validators,
|
||||
current_epoch_ptc = ptc_validators,
|
||||
active_validators = attesting_validators,
|
||||
total_validators = total_validators,
|
||||
%epoch,
|
||||
|
||||
@@ -214,7 +214,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
|
||||
.map(|_| ())
|
||||
.await
|
||||
}
|
||||
.instrument(info_span!("sync_committee_signature_publish", %slot)),
|
||||
.instrument(info_span!("lh_sync_committee_signature_publish", %slot)),
|
||||
"sync_committee_signature_publish",
|
||||
);
|
||||
|
||||
@@ -232,7 +232,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
|
||||
.map(|_| ())
|
||||
.await
|
||||
}
|
||||
.instrument(info_span!("sync_committee_aggregate_publish", %slot)),
|
||||
.instrument(info_span!("lh_sync_committee_aggregate_publish", %slot)),
|
||||
"sync_committee_aggregate_publish",
|
||||
);
|
||||
|
||||
@@ -324,7 +324,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
|
||||
.map(|_| ())
|
||||
.await
|
||||
}
|
||||
.instrument(info_span!("publish_sync_committee_aggregate_for_subnet", %slot, ?beacon_block_root, %subnet_id)),
|
||||
.instrument(info_span!("lh_publish_sync_committee_aggregate_for_subnet", %slot, ?beacon_block_root, %subnet_id)),
|
||||
"sync_committee_aggregate_publish_subnet",
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user