Anchor pre-PR: Modularize validator store (#6771)

* pass slots_per_epoch at runtime

* remove generic E from unrequired types

* move `validator_store` to `lighthouse_validator_store`

* make validator_store into a trait

* further reduce dependencies

* remove `environment` dependency on `beacon_node_fallback`

* Manually pull in some changes from tracing-integration (thanks sayan!)

Co-authored-by: ThreeHrSleep <threehrsleep@gmail.com>

* remove `environment` from `validator_services`

* unify boost factor accessors

* add builder for DutiesService

* Manually merge tracing PR for beacon_node_fallback

Co-authored-by: ThreeHrSleep <threehrsleep@gmail.com>

* Fix chain_spec for BlockService

* address review

* remove PhantomData from SyncDutiesMap

* fix tests

* correct test

* Add `E` to `ValidatorStore` as associated type

* fix tests

* derive Clone for ValidatorStore's Error and required sub-errors

* switch to enum for block signing to allow differing types

---------

Co-authored-by: João Oliveira <hello@jxs.pt>
Co-authored-by: ThreeHrSleep <threehrsleep@gmail.com>
Co-authored-by: Jimmy Chen <jimmy@sigmaprime.io>
This commit is contained in:
Daniel Knopik
2025-01-09 13:56:29 +01:00
committed by GitHub
parent 9a4768a771
commit 64eb84800a
46 changed files with 2305 additions and 2195 deletions

View File

@@ -6,17 +6,17 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
[dependencies]
beacon_node_fallback = { workspace = true }
bls = { workspace = true }
doppelganger_service = { workspace = true }
environment = { workspace = true }
bls = { workspace = true }
eth2 = { workspace = true }
futures = { workspace = true }
graffiti_file = { workspace = true }
logging = { workspace = true }
parking_lot = { workspace = true }
safe_arith = { workspace = true }
slog = { workspace = true }
slot_clock = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tree_hash = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }

View File

