From 985382e3e59c541df7f2497b3c87ebe80a18d8ee Mon Sep 17 00:00:00 2001 From: jacobkaufmann Date: Thu, 19 Dec 2024 14:23:36 -0700 Subject: [PATCH] add IL service to download IL from EL, sign in VC, and publish via BN add HTTP API to the beacon node to retrieve IL from the EL. add IL service in the validator client to download the IL from the beacon node. add logic to the beacon node to package the IL for the validator client. add HTTP API to the beacon node to gossip signed ILs. the validator client will sign the ILs from the beacon node and resubmit to the beacon node to gossip. --- beacon_node/beacon_chain/src/beacon_chain.rs | 79 +++++ .../beacon_chain/src/canonical_head.rs | 5 + beacon_node/beacon_chain/src/errors.rs | 1 + beacon_node/execution_layer/src/lib.rs | 17 +- beacon_node/http_api/src/lib.rs | 74 ++++- common/eth2/src/lib.rs | 40 +++ common/eth2/src/types.rs | 5 + consensus/types/src/beacon_state.rs | 14 +- consensus/types/src/inclusion_list.rs | 8 +- .../types/src/inclusion_list_committee.rs | 3 + consensus/types/src/inclusion_list_duty.rs | 5 + consensus/types/src/lib.rs | 4 +- .../beacon_node_fallback/src/lib.rs | 9 +- validator_client/signing_method/src/lib.rs | 3 + .../signing_method/src/web3signer.rs | 3 + validator_client/src/lib.rs | 18 ++ .../validator_services/src/duties_service.rs | 19 ++ .../src/inclusion_list_service.rs | 285 ++++++++++++++++++ .../validator_services/src/lib.rs | 1 + validator_client/validator_store/src/lib.rs | 34 ++- 20 files changed, 610 insertions(+), 17 deletions(-) create mode 100644 consensus/types/src/inclusion_list_committee.rs create mode 100644 validator_client/validator_services/src/inclusion_list_service.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index da9037a180..5d5186315d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2044,6 +2044,85 @@ impl BeaconChain { )?) } + /// Produce an `InclusionList` that is valid for the given `slot`. + /// + /// The produced `InclusionList` will not be valid until it has been signed by exactly one + /// validator that is in the inclusion list committee for `slot` in the canonical chain. + /// + /// ## Errors + /// + /// May return an error if the `request_slot` is too far behind the head state. + pub async fn produce_inclusion_list( + self: &Arc, + request_slot: Slot, + ) -> Result>, Error> { + let execution_layer = self + .execution_layer + .clone() + .ok_or(Error::ExecutionLayerMissing)?; + + // Load the cached head and the associated block hash and slot. + // + // Use a blocking task since blocking the core executor on the canonical head read lock can + // block the core tokio executor. + let chain = self.clone(); + let (head_slot, head_hash) = self + .spawn_blocking_handle( + move || { + let cached_head = chain.canonical_head.cached_head(); + let head_slot = cached_head.head_slot(); + let head_hash = cached_head.head_hash(); + (head_slot, head_hash) + }, + "produce_inclusion_list_head_read", + ) + .await?; + + // NOTE: not sure how to handle scenario where head hash is `None` i.e. pre-bellatrix, which + // is pre-electra. + let Some(head_hash) = head_hash else { + debug!( + self.log, + "Attempted to produce inclusion list pre-bellatrix" + ); + return Ok(None); + }; + + let current_slot = self.slot()?; + let next_slot = current_slot.safe_add(1)?; + + // Don't bother with the inclusion list if the head is not the current slot. + // + // This prevents the routine from running during sync. + if head_slot != current_slot { + debug!( + self.log, + "Head too old for inclusion list"; + "head_slot" => %head_slot, + "current_slot" => %current_slot, + ); + return Ok(None); + } + + // Don't bother with the inclusion list if the request slot is not the next + // slot. + // + // NOTE: does this represent a critical error? should we return an error here or log crit? + // is this check redundant? + if request_slot != next_slot { + debug!(self.log, "Inclusion list request slot not equal to next slot"; "request_slot" => %request_slot, "next_slot" => %next_slot); + return Ok(None); + } + + // Retrieve the inclusion list from the execution layer. + let inclusion_list = execution_layer + .get_inclusion_list(head_hash.into_root()) + .await + .map_err(|e| Error::ExecutionLayerGetInclusionListFailed(Box::new(e)))?; + + Ok(Some(inclusion_list)) + } + /// Performs the same validation as `Self::verify_unaggregated_attestation_for_gossip`, but for /// multiple attestations using batch BLS verification. Batch verification can provide /// significant CPU-time savings compared to individual verification. diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 4f92f5ec8f..9cbeb03246 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -139,6 +139,11 @@ impl CachedHead { self.snapshot.beacon_block.slot() } + /// Returns the `execution_payload.block_hash` of the block at the head of the beacon chain. + pub fn head_hash(&self) -> Option { + self.head_hash + } + /// Returns the `Fork` from the `BeaconState` at the head of the chain. pub fn head_fork(&self) -> Fork { self.snapshot.beacon_state.fork() diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index c3e31c23da..0fc6a32435 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -149,6 +149,7 @@ pub enum BeaconChainError { EngineGetCapabilititesFailed(Box), ExecutionLayerGetBlockByNumberFailed(Box), ExecutionLayerGetBlockByHashFailed(Box), + ExecutionLayerGetInclusionListFailed(Box), BlockHashMissingFromExecutionLayer(ExecutionBlockHash), InconsistentPayloadReconstructed { slot: Slot, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 08a00d7bf8..03305c2231 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -54,8 +54,8 @@ use types::{ }; use types::{ BeaconStateError, BlindedPayload, ChainSpec, Epoch, ExecPayload, ExecutionPayloadBellatrix, - ExecutionPayloadCapella, ExecutionPayloadElectra, FullPayload, ProposerPreparationData, - PublicKeyBytes, Signature, Slot, + ExecutionPayloadCapella, ExecutionPayloadElectra, FullPayload, InclusionListTransactions, + ProposerPreparationData, PublicKeyBytes, Signature, Slot, }; mod block_hash; @@ -1951,6 +1951,19 @@ impl ExecutionLayer { Err(Error::NoPayloadBuilder) } } + + pub async fn get_inclusion_list( + &self, + parent_hash: Hash256, + ) -> Result, Error> { + debug!(self.log(), "Requesting inclusion list from EL"; "parent_hash" => %parent_hash); + let transactions = self + .engine() + .api + .get_inclusion_list::(parent_hash) + .await?; + Ok(transactions) + } } #[derive(AsRefStr)] diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index cc3b01db87..42f11265ac 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -86,8 +86,8 @@ use types::{ AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, - SyncCommitteeMessage, SyncContributionData, + SignedContributionAndProof, SignedInclusionList, SignedValidatorRegistrationData, + SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -265,6 +265,7 @@ pub fn prometheus_metrics() -> warp::filters::log::Log( }, ); + // POST beacon/pool/inclusion_lists + let post_beacon_pool_inclusion_lists = beacon_pool_path + .clone() + .and(warp::path("inclusion_lists")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + inclusion_lists: Vec>, + network_tx: UnboundedSender>, + log: Logger| { + task_spawner.blocking_json_task(Priority::P0, move || { + // TODO: actually gossip the inclusion lists + info!( + log, + "Posting signed inclusion lists for gossip"; + "num_inclusion_lists" => inclusion_lists.len(), + ); + + Ok(()) + }) + }, + ); + // GET beacon/deposit_snapshot let get_beacon_deposit_snapshot = eth_v1 .and(warp::path("beacon")) @@ -3496,6 +3524,47 @@ pub fn serve( }, ); + // GET validator/inclusion_list?slot + let get_validator_inclusion_list = eth_v1 + .and(warp::path("validator")) + .and(warp::path("inclusion_list")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::ValidatorInclusionListQuery, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + not_synced_filter?; + + let current_slot = chain + .slot() + .map_err(warp_utils::reject::beacon_chain_error)?; + + // allow a tolerance of one slot to account for clock skew + // + // TODO: make sure tolerance is consistent with inner logic + if query.slot > current_slot + 1 { + return Err(warp_utils::reject::custom_bad_request(format!( + "request slot {} is more than one slot past the current slot {}", + query.slot, current_slot + ))); + } + + let data = chain + .produce_inclusion_list(query.slot) + .await + .map(|il| il.clone()) + .map(api_types::GenericResponse::from) + .map_err(warp_utils::reject::beacon_chain_error)?; + Ok::<_, warp::reject::Rejection>(warp::reply::json(&data).into_response()) + }) + }, + ); // POST validator/aggregate_and_proofs let post_validator_aggregate_and_proofs = any_version .and(warp::path("validator")) @@ -4706,6 +4775,7 @@ pub fn serve( .uor(get_validator_attestation_data) .uor(get_validator_aggregate_attestation) .uor(get_validator_sync_committee_contribution) + .uor(get_validator_inclusion_list) .uor(get_lighthouse_health) .uor(get_lighthouse_ui_health) .uor(get_lighthouse_ui_validator_count) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 18433ff9a0..16fad3e512 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -128,6 +128,7 @@ pub struct Timeouts { pub proposer_duties: Duration, pub sync_committee_contribution: Duration, pub sync_duties: Duration, + pub inclusion_list: Duration, pub inclusion_list_duties: Duration, pub get_beacon_blocks_ssz: Duration, pub get_debug_beacon_states: Duration, @@ -146,6 +147,7 @@ impl Timeouts { proposer_duties: timeout, sync_committee_contribution: timeout, sync_duties: timeout, + inclusion_list: timeout, inclusion_list_duties: timeout, get_beacon_blocks_ssz: timeout, get_debug_beacon_states: timeout, @@ -1564,6 +1566,25 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/pool/inclusion_lists` + pub async fn post_beacon_pool_inclusion_lists( + &self, + inclusion_lists: &[SignedInclusionList], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("inclusion_lists"); + + self.post_with_timeout(path, &inclusion_lists, self.timeouts.inclusion_list) + .await?; + + Ok(()) + } + /// `GET beacon/deposit_snapshot` pub async fn get_deposit_snapshot(&self) -> Result, Error> { let mut path = self.eth_path(V1)?; @@ -2427,6 +2448,25 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } + /// `GET validator/inclusion_list?slot` + pub async fn get_validator_inclusion_list( + &self, + slot: Slot, + ) -> Result>>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("inclusion_list"); + + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()); + + self.get_opt_with_timeout(path, self.timeouts.inclusion_list) + .await + } + /// `POST lighthouse/liveness` pub async fn post_lighthouse_liveness( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index f1b2216336..1344c14416 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -793,6 +793,11 @@ pub struct ValidatorAggregateAttestationQuery { pub committee_index: Option, } +#[derive(Clone, Serialize, Deserialize)] +pub struct ValidatorInclusionListQuery { + pub slot: Slot, +} + #[derive(Clone, Deserialize)] pub struct LightClientUpdatesQuery { pub start_period: u64, diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index b035a39b06..907830009c 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -816,7 +816,7 @@ impl BeaconState { &self, slot: Slot, spec: &ChainSpec, - ) -> Result, Error> { + ) -> Result, Error> { let epoch = slot.epoch(E::slots_per_epoch()); let current_epoch = self.current_epoch(); let next_epoch = current_epoch + 1; @@ -842,11 +842,11 @@ impl BeaconState { spec.shuffle_round_count, ) .ok_or(Error::UnableToShuffle)?; - il_committee_indices.push(shuffled_index); + il_committee_indices.push(shuffled_index as u64); i.safe_add_assign(1)?; } - Ok(il_committee_indices) + Ok(InclusionListCommittee::::from(il_committee_indices)) } /// Compute the seed to use for the beacon inclusion list committee selection at the given @@ -1727,10 +1727,16 @@ impl BeaconState { epoch: Epoch, spec: &ChainSpec, ) -> Result, Error> { + let validator_index = validator_index as u64; for slot in epoch.slot_iter(E::slots_per_epoch()) { let committee = self.get_inclusion_list_committee(slot, spec)?; + let committee_root = committee.tree_hash_root(); if committee.contains(&validator_index) { - return Ok(Some(InclusionListDuty { slot })); + return Ok(Some(InclusionListDuty { + slot, + validator_index, + committee_root, + })); } } Ok(None) diff --git a/consensus/types/src/inclusion_list.rs b/consensus/types/src/inclusion_list.rs index 016d0bc2bb..9283bb5799 100644 --- a/consensus/types/src/inclusion_list.rs +++ b/consensus/types/src/inclusion_list.rs @@ -9,6 +9,11 @@ use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; +pub type InclusionListTransactions = VariableList< + Transaction<::MaxBytesPerTransaction>, + ::MaxTransactionsPerInclusionList, +>; + #[derive( Debug, Clone, @@ -29,8 +34,7 @@ pub struct InclusionList { #[serde(with = "serde_utils::quoted_u64")] pub validator_index: u64, pub inclusion_list_committee_root: Hash256, - pub transactions: - VariableList, E::MaxTransactionsPerInclusionList>, + pub transactions: InclusionListTransactions, } impl SignedRoot for InclusionList {} diff --git a/consensus/types/src/inclusion_list_committee.rs b/consensus/types/src/inclusion_list_committee.rs new file mode 100644 index 0000000000..2aa928e02e --- /dev/null +++ b/consensus/types/src/inclusion_list_committee.rs @@ -0,0 +1,3 @@ +use crate::*; + +pub type InclusionListCommittee = FixedVector::InclusionListCommitteeSize>; diff --git a/consensus/types/src/inclusion_list_duty.rs b/consensus/types/src/inclusion_list_duty.rs index bf578b4623..4317abd5eb 100644 --- a/consensus/types/src/inclusion_list_duty.rs +++ b/consensus/types/src/inclusion_list_duty.rs @@ -5,4 +5,9 @@ use serde::{Deserialize, Serialize}; pub struct InclusionListDuty { /// The slot during which the validator must produce an inclusion list. pub slot: Slot, + #[serde(with = "serde_utils::quoted_u64")] + /// The index of the validator. + pub validator_index: u64, + /// The hash tree root of the inclusion list committee. + pub committee_root: Hash256, } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 8a1eab3d75..f24ba9908f 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -49,6 +49,7 @@ pub mod graffiti; pub mod historical_batch; pub mod historical_summary; pub mod inclusion_list; +pub mod inclusion_list_committee; pub mod inclusion_list_duty; pub mod indexed_attestation; pub mod light_client_bootstrap; @@ -180,7 +181,8 @@ pub use crate::fork_name::{ForkName, InconsistentFork}; pub use crate::fork_versioned_response::{ForkVersionDeserialize, ForkVersionedResponse}; pub use crate::graffiti::{Graffiti, GRAFFITI_BYTES_LEN}; pub use crate::historical_batch::HistoricalBatch; -pub use crate::inclusion_list::{InclusionList, SignedInclusionList}; +pub use crate::inclusion_list::{InclusionList, InclusionListTransactions, SignedInclusionList}; +pub use crate::inclusion_list_committee::InclusionListCommittee; pub use crate::inclusion_list_duty::InclusionListDuty; pub use crate::indexed_attestation::{ IndexedAttestation, IndexedAttestationBase, IndexedAttestationElectra, IndexedAttestationRef, diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 95a221f189..daa0802f9b 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -724,6 +724,7 @@ async fn sort_nodes_by_health(nodes: &mut Vec pub enum ApiTopic { Attestations, Blocks, + InclusionLists, Subscriptions, SyncCommittee, } @@ -731,7 +732,13 @@ pub enum ApiTopic { impl ApiTopic { pub fn all() -> Vec { use ApiTopic::*; - vec![Attestations, Blocks, Subscriptions, SyncCommittee] + vec![ + Attestations, + Blocks, + InclusionLists, + Subscriptions, + SyncCommittee, + ] } } diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index 2fe4af39d3..d30d84bbb6 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -47,6 +47,7 @@ pub enum SignableMessage<'a, E: EthSpec, Payload: AbstractExecPayload = FullP SignedContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), VoluntaryExit(&'a VoluntaryExit), + InclusionList(&'a InclusionList), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> SignableMessage<'a, E, Payload> { @@ -68,6 +69,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> SignableMessage<'a, E, Pay SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain), SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain), + SignableMessage::InclusionList(il) => il.signing_root(domain), } } } @@ -223,6 +225,7 @@ impl SigningMethod { Web3SignerObject::ValidatorRegistration(v) } SignableMessage::VoluntaryExit(e) => Web3SignerObject::VoluntaryExit(e), + SignableMessage::InclusionList(il) => Web3SignerObject::InclusionList(il), }; // Determine the Web3Signer message type. diff --git a/validator_client/signing_method/src/web3signer.rs b/validator_client/signing_method/src/web3signer.rs index 86e7015ad3..a04922c8c0 100644 --- a/validator_client/signing_method/src/web3signer.rs +++ b/validator_client/signing_method/src/web3signer.rs @@ -18,6 +18,7 @@ pub enum MessageType { SyncCommitteeSelectionProof, SyncCommitteeContributionAndProof, ValidatorRegistration, + InclusionList, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -72,6 +73,7 @@ pub enum Web3SignerObject<'a, E: EthSpec, Payload: AbstractExecPayload> { SyncAggregatorSelectionData(&'a SyncAggregatorSelectionData), ContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), + InclusionList(&'a InclusionList), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Payload> { @@ -127,6 +129,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Pa MessageType::SyncCommitteeContributionAndProof } Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, + Web3SignerObject::InclusionList(_) => MessageType::InclusionList, } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index b0dc651f27..928e5c7672 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -44,6 +44,7 @@ use validator_services::{ attestation_service::{AttestationService, AttestationServiceBuilder}, block_service::{BlockService, BlockServiceBuilder}, duties_service::{self, DutiesService}, + inclusion_list_service::InclusionListService, preparation_service::{PreparationService, PreparationServiceBuilder}, sync::SyncDutiesMap, sync_committee_service::SyncCommitteeService, @@ -66,6 +67,7 @@ const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_INCLUSION_LIST_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_INCLUSION_LIST_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; @@ -81,6 +83,7 @@ pub struct ProductionValidatorClient { block_service: BlockService, attestation_service: AttestationService, sync_committee_service: SyncCommitteeService, + inclusion_list_service: InclusionListService, doppelganger_service: Option>, preparation_service: PreparationService, validator_store: Arc>, @@ -319,6 +322,7 @@ impl ProductionValidatorClient { sync_committee_contribution: slot_duration / HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT, sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, + inclusion_list: slot_duration / HTTP_INCLUSION_LIST_TIMEOUT_QUOTIENT, inclusion_list_duties: slot_duration / HTTP_INCLUSION_LIST_DUTIES_TIMEOUT_QUOTIENT, get_beacon_blocks_ssz: slot_duration @@ -530,12 +534,21 @@ impl ProductionValidatorClient { context.service_context("sync_committee".into()), ); + let inclusion_list_service = InclusionListService::new( + duties_service.clone(), + validator_store.clone(), + slot_clock.clone(), + beacon_nodes.clone(), + context.service_context("inclusion_list".into()), + ); + Ok(Self { context, duties_service, block_service, attestation_service, sync_committee_service, + inclusion_list_service, doppelganger_service, preparation_service, validator_store, @@ -611,6 +624,11 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + self.inclusion_list_service + .clone() + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start inclusion list service: {}", e))?; + self.preparation_service .clone() .start_update_service(&self.context.eth2_config.spec) diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index f47a117fb5..cad23634f3 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -334,6 +334,25 @@ impl DutiesService { .collect() } + /// Returns all `InclusionListDutyData` for the given `slot`. + pub fn inclusion_list_duties(&self, slot: Slot) -> Vec { + let epoch = slot.epoch(E::slots_per_epoch()); + + // Only collect validators that are considered safe in terms of doppelganger protection. + let signing_pubkeys: HashSet<_> = self + .validator_store + .voting_pubkeys(DoppelgangerStatus::only_safe); + + self.inclusion_list_duties + .read() + .iter() + .filter_map(|(_, map)| map.get(&epoch)) + .map(|(_, duty)| duty) + .filter(|duty| duty.slot == slot && signing_pubkeys.contains(&duty.pubkey)) + .cloned() + .collect() + } + /// Returns `true` if we should collect per validator metrics and `false` otherwise. pub fn per_validator_metrics(&self) -> bool { self.enable_high_validator_count_metrics diff --git a/validator_client/validator_services/src/inclusion_list_service.rs b/validator_client/validator_services/src/inclusion_list_service.rs new file mode 100644 index 0000000000..f3abdb82b0 --- /dev/null +++ b/validator_client/validator_services/src/inclusion_list_service.rs @@ -0,0 +1,285 @@ +use crate::duties_service::{DutiesService, DutyAndProof}; +use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; +use environment::RuntimeContext; +use eth2::types::InclusionListDutyData; +use futures::future::join_all; +use slog::{crit, debug, error, info, trace, warn}; +use slot_clock::SlotClock; +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::Arc; +use tokio::time::{sleep, sleep_until, Duration, Instant}; +use tree_hash::TreeHash; +use types::{ + Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, InclusionList, + SignedInclusionList, Slot, +}; +use validator_store::{Error as ValidatorStoreError, ValidatorStore}; + +/// Helper to minimise `Arc` usage. +pub struct Inner { + duties_service: Arc>, + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + context: RuntimeContext, +} + +/// Attempts to produce inclusion lists for all known validators 3/4 of the way through each slot. +pub struct InclusionListService { + inner: Arc>, +} + +impl Clone for InclusionListService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for InclusionListService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl InclusionListService { + pub fn new( + duties_service: Arc>, + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + context: RuntimeContext, + ) -> Self { + Self { + inner: Arc::new(Inner { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + context, + }), + } + } + + /// Starts the service which periodically produces inclusion lists. + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + let log = self.context.log().clone(); + + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + info!( + log, + "Inclusion list production service started"; + "next_update_millis" => duration_to_next_slot.as_millis() + ); + + let executor = self.context.executor.clone(); + + let interval_fut = async move { + loop { + if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() { + // 3/4 of the way into the slot + sleep(duration_to_next_slot + (slot_duration * 3 / 4)).await; + let log = self.context.log(); + + if let Err(e) = self.spawn_inclusion_list_task(slot_duration) { + crit!( + log, + "Failed to spawn inclusion list task"; + "error" => e + ) + } else { + trace!( + log, + "Spawned inclusion list task"; + ) + } + } else { + error!(log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + continue; + } + } + }; + + executor.spawn(interval_fut, "inclusion_list_service"); + Ok(()) + } + + /// Spawn a new task that downloads, signs and uploads the inclusion lists to the beacon node. + fn spawn_inclusion_list_task(&self, slot_duration: Duration) -> Result<(), String> { + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + let inclusion_list_duties = self.duties_service.inclusion_list_duties(slot); + self.inner.context.executor.spawn_ignoring_error( + self.clone() + .produce_and_publish_inclusion_lists(slot, inclusion_list_duties), + "inclusion list publish", + ); + + Ok(()) + } + + /// Downloads inclusion list objects, signs them, and returns them to the validator. + /// + /// ## Detail + /// + /// The given `validator_duties` should already be filtered to only contain those that match + /// `slot`. Critical errors will be logged if this is not the case. + /// + /// Only one `InclusionList` is downloaded from the BN. It is then cloned and signed by each + /// validator and the list of individually-signed `SignedInclusionList` objects is returned to + /// the BN. + async fn produce_and_publish_inclusion_lists( + self, + slot: Slot, + validator_duties: Vec, + ) -> Result<(), ()> { + let log = self.context.log(); + let validator_store = self.validator_store.clone(); + + if validator_duties.is_empty() { + return Ok(()); + } + + let current_epoch = self + .slot_clock + .now() + .ok_or("Unable to determine current slot from clock") + .map(|slot| slot.epoch(E::slots_per_epoch())); + let current_epoch = current_epoch.map_err(|e| { + crit!( + log, + "Error during inclusion list routine"; + "error" => format!("{:?}", e), + "slot" => slot.as_u64(), + ) + })?; + + let inclusion_list = self + .beacon_nodes + .first_success(|beacon_node| async move { + // TODO: add timer metric + beacon_node + .get_validator_inclusion_list(slot) + .await + .map_err(|e| format!("Failed to produce inclusion list: {:?}", e)) + .map(|result| result.ok_or(format!("Inclusion list unavailable")))? + .map(|result| result.data) + }) + .await + .map_err(|e| { + crit!( + log, + "Error during inclusion list routine"; + "error" => format!("{}", e), + "slot" => slot.as_u64(), + ) + })?; + + // Create futures to produce signed `InclusionList` objects. + let signing_futures = validator_duties.iter().map(|duty| { + let inclusion_list = inclusion_list.clone(); + let validator_store = Arc::clone(&validator_store); + async move { + // Ensure that the inclusion list matches the duties. + // + // TODO: do we need to check any other fields here? + if inclusion_list.slot != duty.slot { + crit!( + log, + "Inconsistent validator duties during signing"; + "validator" => ?duty.pubkey, + "duty_slot" => duty.slot, + "inclusion_list_slot" => inclusion_list.slot, + ); + return None; + } + + match validator_store + .sign_inclusion_list(duty.pubkey, inclusion_list) + .await + { + Ok(il) => Some((il, duty.validator_index)), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + warn!( + log, + "Missing pubkey for inclusion list"; + "info" => "a validator may have recently been removed from this VC", + "pubkey" => ?pubkey, + "validator" => ?duty.pubkey, + "slot" => slot.as_u64(), + ); + None + } + Err(e) => { + crit!( + log, + "Failed to sign inclusion list"; + "error" => ?e, + "validator" => ?duty.pubkey, + "slot" => slot.as_u64(), + ); + None + } + } + } + }); + + // Execute all the futures in parallel, collecting any successful results. + let (ref inclusion_lists, ref validator_indices): (Vec<_>, Vec<_>) = + join_all(signing_futures) + .await + .into_iter() + .flatten() + .unzip(); + + if inclusion_lists.is_empty() { + warn!(log, "No inclusion lists were published"); + return Ok(()); + } + + // Post the inclusion lists to the BN. + match self + .beacon_nodes + .request(ApiTopic::InclusionLists, |beacon_node| async move { + // TODO: add timer metric + beacon_node + .post_beacon_pool_inclusion_lists(inclusion_lists) + .await + }) + .await + { + Ok(()) => info!( + log, + "Successfully published inclusion lists"; + "count" => inclusion_lists.len(), + "validator_indices" => ?validator_indices, + "slot" => slot.as_u64(), + ), + Err(e) => error!( + log, + "Unable to publish inclusion lists"; + "error" => %e, + "slot" => slot.as_u64(), + ), + } + + Ok(()) + } +} diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index abf8fab3cb..e3ddbf7ae7 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -1,6 +1,7 @@ pub mod attestation_service; pub mod block_service; pub mod duties_service; +pub mod inclusion_list_service; pub mod preparation_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 837af5b51d..15377fada9 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -16,11 +16,11 @@ use task_executor::TaskExecutor; use types::{ attestation::Error as AttestationError, graffiti::GraffitiString, AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, - Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, PublicKeyBytes, SelectionProof, Signature, - SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, - SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, - SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, - ValidatorRegistrationData, VoluntaryExit, + Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, InclusionList, PublicKeyBytes, SelectionProof, + Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, + SignedInclusionList, SignedRoot, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, + SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, VoluntaryExit, }; #[derive(Debug, PartialEq)] @@ -1009,6 +1009,30 @@ impl ValidatorStore { Ok(SignedContributionAndProof { message, signature }) } + pub async fn sign_inclusion_list( + &self, + pubkey: PublicKeyBytes, + inclusion_list: InclusionList, + ) -> Result, Error> { + let signing_epoch = inclusion_list.slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::InclusionListCommittee, signing_epoch); + let signing_method = self.doppelganger_bypassed_signing_method(pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::InclusionList(&inclusion_list), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + + Ok(SignedInclusionList { + message: inclusion_list, + signature, + }) + } + pub fn import_slashing_protection( &self, interchange: Interchange,