add IL service to download IL from EL, sign in VC, and publish via BN

add HTTP API to the beacon node to retrieve IL from the EL.
add IL service in the validator client to download the IL from the beacon node.
add logic to the beacon node to package the IL for the validator client.
add HTTP API to the beacon node to gossip signed ILs.
the validator client will sign the ILs from the beacon node and resubmit to the beacon node to gossip.
This commit is contained in:
jacobkaufmann
2024-12-19 14:23:36 -07:00
parent 7125f25f3a
commit 985382e3e5
20 changed files with 610 additions and 17 deletions

View File

@@ -724,6 +724,7 @@ async fn sort_nodes_by_health<E: EthSpec>(nodes: &mut Vec<CandidateBeaconNode<E>
pub enum ApiTopic {
Attestations,
Blocks,
InclusionLists,
Subscriptions,
SyncCommittee,
}
@@ -731,7 +732,13 @@ pub enum ApiTopic {
impl ApiTopic {
pub fn all() -> Vec<ApiTopic> {
use ApiTopic::*;
vec![Attestations, Blocks, Subscriptions, SyncCommittee]
vec![
Attestations,
Blocks,
InclusionLists,
Subscriptions,
SyncCommittee,
]
}
}

View File

@@ -47,6 +47,7 @@ pub enum SignableMessage<'a, E: EthSpec, Payload: AbstractExecPayload<E> = FullP
SignedContributionAndProof(&'a ContributionAndProof<E>),
ValidatorRegistration(&'a ValidatorRegistrationData),
VoluntaryExit(&'a VoluntaryExit),
InclusionList(&'a InclusionList<E>),
}
impl<'a, E: EthSpec, Payload: AbstractExecPayload<E>> SignableMessage<'a, E, Payload> {
@@ -68,6 +69,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload<E>> SignableMessage<'a, E, Pay
SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain),
SignableMessage::ValidatorRegistration(v) => v.signing_root(domain),
SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain),
SignableMessage::InclusionList(il) => il.signing_root(domain),
}
}
}
@@ -223,6 +225,7 @@ impl SigningMethod {
Web3SignerObject::ValidatorRegistration(v)
}
SignableMessage::VoluntaryExit(e) => Web3SignerObject::VoluntaryExit(e),
SignableMessage::InclusionList(il) => Web3SignerObject::InclusionList(il),
};
// Determine the Web3Signer message type.

View File

@@ -18,6 +18,7 @@ pub enum MessageType {
SyncCommitteeSelectionProof,
SyncCommitteeContributionAndProof,
ValidatorRegistration,
InclusionList,
}
#[derive(Debug, PartialEq, Copy, Clone, Serialize)]
@@ -72,6 +73,7 @@ pub enum Web3SignerObject<'a, E: EthSpec, Payload: AbstractExecPayload<E>> {
SyncAggregatorSelectionData(&'a SyncAggregatorSelectionData),
ContributionAndProof(&'a ContributionAndProof<E>),
ValidatorRegistration(&'a ValidatorRegistrationData),
InclusionList(&'a InclusionList<E>),
}
impl<'a, E: EthSpec, Payload: AbstractExecPayload<E>> Web3SignerObject<'a, E, Payload> {
@@ -127,6 +129,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload<E>> Web3SignerObject<'a, E, Pa
MessageType::SyncCommitteeContributionAndProof
}
Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration,
Web3SignerObject::InclusionList(_) => MessageType::InclusionList,
}
}
}

View File