@@ -1,44 +1,47 @@
use crate::duties_service::{DutiesService, DutyAndProof};
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use environment::RuntimeContext;
use futures::future::join_all;
use slog::{crit, debug, error, info, trace, warn};
use logging::crit;
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
/// Builds an `AttestationService`.
#[derive(Default)]
pub struct AttestationServiceBuilder<T: SlotClock + 'static, E: EthSpec> {
duties_service: Option<Arc<DutiesService<T, E>>>,
validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub struct AttestationServiceBuilder<S: ValidatorStore, T: SlotClock + 'static> {
duties_service: Option<Arc<DutiesService<S, T>>>,
validator_store: Option<Arc<S>>,
slot_clock: Option<T>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
chain_spec: Option<Arc<ChainSpec>>,
}
impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
duties_service: None,
validator_store: None,
slot_clock: None,
beacon_nodes: None,
context: None,
executor: None,
chain_spec: None,
}
}
pub fn duties_service(mut self, service: Arc<DutiesService<T, E>>) -> Self {
pub fn duties_service(mut self, service: Arc<DutiesService<S, T>>) -> Self {
self.duties_service = Some(service);
self
}
pub fn validator_store(mut self, store: Arc<ValidatorStore<T, E>>) -> Self {
pub fn validator_store(mut self, store: Arc<S>) -> Self {
self.validator_store = Some(store);
self
}
@@ -48,17 +51,22 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn runtime_context(mut self, context: RuntimeContext<E>) -> Self {
self.context = Some(context);
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn build(self) -> Result<AttestationService<T, E>, String> {
pub fn chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
self.chain_spec = Some(chain_spec);
self
}
pub fn build(self) -> Result<AttestationService<S, T>, String> {
Ok(AttestationService {
inner: Arc::new(Inner {
duties_service: self
@@ -73,21 +81,25 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build AttestationService without beacon_nodes")?,
context: self
.context
.ok_or("Cannot build AttestationService without runtime_context")?,
executor: self
.executor
.ok_or("Cannot build AttestationService without executor")?,
chain_spec: self
.chain_spec
.ok_or("Cannot build AttestationService without chain_spec")?,
}),
})
}
}
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
pub struct Inner<S, T> {
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
chain_spec: Arc<ChainSpec>,
}
/// Attempts to produce attestations for all known validators 1/3rd of the way through each slot.
@@ -95,11 +107,11 @@ pub struct Inner<T, E: EthSpec> {
/// If any validators are on the same committee, a single attestation will be downloaded and
/// returned to the beacon node. This attestation will have a signature from each of the
/// validators.
pub struct AttestationService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
pub struct AttestationService<S, T> {
inner: Arc<Inner<S, T>>,
}
impl<T, E: EthSpec> Clone for AttestationService<T, E> {
impl<S, T> Clone for AttestationService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@@ -107,19 +119,17 @@ impl<T, E: EthSpec> Clone for AttestationService<T, E> {
}
}
impl<T, E: EthSpec> Deref for AttestationService<T, E> {
type Target = Inner<T, E>;
impl<S, T> Deref for AttestationService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, T> {
/// Starts the service which periodically produces attestations.
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
let log = self.context.log().clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
let duration_to_next_slot = self
.slot_clock
@@ -127,33 +137,24 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.ok_or("Unable to determine duration to next slot")?;
info!(
log,
"Attestation production service started";
"next_update_millis" => duration_to_next_slot.as_millis()
next_update_millis = duration_to_next_slot.as_millis(),
"Attestation production service started"
);
let executor = self.context.executor.clone();
let executor = self.executor.clone();
let interval_fut = async move {
loop {
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot + slot_duration / 3).await;
let log = self.context.log();
if let Err(e) = self.spawn_attestation_tasks(slot_duration) {
crit!(
log,
"Failed to spawn attestation tasks";
"error" => e
)
crit!(error = e, "Failed to spawn attestation tasks")
} else {
trace!(
log,
"Spawned attestation tasks";
)
trace!("Spawned attestation tasks");
}
} else {
error!(log, "Failed to read slot clock");
error!("Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
continue;
@@ -200,7 +201,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.into_iter()
.for_each(|(committee_index, validator_duties)| {
// Spawn a separate task for each attestation.
self.inner.context.executor.spawn_ignoring_error(
self.inner.executor.spawn_ignoring_error(
self.clone().publish_attestations_and_aggregates(
slot,
committee_index,
@@ -235,7 +236,6 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
validator_duties: Vec<DutyAndProof>,
aggregate_production_instant: Instant,
) -> Result<(), ()> {
let log = self.context.log();
let attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
@@ -255,11 +255,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.await
.map_err(move |e| {
crit!(
log,
"Error during attestation routine";
"error" => format!("{:?}", e),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
)
})?;
@@ -292,11 +291,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.await
.map_err(move |e| {
crit!(
log,
"Error during attestation routine";
"error" => format!("{:?}", e),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
)
})?;
}
@@ -322,8 +320,6 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
) -> Result<Option<AttestationData>, String> {
let log = self.context.log();
if validator_duties.is_empty() {
return Ok(None);
}
@@ -332,7 +328,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.slot_clock
.now()
.ok_or("Unable to determine current slot from clock")?
.epoch(E::slots_per_epoch());
.epoch(S::E::slots_per_epoch());
let attestation_data = self
.beacon_nodes
@@ -357,36 +353,34 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let attestation_data = attestation_data_ref;
// Ensure that the attestation matches the duties.
if !duty.match_attestation_data::<E>(attestation_data, &self.context.eth2_config.spec) {
if !duty.match_attestation_data::<S::E>(attestation_data, &self.chain_spec) {
crit!(
log,
"Inconsistent validator duties during signing";
"validator" => ?duty.pubkey,
"duty_slot" => duty.slot,
"attestation_slot" => attestation_data.slot,
"duty_index" => duty.committee_index,
"attestation_index" => attestation_data.index,
validator = ?duty.pubkey,
duty_slot = ?duty.slot,
attestation_slot = %attestation_data.slot,
duty_index = duty.committee_index,
attestation_index = attestation_data.index,
"Inconsistent validator duties during signing"
);
return None;
}
let mut attestation = match Attestation::<E>::empty_for_signing(
let mut attestation = match Attestation::empty_for_signing(
duty.committee_index,
duty.committee_length as usize,
attestation_data.slot,
attestation_data.beacon_block_root,
attestation_data.source,
attestation_data.target,
&self.context.eth2_config.spec,
&self.chain_spec,
) {
Ok(attestation) => attestation,
Err(err) => {
crit!(
log,
"Invalid validator duties during signing";
"validator" => ?duty.pubkey,
"duty" => ?duty,
"err" => ?err,
validator = ?duty.pubkey,
?duty,
?err,
"Invalid validator duties during signing"
);
return None;
}
@@ -407,24 +401,22 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// A pubkey can be missing when a validator was recently
// removed via the API.
warn!(
log,
"Missing pubkey for attestation";
"info" => "a validator may have recently been removed from this VC",
"pubkey" => ?pubkey,
"validator" => ?duty.pubkey,
"committee_index" => committee_index,
"slot" => slot.as_u64(),
info = "a validator may have recently been removed from this VC",
pubkey = ?pubkey,
validator = ?duty.pubkey,
committee_index = committee_index,
slot = slot.as_u64(),
"Missing pubkey for attestation"
);
None
}
Err(e) => {
crit!(
log,
"Failed to sign attestation";
"error" => ?e,
"validator" => ?duty.pubkey,
"committee_index" => committee_index,
"slot" => slot.as_u64(),
error = ?e,
validator = ?duty.pubkey,
committee_index,
slot = slot.as_u64(),
"Failed to sign attestation"
);
None
}
@@ -439,14 +431,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.unzip();
if attestations.is_empty() {
warn!(log, "No attestations were published");
warn!("No attestations were published");
return Ok(None);
}
let fork_name = self
.context
.eth2_config
.spec
.fork_name_at_slot::<E>(attestation_data.slot);
.chain_spec
.fork_name_at_slot::<S::E>(attestation_data.slot);
// Post the attestations to the BN.
match self
@@ -469,22 +459,20 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.await
{
Ok(()) => info!(
log,
"Successfully published attestations";
"count" => attestations.len(),
"validator_indices" => ?validator_indices,
"head_block" => ?attestation_data.beacon_block_root,
"committee_index" => attestation_data.index,
"slot" => attestation_data.slot.as_u64(),
"type" => "unaggregated",
count = attestations.len(),
validator_indices = ?validator_indices,
head_block = ?attestation_data.beacon_block_root,
committee_index = attestation_data.index,
slot = attestation_data.slot.as_u64(),
"type" = "unaggregated",
"Successfully published attestations"
),
Err(e) => error!(
log,
"Unable to publish attestations";
"error" => %e,
"committee_index" => attestation_data.index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
error = %e,
committee_index = attestation_data.index,
slot = slot.as_u64(),
"type" = "unaggregated",
"Unable to publish attestations"
),
}
@@ -510,8 +498,6 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
) -> Result<(), String> {
let log = self.context.log();
if !validator_duties
.iter()
.any(|duty_and_proof| duty_and_proof.selection_proof.is_some())
@@ -521,10 +507,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
}
let fork_name = self
.context
.eth2_config
.spec
.fork_name_at_slot::<E>(attestation_data.slot);
.chain_spec
.fork_name_at_slot::<S::E>(attestation_data.slot);
let aggregated_attestation = &self
.beacon_nodes
@@ -568,8 +552,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let duty = &duty_and_proof.duty;
let selection_proof = duty_and_proof.selection_proof.as_ref()?;
if !duty.match_attestation_data::<E>(attestation_data, &self.context.eth2_config.spec) {
crit!(log, "Inconsistent validator duties during signing");
if !duty.match_attestation_data::<S::E>(attestation_data, &self.chain_spec) {
crit!("Inconsistent validator duties during signing");
return None;
}
@@ -587,19 +571,14 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for aggregate";
"pubkey" => ?pubkey,
);
debug!(?pubkey, "Missing pubkey for aggregate");
None
}
Err(e) => {
crit!(
log,
"Failed to sign aggregate";
"error" => ?e,
"pubkey" => ?duty.pubkey,
error = ?e,
pubkey = ?duty.pubkey,
"Failed to sign aggregate"
);
None
}
@@ -643,14 +622,13 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
for signed_aggregate_and_proof in signed_aggregate_and_proofs {
let attestation = signed_aggregate_and_proof.message().aggregate();
info!(
log,
"Successfully published attestation";
"aggregator" => signed_aggregate_and_proof.message().aggregator_index(),
"signatures" => attestation.num_set_aggregation_bits(),
"head_block" => format!("{:?}", attestation.data().beacon_block_root),
"committee_index" => attestation.committee_index(),
"slot" => attestation.data().slot.as_u64(),
"type" => "aggregated",
aggregator = signed_aggregate_and_proof.message().aggregator_index(),
signatures = attestation.num_set_aggregation_bits(),
head_block = format!("{:?}", attestation.data().beacon_block_root),
committee_index = attestation.committee_index(),
slot = attestation.data().slot.as_u64(),
"type" = "aggregated",
"Successfully published attestation"
);
}
}
@@ -658,13 +636,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
for signed_aggregate_and_proof in signed_aggregate_and_proofs {
let attestation = &signed_aggregate_and_proof.message().aggregate();
crit!(
log,
"Failed to publish attestation";
"error" => %e,
"aggregator" => signed_aggregate_and_proof.message().aggregator_index(),
"committee_index" => attestation.committee_index(),
"slot" => attestation.data().slot.as_u64(),
"type" => "aggregated",
error = %e,
aggregator = signed_aggregate_and_proof.message().aggregator_index(),
committee_index = attestation.committee_index(),
slot = attestation.data().slot.as_u64(),
"type" = "aggregated",
"Failed to publish attestation"
);
}
}
@@ -679,11 +656,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
/// Start the task at `pruning_instant` to avoid interference with other tasks.
fn spawn_slashing_protection_pruning_task(&self, slot: Slot, pruning_instant: Instant) {
let attestation_service = self.clone();
let executor = self.inner.context.executor.clone();
let current_epoch = slot.epoch(E::slots_per_epoch());
let executor = self.inner.executor.clone();
let current_epoch = slot.epoch(S::E::slots_per_epoch());
// Wait for `pruning_instant` in a regular task, and then switch to a blocking one.
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
sleep_until(pruning_instant).await;

View File

@@ -1,20 +1,21 @@
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, Error as FallbackError, Errors};
use bls::SignatureBytes;
use environment::RuntimeContext;
use eth2::types::{FullBlockContents, PublishBlockRequest};
use eth2::{BeaconNodeHttpClient, StatusCode};
use graffiti_file::{determine_graffiti, GraffitiFile};
use slog::{crit, debug, error, info, trace, warn, Logger};
use logging::crit;
use slot_clock::SlotClock;
use std::fmt::Debug;
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
use types::{
BlindedBeaconBlock, BlockType, EthSpec, Graffiti, PublicKeyBytes, SignedBlindedBeaconBlock,
Slot,
BlindedBeaconBlock, BlockType, ChainSpec, EthSpec, Graffiti, PublicKeyBytes,
SignedBlindedBeaconBlock, Slot,
};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
@@ -44,30 +45,32 @@ impl From<Errors<BlockError>> for BlockError {
/// Builds a `BlockService`.
#[derive(Default)]
pub struct BlockServiceBuilder<T, E: EthSpec> {
validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub struct BlockServiceBuilder<S, T> {
validator_store: Option<Arc<S>>,
slot_clock: Option<Arc<T>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
chain_spec: Option<Arc<ChainSpec>>,
graffiti: Option<Graffiti>,
graffiti_file: Option<GraffitiFile>,
}
impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> BlockServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
validator_store: None,
slot_clock: None,
beacon_nodes: None,
proposer_nodes: None,
context: None,
executor: None,
chain_spec: None,
graffiti: None,
graffiti_file: None,
}
}
pub fn validator_store(mut self, store: Arc<ValidatorStore<T, E>>) -> Self {
pub fn validator_store(mut self, store: Arc<S>) -> Self {
self.validator_store = Some(store);
self
}
@@ -77,18 +80,23 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn proposer_nodes(mut self, proposer_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
pub fn proposer_nodes(mut self, proposer_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.proposer_nodes = Some(proposer_nodes);
self
}
pub fn runtime_context(mut self, context: RuntimeContext<E>) -> Self {
self.context = Some(context);
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn chain_spec(mut self, chain_spec: Arc<ChainSpec>) -> Self {
self.chain_spec = Some(chain_spec);
self
}
@@ -102,7 +110,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
self
}
pub fn build(self) -> Result<BlockService<T, E>, String> {
pub fn build(self) -> Result<BlockService<S, T>, String> {
Ok(BlockService {
inner: Arc::new(Inner {
validator_store: self
@@ -114,9 +122,12 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build BlockService without beacon_node")?,
context: self
.context
.ok_or("Cannot build BlockService without runtime_context")?,
executor: self
.executor
.ok_or("Cannot build BlockService without executor")?,
chain_spec: self
.chain_spec
.ok_or("Cannot build BlockService without chain_spec")?,
proposer_nodes: self.proposer_nodes,
graffiti: self.graffiti,
graffiti_file: self.graffiti_file,
@@ -127,12 +138,12 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
// Combines a set of non-block-proposing `beacon_nodes` and only-block-proposing
// `proposer_nodes`.
pub struct ProposerFallback<T, E: EthSpec> {
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
pub struct ProposerFallback<T> {
beacon_nodes: Arc<BeaconNodeFallback<T>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T>>>,
}
impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
impl<T: SlotClock> ProposerFallback<T> {
// Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`.
pub async fn request_proposers_first<F, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
where
@@ -177,22 +188,23 @@ impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
}
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
validator_store: Arc<ValidatorStore<T, E>>,
pub struct Inner<S, T> {
validator_store: Arc<S>,
slot_clock: Arc<T>,
pub beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
pub proposer_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: RuntimeContext<E>,
pub beacon_nodes: Arc<BeaconNodeFallback<T>>,
pub proposer_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: TaskExecutor,
chain_spec: Arc<ChainSpec>,
graffiti: Option<Graffiti>,
graffiti_file: Option<GraffitiFile>,
}
/// Attempts to produce attestations for any block producer(s) at the start of the epoch.
pub struct BlockService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
pub struct BlockService<S, T> {
inner: Arc<Inner<S, T>>,
}
impl<T, E: EthSpec> Clone for BlockService<T, E> {
impl<S, T> Clone for BlockService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@@ -200,8 +212,8 @@ impl<T, E: EthSpec> Clone for BlockService<T, E> {
}
}
impl<T, E: EthSpec> Deref for BlockService<T, E> {
type Target = Inner<T, E>;
impl<S, T> Deref for BlockService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
@@ -214,23 +226,21 @@ pub struct BlockServiceNotification {
pub block_proposers: Vec<PublicKeyBytes>,
}
impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
pub fn start_update_service(
self,
mut notification_rx: mpsc::Receiver<BlockServiceNotification>,
) -> Result<(), String> {
let log = self.context.log().clone();
info!("Block production service started");
info!(log, "Block production service started");
let executor = self.inner.context.executor.clone();
let executor = self.inner.executor.clone();
executor.spawn(
async move {
while let Some(notif) = notification_rx.recv().await {
self.do_update(notif).await.ok();
}
debug!(log, "Block service shutting down");
debug!("Block service shutting down");
},
"block_service",
);
@@ -240,65 +250,55 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
/// Attempt to produce a block for any block producers in the `ValidatorStore`.
async fn do_update(&self, notification: BlockServiceNotification) -> Result<(), ()> {
let log = self.context.log();
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::BLOCK_SERVICE_TIMES,
&[validator_metrics::FULL_UPDATE],
);
let slot = self.slot_clock.now().ok_or_else(move || {
crit!(log, "Duties manager failed to read slot clock");
crit!("Duties manager failed to read slot clock");
})?;
if notification.slot != slot {
warn!(
log,
"Skipping block production for expired slot";
"current_slot" => slot.as_u64(),
"notification_slot" => notification.slot.as_u64(),
"info" => "Your machine could be overloaded"
current_slot = slot.as_u64(),
notification_slot = notification.slot.as_u64(),
info = "Your machine could be overloaded",
"Skipping block production for expired slot"
);
return Ok(());
}
if slot == self.context.eth2_config.spec.genesis_slot {
if slot == self.chain_spec.genesis_slot {
debug!(
log,
"Not producing block at genesis slot";
"proposers" => format!("{:?}", notification.block_proposers),
proposers = format!("{:?}", notification.block_proposers),
"Not producing block at genesis slot"
);
return Ok(());
}
trace!(
log,
"Block service update started";
"slot" => slot.as_u64()
);
trace!(slot = slot.as_u64(), "Block service update started");
let proposers = notification.block_proposers;
if proposers.is_empty() {
trace!(
log,
"No local block proposers for this slot";
"slot" => slot.as_u64()
slot = slot.as_u64(),
"No local block proposers for this slot"
)
} else if proposers.len() > 1 {
error!(
log,
"Multiple block proposers for this slot";
"action" => "producing blocks for all proposers",
"num_proposers" => proposers.len(),
"slot" => slot.as_u64(),
action = "producing blocks for all proposers",
num_proposers = proposers.len(),
slot = slot.as_u64(),
"Multiple block proposers for this slot"
)
}
for validator_pubkey in proposers {
let builder_boost_factor = self.get_builder_boost_factor(&validator_pubkey);
let service = self.clone();
let log = log.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
let result = service
.publish_block(slot, validator_pubkey, builder_boost_factor)
@@ -308,11 +308,10 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
Ok(_) => {}
Err(BlockError::Recoverable(e)) | Err(BlockError::Irrecoverable(e)) => {
error!(
log,
"Error whilst producing block";
"error" => ?e,
"block_slot" => ?slot,
"info" => "block v3 proposal failed, this error may or may not result in a missed block"
error = ?e,
block_slot = ?slot,
info = "block v3 proposal failed, this error may or may not result in a missed block",
"Error whilst producing block"
);
}
}
@@ -326,42 +325,45 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
#[allow(clippy::too_many_arguments)]
async fn sign_and_publish_block(
&self,
proposer_fallback: ProposerFallback<T, E>,
proposer_fallback: ProposerFallback<T>,
slot: Slot,
graffiti: Option<Graffiti>,
validator_pubkey: &PublicKeyBytes,
unsigned_block: UnsignedBlock<E>,
unsigned_block: UnsignedBlock<S::E>,
) -> Result<(), BlockError> {
let log = self.context.log();
let signing_timer = validator_metrics::start_timer(&validator_metrics::BLOCK_SIGNING_TIMES);
let res = match unsigned_block {
let (block, maybe_blobs) = match unsigned_block {
UnsignedBlock::Full(block_contents) => {
let (block, maybe_blobs) = block_contents.deconstruct();
self.validator_store
.sign_block(*validator_pubkey, block, slot)
.await
.map(|b| SignedBlock::Full(PublishBlockRequest::new(Arc::new(b), maybe_blobs)))
(block.into(), maybe_blobs)
}
UnsignedBlock::Blinded(block) => self
.validator_store
.sign_block(*validator_pubkey, block, slot)
.await
.map(Arc::new)
.map(SignedBlock::Blinded),
UnsignedBlock::Blinded(block) => (block.into(), None),
};
let res = self
.validator_store
.sign_block(*validator_pubkey, block, slot)
.await
.map(|block| match block {
validator_store::SignedBlock::Full(block) => {
SignedBlock::Full(PublishBlockRequest::new(Arc::new(block), maybe_blobs))
}
validator_store::SignedBlock::Blinded(block) => {
SignedBlock::Blinded(Arc::new(block))
}
});
let signed_block = match res {
Ok(block) => block,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently removed
// via the API.
warn!(
log,
"Missing pubkey for block";
"info" => "a validator may have recently been removed from this VC",
"pubkey" => ?pubkey,
"slot" => ?slot
info = "a validator may have recently been removed from this VC",
?pubkey,
?slot,
"Missing pubkey for block"
);
return Ok(());
}
@@ -377,10 +379,9 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
info!(
log,
"Publishing signed block";
"slot" => slot.as_u64(),
"signing_time_ms" => signing_time_ms,
slot = slot.as_u64(),
signing_time_ms = signing_time_ms,
"Publishing signed block"
);
// Publish block with first available beacon node.
@@ -396,13 +397,12 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
.await?;
info!(
log,
"Successfully published block";
"block_type" => ?signed_block.block_type(),
"deposits" => signed_block.num_deposits(),
"attestations" => signed_block.num_attestations(),
"graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()),
"slot" => signed_block.slot().as_u64(),
block_type = ?signed_block.block_type(),
deposits = signed_block.num_deposits(),
attestations = signed_block.num_attestations(),
graffiti = ?graffiti.map(|g| g.as_utf8_lossy()),
slot = signed_block.slot().as_u64(),
"Successfully published block"
);
Ok(())
}
@@ -413,7 +413,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
validator_pubkey: PublicKeyBytes,
builder_boost_factor: Option<u64>,
) -> Result<(), BlockError> {
let log = self.context.log();
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::BLOCK_SERVICE_TIMES,
&[validator_metrics::BEACON_BLOCK],
@@ -421,7 +420,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let randao_reveal = match self
.validator_store
.randao_reveal(validator_pubkey, slot.epoch(E::slots_per_epoch()))
.randao_reveal(validator_pubkey, slot.epoch(S::E::slots_per_epoch()))
.await
{
Ok(signature) => signature.into(),
@@ -429,11 +428,10 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// A pubkey can be missing when a validator was recently removed
// via the API.
warn!(
log,
"Missing pubkey for block randao";
"info" => "a validator may have recently been removed from this VC",
"pubkey" => ?pubkey,
"slot" => ?slot
info = "a validator may have recently been removed from this VC",
?pubkey,
?slot,
"Missing pubkey for block randao"
);
return Ok(());
}
@@ -447,7 +445,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let graffiti = determine_graffiti(
&validator_pubkey,
log,
self.graffiti_file.clone(),
self.validator_store.graffiti(&validator_pubkey),
self.graffiti,
@@ -461,11 +458,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
proposer_nodes: self.proposer_nodes.clone(),
};
info!(
log,
"Requesting unsigned block";
"slot" => slot.as_u64(),
);
info!(slot = slot.as_u64(), "Requesting unsigned block");
// Request block from first responsive beacon node.
//
@@ -484,7 +477,6 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
graffiti,
proposer_index,
builder_boost_factor,
log,
)
.await
.map_err(|e| {
@@ -511,10 +503,9 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
async fn publish_signed_block_contents(
&self,
signed_block: &SignedBlock<E>,
signed_block: &SignedBlock<S::E>,
beacon_node: BeaconNodeHttpClient,
) -> Result<(), BlockError> {
let log = self.context.log();
let slot = signed_block.slot();
match signed_block {
SignedBlock::Full(signed_block) => {
@@ -525,7 +516,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
beacon_node
.post_beacon_blocks_v2_ssz(signed_block, None)
.await
.or_else(|e| handle_block_post_error(e, slot, log))?
.or_else(|e| handle_block_post_error(e, slot))?
}
SignedBlock::Blinded(signed_block) => {
let _post_timer = validator_metrics::start_timer_vec(
@@ -535,7 +526,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
beacon_node
.post_beacon_blinded_blocks_v2_ssz(signed_block, None)
.await
.or_else(|e| handle_block_post_error(e, slot, log))?
.or_else(|e| handle_block_post_error(e, slot))?
}
}
Ok::<_, BlockError>(())
@@ -548,10 +539,9 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
graffiti: Option<Graffiti>,
proposer_index: Option<u64>,
builder_boost_factor: Option<u64>,
log: &Logger,
) -> Result<UnsignedBlock<E>, BlockError> {
) -> Result<UnsignedBlock<S::E>, BlockError> {
let (block_response, _) = beacon_node
.get_validator_blocks_v3::<E>(
.get_validator_blocks_v3::<S::E>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
@@ -570,11 +560,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
eth2::types::ProduceBlockV3Response::Blinded(block) => UnsignedBlock::Blinded(block),
};
info!(
log,
"Received unsigned block";
"slot" => slot.as_u64(),
);
info!(slot = slot.as_u64(), "Received unsigned block");
if proposer_index != Some(unsigned_block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged".to_string(),
@@ -593,15 +579,9 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Apply per validator configuration first.
let validator_builder_boost_factor = self
.validator_store
.determine_validator_builder_boost_factor(validator_pubkey);
.determine_builder_boost_factor(validator_pubkey);
// Fallback to process-wide configuration if needed.
let maybe_builder_boost_factor = validator_builder_boost_factor.or_else(|| {
self.validator_store
.determine_default_builder_boost_factor()
});
if let Some(builder_boost_factor) = maybe_builder_boost_factor {
if let Some(builder_boost_factor) = validator_builder_boost_factor {
// if builder boost factor is set to 100 it should be treated
// as None to prevent unnecessary calculations that could
// lead to loss of information.
@@ -662,23 +642,21 @@ impl<E: EthSpec> SignedBlock<E> {
}
}
fn handle_block_post_error(err: eth2::Error, slot: Slot, log: &Logger) -> Result<(), BlockError> {
fn handle_block_post_error(err: eth2::Error, slot: Slot) -> Result<(), BlockError> {
// Handle non-200 success codes.
if let Some(status) = err.status() {
if status == StatusCode::ACCEPTED {
info!(
log,
"Block is already known to BN or might be invalid";
"slot" => slot,
"status_code" => status.as_u16(),
%slot,
status_code = status.as_u16(),
"Block is already known to BN or might be invalid"
);
return Ok(());
} else if status.is_success() {
debug!(
log,
"Block published with non-standard success code";
"slot" => slot,
"status_code" => status.as_u16(),
%slot,
status_code = status.as_u16(),
"Block published with non-standard success code"
);
return Ok(());
}

View File

@@ -10,25 +10,24 @@ use crate::block_service::BlockServiceNotification;
use crate::sync::poll_sync_committee_duties;
use crate::sync::SyncDutiesMap;
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use doppelganger_service::DoppelgangerStatus;
use environment::RuntimeContext;
use eth2::types::{
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
};
use futures::{stream, StreamExt};
use parking_lot::RwLock;
use safe_arith::{ArithError, SafeArith};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::{sync::mpsc::Sender, time::sleep};
use tracing::{debug, error, info, warn};
use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot};
use validator_metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
@@ -87,16 +86,16 @@ const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBS
// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
#[derive(Debug)]
pub enum Error {
pub enum Error<T> {
UnableToReadSlotClock,
FailedToDownloadAttesters(#[allow(dead_code)] String),
FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError),
FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError<T>),
InvalidModulo(#[allow(dead_code)] ArithError),
Arith(#[allow(dead_code)] ArithError),
SyncDutiesNotFound(#[allow(dead_code)] u64),
}
impl From<ArithError> for Error {
impl<T> From<ArithError> for Error<T> {
fn from(e: ArithError) -> Self {
Self::Arith(e)
}
@@ -125,11 +124,11 @@ pub struct SubscriptionSlots {
/// Create a selection proof for `duty`.
///
/// Return `Ok(None)` if the attesting validator is not an aggregator.
async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
async fn make_selection_proof<S: ValidatorStore + 'static>(
duty: &AttesterData,
validator_store: &ValidatorStore<T, E>,
validator_store: &S,
spec: &ChainSpec,
) -> Result<Option<SelectionProof>, Error> {
) -> Result<Option<SelectionProof>, Error<S::Error>> {
let selection_proof = validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
@@ -205,25 +204,124 @@ type DependentRoot = Hash256;
type AttesterMap = HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, DutyAndProof)>>;
type ProposerMap = HashMap<Epoch, (DependentRoot, Vec<ProposerData>)>;
pub struct DutiesServiceBuilder<S, T> {
/// Provides the canonical list of locally-managed validators.
validator_store: Option<Arc<S>>,
/// Tracks the current slot.
slot_clock: Option<T>,
/// Provides HTTP access to remote beacon nodes.
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
/// The runtime for spawning tasks.
executor: Option<TaskExecutor>,
/// The current chain spec.
spec: Option<Arc<ChainSpec>>,
//// Whether we permit large validator counts in the metrics.
enable_high_validator_count_metrics: bool,
/// If this validator is running in distributed mode.
distributed: bool,
}
impl<S, T> Default for DutiesServiceBuilder<S, T> {
fn default() -> Self {
Self::new()
}
}
impl<S, T> DutiesServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
validator_store: None,
slot_clock: None,
beacon_nodes: None,
executor: None,
spec: None,
enable_high_validator_count_metrics: false,
distributed: false,
}
}
pub fn validator_store(mut self, validator_store: Arc<S>) -> Self {
self.validator_store = Some(validator_store);
self
}
pub fn slot_clock(mut self, slot_clock: T) -> Self {
self.slot_clock = Some(slot_clock);
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn spec(mut self, spec: Arc<ChainSpec>) -> Self {
self.spec = Some(spec);
self
}
pub fn enable_high_validator_count_metrics(
mut self,
enable_high_validator_count_metrics: bool,
) -> Self {
self.enable_high_validator_count_metrics = enable_high_validator_count_metrics;
self
}
pub fn distributed(mut self, distributed: bool) -> Self {
self.distributed = distributed;
self
}
pub fn build(self) -> Result<DutiesService<S, T>, String> {
Ok(DutiesService {
attesters: Default::default(),
proposers: Default::default(),
sync_duties: SyncDutiesMap::new(self.distributed),
validator_store: self
.validator_store
.ok_or("Cannot build DutiesService without validator_store")?,
unknown_validator_next_poll_slots: Default::default(),
slot_clock: self
.slot_clock
.ok_or("Cannot build DutiesService without slot_clock")?,
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build DutiesService without beacon_nodes")?,
executor: self
.executor
.ok_or("Cannot build DutiesService without executor")?,
spec: self.spec.ok_or("Cannot build DutiesService without spec")?,
enable_high_validator_count_metrics: self.enable_high_validator_count_metrics,
distributed: self.distributed,
})
}
}
/// See the module-level documentation.
pub struct DutiesService<T, E: EthSpec> {
pub struct DutiesService<S, T> {
/// Maps a validator public key to their duties for each epoch.
pub attesters: RwLock<AttesterMap>,
/// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain
/// proposals for any validators which are not registered locally.
pub proposers: RwLock<ProposerMap>,
/// Map from validator index to sync committee duties.
pub sync_duties: SyncDutiesMap<E>,
pub sync_duties: SyncDutiesMap,
/// Provides the canonical list of locally-managed validators.
pub validator_store: Arc<ValidatorStore<T, E>>,
pub validator_store: Arc<S>,
/// Maps unknown validator pubkeys to the next slot time when a poll should be conducted again.
pub unknown_validator_next_poll_slots: RwLock<HashMap<PublicKeyBytes, Slot>>,
/// Tracks the current slot.
pub slot_clock: T,
/// Provides HTTP access to remote beacon nodes.
pub beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
pub beacon_nodes: Arc<BeaconNodeFallback<T>>,
/// The runtime for spawning tasks.
pub context: RuntimeContext<E>,
pub executor: TaskExecutor,
/// The current chain spec.
pub spec: Arc<ChainSpec>,
//// Whether we permit large validator counts in the metrics.
@@ -232,7 +330,7 @@ pub struct DutiesService<T, E: EthSpec> {
pub distributed: bool,
}
impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
/// Returns the total number of validators known to the duties service.
pub fn total_validator_count(&self) -> usize {
self.validator_store.num_voting_validators()
@@ -282,7 +380,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
///
/// It is possible that multiple validators have an identical proposal slot, however that is
/// likely the result of heavy forking (lol) or inconsistent beacon node connections.
pub fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
pub fn block_proposers<E: EthSpec>(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
let epoch = slot.epoch(E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
@@ -308,7 +406,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// Returns all `ValidatorDuty` for the given `slot`.
pub fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> {
let epoch = slot.epoch(E::slots_per_epoch());
let epoch = slot.epoch(S::E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
@@ -346,15 +444,15 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// process every slot, which has the chance of creating a theoretically unlimited backlog of tasks.
/// It was a conscious decision to choose to drop tasks on an overloaded/latent system rather than
/// overload it even more.
pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
core_duties_service: Arc<DutiesService<T, E>>,
pub fn start_update_service<S: ValidatorStore + 'static, T: SlotClock + 'static>(
core_duties_service: Arc<DutiesService<S, T>>,
mut block_service_tx: Sender<BlockServiceNotification>,
) {
/*
* Spawn the task which updates the map of pubkey to validator index.
*/
let duties_service = core_duties_service.clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
// Run this poll before the wait, this should hopefully download all the indices
@@ -377,8 +475,7 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
* Spawn the task which keeps track of local block proposal duties.
*/
let duties_service = core_duties_service.clone();
let log = core_duties_service.context.log().clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
@@ -393,9 +490,8 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
if let Err(e) = poll_beacon_proposers(&duties_service, &mut block_service_tx).await
{
error!(
log,
"Failed to poll beacon proposers";
"error" => ?e
error = ?e,
"Failed to poll beacon proposers"
)
}
}
@@ -407,8 +503,7 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
* Spawn the task which keeps track of local attestation duties.
*/
let duties_service = core_duties_service.clone();
let log = core_duties_service.context.log().clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
@@ -422,9 +517,8 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
if let Err(e) = poll_beacon_attesters(&duties_service).await {
error!(
log,
"Failed to poll beacon attesters";
"error" => ?e
error = ?e,
"Failed to poll beacon attesters"
);
}
}
@@ -434,15 +528,13 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
// Spawn the task which keeps track of local sync committee duties.
let duties_service = core_duties_service.clone();
let log = core_duties_service.context.log().clone();
core_duties_service.context.executor.spawn(
core_duties_service.executor.spawn(
async move {
loop {
if let Err(e) = poll_sync_committee_duties(&duties_service).await {
error!(
log,
"Failed to poll sync committee duties";
"error" => ?e
error = ?e,
"Failed to poll sync committee duties"
);
}
@@ -466,16 +558,14 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
/// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown
/// validator indices.
async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
async fn poll_validator_indices<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
) {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_INDICES],
);
let log = duties_service.context.log();
// Collect *all* pubkeys for resolving indices, even those undergoing doppelganger protection.
//
// Since doppelganger protection queries rely on validator indices it is important to ensure we
@@ -488,16 +578,14 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
// This is on its own line to avoid some weirdness with locks and if statements.
let is_known = duties_service
.validator_store
.initialized_validators()
.read()
.get_index(&pubkey)
.validator_index(&pubkey)
.is_some();
if !is_known {
let current_slot_opt = duties_service.slot_clock.now();
if let Some(current_slot) = current_slot_opt {
let is_first_slot_of_epoch = current_slot % E::slots_per_epoch() == 0;
let is_first_slot_of_epoch = current_slot % S::E::slots_per_epoch() == 0;
// Query an unknown validator later if it was queried within the last epoch, or if
// the current slot is the first slot of an epoch.
@@ -541,17 +629,14 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
match download_result {
Ok(Some(response)) => {
info!(
log,
"Validator exists in beacon chain";
"pubkey" => ?pubkey,
"validator_index" => response.data.index,
"fee_recipient" => fee_recipient
?pubkey,
validator_index = response.data.index,
fee_recipient,
"Validator exists in beacon chain"
);
duties_service
.validator_store
.initialized_validators()
.write()
.set_index(&pubkey, response.data.index);
.set_validator_index(&pubkey, response.data.index);
duties_service
.unknown_validator_next_poll_slots
@@ -562,28 +647,22 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
// the beacon chain.
Ok(None) => {
if let Some(current_slot) = current_slot_opt {
let next_poll_slot = current_slot.saturating_add(E::slots_per_epoch());
let next_poll_slot = current_slot.saturating_add(S::E::slots_per_epoch());
duties_service
.unknown_validator_next_poll_slots
.write()
.insert(pubkey, next_poll_slot);
}
debug!(
log,
"Validator without index";
"pubkey" => ?pubkey,
"fee_recipient" => fee_recipient
)
debug!(?pubkey, fee_recipient, "Validator without index")
}
// Don't exit early on an error, keep attempting to resolve other indices.
Err(e) => {
error!(
log,
"Failed to resolve pubkey to index";
"error" => %e,
"pubkey" => ?pubkey,
"fee_recipient" => fee_recipient
error = %e,
?pubkey,
fee_recipient,
"Failed to resolve pubkey to index"
)
}
}
@@ -599,21 +678,19 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
/// 2. As above, but for the next-epoch.
/// 3. Push out any attestation subnet subscriptions to the BN.
/// 4. Prune old entries from `duties_service.attesters`.
async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
async fn poll_beacon_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
) -> Result<(), Error<S::Error>> {
let current_epoch_timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_ATTESTERS_CURRENT_EPOCH],
);
let log = duties_service.context.log();
let current_slot = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
let next_epoch = current_epoch + 1;
// Collect *all* pubkeys, even those undergoing doppelganger protection.
@@ -627,10 +704,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys {
if let Some(validator_index) = vals.get_index(&pubkey) {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
local_indices.push(validator_index)
}
}
@@ -647,15 +722,14 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.await
{
error!(
log,
"Failed to download attester duties";
"current_epoch" => current_epoch,
"request_epoch" => current_epoch,
"err" => ?e,
%current_epoch,
request_epoch = %current_epoch,
err = ?e,
"Failed to download attester duties"
)
}
update_per_validator_duty_metrics::<T, E>(duties_service, current_epoch, current_slot);
update_per_validator_duty_metrics(duties_service, current_epoch, current_slot);
drop(current_epoch_timer);
let next_epoch_timer = validator_metrics::start_timer_vec(
@@ -669,15 +743,14 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.await
{
error!(
log,
"Failed to download attester duties";
"current_epoch" => current_epoch,
"request_epoch" => next_epoch,
"err" => ?e,
%current_epoch,
request_epoch = %next_epoch,
err = ?e,
"Failed to download attester duties"
)
}
update_per_validator_duty_metrics::<T, E>(duties_service, next_epoch, current_slot);
update_per_validator_duty_metrics(duties_service, next_epoch, current_slot);
drop(next_epoch_timer);
let subscriptions_timer = validator_metrics::start_timer_vec(
@@ -698,7 +771,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
* std::cmp::max(
1,
local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len()
/ E::slots_per_epoch() as usize,
/ S::E::slots_per_epoch() as usize,
)
/ overallocation_denominator;
let mut subscriptions = Vec::with_capacity(num_expected_subscriptions);
@@ -752,9 +825,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.await;
if subscription_result.as_ref().is_ok() {
debug!(
log,
"Broadcast attestation subscriptions";
"count" => subscriptions.len(),
count = subscriptions.len(),
"Broadcast attestation subscriptions"
);
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
@@ -762,9 +834,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
} else if let Err(e) = subscription_result {
if e.num_errors() < duties_service.beacon_nodes.num_total().await {
warn!(
log,
"Some subscriptions failed";
"error" => %e,
error = %e,
"Some subscriptions failed"
);
// If subscriptions were sent to at least one node, regard that as a success.
// There is some redundancy built into the subscription schedule to handle failures.
@@ -773,9 +844,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
}
} else {
error!(
log,
"All subscriptions failed";
"error" => %e
error = %e,
"All subscriptions failed"
);
}
}
@@ -797,20 +867,17 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
/// For the given `local_indices` and `local_pubkeys`, download the duties for the given `epoch` and
/// store them in `duties_service.attesters`.
async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Result<(), Error> {
let log = duties_service.context.log();
) -> Result<(), Error<S::Error>> {
// No need to bother the BN if we don't have any validators.
if local_indices.is_empty() {
debug!(
duties_service.context.log(),
"No validators, not downloading duties";
"epoch" => epoch,
%epoch,
"No validators, not downloading duties"
);
return Ok(());
}
@@ -889,10 +956,9 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
);
debug!(
log,
"Downloaded attester duties";
"dependent_root" => %dependent_root,
"num_new_duties" => new_duties.len(),
%dependent_root,
num_new_duties = new_duties.len(),
"Downloaded attester duties"
);
// Update the duties service with the new `DutyAndProof` messages.
@@ -923,10 +989,9 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
&& prior_duty_and_proof.duty == duty_and_proof.duty
{
warn!(
log,
"Redundant attester duty update";
"dependent_root" => %dependent_root,
"validator_index" => duty.validator_index,
%dependent_root,
validator_index = duty.validator_index,
"Redundant attester duty update"
);
continue;
}
@@ -934,11 +999,10 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// Using `already_warned` avoids excessive logs.
if dependent_root != *prior_dependent_root && already_warned.take().is_some() {
warn!(
log,
"Attester duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"note" => "this may happen from time to time"
%prior_dependent_root,
%dependent_root,
note = "this may happen from time to time",
"Attester duties re-org"
)
}
*mut_value = (dependent_root, duty_and_proof);
@@ -952,7 +1016,7 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// Spawn the background task to compute selection proofs.
let subservice = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_selection_proofs(subservice, new_duties, dependent_root).await;
},
@@ -963,8 +1027,8 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
}
/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
fn get_uninitialized_validators<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
@@ -980,8 +1044,8 @@ fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
.collect::<Vec<_>>()
}
fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
fn update_per_validator_duty_metrics<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
current_slot: Slot,
) {
@@ -996,14 +1060,14 @@ fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());
let existing_epoch = existing_slot.epoch(S::E::slots_per_epoch());
// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
|| (duty_slot.epoch(S::E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
@@ -1021,11 +1085,11 @@ fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
}
}
async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
async fn post_validator_duties_attester<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
) -> Result<DutiesResponse<Vec<AttesterData>>, Error<S::Error>> {
duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
@@ -1045,13 +1109,11 @@ async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
/// terminate early as it is assumed the selection proofs from `duties` are no longer relevant.
async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
async fn fill_in_selection_proofs<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: Arc<DutiesService<S, T>>,
duties: Vec<AttesterData>,
dependent_root: Hash256,
) {
let log = duties_service.context.log();
// Sort duties by slot in a BTreeMap.
let mut duties_by_slot: BTreeMap<Slot, Vec<_>> = BTreeMap::new();
@@ -1099,7 +1161,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
.then(|duty| async {
let opt_selection_proof = make_selection_proof(
&duty,
&duties_service.validator_store,
duties_service.validator_store.as_ref(),
&duties_service.spec,
)
.await?;
@@ -1119,20 +1181,18 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// A pubkey can be missing when a validator was recently
// removed via the API.
warn!(
log,
"Missing pubkey for duty and proof";
"info" => "a validator may have recently been removed from this VC",
"pubkey" => ?pubkey,
info = "a validator may have recently been removed from this VC",
?pubkey,
"Missing pubkey for duty and proof"
);
// Do not abort the entire batch for a single failure.
continue;
}
Err(e) => {
error!(
log,
"Failed to produce duty and proof";
"error" => ?e,
"msg" => "may impair attestation duties"
error = ?e,
msg = "may impair attestation duties",
"Failed to produce duty and proof"
);
// Do not abort the entire batch for a single failure.
continue;
@@ -1140,7 +1200,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
};
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(E::slots_per_epoch());
let epoch = duty.slot.epoch(S::E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
@@ -1157,9 +1217,8 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// Our selection proofs are no longer relevant due to a reorg, abandon
// this entire background process.
debug!(
log,
"Stopping selection proof background task";
"reason" => "re-org"
reason = "re-org",
"Stopping selection proof background task"
);
return;
}
@@ -1182,11 +1241,10 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
let time_taken_ms =
Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
debug!(
log,
"Computed attestation selection proofs";
"batch_size" => batch_size,
"lookahead_slot" => lookahead_slot,
"time_taken_ms" => time_taken_ms
batch_size,
%lookahead_slot,
time_taken_ms,
"Computed attestation selection proofs"
);
} else {
// Just sleep for one slot if we are unable to read the system clock, this gives
@@ -1219,33 +1277,30 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
/// through the slow path every time. I.e., the proposal will only happen after we've been able to
/// download and process the duties from the BN. This means it is very important to ensure this
/// function is as fast as possible.
async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
block_service_tx: &mut Sender<BlockServiceNotification>,
) -> Result<(), Error> {
) -> Result<(), Error<S::Error>> {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_PROPOSERS],
);
let log = duties_service.context.log();
let current_slot = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
// Notify the block proposal service for any proposals that we have in our cache.
//
// See the function-level documentation for more information.
let initial_block_proposers = duties_service.block_proposers(current_slot);
notify_block_production_service(
let initial_block_proposers = duties_service.block_proposers::<S::E>(current_slot);
notify_block_production_service::<S>(
current_slot,
&initial_block_proposers,
block_service_tx,
&duties_service.validator_store,
log,
duties_service.validator_store.as_ref(),
)
.await;
@@ -1284,10 +1339,9 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
.collect::<Vec<_>>();
debug!(
log,
"Downloaded proposer duties";
"dependent_root" => %dependent_root,
"num_relevant_duties" => relevant_duties.len(),
%dependent_root,
num_relevant_duties = relevant_duties.len(),
"Downloaded proposer duties"
);
if let Some((prior_dependent_root, _)) = duties_service
@@ -1297,20 +1351,18 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
{
if dependent_root != prior_dependent_root {
warn!(
log,
"Proposer duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"msg" => "this may happen from time to time"
%prior_dependent_root,
%dependent_root,
msg = "this may happen from time to time",
"Proposer duties re-org"
)
}
}
}
// Don't return early here, we still want to try and produce blocks using the cached values.
Err(e) => error!(
log,
"Failed to download proposer duties";
"err" => %e,
err = %e,
"Failed to download proposer duties"
),
}
@@ -1320,7 +1372,7 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
// Then, compute the difference between these two sets to obtain a set of block proposers
// which were not included in the initial notification to the `BlockService`.
let additional_block_producers = duties_service
.block_proposers(current_slot)
.block_proposers::<S::E>(current_slot)
.difference(&initial_block_proposers)
.copied()
.collect::<HashSet<PublicKeyBytes>>();
@@ -1330,18 +1382,16 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
//
// See the function-level documentation for more reasoning about this behaviour.
if !additional_block_producers.is_empty() {
notify_block_production_service(
notify_block_production_service::<S>(
current_slot,
&additional_block_producers,
block_service_tx,
&duties_service.validator_store,
log,
duties_service.validator_store.as_ref(),
)
.await;
debug!(
log,
"Detected new block proposer";
"current_slot" => current_slot,
%current_slot,
"Detected new block proposer"
);
validator_metrics::inc_counter(&validator_metrics::PROPOSAL_CHANGED);
}
@@ -1357,12 +1407,11 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
}
/// Notify the block service if it should produce a block.
async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
async fn notify_block_production_service<S: ValidatorStore>(
current_slot: Slot,
block_proposers: &HashSet<PublicKeyBytes>,
block_service_tx: &mut Sender<BlockServiceNotification>,
validator_store: &ValidatorStore<T, E>,
log: &Logger,
validator_store: &S,
) {
let non_doppelganger_proposers = block_proposers
.iter()
@@ -1379,10 +1428,9 @@ async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
.await
{
error!(
log,
"Failed to notify block service";
"current_slot" => current_slot,
"error" => %e
%current_slot,
error = %e,
"Failed to notify block service"
);
};
}

View File

@@ -1,21 +1,22 @@
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use bls::PublicKeyBytes;
use doppelganger_service::DoppelgangerStatus;
use environment::RuntimeContext;
use parking_lot::RwLock;
use slog::{debug, error, info, warn};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use task_executor::TaskExecutor;
use tokio::time::{sleep, Duration};
use tracing::{debug, error, info, warn};
use types::{
Address, ChainSpec, EthSpec, ProposerPreparationData, SignedValidatorRegistrationData,
ValidatorRegistrationData,
};
use validator_store::{Error as ValidatorStoreError, ProposalData, ValidatorStore};
use validator_store::{
DoppelgangerStatus, Error as ValidatorStoreError, ProposalData, ValidatorStore,
};
/// Number of epochs before the Bellatrix hard fork to begin posting proposer preparations.
const PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS: u64 = 2;
@@ -25,28 +26,28 @@ const EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION: u64 = 1;
/// Builds an `PreparationService`.
#[derive(Default)]
pub struct PreparationServiceBuilder<T: SlotClock + 'static, E: EthSpec> {
validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub struct PreparationServiceBuilder<S: ValidatorStore, T: SlotClock + 'static> {
validator_store: Option<Arc<S>>,
slot_clock: Option<T>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
builder_registration_timestamp_override: Option<u64>,
validator_registration_batch_size: Option<usize>,
}
impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> PreparationServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
validator_store: None,
slot_clock: None,
beacon_nodes: None,
context: None,
executor: None,
builder_registration_timestamp_override: None,
validator_registration_batch_size: None,
}
}
pub fn validator_store(mut self, store: Arc<ValidatorStore<T, E>>) -> Self {
pub fn validator_store(mut self, store: Arc<S>) -> Self {
self.validator_store = Some(store);
self
}
@@ -56,13 +57,13 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn runtime_context(mut self, context: RuntimeContext<E>) -> Self {
self.context = Some(context);
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
@@ -82,7 +83,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
self
}
pub fn build(self) -> Result<PreparationService<T, E>, String> {
pub fn build(self) -> Result<PreparationService<S, T>, String> {
Ok(PreparationService {
inner: Arc::new(Inner {
validator_store: self
@@ -94,9 +95,9 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build PreparationService without beacon_nodes")?,
context: self
.context
.ok_or("Cannot build PreparationService without runtime_context")?,
executor: self
.executor
.ok_or("Cannot build PreparationService without executor")?,
builder_registration_timestamp_override: self
.builder_registration_timestamp_override,
validator_registration_batch_size: self.validator_registration_batch_size.ok_or(
@@ -109,11 +110,11 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationServiceBuilder<T, E> {
}
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
validator_store: Arc<ValidatorStore<T, E>>,
pub struct Inner<S, T> {
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
builder_registration_timestamp_override: Option<u64>,
// Used to track unpublished validator registration changes.
validator_registration_cache:
@@ -145,11 +146,11 @@ impl From<ValidatorRegistrationData> for ValidatorRegistrationKey {
}
/// Attempts to produce proposer preparations for all known validators at the beginning of each epoch.
pub struct PreparationService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
pub struct PreparationService<S, T> {
inner: Arc<Inner<S, T>>,
}
impl<T, E: EthSpec> Clone for PreparationService<T, E> {
impl<S, T> Clone for PreparationService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@@ -157,15 +158,15 @@ impl<T, E: EthSpec> Clone for PreparationService<T, E> {
}
}
impl<T, E: EthSpec> Deref for PreparationService<T, E> {
type Target = Inner<T, E>;
impl<S, T> Deref for PreparationService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S, T> {
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
self.clone().start_validator_registration_service(spec)?;
self.start_proposer_prepare_service(spec)
@@ -173,15 +174,10 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
/// Starts the service which periodically produces proposer preparations.
pub fn start_proposer_prepare_service(self, spec: &ChainSpec) -> Result<(), String> {
let log = self.context.log().clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
info!(
log,
"Proposer preparation service started";
);
info!("Proposer preparation service started");
let executor = self.context.executor.clone();
let executor = self.executor.clone();
let spec = spec.clone();
let interval_fut = async move {
@@ -192,9 +188,8 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
.await
.map_err(|e| {
error!(
log,
"Error during proposer preparation";
"error" => ?e,
error = ?e,
"Error during proposer preparation"
)
})
.unwrap_or(());
@@ -203,7 +198,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot).await;
} else {
error!(log, "Failed to read slot clock");
error!("Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
}
@@ -216,30 +211,25 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
/// Starts the service which periodically sends connected beacon nodes validator registration information.
pub fn start_validator_registration_service(self, spec: &ChainSpec) -> Result<(), String> {
let log = self.context.log().clone();
info!(
log,
"Validator registration service started";
);
info!("Validator registration service started");
let spec = spec.clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
let executor = self.context.executor.clone();
let executor = self.executor.clone();
let validator_registration_fut = async move {
loop {
// Poll the endpoint immediately to ensure fee recipients are received.
if let Err(e) = self.register_validators().await {
error!(log,"Error during validator registration";"error" => ?e);
error!(error = ?e,"Error during validator registration");
}
// Wait one slot if the register validator request fails or if we should not publish at the current slot.
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot).await;
} else {
error!(log, "Failed to read slot clock");
error!("Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
}
@@ -254,10 +244,9 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
/// This avoids spamming the BN with preparations before the Bellatrix fork epoch, which may
/// cause errors if it doesn't support the preparation API.
fn should_publish_at_current_slot(&self, spec: &ChainSpec) -> bool {
let current_epoch = self
.slot_clock
.now()
.map_or(E::genesis_epoch(), |slot| slot.epoch(E::slots_per_epoch()));
let current_epoch = self.slot_clock.now().map_or(S::E::genesis_epoch(), |slot| {
slot.epoch(S::E::slots_per_epoch())
});
spec.bellatrix_fork_epoch.map_or(false, |fork_epoch| {
current_epoch + PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS >= fork_epoch
})
@@ -274,7 +263,6 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
}
fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec<ProposerPreparationData> {
let log = self.context.log();
self.collect_proposal_data(|pubkey, proposal_data| {
if let Some(fee_recipient) = proposal_data.fee_recipient {
Some(ProposerPreparationData {
@@ -285,10 +273,9 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
} else {
if spec.bellatrix_fork_epoch.is_some() {
error!(
log,
"Validator is missing fee recipient";
"msg" => "update validator_definitions.yml",
"pubkey" => ?pubkey
msg = "update validator_definitions.yml",
?pubkey,
"Validator is missing fee recipient"
);
}
None
@@ -336,8 +323,6 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
&self,
preparation_data: Vec<ProposerPreparationData>,
) -> Result<(), String> {
let log = self.context.log();
// Post the proposer preparations to the BN.
let preparation_data_len = preparation_data.len();
let preparation_entries = preparation_data.as_slice();
@@ -351,14 +336,12 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
.await
{
Ok(()) => debug!(
log,
"Published proposer preparation";
"count" => preparation_data_len,
count = preparation_data_len,
"Published proposer preparation"
),
Err(e) => error!(
log,
"Unable to publish proposer preparation to all beacon nodes";
"error" => %e,
error = %e,
"Unable to publish proposer preparation to all beacon nodes"
),
}
Ok(())
@@ -384,7 +367,8 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
// Check if any have changed or it's been `EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION`.
if let Some(slot) = self.slot_clock.now() {
if slot % (E::slots_per_epoch() * EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION) == 0 {
if slot % (S::E::slots_per_epoch() * EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION) == 0
{
self.publish_validator_registration_data(registration_keys)
.await?;
} else if !changed_keys.is_empty() {
@@ -400,8 +384,6 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
&self,
registration_keys: Vec<ValidatorRegistrationKey>,
) -> Result<(), String> {
let log = self.context.log();
let registration_data_len = registration_keys.len();
let mut signed = Vec::with_capacity(registration_data_len);
@@ -442,19 +424,14 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for registration data";
"pubkey" => ?pubkey,
);
debug!(?pubkey, "Missing pubkey for registration data");
continue;
}
Err(e) => {
error!(
log,
"Unable to sign validator registration data";
"error" => ?e,
"pubkey" => ?pubkey
error = ?e,
?pubkey,
"Unable to sign validator registration data"
);
continue;
}
@@ -479,14 +456,12 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
.await
{
Ok(()) => info!(
log,
"Published validator registrations to the builder network";
"count" => batch.len(),
count = batch.len(),
"Published validator registrations to the builder network"
),
Err(e) => warn!(
log,
"Unable to publish validator registrations to the builder network";
"error" => %e,
error = %e,
"Unable to publish validator registrations to the builder network"
),
}
}

View File

@@ -1,14 +1,13 @@
use crate::duties_service::{DutiesService, Error};
use doppelganger_service::DoppelgangerStatus;
use futures::future::join_all;
use logging::crit;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slog::{crit, debug, info, warn};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::Arc;
use tracing::{debug, info, warn};
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
use validator_store::Error as ValidatorStoreError;
use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
/// Number of epochs in advance to compute selection proofs when not in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
@@ -27,12 +26,11 @@ pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
/// 2. One-at-a-time locking. For the innermost locks on the aggregator duties, all of the functions
/// in this file take care to only lock one validator at a time. We never hold a lock while
/// trying to obtain another one (hence no lock ordering issues).
pub struct SyncDutiesMap<E: EthSpec> {
pub struct SyncDutiesMap {
/// Map from sync committee period to duties for members of that sync committee.
committees: RwLock<HashMap<u64, CommitteeDuties>>,
/// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute.
distributed: bool,
_phantom: PhantomData<E>,
}
/// Duties for a single sync committee period.
@@ -80,12 +78,11 @@ pub struct SlotDuties {
pub aggregators: HashMap<SyncSubnetId, Vec<(u64, PublicKeyBytes, SyncSelectionProof)>>,
}
impl<E: EthSpec> SyncDutiesMap<E> {
impl SyncDutiesMap {
pub fn new(distributed: bool) -> Self {
Self {
committees: RwLock::new(HashMap::new()),
distributed,
_phantom: PhantomData,
}
}
@@ -103,7 +100,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
}
/// Number of slots in advance to compute selection proofs
fn aggregation_pre_compute_slots(&self) -> u64 {
fn aggregation_pre_compute_slots<E: EthSpec>(&self) -> u64 {
if self.distributed {
AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED
} else {
@@ -116,7 +113,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
/// Return the slot up to which proofs should be pre-computed, as well as a vec of
/// `(previous_pre_compute_slot, sync_duty)` pairs for all validators which need to have proofs
/// computed. See `fill_in_aggregation_proofs` for the actual calculation.
fn prepare_for_aggregator_pre_compute(
fn prepare_for_aggregator_pre_compute<E: EthSpec>(
&self,
committee_period: u64,
current_slot: Slot,
@@ -126,7 +123,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
current_slot,
first_slot_of_period::<E>(committee_period, spec),
);
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots();
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots::<E>();
let pre_compute_slot = std::cmp::min(
current_slot + pre_compute_lookahead_slots,
last_slot_of_period::<E>(committee_period, spec),
@@ -186,7 +183,7 @@ impl<E: EthSpec> SyncDutiesMap<E> {
/// Get duties for all validators for the given `wall_clock_slot`.
///
/// This is the entry-point for the sync committee service.
pub fn get_duties_for_slot(
pub fn get_duties_for_slot<E: EthSpec>(
&self,
wall_clock_slot: Slot,
spec: &ChainSpec,
@@ -283,16 +280,16 @@ fn last_slot_of_period<E: EthSpec>(sync_committee_period: u64, spec: &ChainSpec)
first_slot_of_period::<E>(sync_committee_period + 1, spec) - 1
}
pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
) -> Result<(), Error<S::Error>> {
let sync_duties = &duties_service.sync_duties;
let spec = &duties_service.spec;
let current_slot = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
// If the Altair fork is yet to be activated, do not attempt to poll for duties.
if spec
@@ -316,10 +313,8 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys {
if let Some(validator_index) = vals.get_index(&pubkey) {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
local_indices.push(validator_index)
}
}
@@ -341,11 +336,15 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
// Pre-compute aggregator selection proofs for the current period.
let (current_pre_compute_slot, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(current_sync_committee_period, current_slot, spec);
.prepare_for_aggregator_pre_compute::<S::E>(
current_sync_committee_period,
current_slot,
spec,
);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_aggregation_proofs(
sub_duties_service,
@@ -378,18 +377,22 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
}
// Pre-compute aggregator selection proofs for the next period.
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots();
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots::<S::E>();
if (current_slot + aggregate_pre_compute_lookahead_slots)
.epoch(E::slots_per_epoch())
.epoch(S::E::slots_per_epoch())
.sync_committee_period(spec)?
== next_sync_committee_period
{
let (pre_compute_slot, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(next_sync_committee_period, current_slot, spec);
.prepare_for_aggregator_pre_compute::<S::E>(
next_sync_committee_period,
current_slot,
spec,
);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn(
duties_service.executor.spawn(
async move {
fill_in_aggregation_proofs(
sub_duties_service,
@@ -408,29 +411,26 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
Ok(())
}
pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
pub async fn poll_sync_committee_duties_for_period<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
local_indices: &[u64],
sync_committee_period: u64,
) -> Result<(), Error> {
) -> Result<(), Error<S::Error>> {
let spec = &duties_service.spec;
let log = duties_service.context.log();
// no local validators don't need to poll for sync committee
if local_indices.is_empty() {
debug!(
duties_service.context.log(),
"No validators, not polling for sync committee duties";
"sync_committee_period" => sync_committee_period,
sync_committee_period,
"No validators, not polling for sync committee duties"
);
return Ok(());
}
debug!(
log,
"Fetching sync committee duties";
"sync_committee_period" => sync_committee_period,
"num_validators" => local_indices.len(),
sync_committee_period,
num_validators = local_indices.len(),
"Fetching sync committee duties"
);
let period_start_epoch = spec.epochs_per_sync_committee_period * sync_committee_period;
@@ -452,16 +452,15 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
Ok(res) => res.data,
Err(e) => {
warn!(
log,
"Failed to download sync committee duties";
"sync_committee_period" => sync_committee_period,
"error" => %e,
sync_committee_period,
error = %e,
"Failed to download sync committee duties"
);
return Ok(());
}
};
debug!(log, "Fetched sync duties from BN"; "count" => duties.len());
debug!(count = duties.len(), "Fetched sync duties from BN");
// Add duties to map.
let committee_duties = duties_service
@@ -479,9 +478,8 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
!= duty.validator_sync_committee_indices;
if updated_due_to_reorg {
warn!(
log,
"Sync committee duties changed";
"message" => "this could be due to a really long re-org, or a bug"
message = "this could be due to a really long re-org, or a bug",
"Sync committee duties changed"
);
}
updated_due_to_reorg
@@ -489,10 +487,8 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
if updated {
info!(
log,
"Validator in sync committee";
"validator_index" => duty.validator_index,
"sync_committee_period" => sync_committee_period,
validator_index = duty.validator_index,
sync_committee_period, "Validator in sync committee"
);
*validator_duties = Some(ValidatorDuties::new(duty));
@@ -502,21 +498,18 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
Ok(())
}
pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: Arc<DutiesService<S, T>>,
pre_compute_duties: &[(Slot, SyncDuty)],
sync_committee_period: u64,
current_slot: Slot,
pre_compute_slot: Slot,
) {
let log = duties_service.context.log();
debug!(
log,
"Calculating sync selection proofs";
"period" => sync_committee_period,
"current_slot" => current_slot,
"pre_compute_slot" => pre_compute_slot
period = sync_committee_period,
%current_slot,
%pre_compute_slot,
"Calculating sync selection proofs"
);
// Generate selection proofs for each validator at each slot, one slot at a time.
@@ -528,13 +521,12 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
continue;
}
let subnet_ids = match duty.subnet_ids::<E>() {
let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
log,
"Arithmetic error computing subnet IDs";
"error" => ?e,
error = ?e,
"Arithmetic error computing subnet IDs"
);
continue;
}
@@ -556,45 +548,41 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for sync selection proof";
"pubkey" => ?pubkey,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
?pubkey,
pubkey = ?duty.pubkey,
slot = %proof_slot,
"Missing pubkey for sync selection proof"
);
return None;
}
Err(e) => {
warn!(
log,
"Unable to sign selection proof";
"error" => ?e,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
error = ?e,
pubkey = ?duty.pubkey,
slot = %proof_slot,
"Unable to sign selection proof"
);
return None;
}
};
match proof.is_aggregator::<E>() {
match proof.is_aggregator::<S::E>() {
Ok(true) => {
debug!(
log,
"Validator is sync aggregator";
"validator_index" => duty.validator_index,
"slot" => proof_slot,
"subnet_id" => %subnet_id,
validator_index = duty.validator_index,
slot = %proof_slot,
%subnet_id,
"Validator is sync aggregator"
);
Some(((proof_slot, *subnet_id), proof))
}
Ok(false) => None,
Err(e) => {
warn!(
log,
"Error determining is_aggregator";
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
"error" => ?e,
pubkey = ?duty.pubkey,
slot = %proof_slot,
error = ?e,
"Error determining is_aggregator"
);
None
}
@@ -614,11 +602,7 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
// Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(
log,
"Missing sync duties";
"period" => sync_committee_period,
);
debug!(period = sync_committee_period, "Missing sync duties");
continue;
};
let validators = committee_duties.validators.read();
@@ -629,20 +613,18 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
debug!(
log,
"Missing sync duty to update";
"validator_index" => validator_index,
"period" => sync_committee_period,
validator_index,
period = sync_committee_period,
"Missing sync duty to update"
);
}
}
if num_validators_updated > 0 {
debug!(
log,
"Finished computing sync selection proofs";
"slot" => slot,
"updated_validators" => num_validators_updated,
%slot,
updated_validators = num_validators_updated,
"Finished computing sync selection proofs"
);
}
}

View File

@@ -1,16 +1,17 @@
use crate::duties_service::DutiesService;
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use environment::RuntimeContext;
use eth2::types::BlockId;
use futures::future::join_all;
use futures::future::FutureExt;
use slog::{crit, debug, error, info, trace, warn};
use logging::crit;
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn};
use types::{
ChainSpec, EthSpec, Hash256, PublicKeyBytes, Slot, SyncCommitteeSubscription,
SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId,
@@ -19,11 +20,11 @@ use validator_store::{Error as ValidatorStoreError, ValidatorStore};
pub const SUBSCRIPTION_LOOKAHEAD_EPOCHS: u64 = 4;
pub struct SyncCommitteeService<T: SlotClock + 'static, E: EthSpec> {
inner: Arc<Inner<T, E>>,
pub struct SyncCommitteeService<S: ValidatorStore, T: SlotClock + 'static> {
inner: Arc<Inner<S, T>>,
}
impl<T: SlotClock + 'static, E: EthSpec> Clone for SyncCommitteeService<T, E> {
impl<S: ValidatorStore, T: SlotClock + 'static> Clone for SyncCommitteeService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
@@ -31,33 +32,33 @@ impl<T: SlotClock + 'static, E: EthSpec> Clone for SyncCommitteeService<T, E> {
}
}
impl<T: SlotClock + 'static, E: EthSpec> Deref for SyncCommitteeService<T, E> {
type Target = Inner<T, E>;
impl<S: ValidatorStore, T: SlotClock + 'static> Deref for SyncCommitteeService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
pub struct Inner<T: SlotClock + 'static, E: EthSpec> {
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
pub struct Inner<S: ValidatorStore, T: SlotClock + 'static> {
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
/// Boolean to track whether the service has posted subscriptions to the BN at least once.
///
/// This acts as a latch that fires once upon start-up, and then never again.
first_subscription_done: AtomicBool,
}
impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S, T> {
pub fn new(
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
) -> Self {
Self {
inner: Arc::new(Inner {
@@ -65,7 +66,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
validator_store,
slot_clock,
beacon_nodes,
context,
executor,
first_subscription_done: AtomicBool::new(false),
}),
}
@@ -79,14 +80,13 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.spec
.altair_fork_epoch
.and_then(|fork_epoch| {
let current_epoch = self.slot_clock.now()?.epoch(E::slots_per_epoch());
let current_epoch = self.slot_clock.now()?.epoch(S::E::slots_per_epoch());
Some(current_epoch >= fork_epoch)
})
.unwrap_or(false)
}
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
let log = self.context.log().clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
let duration_to_next_slot = self
.slot_clock
@@ -94,18 +94,16 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.ok_or("Unable to determine duration to next slot")?;
info!(
log,
"Sync committee service started";
"next_update_millis" => duration_to_next_slot.as_millis()
next_update_millis = duration_to_next_slot.as_millis(),
"Sync committee service started"
);
let executor = self.context.executor.clone();
let executor = self.executor.clone();
let interval_fut = async move {
loop {
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
// Wait for contribution broadcast interval 1/3 of the way through the slot.
let log = self.context.log();
sleep(duration_to_next_slot + slot_duration / 3).await;
// Do nothing if the Altair fork has not yet occurred.
@@ -115,21 +113,17 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
if let Err(e) = self.spawn_contribution_tasks(slot_duration).await {
crit!(
log,
"Failed to spawn sync contribution tasks";
"error" => e
error = ?e,
"Failed to spawn sync contribution tasks"
)
} else {
trace!(
log,
"Spawned sync contribution tasks";
)
trace!("Spawned sync contribution tasks")
}
// Do subscriptions for future slots/epochs.
self.spawn_subscription_tasks();
} else {
error!(log, "Failed to read slot clock");
error!("Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
}
@@ -141,7 +135,6 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
}
async fn spawn_contribution_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let log = self.context.log().clone();
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
@@ -158,18 +151,14 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let Some(slot_duties) = self
.duties_service
.sync_duties
.get_duties_for_slot(slot, &self.duties_service.spec)
.get_duties_for_slot::<S::E>(slot, &self.duties_service.spec)
else {
debug!(log, "No duties known for slot {}", slot);
debug!("No duties known for slot {}", slot);
return Ok(());
};
if slot_duties.duties.is_empty() {
debug!(
log,
"No local validators in current sync committee";
"slot" => slot,
);
debug!(%slot, "No local validators in current sync committee");
return Ok(());
}
@@ -196,11 +185,10 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
Ok(block) => block.data.root,
Err(errs) => {
warn!(
log,
errors = errs.to_string(),
%slot,
"Refusing to sign sync committee messages for an optimistic head block or \
a block head with unknown optimistic status";
"errors" => errs.to_string(),
"slot" => slot,
a block head with unknown optimistic status"
);
return Ok(());
}
@@ -209,7 +197,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// Spawn one task to publish all of the sync committee signatures.
let validator_duties = slot_duties.duties;
let service = self.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
service
.publish_sync_committee_signatures(slot, block_root, validator_duties)
@@ -221,7 +209,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let aggregators = slot_duties.aggregators;
let service = self.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
service
.publish_sync_committee_aggregates(
@@ -246,8 +234,6 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
beacon_block_root: Hash256,
validator_duties: Vec<SyncDuty>,
) -> Result<(), ()> {
let log = self.context.log();
// Create futures to produce sync committee signatures.
let signature_futures = validator_duties.iter().map(|duty| async move {
match self
@@ -265,21 +251,19 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for sync committee signature";
"pubkey" => ?pubkey,
"validator_index" => duty.validator_index,
"slot" => slot,
?pubkey,
validator_index = duty.validator_index,
%slot,
"Missing pubkey for sync committee signature"
);
None
}
Err(e) => {
crit!(
log,
"Failed to sign sync committee signature";
"validator_index" => duty.validator_index,
"slot" => slot,
"error" => ?e,
validator_index = duty.validator_index,
%slot,
error = ?e,
"Failed to sign sync committee signature"
);
None
}
@@ -302,19 +286,17 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.await
.map_err(|e| {
error!(
log,
"Unable to publish sync committee messages";
"slot" => slot,
"error" => %e,
%slot,
error = %e,
"Unable to publish sync committee messages"
);
})?;
info!(
log,
"Successfully published sync committee messages";
"count" => committee_signatures.len(),
"head_block" => ?beacon_block_root,
"slot" => slot,
count = committee_signatures.len(),
head_block = ?beacon_block_root,
%slot,
"Successfully published sync committee messages"
);
Ok(())
@@ -329,7 +311,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
) {
for (subnet_id, subnet_aggregators) in aggregators {
let service = self.clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
service
.publish_sync_committee_aggregate_for_subnet(
@@ -357,8 +339,6 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
) -> Result<(), ()> {
sleep_until(aggregate_instant).await;
let log = self.context.log();
let contribution = &self
.beacon_nodes
.first_success(|beacon_node| async move {
@@ -369,26 +349,20 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
};
beacon_node
.get_validator_sync_committee_contribution::<E>(&sync_contribution_data)
.get_validator_sync_committee_contribution(&sync_contribution_data)
.await
})
.await
.map_err(|e| {
crit!(
log,
"Failed to produce sync contribution";
"slot" => slot,
"beacon_block_root" => ?beacon_block_root,
"error" => %e,
%slot,
?beacon_block_root,
error = %e,
"Failed to produce sync contribution"
)
})?
.ok_or_else(|| {
crit!(
log,
"No aggregate contribution found";
"slot" => slot,
"beacon_block_root" => ?beacon_block_root,
);
crit!(%slot, ?beacon_block_root, "No aggregate contribution found");
})?
.data;
@@ -409,20 +383,14 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for sync contribution";
"pubkey" => ?pubkey,
"slot" => slot,
);
debug!(?pubkey, %slot, "Missing pubkey for sync contribution");
None
}
Err(e) => {
crit!(
log,
"Unable to sign sync committee contribution";
"slot" => slot,
"error" => ?e,
%slot,
error = ?e,
"Unable to sign sync committee contribution"
);
None
}
@@ -447,20 +415,18 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.await
.map_err(|e| {
error!(
log,
"Unable to publish signed contributions and proofs";
"slot" => slot,
"error" => %e,
%slot,
error = %e,
"Unable to publish signed contributions and proofs"
);
})?;
info!(
log,
"Successfully published sync contributions";
"subnet" => %subnet_id,
"beacon_block_root" => %beacon_block_root,
"num_signers" => contribution.aggregation_bits.num_set_bits(),
"slot" => slot,
subnet = %subnet_id,
beacon_block_root = %beacon_block_root,
num_signers = contribution.aggregation_bits.num_set_bits(),
%slot,
"Successfully published sync contributions"
);
Ok(())
@@ -468,14 +434,13 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
fn spawn_subscription_tasks(&self) {
let service = self.clone();
let log = self.context.log().clone();
self.inner.context.executor.spawn(
self.inner.executor.spawn(
async move {
service.publish_subscriptions().await.unwrap_or_else(|e| {
error!(
log,
"Error publishing subscriptions";
"error" => ?e,
error = ?e,
"Error publishing subscriptions"
)
});
},
@@ -484,7 +449,6 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
}
async fn publish_subscriptions(self) -> Result<(), String> {
let log = self.context.log().clone();
let spec = &self.duties_service.spec;
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
@@ -494,10 +458,10 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// At the start of every epoch during the current period, re-post the subscriptions
// to the beacon node. This covers the case where the BN has forgotten the subscriptions
// due to a restart, or where the VC has switched to a fallback BN.
let current_period = sync_period_of_slot::<E>(slot, spec)?;
let current_period = sync_period_of_slot::<S::E>(slot, spec)?;
if !self.first_subscription_done.load(Ordering::Relaxed)
|| slot.as_u64() % E::slots_per_epoch() == 0
|| slot.as_u64() % S::E::slots_per_epoch() == 0
{
duty_slots.push((slot, current_period));
}
@@ -505,9 +469,9 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// Near the end of the current period, push subscriptions for the next period to the
// beacon node. We aggressively push every slot in the lead-up, as this is the main way
// that we want to ensure that the BN is subscribed (well in advance).
let lookahead_slot = slot + SUBSCRIPTION_LOOKAHEAD_EPOCHS * E::slots_per_epoch();
let lookahead_slot = slot + SUBSCRIPTION_LOOKAHEAD_EPOCHS * S::E::slots_per_epoch();
let lookahead_period = sync_period_of_slot::<E>(lookahead_slot, spec)?;
let lookahead_period = sync_period_of_slot::<S::E>(lookahead_slot, spec)?;
if lookahead_period > current_period {
duty_slots.push((lookahead_slot, lookahead_period));
@@ -521,16 +485,11 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let mut subscriptions = vec![];
for (duty_slot, sync_committee_period) in duty_slots {
debug!(
log,
"Fetching subscription duties";
"duty_slot" => duty_slot,
"current_slot" => slot,
);
debug!(%duty_slot, %slot, "Fetching subscription duties");
match self
.duties_service
.sync_duties
.get_duties_for_slot(duty_slot, spec)
.get_duties_for_slot::<S::E>(duty_slot, spec)
{
Some(duties) => subscriptions.extend(subscriptions_from_sync_duties(
duties.duties,
@@ -539,9 +498,8 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
)),
None => {
debug!(
log,
"No duties for subscription";
"slot" => duty_slot,
slot = %duty_slot,
"No duties for subscription"
);
all_succeeded = false;
}
@@ -549,29 +507,23 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
}
if subscriptions.is_empty() {
debug!(
log,
"No sync subscriptions to send";
"slot" => slot,
);
debug!(%slot, "No sync subscriptions to send");
return Ok(());
}
// Post subscriptions to BN.
debug!(
log,
"Posting sync subscriptions to BN";
"count" => subscriptions.len(),
count = subscriptions.len(),
"Posting sync subscriptions to BN"
);
let subscriptions_slice = &subscriptions;
for subscription in subscriptions_slice {
debug!(
log,
"Subscription";
"validator_index" => subscription.validator_index,
"validator_sync_committee_indices" => ?subscription.sync_committee_indices,
"until_epoch" => subscription.until_epoch,
validator_index = subscription.validator_index,
validator_sync_committee_indices = ?subscription.sync_committee_indices,
until_epoch = %subscription.until_epoch,
"Subscription"
);
}
@@ -585,10 +537,9 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.await
{
error!(
log,
"Unable to post sync committee subscriptions";
"slot" => slot,
"error" => %e,
%slot,
error = %e,
"Unable to post sync committee subscriptions"
);
all_succeeded = false;
}