Register validator api (#3194)

## Issue Addressed

Lays the groundwork for builder API changes by implementing the beacon-API's new `register_validator` endpoint

## Proposed Changes

- Add a routine in the VC that runs on startup (re-try until success), once per epoch or whenever `suggested_fee_recipient` is updated, signing `ValidatorRegistrationData` and sending it to the BN.
  -  TODO: `gas_limit` config options https://github.com/ethereum/builder-specs/issues/17
-  BN only sends VC registration data to builders on demand, but VC registration data *does update* the BN's prepare proposer cache and send an updated fcU to  a local EE. This is necessary for fee recipient consistency between the blinded and full block flow in the event of fallback.  Having the BN only send registration data to builders on demand gives feedback directly to the VC about relay status. Also, since the BN has no ability to sign these messages anyways (so couldn't refresh them if it wanted), and validator registration is independent of the BN head, I think this approach makes sense. 
- Adds upcoming consensus spec changes for this PR https://github.com/ethereum/consensus-specs/pull/2884
  -  I initially applied the bit mask based on a configured application domain.. but I ended up just hard coding it here instead because that's how it's spec'd in the builder repo. 
  -  Should application mask appear in the api?



Co-authored-by: realbigsean <sean@sigmaprime.io>
This commit is contained in:
realbigsean
2022-06-30 00:49:21 +00:00
parent 5de00b7ee8
commit f6ec44f0dd
18 changed files with 603 additions and 27 deletions

View File

@@ -85,6 +85,11 @@ lazy_static::lazy_static! {
"Total count of attempted SyncSelectionProof signings",
&["status"]
);
pub static ref SIGNED_VALIDATOR_REGISTRATIONS_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"builder_validator_registrations_total",
"Total count of ValidatorRegistrationData signings",
&["status"]
);
pub static ref DUTIES_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
"vc_duties_service_task_times_seconds",
"Duration to perform duties service tasks",

View File

