diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index da9037a180..5d5186315d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2044,6 +2044,85 @@ impl BeaconChain { )?) } + /// Produce an `InclusionList` that is valid for the given `slot`. + /// + /// The produced `InclusionList` will not be valid until it has been signed by exactly one + /// validator that is in the inclusion list committee for `slot` in the canonical chain. + /// + /// ## Errors + /// + /// May return an error if the `request_slot` is too far behind the head state. + pub async fn produce_inclusion_list( + self: &Arc, + request_slot: Slot, + ) -> Result>, Error> { + let execution_layer = self + .execution_layer + .clone() + .ok_or(Error::ExecutionLayerMissing)?; + + // Load the cached head and the associated block hash and slot. + // + // Use a blocking task since blocking the core executor on the canonical head read lock can + // block the core tokio executor. + let chain = self.clone(); + let (head_slot, head_hash) = self + .spawn_blocking_handle( + move || { + let cached_head = chain.canonical_head.cached_head(); + let head_slot = cached_head.head_slot(); + let head_hash = cached_head.head_hash(); + (head_slot, head_hash) + }, + "produce_inclusion_list_head_read", + ) + .await?; + + // NOTE: not sure how to handle scenario where head hash is `None` i.e. pre-bellatrix, which + // is pre-electra. + let Some(head_hash) = head_hash else { + debug!( + self.log, + "Attempted to produce inclusion list pre-bellatrix" + ); + return Ok(None); + }; + + let current_slot = self.slot()?; + let next_slot = current_slot.safe_add(1)?; + + // Don't bother with the inclusion list if the head is not the current slot. + // + // This prevents the routine from running during sync. + if head_slot != current_slot { + debug!( + self.log, + "Head too old for inclusion list"; + "head_slot" => %head_slot, + "current_slot" => %current_slot, + ); + return Ok(None); + } + + // Don't bother with the inclusion list if the request slot is not the next + // slot. + // + // NOTE: does this represent a critical error? should we return an error here or log crit? + // is this check redundant? + if request_slot != next_slot { + debug!(self.log, "Inclusion list request slot not equal to next slot"; "request_slot" => %request_slot, "next_slot" => %next_slot); + return Ok(None); + } + + // Retrieve the inclusion list from the execution layer. + let inclusion_list = execution_layer + .get_inclusion_list(head_hash.into_root()) + .await + .map_err(|e| Error::ExecutionLayerGetInclusionListFailed(Box::new(e)))?; + + Ok(Some(inclusion_list)) + } + /// Performs the same validation as `Self::verify_unaggregated_attestation_for_gossip`, but for /// multiple attestations using batch BLS verification. Batch verification can provide /// significant CPU-time savings compared to individual verification. diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 4f92f5ec8f..9cbeb03246 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -139,6 +139,11 @@ impl CachedHead { self.snapshot.beacon_block.slot() } + /// Returns the `execution_payload.block_hash` of the block at the head of the beacon chain. + pub fn head_hash(&self) -> Option { + self.head_hash + } + /// Returns the `Fork` from the `BeaconState` at the head of the chain. pub fn head_fork(&self) -> Fork { self.snapshot.beacon_state.fork() diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index c3e31c23da..0fc6a32435 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -149,6 +149,7 @@ pub enum BeaconChainError { EngineGetCapabilititesFailed(Box), ExecutionLayerGetBlockByNumberFailed(Box), ExecutionLayerGetBlockByHashFailed(Box), + ExecutionLayerGetInclusionListFailed(Box), BlockHashMissingFromExecutionLayer(ExecutionBlockHash), InconsistentPayloadReconstructed { slot: Slot, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 08a00d7bf8..03305c2231 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -54,8 +54,8 @@ use types::{ }; use types::{ BeaconStateError, BlindedPayload, ChainSpec, Epoch, ExecPayload, ExecutionPayloadBellatrix, - ExecutionPayloadCapella, ExecutionPayloadElectra, FullPayload, ProposerPreparationData, - PublicKeyBytes, Signature, Slot, + ExecutionPayloadCapella, ExecutionPayloadElectra, FullPayload, InclusionListTransactions, + ProposerPreparationData, PublicKeyBytes, Signature, Slot, }; mod block_hash; @@ -1951,6 +1951,19 @@ impl ExecutionLayer { Err(Error::NoPayloadBuilder) } } + + pub async fn get_inclusion_list( + &self, + parent_hash: Hash256, + ) -> Result, Error> { + debug!(self.log(), "Requesting inclusion list from EL"; "parent_hash" => %parent_hash); + let transactions = self + .engine() + .api + .get_inclusion_list::(parent_hash) + .await?; + Ok(transactions) + } } #[derive(AsRefStr)] diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index cc3b01db87..42f11265ac 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -86,8 +86,8 @@ use types::{ AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, - SyncCommitteeMessage, SyncContributionData, + SignedContributionAndProof, SignedInclusionList, SignedValidatorRegistrationData, + SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -265,6 +265,7 @@ pub fn prometheus_metrics() -> warp::filters::log::Log( }, ); + // POST beacon/pool/inclusion_lists + let post_beacon_pool_inclusion_lists = beacon_pool_path + .clone() + .and(warp::path("inclusion_lists")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + inclusion_lists: Vec>, + network_tx: UnboundedSender>, + log: Logger| { + task_spawner.blocking_json_task(Priority::P0, move || { + // TODO: actually gossip the inclusion lists + info!( + log, + "Posting signed inclusion lists for gossip"; + "num_inclusion_lists" => inclusion_lists.len(), + ); + + Ok(()) + }) + }, + ); + // GET beacon/deposit_snapshot let get_beacon_deposit_snapshot = eth_v1 .and(warp::path("beacon")) @@ -3496,6 +3524,47 @@ pub fn serve( }, ); + // GET validator/inclusion_list?slot + let get_validator_inclusion_list = eth_v1 + .and(warp::path("validator")) + .and(warp::path("inclusion_list")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::ValidatorInclusionListQuery, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + not_synced_filter?; + + let current_slot = chain + .slot() + .map_err(warp_utils::reject::beacon_chain_error)?; + + // allow a tolerance of one slot to account for clock skew + // + // TODO: make sure tolerance is consistent with inner logic + if query.slot > current_slot + 1 { + return Err(warp_utils::reject::custom_bad_request(format!( + "request slot {} is more than one slot past the current slot {}", + query.slot, current_slot + ))); + } + + let data = chain + .produce_inclusion_list(query.slot) + .await + .map(|il| il.clone()) + .map(api_types::GenericResponse::from) + .map_err(warp_utils::reject::beacon_chain_error)?; + Ok::<_, warp::reject::Rejection>(warp::reply::json(&data).into_response()) + }) + }, + ); // POST validator/aggregate_and_proofs let post_validator_aggregate_and_proofs = any_version .and(warp::path("validator")) @@ -4706,6 +4775,7 @@ pub fn serve( .uor(get_validator_attestation_data) .uor(get_validator_aggregate_attestation) .uor(get_validator_sync_committee_contribution) + .uor(get_validator_inclusion_list) .uor(get_lighthouse_health) .uor(get_lighthouse_ui_health) .uor(get_lighthouse_ui_validator_count) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 18433ff9a0..16fad3e512 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -128,6 +128,7 @@ pub struct Timeouts { pub proposer_duties: Duration, pub sync_committee_contribution: Duration, pub sync_duties: Duration, + pub inclusion_list: Duration, pub inclusion_list_duties: Duration, pub get_beacon_blocks_ssz: Duration, pub get_debug_beacon_states: Duration, @@ -146,6 +147,7 @@ impl Timeouts { proposer_duties: timeout, sync_committee_contribution: timeout, sync_duties: timeout, + inclusion_list: timeout, inclusion_list_duties: timeout, get_beacon_blocks_ssz: timeout, get_debug_beacon_states: timeout, @@ -1564,6 +1566,25 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/pool/inclusion_lists` + pub async fn post_beacon_pool_inclusion_lists( + &self, + inclusion_lists: &[SignedInclusionList], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("inclusion_lists"); + + self.post_with_timeout(path, &inclusion_lists, self.timeouts.inclusion_list) + .await?; + + Ok(()) + } + /// `GET beacon/deposit_snapshot` pub async fn get_deposit_snapshot(&self) -> Result, Error> { let mut path = self.eth_path(V1)?; @@ -2427,6 +2448,25 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } + /// `GET validator/inclusion_list?slot` + pub async fn get_validator_inclusion_list( + &self, + slot: Slot, + ) -> Result>>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("inclusion_list"); + + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()); + + self.get_opt_with_timeout(path, self.timeouts.inclusion_list) + .await + } + /// `POST lighthouse/liveness` pub async fn post_lighthouse_liveness( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index f1b2216336..1344c14416 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -793,6 +793,11 @@ pub struct ValidatorAggregateAttestationQuery { pub committee_index: Option, } +#[derive(Clone, Serialize, Deserialize)] +pub struct ValidatorInclusionListQuery { + pub slot: Slot, +} + #[derive(Clone, Deserialize)] pub struct LightClientUpdatesQuery { pub start_period: u64, diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index b035a39b06..907830009c 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -816,7 +816,7 @@ impl BeaconState { &self, slot: Slot, spec: &ChainSpec, - ) -> Result, Error> { + ) -> Result, Error> { let epoch = slot.epoch(E::slots_per_epoch()); let current_epoch = self.current_epoch(); let next_epoch = current_epoch + 1; @@ -842,11 +842,11 @@ impl BeaconState { spec.shuffle_round_count, ) .ok_or(Error::UnableToShuffle)?; - il_committee_indices.push(shuffled_index); + il_committee_indices.push(shuffled_index as u64); i.safe_add_assign(1)?; } - Ok(il_committee_indices) + Ok(InclusionListCommittee::::from(il_committee_indices)) } /// Compute the seed to use for the beacon inclusion list committee selection at the given @@ -1727,10 +1727,16 @@ impl BeaconState { epoch: Epoch, spec: &ChainSpec, ) -> Result, Error> { + let validator_index = validator_index as u64; for slot in epoch.slot_iter(E::slots_per_epoch()) { let committee = self.get_inclusion_list_committee(slot, spec)?; + let committee_root = committee.tree_hash_root(); if committee.contains(&validator_index) { - return Ok(Some(InclusionListDuty { slot })); + return Ok(Some(InclusionListDuty { + slot, + validator_index, + committee_root, + })); } } Ok(None) diff --git a/consensus/types/src/inclusion_list.rs b/consensus/types/src/inclusion_list.rs index 016d0bc2bb..9283bb5799 100644 --- a/consensus/types/src/inclusion_list.rs +++ b/consensus/types/src/inclusion_list.rs @@ -9,6 +9,11 @@ use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; +pub type InclusionListTransactions = VariableList< + Transaction<::MaxBytesPerTransaction>, + ::MaxTransactionsPerInclusionList, +>; + #[derive( Debug, Clone, @@ -29,8 +34,7 @@ pub struct InclusionList { #[serde(with = "serde_utils::quoted_u64")] pub validator_index: u64, pub inclusion_list_committee_root: Hash256, - pub transactions: - VariableList, E::MaxTransactionsPerInclusionList>, + pub transactions: InclusionListTransactions, } impl SignedRoot for InclusionList {} diff --git a/consensus/types/src/inclusion_list_committee.rs b/consensus/types/src/inclusion_list_committee.rs new file mode 100644 index 0000000000..2aa928e02e --- /dev/null +++ b/consensus/types/src/inclusion_list_committee.rs @@ -0,0 +1,3 @@ +use crate::*; + +pub type InclusionListCommittee = FixedVector::InclusionListCommitteeSize>; diff --git a/consensus/types/src/inclusion_list_duty.rs b/consensus/types/src/inclusion_list_duty.rs index bf578b4623..4317abd5eb 100644 --- a/consensus/types/src/inclusion_list_duty.rs +++ b/consensus/types/src/inclusion_list_duty.rs @@ -5,4 +5,9 @@ use serde::{Deserialize, Serialize}; pub struct InclusionListDuty { /// The slot during which the validator must produce an inclusion list. pub slot: Slot, + #[serde(with = "serde_utils::quoted_u64")] + /// The index of the validator. + pub validator_index: u64, + /// The hash tree root of the inclusion list committee. + pub committee_root: Hash256, } diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 8a1eab3d75..f24ba9908f 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -49,6 +49,7 @@ pub mod graffiti; pub mod historical_batch; pub mod historical_summary; pub mod inclusion_list; +pub mod inclusion_list_committee; pub mod inclusion_list_duty; pub mod indexed_attestation; pub mod light_client_bootstrap; @@ -180,7 +181,8 @@ pub use crate::fork_name::{ForkName, InconsistentFork}; pub use crate::fork_versioned_response::{ForkVersionDeserialize, ForkVersionedResponse}; pub use crate::graffiti::{Graffiti, GRAFFITI_BYTES_LEN}; pub use crate::historical_batch::HistoricalBatch; -pub use crate::inclusion_list::{InclusionList, SignedInclusionList}; +pub use crate::inclusion_list::{InclusionList, InclusionListTransactions, SignedInclusionList}; +pub use crate::inclusion_list_committee::InclusionListCommittee; pub use crate::inclusion_list_duty::InclusionListDuty; pub use crate::indexed_attestation::{ IndexedAttestation, IndexedAttestationBase, IndexedAttestationElectra, IndexedAttestationRef, diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 95a221f189..daa0802f9b 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -724,6 +724,7 @@ async fn sort_nodes_by_health(nodes: &mut Vec pub enum ApiTopic { Attestations, Blocks, + InclusionLists, Subscriptions, SyncCommittee, } @@ -731,7 +732,13 @@ pub enum ApiTopic { impl ApiTopic { pub fn all() -> Vec { use ApiTopic::*; - vec![Attestations, Blocks, Subscriptions, SyncCommittee] + vec![ + Attestations, + Blocks, + InclusionLists, + Subscriptions, + SyncCommittee, + ] } } diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index 2fe4af39d3..d30d84bbb6 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -47,6 +47,7 @@ pub enum SignableMessage<'a, E: EthSpec, Payload: AbstractExecPayload = FullP SignedContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), VoluntaryExit(&'a VoluntaryExit), + InclusionList(&'a InclusionList), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> SignableMessage<'a, E, Payload> { @@ -68,6 +69,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> SignableMessage<'a, E, Pay SignableMessage::SignedContributionAndProof(c) => c.signing_root(domain), SignableMessage::ValidatorRegistration(v) => v.signing_root(domain), SignableMessage::VoluntaryExit(exit) => exit.signing_root(domain), + SignableMessage::InclusionList(il) => il.signing_root(domain), } } } @@ -223,6 +225,7 @@ impl SigningMethod { Web3SignerObject::ValidatorRegistration(v) } SignableMessage::VoluntaryExit(e) => Web3SignerObject::VoluntaryExit(e), + SignableMessage::InclusionList(il) => Web3SignerObject::InclusionList(il), }; // Determine the Web3Signer message type. diff --git a/validator_client/signing_method/src/web3signer.rs b/validator_client/signing_method/src/web3signer.rs index 86e7015ad3..a04922c8c0 100644 --- a/validator_client/signing_method/src/web3signer.rs +++ b/validator_client/signing_method/src/web3signer.rs @@ -18,6 +18,7 @@ pub enum MessageType { SyncCommitteeSelectionProof, SyncCommitteeContributionAndProof, ValidatorRegistration, + InclusionList, } #[derive(Debug, PartialEq, Copy, Clone, Serialize)] @@ -72,6 +73,7 @@ pub enum Web3SignerObject<'a, E: EthSpec, Payload: AbstractExecPayload> { SyncAggregatorSelectionData(&'a SyncAggregatorSelectionData), ContributionAndProof(&'a ContributionAndProof), ValidatorRegistration(&'a ValidatorRegistrationData), + InclusionList(&'a InclusionList), } impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Payload> { @@ -127,6 +129,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> Web3SignerObject<'a, E, Pa MessageType::SyncCommitteeContributionAndProof } Web3SignerObject::ValidatorRegistration(_) => MessageType::ValidatorRegistration, + Web3SignerObject::InclusionList(_) => MessageType::InclusionList, } } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index b0dc651f27..928e5c7672 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -44,6 +44,7 @@ use validator_services::{ attestation_service::{AttestationService, AttestationServiceBuilder}, block_service::{BlockService, BlockServiceBuilder}, duties_service::{self, DutiesService}, + inclusion_list_service::InclusionListService, preparation_service::{PreparationService, PreparationServiceBuilder}, sync::SyncDutiesMap, sync_committee_service::SyncCommitteeService, @@ -66,6 +67,7 @@ const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_INCLUSION_LIST_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_INCLUSION_LIST_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; @@ -81,6 +83,7 @@ pub struct ProductionValidatorClient { block_service: BlockService, attestation_service: AttestationService, sync_committee_service: SyncCommitteeService, + inclusion_list_service: InclusionListService, doppelganger_service: Option>, preparation_service: PreparationService, validator_store: Arc>, @@ -319,6 +322,7 @@ impl ProductionValidatorClient { sync_committee_contribution: slot_duration / HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT, sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, + inclusion_list: slot_duration / HTTP_INCLUSION_LIST_TIMEOUT_QUOTIENT, inclusion_list_duties: slot_duration / HTTP_INCLUSION_LIST_DUTIES_TIMEOUT_QUOTIENT, get_beacon_blocks_ssz: slot_duration @@ -530,12 +534,21 @@ impl ProductionValidatorClient { context.service_context("sync_committee".into()), ); + let inclusion_list_service = InclusionListService::new( + duties_service.clone(), + validator_store.clone(), + slot_clock.clone(), + beacon_nodes.clone(), + context.service_context("inclusion_list".into()), + ); + Ok(Self { context, duties_service, block_service, attestation_service, sync_committee_service, + inclusion_list_service, doppelganger_service, preparation_service, validator_store, @@ -611,6 +624,11 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + self.inclusion_list_service + .clone() + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start inclusion list service: {}", e))?; + self.preparation_service .clone() .start_update_service(&self.context.eth2_config.spec) diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index f47a117fb5..cad23634f3 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -334,6 +334,25 @@ impl DutiesService { .collect() } + /// Returns all `InclusionListDutyData` for the given `slot`. + pub fn inclusion_list_duties(&self, slot: Slot) -> Vec { + let epoch = slot.epoch(E::slots_per_epoch()); + + // Only collect validators that are considered safe in terms of doppelganger protection. + let signing_pubkeys: HashSet<_> = self + .validator_store + .voting_pubkeys(DoppelgangerStatus::only_safe); + + self.inclusion_list_duties + .read() + .iter() + .filter_map(|(_, map)| map.get(&epoch)) + .map(|(_, duty)| duty) + .filter(|duty| duty.slot == slot && signing_pubkeys.contains(&duty.pubkey)) + .cloned() + .collect() + } + /// Returns `true` if we should collect per validator metrics and `false` otherwise. pub fn per_validator_metrics(&self) -> bool { self.enable_high_validator_count_metrics diff --git a/validator_client/validator_services/src/inclusion_list_service.rs b/validator_client/validator_services/src/inclusion_list_service.rs new file mode 100644 index 0000000000..f3abdb82b0 --- /dev/null +++ b/validator_client/validator_services/src/inclusion_list_service.rs @@ -0,0 +1,285 @@ +use crate::duties_service::{DutiesService, DutyAndProof}; +use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; +use environment::RuntimeContext; +use eth2::types::InclusionListDutyData; +use futures::future::join_all; +use slog::{crit, debug, error, info, trace, warn}; +use slot_clock::SlotClock; +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::Arc; +use tokio::time::{sleep, sleep_until, Duration, Instant}; +use tree_hash::TreeHash; +use types::{ + Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, InclusionList, + SignedInclusionList, Slot, +}; +use validator_store::{Error as ValidatorStoreError, ValidatorStore}; + +/// Helper to minimise `Arc` usage. +pub struct Inner { + duties_service: Arc>, + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + context: RuntimeContext, +} + +/// Attempts to produce inclusion lists for all known validators 3/4 of the way through each slot. +pub struct InclusionListService { + inner: Arc>, +} + +impl Clone for InclusionListService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for InclusionListService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl InclusionListService { + pub fn new( + duties_service: Arc>, + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + context: RuntimeContext, + ) -> Self { + Self { + inner: Arc::new(Inner { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + context, + }), + } + } + + /// Starts the service which periodically produces inclusion lists. + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + let log = self.context.log().clone(); + + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + info!( + log, + "Inclusion list production service started"; + "next_update_millis" => duration_to_next_slot.as_millis() + ); + + let executor = self.context.executor.clone(); + + let interval_fut = async move { + loop { + if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() { + // 3/4 of the way into the slot + sleep(duration_to_next_slot + (slot_duration * 3 / 4)).await; + let log = self.context.log(); + + if let Err(e) = self.spawn_inclusion_list_task(slot_duration) { + crit!( + log, + "Failed to spawn inclusion list task"; + "error" => e + ) + } else { + trace!( + log, + "Spawned inclusion list task"; + ) + } + } else { + error!(log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + continue; + } + } + }; + + executor.spawn(interval_fut, "inclusion_list_service"); + Ok(()) + } + + /// Spawn a new task that downloads, signs and uploads the inclusion lists to the beacon node. + fn spawn_inclusion_list_task(&self, slot_duration: Duration) -> Result<(), String> { + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + let inclusion_list_duties = self.duties_service.inclusion_list_duties(slot); + self.inner.context.executor.spawn_ignoring_error( + self.clone() + .produce_and_publish_inclusion_lists(slot, inclusion_list_duties), + "inclusion list publish", + ); + + Ok(()) + } + + /// Downloads inclusion list objects, signs them, and returns them to the validator. + /// + /// ## Detail + /// + /// The given `validator_duties` should already be filtered to only contain those that match + /// `slot`. Critical errors will be logged if this is not the case. + /// + /// Only one `InclusionList` is downloaded from the BN. It is then cloned and signed by each + /// validator and the list of individually-signed `SignedInclusionList` objects is returned to + /// the BN. + async fn produce_and_publish_inclusion_lists( + self, + slot: Slot, + validator_duties: Vec, + ) -> Result<(), ()> { + let log = self.context.log(); + let validator_store = self.validator_store.clone(); + + if validator_duties.is_empty() { + return Ok(()); + } + + let current_epoch = self + .slot_clock + .now() + .ok_or("Unable to determine current slot from clock") + .map(|slot| slot.epoch(E::slots_per_epoch())); + let current_epoch = current_epoch.map_err(|e| { + crit!( + log, + "Error during inclusion list routine"; + "error" => format!("{:?}", e), + "slot" => slot.as_u64(), + ) + })?; + + let inclusion_list = self + .beacon_nodes + .first_success(|beacon_node| async move { + // TODO: add timer metric + beacon_node + .get_validator_inclusion_list(slot) + .await + .map_err(|e| format!("Failed to produce inclusion list: {:?}", e)) + .map(|result| result.ok_or(format!("Inclusion list unavailable")))? + .map(|result| result.data) + }) + .await + .map_err(|e| { + crit!( + log, + "Error during inclusion list routine"; + "error" => format!("{}", e), + "slot" => slot.as_u64(), + ) + })?; + + // Create futures to produce signed `InclusionList` objects. + let signing_futures = validator_duties.iter().map(|duty| { + let inclusion_list = inclusion_list.clone(); + let validator_store = Arc::clone(&validator_store); + async move { + // Ensure that the inclusion list matches the duties. + // + // TODO: do we need to check any other fields here? + if inclusion_list.slot != duty.slot { + crit!( + log, + "Inconsistent validator duties during signing"; + "validator" => ?duty.pubkey, + "duty_slot" => duty.slot, + "inclusion_list_slot" => inclusion_list.slot, + ); + return None; + } + + match validator_store + .sign_inclusion_list(duty.pubkey, inclusion_list) + .await + { + Ok(il) => Some((il, duty.validator_index)), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + warn!( + log, + "Missing pubkey for inclusion list"; + "info" => "a validator may have recently been removed from this VC", + "pubkey" => ?pubkey, + "validator" => ?duty.pubkey, + "slot" => slot.as_u64(), + ); + None + } + Err(e) => { + crit!( + log, + "Failed to sign inclusion list"; + "error" => ?e, + "validator" => ?duty.pubkey, + "slot" => slot.as_u64(), + ); + None + } + } + } + }); + + // Execute all the futures in parallel, collecting any successful results. + let (ref inclusion_lists, ref validator_indices): (Vec<_>, Vec<_>) = + join_all(signing_futures) + .await + .into_iter() + .flatten() + .unzip(); + + if inclusion_lists.is_empty() { + warn!(log, "No inclusion lists were published"); + return Ok(()); + } + + // Post the inclusion lists to the BN. + match self + .beacon_nodes + .request(ApiTopic::InclusionLists, |beacon_node| async move { + // TODO: add timer metric + beacon_node + .post_beacon_pool_inclusion_lists(inclusion_lists) + .await + }) + .await + { + Ok(()) => info!( + log, + "Successfully published inclusion lists"; + "count" => inclusion_lists.len(), + "validator_indices" => ?validator_indices, + "slot" => slot.as_u64(), + ), + Err(e) => error!( + log, + "Unable to publish inclusion lists"; + "error" => %e, + "slot" => slot.as_u64(), + ), + } + + Ok(()) + } +} diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index abf8fab3cb..e3ddbf7ae7 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -1,6 +1,7 @@ pub mod attestation_service; pub mod block_service; pub mod duties_service; +pub mod inclusion_list_service; pub mod preparation_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 837af5b51d..15377fada9 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -16,11 +16,11 @@ use task_executor::TaskExecutor; use types::{ attestation::Error as AttestationError, graffiti::GraffitiString, AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, - Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, PublicKeyBytes, SelectionProof, Signature, - SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedRoot, - SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncAggregatorSelectionData, - SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, - ValidatorRegistrationData, VoluntaryExit, + Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, InclusionList, PublicKeyBytes, SelectionProof, + Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, + SignedInclusionList, SignedRoot, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncAggregatorSelectionData, SyncCommitteeContribution, SyncCommitteeMessage, + SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData, VoluntaryExit, }; #[derive(Debug, PartialEq)] @@ -1009,6 +1009,30 @@ impl ValidatorStore { Ok(SignedContributionAndProof { message, signature }) } + pub async fn sign_inclusion_list( + &self, + pubkey: PublicKeyBytes, + inclusion_list: InclusionList, + ) -> Result, Error> { + let signing_epoch = inclusion_list.slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::InclusionListCommittee, signing_epoch); + let signing_method = self.doppelganger_bypassed_signing_method(pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::InclusionList(&inclusion_list), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + + Ok(SignedInclusionList { + message: inclusion_list, + signature, + }) + } + pub fn import_slashing_protection( &self, interchange: Interchange,