@@ -44,6 +44,7 @@ use validator_services::{
attestation_service::{AttestationService, AttestationServiceBuilder},
block_service::{BlockService, BlockServiceBuilder},
duties_service::{self, DutiesService},
inclusion_list_service::InclusionListService,
preparation_service::{PreparationService, PreparationServiceBuilder},
sync::SyncDutiesMap,
sync_committee_service::SyncCommitteeService,
@@ -66,6 +67,7 @@ const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_INCLUSION_LIST_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_INCLUSION_LIST_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
@@ -81,6 +83,7 @@ pub struct ProductionValidatorClient<E: EthSpec> {
block_service: BlockService<SystemTimeSlotClock, E>,
attestation_service: AttestationService<SystemTimeSlotClock, E>,
sync_committee_service: SyncCommitteeService<SystemTimeSlotClock, E>,
inclusion_list_service: InclusionListService<SystemTimeSlotClock, E>,
doppelganger_service: Option<Arc<DoppelgangerService>>,
preparation_service: PreparationService<SystemTimeSlotClock, E>,
validator_store: Arc<ValidatorStore<SystemTimeSlotClock, E>>,
@@ -319,6 +322,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
sync_committee_contribution: slot_duration
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
inclusion_list: slot_duration / HTTP_INCLUSION_LIST_TIMEOUT_QUOTIENT,
inclusion_list_duties: slot_duration
/ HTTP_INCLUSION_LIST_DUTIES_TIMEOUT_QUOTIENT,
get_beacon_blocks_ssz: slot_duration
@@ -530,12 +534,21 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
context.service_context("sync_committee".into()),
);
let inclusion_list_service = InclusionListService::new(
duties_service.clone(),
validator_store.clone(),
slot_clock.clone(),
beacon_nodes.clone(),
context.service_context("inclusion_list".into()),
);
Ok(Self {
context,
duties_service,
block_service,
attestation_service,
sync_committee_service,
inclusion_list_service,
doppelganger_service,
preparation_service,
validator_store,
@@ -611,6 +624,11 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.start_update_service(&self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start sync committee service: {}", e))?;
self.inclusion_list_service
.clone()
.start_update_service(&self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start inclusion list service: {}", e))?;
self.preparation_service
.clone()
.start_update_service(&self.context.eth2_config.spec)

View File

@@ -334,6 +334,25 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
.collect()
}
/// Returns all `InclusionListDutyData` for the given `slot`.
pub fn inclusion_list_duties(&self, slot: Slot) -> Vec<InclusionListDutyData> {
let epoch = slot.epoch(E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
self.inclusion_list_duties
.read()
.iter()
.filter_map(|(_, map)| map.get(&epoch))
.map(|(_, duty)| duty)
.filter(|duty| duty.slot == slot && signing_pubkeys.contains(&duty.pubkey))
.cloned()
.collect()
}
/// Returns `true` if we should collect per validator metrics and `false` otherwise.
pub fn per_validator_metrics(&self) -> bool {
self.enable_high_validator_count_metrics

View File

@@ -0,0 +1,285 @@
use crate::duties_service::{DutiesService, DutyAndProof};
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use environment::RuntimeContext;
use eth2::types::InclusionListDutyData;
use futures::future::join_all;
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tree_hash::TreeHash;
use types::{
Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, InclusionList,
SignedInclusionList, Slot,
};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
}
/// Attempts to produce inclusion lists for all known validators 3/4 of the way through each slot.
pub struct InclusionListService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
}
impl<T, E: EthSpec> Clone for InclusionListService<T, E> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T, E: EthSpec> Deref for InclusionListService<T, E> {
type Target = Inner<T, E>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<T: SlotClock + 'static, E: EthSpec> InclusionListService<T, E> {
pub fn new(
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
) -> Self {
Self {
inner: Arc::new(Inner {
duties_service,
validator_store,
slot_clock,
beacon_nodes,
context,
}),
}
}
/// Starts the service which periodically produces inclusion lists.
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
let log = self.context.log().clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?;
info!(
log,
"Inclusion list production service started";
"next_update_millis" => duration_to_next_slot.as_millis()
);
let executor = self.context.executor.clone();
let interval_fut = async move {
loop {
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
// 3/4 of the way into the slot
sleep(duration_to_next_slot + (slot_duration * 3 / 4)).await;
let log = self.context.log();
if let Err(e) = self.spawn_inclusion_list_task(slot_duration) {
crit!(
log,
"Failed to spawn inclusion list task";
"error" => e
)
} else {
trace!(
log,
"Spawned inclusion list task";
)
}
} else {
error!(log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
continue;
}
}
};
executor.spawn(interval_fut, "inclusion_list_service");
Ok(())
}
/// Spawn a new task that downloads, signs and uploads the inclusion lists to the beacon node.
fn spawn_inclusion_list_task(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?;
let inclusion_list_duties = self.duties_service.inclusion_list_duties(slot);
self.inner.context.executor.spawn_ignoring_error(
self.clone()
.produce_and_publish_inclusion_lists(slot, inclusion_list_duties),
"inclusion list publish",
);
Ok(())
}
/// Downloads inclusion list objects, signs them, and returns them to the validator.
///
/// ## Detail
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot`. Critical errors will be logged if this is not the case.
///
/// Only one `InclusionList` is downloaded from the BN. It is then cloned and signed by each
/// validator and the list of individually-signed `SignedInclusionList` objects is returned to
/// the BN.
async fn produce_and_publish_inclusion_lists(
self,
slot: Slot,
validator_duties: Vec<InclusionListDutyData>,
) -> Result<(), ()> {
let log = self.context.log();
let validator_store = self.validator_store.clone();
if validator_duties.is_empty() {
return Ok(());
}
let current_epoch = self
.slot_clock
.now()
.ok_or("Unable to determine current slot from clock")
.map(|slot| slot.epoch(E::slots_per_epoch()));
let current_epoch = current_epoch.map_err(|e| {
crit!(
log,
"Error during inclusion list routine";
"error" => format!("{:?}", e),
"slot" => slot.as_u64(),
)
})?;
let inclusion_list = self
.beacon_nodes
.first_success(|beacon_node| async move {
// TODO: add timer metric
beacon_node
.get_validator_inclusion_list(slot)
.await
.map_err(|e| format!("Failed to produce inclusion list: {:?}", e))
.map(|result| result.ok_or(format!("Inclusion list unavailable")))?
.map(|result| result.data)
})
.await
.map_err(|e| {
crit!(
log,
"Error during inclusion list routine";
"error" => format!("{}", e),
"slot" => slot.as_u64(),
)
})?;
// Create futures to produce signed `InclusionList` objects.
let signing_futures = validator_duties.iter().map(|duty| {
let inclusion_list = inclusion_list.clone();
let validator_store = Arc::clone(&validator_store);
async move {
// Ensure that the inclusion list matches the duties.
//
// TODO: do we need to check any other fields here?
if inclusion_list.slot != duty.slot {
crit!(
log,
"Inconsistent validator duties during signing";
"validator" => ?duty.pubkey,
"duty_slot" => duty.slot,
"inclusion_list_slot" => inclusion_list.slot,
);
return None;
}
match validator_store
.sign_inclusion_list(duty.pubkey, inclusion_list)
.await
{
Ok(il) => Some((il, duty.validator_index)),
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
warn!(
log,
"Missing pubkey for inclusion list";
"info" => "a validator may have recently been removed from this VC",
"pubkey" => ?pubkey,
"validator" => ?duty.pubkey,
"slot" => slot.as_u64(),
);
None
}
Err(e) => {
crit!(
log,
"Failed to sign inclusion list";
"error" => ?e,
"validator" => ?duty.pubkey,
"slot" => slot.as_u64(),
);
None
}
}
}
});
// Execute all the futures in parallel, collecting any successful results.
let (ref inclusion_lists, ref validator_indices): (Vec<_>, Vec<_>) =
join_all(signing_futures)
.await
.into_iter()
.flatten()
.unzip();
if inclusion_lists.is_empty() {
warn!(log, "No inclusion lists were published");
return Ok(());
}
// Post the inclusion lists to the BN.
match self
.beacon_nodes
.request(ApiTopic::InclusionLists, |beacon_node| async move {
// TODO: add timer metric
beacon_node
.post_beacon_pool_inclusion_lists(inclusion_lists)
.await
})
.await
{
Ok(()) => info!(
log,
"Successfully published inclusion lists";
"count" => inclusion_lists.len(),
"validator_indices" => ?validator_indices,
"slot" => slot.as_u64(),
),
Err(e) => error!(
log,
"Unable to publish inclusion lists";
"error" => %e,
"slot" => slot.as_u64(),
),
}
Ok(())
}
}

View File

@@ -1,6 +1,7 @@
pub mod attestation_service;
pub mod block_service;
pub mod duties_service;
pub mod inclusion_list_service;
pub mod preparation_service;
pub mod sync;
pub mod sync_committee_service;

View File

@@ -16,11 +16,11 @@ use task_executor::TaskExecutor;
use types::{
attestation::Error as AttestationError, graffiti::GraffitiString, AbstractExecPayload, Address,
AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof,
Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, PublicKeyBytes, SelectionProof, Signature,
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot,
SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData,
SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId,
ValidatorRegistrationData, VoluntaryExit,
Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, InclusionList, PublicKeyBytes, SelectionProof,
Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof,
SignedInclusionList, SignedRoot, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage,
SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, VoluntaryExit,
};
#[derive(Debug, PartialEq)]
@@ -1009,6 +1009,30 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
Ok(SignedContributionAndProof { message, signature })
}
pub async fn sign_inclusion_list(
&self,
pubkey: PublicKeyBytes,
inclusion_list: InclusionList<E>,
) -> Result<SignedInclusionList<E>, Error> {
let signing_epoch = inclusion_list.slot.epoch(E::slots_per_epoch());
let signing_context = self.signing_context(Domain::InclusionListCommittee, signing_epoch);
let signing_method = self.doppelganger_bypassed_signing_method(pubkey)?;
let signature = signing_method
.get_signature::<E, BlindedPayload<E>>(
SignableMessage::InclusionList(&inclusion_list),
signing_context,
&self.spec,
&self.task_executor,
)
.await?;
Ok(SignedInclusionList {
message: inclusion_list,
signature,
})
}
pub fn import_slashing_protection(
&self,
interchange: Interchange,