@@ -485,7 +485,10 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
self.preparation_service
.clone()
.start_update_service(&self.context.eth2_config.spec)
.start_update_service(
self.config.private_tx_proposals,
&self.context.eth2_config.spec,
)
.map_err(|e| format!("Unable to start preparation service: {}", e))?;
if let Some(doppelganger_service) = self.doppelganger_service.clone() {

View File

@@ -3,17 +3,28 @@ use crate::{
fee_recipient_file::FeeRecipientFile,
validator_store::{DoppelgangerStatus, ValidatorStore},
};
use bls::PublicKeyBytes;
use environment::RuntimeContext;
use parking_lot::RwLock;
use slog::{debug, error, info};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{sleep, Duration};
use types::{Address, ChainSpec, EthSpec, ProposerPreparationData};
use types::{
Address, ChainSpec, EthSpec, ProposerPreparationData, SignedValidatorRegistrationData,
ValidatorRegistrationData,
};
/// Number of epochs before the Bellatrix hard fork to begin posting proposer preparations.
const PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS: u64 = 2;
/// Number of epochs to wait before re-submitting validator registration.
const EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION: u64 = 1;
/// Builds an `PreparationService`.
pub struct PreparationServiceBuilder<T: SlotClock + 'static, E: EthSpec> {
validator_store: Option<Arc<ValidatorStore<T, E>>>,
@@ -83,6 +94,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
.ok_or("Cannot build PreparationService without runtime_context")?,
fee_recipient: self.fee_recipient,
fee_recipient_file: self.fee_recipient_file,
validator_registration_cache: RwLock::new(HashMap::new()),
}),
})
}
@@ -96,6 +108,32 @@ pub struct Inner<T, E: EthSpec> {
context: RuntimeContext<E>,
fee_recipient: Option<Address>,
fee_recipient_file: Option<FeeRecipientFile>,
// Used to track unpublished validator registration changes.
validator_registration_cache:
RwLock<HashMap<ValidatorRegistrationKey, SignedValidatorRegistrationData>>,
}
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
pub struct ValidatorRegistrationKey {
pub fee_recipient: Address,
pub gas_limit: u64,
pub pubkey: PublicKeyBytes,
}
impl From<ValidatorRegistrationData> for ValidatorRegistrationKey {
fn from(data: ValidatorRegistrationData) -> Self {
let ValidatorRegistrationData {
fee_recipient,
gas_limit,
timestamp: _,
pubkey,
} = data;
Self {
fee_recipient,
gas_limit,
pubkey,
}
}
}
/// Attempts to produce proposer preparations for all known validators at the beginning of each epoch.
@@ -120,8 +158,19 @@ impl<T, E: EthSpec> Deref for PreparationService<T, E> {
}
impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
pub fn start_update_service(
self,
start_registration_service: bool,
spec: &ChainSpec,
) -> Result<(), String> {
if start_registration_service {
self.clone().start_validator_registration_service(spec)?;
}
self.start_proposer_prepare_service(spec)
}
/// Starts the service which periodically produces proposer preparations.
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
pub fn start_proposer_prepare_service(self, spec: &ChainSpec) -> Result<(), String> {
let log = self.context.log().clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
@@ -163,6 +212,41 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
Ok(())
}
/// Starts the service which periodically sends connected beacon nodes validator registration information.
pub fn start_validator_registration_service(self, spec: &ChainSpec) -> Result<(), String> {
let log = self.context.log().clone();
info!(
log,
"Validator registration service started";
);
let spec = spec.clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
let executor = self.context.executor.clone();
let validator_registration_fut = async move {
loop {
// Poll the endpoint immediately to ensure fee recipients are received.
if let Err(e) = self.register_validators(&spec).await {
error!(log,"Error during validator registration";"error" => ?e);
}
// Wait one slot if the register validator request fails or if we should not publish at the current slot.
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot).await;
} else {
error!(log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
}
}
};
executor.spawn(validator_registration_fut, "validator_registration_service");
Ok(())
}
/// Return `true` if the current slot is close to or past the Bellatrix fork epoch.
///
/// This avoids spamming the BN with preparations before the Bellatrix fork epoch, which may
@@ -188,6 +272,33 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
}
fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec<ProposerPreparationData> {
self.collect_data(spec, |_, validator_index, fee_recipient| {
ProposerPreparationData {
validator_index,
fee_recipient,
}
})
}
fn collect_validator_registration_keys(
&self,
spec: &ChainSpec,
) -> Vec<ValidatorRegistrationKey> {
self.collect_data(spec, |pubkey, _, fee_recipient| {
ValidatorRegistrationKey {
fee_recipient,
//TODO(sean) this is geth's default, we should make this configurable and maybe have the default be dynamic.
// Discussion here: https://github.com/ethereum/builder-specs/issues/17
gas_limit: 30_000_000,
pubkey,
}
})
}
fn collect_data<G, U>(&self, spec: &ChainSpec, map_fn: G) -> Vec<U>
where
G: Fn(PublicKeyBytes, u64, Address) -> U,
{
let log = self.context.log();
let fee_recipient_file = self
@@ -234,10 +345,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
.or(self.fee_recipient);
if let Some(fee_recipient) = fee_recipient {
Some(ProposerPreparationData {
validator_index,
fee_recipient,
})
Some(map_fn(pubkey, validator_index, fee_recipient))
} else {
if spec.bellatrix_fork_epoch.is_some() {
error!(
@@ -284,4 +392,116 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
}
Ok(())
}
/// Register validators with builders, used in the blinded block proposal flow.
async fn register_validators(&self, spec: &ChainSpec) -> Result<(), String> {
let registration_keys = self.collect_validator_registration_keys(spec);
let mut changed_keys = vec![];
// Need to scope this so the read lock is not held across an await point (I don't know why
// but the explicit `drop` is not enough).
{
let guard = self.validator_registration_cache.read();
for key in registration_keys.iter() {
if !guard.contains_key(key) {
changed_keys.push(key.clone());
}
}
drop(guard);
}
// Check if any have changed or it's been `EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION`.
if let Some(slot) = self.slot_clock.now() {
if slot % (E::slots_per_epoch() * EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION) == 0 {
self.publish_validator_registration_data(registration_keys)
.await?;
} else if !changed_keys.is_empty() {
self.publish_validator_registration_data(changed_keys)
.await?;
}
}
Ok(())
}
async fn publish_validator_registration_data(
&self,
registration_keys: Vec<ValidatorRegistrationKey>,
) -> Result<(), String> {
let log = self.context.log();
let registration_data_len = registration_keys.len();
let mut signed = Vec::with_capacity(registration_data_len);
for key in registration_keys {
let cached_registration_opt =
self.validator_registration_cache.read().get(&key).cloned();
let signed_data = if let Some(signed_data) = cached_registration_opt {
signed_data
} else {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("{e:?}"))?
.as_secs();
let ValidatorRegistrationKey {
fee_recipient,
gas_limit,
pubkey,
} = key.clone();
let signed_data = match self
.validator_store
.sign_validator_registration_data(ValidatorRegistrationData {
fee_recipient,
gas_limit,
timestamp,
pubkey,
})
.await
{
Ok(data) => data,
Err(e) => {
error!(log, "Unable to sign validator registration data"; "error" => ?e, "pubkey" => ?pubkey);
continue;
}
};
self.validator_registration_cache
.write()
.insert(key, signed_data.clone());
signed_data
};
signed.push(signed_data);
}
if !signed.is_empty() {
let signed_ref = signed.as_slice();
match self
.beacon_nodes
.first_success(RequireSynced::Yes, |beacon_node| async move {
beacon_node
.post_validator_register_validator(signed_ref)
.await
})
.await
{
Ok(()) => debug!(
log,
"Published validator registration";
"count" => registration_data_len,
),
Err(e) => error!(
log,
"Unable to publish validator registration";
"error" => %e,
),
}
}
Ok(())
}
}

View File

@@ -30,6 +30,7 @@ pub enum Error {
ShuttingDown,
TokioJoin(String),
MergeForkNotSupported,
GenesisForkVersionRequired,
}
/// Enumerates all messages that can be signed by a validator.
@@ -45,6 +46,7 @@ pub enum SignableMessage<'a, T: EthSpec, Payload: ExecPayload<T> = FullPayload<T
slot: Slot,
},
SignedContributionAndProof(&'a ContributionAndProof<T>),
ValidatorRegistration(&'a ValidatorRegistrationData),
}
impl<'a, T: EthSpec, Payload: ExecPayload<T>> SignableMessage<'a, T, Payload> {
@@ -64,6 +66,7 @@ impl<'a, T: EthSpec, Payload: ExecPayload<T>> SignableMessage<'a, T, Payload> {
beacon_block_root, ..
} => beacon_block_root.signing_root(domain),
SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain),
SignableMessage::ValidatorRegistration(v) => v.signing_root(domain),
}
}
}
@@ -129,6 +132,22 @@ impl SigningMethod {
let signing_root = signable_message.signing_root(domain_hash);
let fork_info = Some(ForkInfo {
fork,
genesis_validators_root,
});
self.get_signature_from_root(signable_message, signing_root, executor, fork_info)
.await
}
pub async fn get_signature_from_root<T: EthSpec, Payload: ExecPayload<T>>(
&self,
signable_message: SignableMessage<'_, T, Payload>,
signing_root: Hash256,
executor: &TaskExecutor,
fork_info: Option<ForkInfo>,
) -> Result<Signature, Error> {
match self {
SigningMethod::LocalKeystore { voting_keypair, .. } => {
let _timer =
@@ -181,21 +200,21 @@ impl SigningMethod {
SignableMessage::SignedContributionAndProof(c) => {
Web3SignerObject::ContributionAndProof(c)
}
SignableMessage::ValidatorRegistration(v) => {
Web3SignerObject::ValidatorRegistration(v)
}
};
// Determine the Web3Signer message type.
let message_type = object.message_type();
// The `fork_info` field is not required for deposits since they sign across the
// genesis fork version.
let fork_info = if let Web3SignerObject::Deposit { .. } = &object {
None
} else {
Some(ForkInfo {
fork,
genesis_validators_root,
})
};
if matches!(
object,
Web3SignerObject::Deposit { .. } | Web3SignerObject::ValidatorRegistration(_)
) && fork_info.is_some()
{
return Err(Error::GenesisForkVersionRequired);
}
let request = SigningRequest {
message_type,

View File

@@ -17,6 +17,7 @@ pub enum MessageType {
SyncCommitteeMessage,
SyncCommitteeSelectionProof,
SyncCommitteeContributionAndProof,
ValidatorRegistration,
}
#[derive(Debug, PartialEq, Copy, Clone, Serialize)]
@@ -64,6 +65,7 @@ pub enum Web3SignerObject<'a, T: EthSpec, Payload: ExecPayload<T>> {
},
SyncAggregatorSelectionData(&'a SyncAggregatorSelectionData),
ContributionAndProof(&'a ContributionAndProof<T>),
ValidatorRegistration(&'a ValidatorRegistrationData),
}
impl<'a, T: EthSpec, Payload: ExecPayload<T>> Web3SignerObject<'a, T, Payload> {
@@ -93,6 +95,7 @@ impl<'a, T: EthSpec, Payload: ExecPayload<T>> Web3SignerObject<'a, T, Payload> {
Web3SignerObject::ContributionAndProof(_) => {
MessageType::SyncCommitteeContributionAndProof
}
Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration,
}
}
}

View File

@@ -20,9 +20,9 @@ use types::{
attestation::Error as AttestationError, graffiti::GraffitiString, Address, AggregateAndProof,
Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch,
EthSpec, ExecPayload, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof,
Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, Slot,
SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage,
SyncSelectionProof, SyncSubnetId,
Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot,
SignedValidatorRegistrationData, Slot, SyncAggregatorSelectionData, SyncCommitteeContribution,
SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData,
};
use validator_dir::ValidatorDir;
@@ -524,6 +524,35 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
}
}
pub async fn sign_validator_registration_data(
&self,
validator_registration_data: ValidatorRegistrationData,
) -> Result<SignedValidatorRegistrationData, Error> {
let domain_hash = self.spec.get_builder_domain();
let signing_root = validator_registration_data.signing_root(domain_hash);
let signing_method =
self.doppelganger_bypassed_signing_method(validator_registration_data.pubkey)?;
let signature = signing_method
.get_signature_from_root::<E, BlindedPayload<E>>(
SignableMessage::ValidatorRegistration(&validator_registration_data),
signing_root,
&self.task_executor,
None,
)
.await?;
metrics::inc_counter_vec(
&metrics::SIGNED_VALIDATOR_REGISTRATIONS_TOTAL,
&[metrics::SUCCESS],
);
Ok(SignedValidatorRegistrationData {
message: validator_registration_data,
signature,
})
}
/// Signs an `AggregateAndProof` for a given validator.
///
/// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be