Altair validator client and HTTP API (#2404)

## Proposed Changes

* Implement the validator client and HTTP API changes necessary to support Altair


Co-authored-by: realbigsean <seananderson33@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Michael Sproul
2021-08-06 00:47:31 +00:00
parent 350b6f19de
commit 17a2c778e3
44 changed files with 3144 additions and 705 deletions

View File

@@ -64,6 +64,7 @@ scrypt = { version = "0.5.0", default-features = false }
lighthouse_metrics = { path = "../common/lighthouse_metrics" }
lazy_static = "1.4.0"
fallback = { path = "../common/fallback" }
itertools = "0.10.0"
monitoring_api = { path = "../common/monitoring_api" }
sensitive_url = { path = "../common/sensitive_url" }
task_executor = { path = "../common/task_executor" }

View File

@@ -18,7 +18,7 @@ use types::{
};
/// Builds an `AttestationService`.
pub struct AttestationServiceBuilder<T, E: EthSpec> {
pub struct AttestationServiceBuilder<T: SlotClock + 'static, E: EthSpec> {
duties_service: Option<Arc<DutiesService<T, E>>>,
validator_store: Option<Arc<ValidatorStore<T, E>>>,
slot_clock: Option<T>,

View File

@@ -6,6 +6,8 @@
//! The `DutiesService` is also responsible for sending events to the `BlockService` which trigger
//! block production.
mod sync;
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::{
block_service::BlockServiceNotification,
@@ -20,6 +22,8 @@ use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use sync::poll_sync_committee_duties;
use sync::SyncDutiesMap;
use tokio::{sync::mpsc::Sender, time::sleep};
use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot};
@@ -40,6 +44,14 @@ pub enum Error {
FailedToDownloadAttesters(String),
FailedToProduceSelectionProof(ValidatorStoreError),
InvalidModulo(ArithError),
Arith(ArithError),
SyncDutiesNotFound(u64),
}
impl From<ArithError> for Error {
fn from(e: ArithError) -> Self {
Self::Arith(e)
}
}
/// Neatly joins the server-generated `AttesterData` with the locally-generated `selection_proof`.
@@ -94,6 +106,8 @@ pub struct DutiesService<T, E: EthSpec> {
/// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain
/// proposals for any validators which are not registered locally.
pub proposers: RwLock<ProposerMap>,
/// Map from validator index to sync committee duties.
pub sync_duties: SyncDutiesMap,
/// Provides the canonical list of locally-managed validators.
pub validator_store: Arc<ValidatorStore<T, E>>,
/// Tracks the current slot.
@@ -302,6 +316,37 @@ pub fn start_update_service<T: SlotClock + 'static, E: EthSpec>(
},
"duties_service_attesters",
);
// Spawn the task which keeps track of local sync committee duties.
let duties_service = core_duties_service.clone();
let log = core_duties_service.context.log().clone();
core_duties_service.context.executor.spawn(
async move {
loop {
if let Err(e) = poll_sync_committee_duties(&duties_service).await {
error!(
log,
"Failed to poll sync committee duties";
"error" => ?e
);
}
// Wait until the next slot before polling again.
// This doesn't mean that the beacon node will get polled every slot
// as the sync duties service will return early if it deems it already has
// enough information.
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
sleep(duration).await;
} else {
// Just sleep for one slot if we are unable to read the system clock, this gives
// us an opportunity for the clock to eventually come good.
sleep(duties_service.slot_clock.slot_duration()).await;
continue;
}
}
},
"duties_service_sync_committee",
);
}
/// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown

View File

@@ -0,0 +1,599 @@
use crate::{
doppelganger_service::DoppelgangerStatus,
duties_service::{DutiesService, Error},
};
use itertools::Itertools;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slog::{crit, debug, info, warn};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use types::{
ChainSpec, Epoch, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId,
};
/// Number of epochs in advance to compute selection proofs.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
/// Top-level data-structure containing sync duty information.
///
/// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained
/// locking is used to provide maximum concurrency for the different services reading and writing.
///
/// Deadlocks are prevented by:
///
/// 1. Hierarchical locking. It is impossible to lock an inner lock (e.g. `validators`) without
/// first locking its parent.
/// 2. One-at-a-time locking. For the innermost locks on the aggregator duties, all of the functions
/// in this file take care to only lock one validator at a time. We never hold a lock while
/// trying to obtain another one (hence no lock ordering issues).
pub struct SyncDutiesMap {
/// Map from sync committee period to duties for members of that sync committee.
committees: RwLock<HashMap<u64, CommitteeDuties>>,
}
/// Duties for a single sync committee period.
#[derive(Default)]
pub struct CommitteeDuties {
/// Map from validator index to validator duties.
///
/// A `None` value indicates that the validator index is known *not* to be a member of the sync
/// committee, while a `Some` indicates a known member. An absent value indicates that the
/// validator index was not part of the set of local validators when the duties were fetched.
/// This allows us to track changes to the set of local validators.
validators: RwLock<HashMap<u64, Option<ValidatorDuties>>>,
}
/// Duties for a single validator.
pub struct ValidatorDuties {
/// The sync duty: including validator sync committee indices & pubkey.
duty: SyncDuty,
/// The aggregator duties: cached selection proofs for upcoming epochs.
aggregation_duties: AggregatorDuties,
}
/// Aggregator duties for a single validator.
pub struct AggregatorDuties {
/// The epoch up to which aggregation proofs have already been computed (inclusive).
pre_compute_epoch: RwLock<Option<Epoch>>,
/// Map from slot & subnet ID to proof that this validator is an aggregator.
///
/// The slot is the slot at which the signed contribution and proof should be broadcast,
/// which is 1 less than the slot for which the `duty` was computed.
proofs: RwLock<HashMap<(Slot, SyncSubnetId), SyncSelectionProof>>,
}
/// Duties for multiple validators, for a single slot.
///
/// This type is returned to the sync service.
pub struct SlotDuties {
/// List of duties for all sync committee members at this slot.
///
/// Note: this is intentionally NOT split by subnet so that we only sign
/// one `SyncCommitteeMessage` per validator (recall a validator may be part of multiple
/// subnets).
pub duties: Vec<SyncDuty>,
/// Map from subnet ID to validator index and selection proof of each aggregator.
pub aggregators: HashMap<SyncSubnetId, Vec<(u64, PublicKeyBytes, SyncSelectionProof)>>,
}
impl Default for SyncDutiesMap {
fn default() -> Self {
Self {
committees: RwLock::new(HashMap::new()),
}
}
}
impl SyncDutiesMap {
/// Check if duties are already known for all of the given validators for `committee_period`.
fn all_duties_known(&self, committee_period: u64, validator_indices: &[u64]) -> bool {
self.committees
.read()
.get(&committee_period)
.map_or(false, |committee_duties| {
let validator_duties = committee_duties.validators.read();
validator_indices
.iter()
.all(|index| validator_duties.contains_key(index))
})
}
/// Prepare for pre-computation of selection proofs for `committee_period`.
///
/// Return the epoch up to which proofs should be pre-computed, as well as a vec of
/// `(previous_pre_compute_epoch, sync_duty)` pairs for all validators which need to have proofs
/// computed. See `fill_in_aggregation_proofs` for the actual calculation.
fn prepare_for_aggregator_pre_compute(
&self,
committee_period: u64,
current_epoch: Epoch,
spec: &ChainSpec,
) -> (Epoch, Vec<(Epoch, SyncDuty)>) {
let default_start_epoch =
std::cmp::max(current_epoch, first_epoch_of_period(committee_period, spec));
let pre_compute_epoch = std::cmp::min(
current_epoch + AGGREGATION_PRE_COMPUTE_EPOCHS,
last_epoch_of_period(committee_period, spec),
);
let pre_compute_duties = self.committees.read().get(&committee_period).map_or_else(
Vec::new,
|committee_duties| {
let validator_duties = committee_duties.validators.read();
validator_duties
.values()
.filter_map(|maybe_duty| {
let duty = maybe_duty.as_ref()?;
let old_pre_compute_epoch = duty
.aggregation_duties
.pre_compute_epoch
.write()
.replace(pre_compute_epoch);
match old_pre_compute_epoch {
// No proofs pre-computed previously, compute all from the start of
// the period or the current epoch (whichever is later).
None => Some((default_start_epoch, duty.duty.clone())),
// Proofs computed up to `prev`, start from the subsequent epoch.
Some(prev) if prev < pre_compute_epoch => {
Some((prev + 1, duty.duty.clone()))
}
// Proofs already known, no need to compute.
_ => None,
}
})
.collect()
},
);
(pre_compute_epoch, pre_compute_duties)
}
fn get_or_create_committee_duties<'a, 'b>(
&'a self,
committee_period: u64,
validator_indices: impl IntoIterator<Item = &'b u64>,
) -> MappedRwLockReadGuard<'a, CommitteeDuties> {
let mut committees_writer = self.committees.write();
committees_writer
.entry(committee_period)
.or_insert_with(CommitteeDuties::default)
.init(validator_indices);
// Return shared reference
RwLockReadGuard::map(
RwLockWriteGuard::downgrade(committees_writer),
|committees_reader| &committees_reader[&committee_period],
)
}
/// Get duties for all validators for the given `wall_clock_slot`.
///
/// This is the entry-point for the sync committee service.
pub fn get_duties_for_slot<E: EthSpec>(
&self,
wall_clock_slot: Slot,
spec: &ChainSpec,
) -> Option<SlotDuties> {
// Sync duties lag their assigned slot by 1
let duty_slot = wall_clock_slot + 1;
let sync_committee_period = duty_slot
.epoch(E::slots_per_epoch())
.sync_committee_period(spec)
.ok()?;
let committees_reader = self.committees.read();
let committee_duties = committees_reader.get(&sync_committee_period)?;
let mut duties = vec![];
let mut aggregators = HashMap::new();
committee_duties
.validators
.read()
.values()
// Filter out non-members & failed subnet IDs.
.filter_map(|opt_duties| {
let duty = opt_duties.as_ref()?;
let subnet_ids = duty.duty.subnet_ids::<E>().ok()?;
Some((duty, subnet_ids))
})
// Add duties for members to the vec of all duties, and aggregators to the
// aggregators map.
.for_each(|(validator_duty, subnet_ids)| {
duties.push(validator_duty.duty.clone());
let proofs = validator_duty.aggregation_duties.proofs.read();
for subnet_id in subnet_ids {
if let Some(proof) = proofs.get(&(wall_clock_slot, subnet_id)) {
aggregators.entry(subnet_id).or_insert_with(Vec::new).push((
validator_duty.duty.validator_index,
validator_duty.duty.pubkey,
proof.clone(),
));
}
}
});
Some(SlotDuties {
duties,
aggregators,
})
}
/// Prune duties for past sync committee periods from the map.
fn prune(&self, current_sync_committee_period: u64) {
self.committees
.write()
.retain(|period, _| *period >= current_sync_committee_period)
}
}
impl CommitteeDuties {
fn init<'b>(&mut self, validator_indices: impl IntoIterator<Item = &'b u64>) {
validator_indices.into_iter().for_each(|validator_index| {
self.validators
.get_mut()
.entry(*validator_index)
.or_insert(None);
})
}
}
impl ValidatorDuties {
fn new(duty: SyncDuty) -> Self {
Self {
duty,
aggregation_duties: AggregatorDuties {
pre_compute_epoch: RwLock::new(None),
proofs: RwLock::new(HashMap::new()),
},
}
}
}
/// Number of epochs to wait from the start of the period before actually fetching duties.
fn epoch_offset(spec: &ChainSpec) -> u64 {
spec.epochs_per_sync_committee_period.as_u64() / 2
}
fn first_epoch_of_period(sync_committee_period: u64, spec: &ChainSpec) -> Epoch {
spec.epochs_per_sync_committee_period * sync_committee_period
}
fn last_epoch_of_period(sync_committee_period: u64, spec: &ChainSpec) -> Epoch {
first_epoch_of_period(sync_committee_period + 1, spec) - 1
}
pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
let sync_duties = &duties_service.sync_duties;
let spec = &duties_service.spec;
let current_epoch = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?
.epoch(E::slots_per_epoch());
// If the Altair fork is yet to be activated, do not attempt to poll for duties.
if spec
.altair_fork_epoch
.map_or(true, |altair_epoch| current_epoch < altair_epoch)
{
return Ok(());
}
let current_sync_committee_period = current_epoch.sync_committee_period(spec)?;
let next_sync_committee_period = current_sync_committee_period + 1;
// Collect *all* pubkeys, even those undergoing doppelganger protection.
//
// Sync committee messages are not slashable and are currently excluded from doppelganger
// protection.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys {
if let Some(validator_index) = vals.get_index(&pubkey) {
local_indices.push(validator_index)
}
}
local_indices
};
// If duties aren't known for the current period, poll for them.
if !sync_duties.all_duties_known(current_sync_committee_period, &local_indices) {
poll_sync_committee_duties_for_period(
duties_service,
&local_indices,
current_sync_committee_period,
)
.await?;
// Prune previous duties (we avoid doing this too often as it locks the whole map).
sync_duties.prune(current_sync_committee_period);
}
// Pre-compute aggregator selection proofs for the current period.
let (current_pre_compute_epoch, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(current_sync_committee_period, current_epoch, spec);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn_blocking(
move || {
fill_in_aggregation_proofs(
sub_duties_service,
&new_pre_compute_duties,
current_sync_committee_period,
current_epoch,
current_pre_compute_epoch,
)
},
"duties_service_sync_selection_proofs",
);
}
// If we're past the point in the current period where we should determine duties for the next
// period and they are not yet known, then poll.
if current_epoch.as_u64() % spec.epochs_per_sync_committee_period.as_u64() >= epoch_offset(spec)
&& !sync_duties.all_duties_known(next_sync_committee_period, &local_indices)
{
poll_sync_committee_duties_for_period(
duties_service,
&local_indices,
next_sync_committee_period,
)
.await?;
// Prune (this is the main code path for updating duties, so we should almost always hit
// this prune).
sync_duties.prune(current_sync_committee_period);
}
// Pre-compute aggregator selection proofs for the next period.
if (current_epoch + AGGREGATION_PRE_COMPUTE_EPOCHS).sync_committee_period(spec)?
== next_sync_committee_period
{
let (pre_compute_epoch, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(next_sync_committee_period, current_epoch, spec);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn_blocking(
move || {
fill_in_aggregation_proofs(
sub_duties_service,
&new_pre_compute_duties,
next_sync_committee_period,
current_epoch,
pre_compute_epoch,
)
},
"duties_service_sync_selection_proofs",
);
}
}
Ok(())
}
pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
local_indices: &[u64],
sync_committee_period: u64,
) -> Result<(), Error> {
let spec = &duties_service.spec;
let log = duties_service.context.log();
debug!(
log,
"Fetching sync committee duties";
"sync_committee_period" => sync_committee_period,
"num_validators" => local_indices.len(),
);
let period_start_epoch = spec.epochs_per_sync_committee_period * sync_committee_period;
let duties_response = duties_service
.beacon_nodes
.first_success(duties_service.require_synced, |beacon_node| async move {
beacon_node
.post_validator_duties_sync(period_start_epoch, local_indices)
.await
})
.await;
let duties = match duties_response {
Ok(res) => res.data,
Err(e) => {
warn!(
log,
"Failed to download sync committee duties";
"sync_committee_period" => sync_committee_period,
"error" => %e,
);
return Ok(());
}
};
debug!(log, "Fetched sync duties from BN"; "count" => duties.len());
// Add duties to map.
let committee_duties = duties_service
.sync_duties
.get_or_create_committee_duties(sync_committee_period, local_indices);
let mut validator_writer = committee_duties.validators.write();
for duty in duties {
let validator_duties = validator_writer
.get_mut(&duty.validator_index)
.ok_or(Error::SyncDutiesNotFound(duty.validator_index))?;
let updated = validator_duties.as_ref().map_or(true, |existing_duties| {
let updated_due_to_reorg = existing_duties.duty.validator_sync_committee_indices
!= duty.validator_sync_committee_indices;
if updated_due_to_reorg {
warn!(
log,
"Sync committee duties changed";
"message" => "this could be due to a really long re-org, or a bug"
);
}
updated_due_to_reorg
});
if updated {
info!(
log,
"Validator in sync committee";
"validator_index" => duty.validator_index,
"sync_committee_period" => sync_committee_period,
);
*validator_duties = Some(ValidatorDuties::new(duty));
}
}
Ok(())
}
pub fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
pre_compute_duties: &[(Epoch, SyncDuty)],
sync_committee_period: u64,
current_epoch: Epoch,
pre_compute_epoch: Epoch,
) {
let log = duties_service.context.log();
debug!(
log,
"Calculating sync selection proofs";
"period" => sync_committee_period,
"current_epoch" => current_epoch,
"pre_compute_epoch" => pre_compute_epoch
);
// Generate selection proofs for each validator at each slot, one epoch at a time.
for epoch in (current_epoch.as_u64()..=pre_compute_epoch.as_u64()).map(Epoch::new) {
// Generate proofs.
let validator_proofs: Vec<(u64, Vec<_>)> = pre_compute_duties
.iter()
.filter_map(|(validator_start_epoch, duty)| {
// Proofs are already known at this epoch for this validator.
if epoch < *validator_start_epoch {
return None;
}
let subnet_ids = duty
.subnet_ids::<E>()
.map_err(|e| {
crit!(
log,
"Arithmetic error computing subnet IDs";
"error" => ?e,
);
})
.ok()?;
let proofs = epoch
.slot_iter(E::slots_per_epoch())
.cartesian_product(&subnet_ids)
.filter_map(|(duty_slot, &subnet_id)| {
// Construct proof for prior slot.
let slot = duty_slot - 1;
let proof = duties_service
.validator_store
.produce_sync_selection_proof(&duty.pubkey, slot, subnet_id)
.map_err(|_| {
warn!(
log,
"Pubkey missing when signing selection proof";
"pubkey" => ?duty.pubkey,
"slot" => slot,
);
})
.ok()?;
let is_aggregator = proof
.is_aggregator::<E>()
.map_err(|e| {
warn!(
log,
"Error determining is_aggregator";
"pubkey" => ?duty.pubkey,
"slot" => slot,
"error" => ?e,
);
})
.ok()?;
if is_aggregator {
debug!(
log,
"Validator is sync aggregator";
"validator_index" => duty.validator_index,
"slot" => slot,
"subnet_id" => %subnet_id,
);
Some(((slot, subnet_id), proof))
} else {
None
}
})
.collect();
Some((duty.validator_index, proofs))
})
.collect();
// Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read();
let committee_duties = if let Some(duties) = sync_map.get(&sync_committee_period) {
duties
} else {
debug!(
log,
"Missing sync duties";
"period" => sync_committee_period,
);
continue;
};
let validators = committee_duties.validators.read();
let num_validators_updated = validator_proofs.len();
for (validator_index, proofs) in validator_proofs {
if let Some(Some(duty)) = validators.get(&validator_index) {
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
debug!(
log,
"Missing sync duty to update";
"validator_index" => validator_index,
"period" => sync_committee_period,
);
}
}
if num_validators_updated > 0 {
debug!(
log,
"Finished computing sync selection proofs";
"epoch" => epoch,
"updated_validators" => num_validators_updated,
);
}
}
}

View File

@@ -1,222 +0,0 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::http_metrics::metrics;
use environment::RuntimeContext;
use eth2::types::StateId;
use parking_lot::RwLock;
use slog::{debug, trace};
use slog::{error, Logger};
use slot_clock::SlotClock;
use std::ops::Deref;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use types::{EthSpec, Fork};
/// Delay this period of time after the slot starts. This allows the node to process the new slot.
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(80);
/// Builds a `ForkService`.
pub struct ForkServiceBuilder<T, E: EthSpec> {
fork: Option<Fork>,
slot_clock: Option<T>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
log: Option<Logger>,
}
impl<T: SlotClock + 'static, E: EthSpec> ForkServiceBuilder<T, E> {
pub fn new() -> Self {
Self {
fork: None,
slot_clock: None,
beacon_nodes: None,
log: None,
}
}
pub fn fork(mut self, fork: Fork) -> Self {
self.fork = Some(fork);
self
}
pub fn slot_clock(mut self, slot_clock: T) -> Self {
self.slot_clock = Some(slot_clock);
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T, E>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn log(mut self, log: Logger) -> Self {
self.log = Some(log);
self
}
pub fn build(self) -> Result<ForkService<T, E>, String> {
Ok(ForkService {
inner: Arc::new(Inner {
fork: RwLock::new(self.fork.ok_or("Cannot build ForkService without fork")?),
slot_clock: self
.slot_clock
.ok_or("Cannot build ForkService without slot_clock")?,
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build ForkService without beacon_node")?,
log: self
.log
.ok_or("Cannot build ForkService without logger")?
.clone(),
}),
})
}
}
#[cfg(test)]
#[allow(dead_code)]
impl<E: EthSpec> ForkServiceBuilder<slot_clock::TestingSlotClock, E> {
pub fn testing_only(spec: types::ChainSpec, log: Logger) -> Self {
use crate::beacon_node_fallback::CandidateBeaconNode;
let slot_clock = slot_clock::TestingSlotClock::new(
types::Slot::new(0),
std::time::Duration::from_secs(42),
std::time::Duration::from_secs(42),
);
let candidates = vec![CandidateBeaconNode::new(eth2::BeaconNodeHttpClient::new(
sensitive_url::SensitiveUrl::parse("http://127.0.0.1").unwrap(),
eth2::Timeouts::set_all(Duration::from_secs(12)),
))];
let mut beacon_nodes = BeaconNodeFallback::new(candidates, spec, log.clone());
beacon_nodes.set_slot_clock(slot_clock);
Self {
fork: Some(types::Fork::default()),
slot_clock: Some(slot_clock::TestingSlotClock::new(
types::Slot::new(0),
std::time::Duration::from_secs(42),
std::time::Duration::from_secs(42),
)),
beacon_nodes: Some(Arc::new(beacon_nodes)),
log: Some(log),
}
}
}
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
fork: RwLock<Fork>,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
log: Logger,
slot_clock: T,
}
/// Attempts to download the `Fork` struct from the beacon node at the start of each epoch.
pub struct ForkService<T, E: EthSpec> {
inner: Arc<Inner<T, E>>,
}
impl<T, E: EthSpec> Clone for ForkService<T, E> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T, E: EthSpec> Deref for ForkService<T, E> {
type Target = Inner<T, E>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
/// Returns the last fork downloaded from the beacon node, if any.
pub fn fork(&self) -> Fork {
*self.fork.read()
}
/// Returns the slot clock.
pub fn slot_clock(&self) -> T {
self.slot_clock.clone()
}
/// Starts the service that periodically polls for the `Fork`.
pub fn start_update_service(self, context: &RuntimeContext<E>) -> Result<(), String> {
// Run an immediate update before starting the updater service.
context
.executor
.spawn_ignoring_error(self.clone().do_update(), "fork service update");
let executor = context.executor.clone();
let log = context.log().clone();
let spec = E::default_spec();
let interval_fut = async move {
loop {
// Run this poll before the wait, this should hopefully download the fork before the
// other services need them.
self.clone().do_update().await.ok();
if let Some(duration_to_next_epoch) =
self.slot_clock.duration_to_next_epoch(E::slots_per_epoch())
{
sleep(duration_to_next_epoch + TIME_DELAY_FROM_SLOT).await;
} else {
error!(log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(Duration::from_secs(spec.seconds_per_slot)).await;
continue;
}
}
};
executor.spawn(interval_fut, "fork_service");
Ok(())
}
/// Attempts to download the `Fork` from the server.
async fn do_update(self) -> Result<(), ()> {
let _timer =
metrics::start_timer_vec(&metrics::FORK_SERVICE_TIMES, &[metrics::FULL_UPDATE]);
let log = &self.log;
let fork = self
.inner
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.get_beacon_states_fork(StateId::Head)
.await
.map_err(|e| {
trace!(
log,
"Fork update failed";
"error" => format!("Error retrieving fork: {:?}", e)
)
})?
.ok_or_else(|| {
trace!(
log,
"Fork update failed";
"error" => "The beacon head fork is unknown"
)
})
.map(|result| result.data)
})
.await
.map_err(|_| ())?;
if *(self.fork.read()) != fork {
*(self.fork.write()) = fork;
}
debug!(self.log, "Fork update success");
// Returning an error will stop the interval. This is not desired, a single failure
// should not stop all future attempts.
Ok(())
}
}

View File

@@ -4,7 +4,7 @@
use crate::doppelganger_service::DoppelgangerService;
use crate::{
http_api::{ApiSecret, Config as HttpConfig, Context},
Config, ForkServiceBuilder, InitializedValidators, ValidatorDefinitions, ValidatorStore,
Config, InitializedValidators, ValidatorDefinitions, ValidatorStore,
};
use account_utils::{
eth2_wallet::WalletBuilder, mnemonic_from_phrase, random_mnemonic, random_password,
@@ -17,10 +17,11 @@ use eth2_keystore::KeystoreBuilder;
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use slot_clock::TestingSlotClock;
use slot_clock::{SlotClock, TestingSlotClock};
use std::marker::PhantomData;
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;
use tempfile::{tempdir, TempDir};
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
@@ -73,20 +74,19 @@ impl ApiTester {
let spec = E::default_spec();
let fork_service = ForkServiceBuilder::testing_only(spec.clone(), log.clone())
.build()
.unwrap();
let slashing_db_path = config.validator_dir.join(SLASHING_PROTECTION_FILENAME);
let slashing_protection = SlashingDatabase::open_or_create(&slashing_db_path).unwrap();
let validator_store: ValidatorStore<TestingSlotClock, E> = ValidatorStore::new(
let slot_clock =
TestingSlotClock::new(Slot::new(0), Duration::from_secs(0), Duration::from_secs(1));
let validator_store = ValidatorStore::<_, E>::new(
initialized_validators,
slashing_protection,
Hash256::repeat_byte(42),
spec,
fork_service.clone(),
Some(Arc::new(DoppelgangerService::new(log.clone()))),
slot_clock,
log.clone(),
);
@@ -96,7 +96,7 @@ impl ApiTester {
let initialized_validators = validator_store.initialized_validators();
let context: Arc<Context<TestingSlotClock, E>> = Arc::new(Context {
let context = Arc::new(Context {
runtime,
api_secret,
validator_dir: Some(validator_dir.path().into()),

View File

@@ -67,16 +67,26 @@ lazy_static::lazy_static! {
"Total count of attempted SelectionProof signings",
&["status"]
);
pub static ref SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"vc_signed_sync_committee_messages_total",
"Total count of attempted SyncCommitteeMessage signings",
&["status"]
);
pub static ref SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"vc_signed_sync_committee_contributions_total",
"Total count of attempted ContributionAndProof signings",
&["status"]
);
pub static ref SIGNED_SYNC_SELECTION_PROOFS_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"vc_signed_sync_selection_proofs_total",
"Total count of attempted SyncSelectionProof signings",
&["status"]
);
pub static ref DUTIES_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
"vc_duties_service_task_times_seconds",
"Duration to perform duties service tasks",
&["task"]
);
pub static ref FORK_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
"vc_fork_service_task_times_seconds",
"Duration to perform fork service tasks",
&["task"]
);
pub static ref ATTESTATION_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
"vc_attestation_service_task_times_seconds",
"Duration to perform attestation service tasks",

View File

@@ -5,12 +5,12 @@ mod check_synced;
mod cli;
mod config;
mod duties_service;
mod fork_service;
mod graffiti_file;
mod http_metrics;
mod initialized_validators;
mod key_cache;
mod notifier;
mod sync_committee_service;
mod validator_store;
mod doppelganger_service;
@@ -31,9 +31,7 @@ use block_service::{BlockService, BlockServiceBuilder};
use clap::ArgMatches;
use duties_service::DutiesService;
use environment::RuntimeContext;
use eth2::types::StateId;
use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts};
use fork_service::{ForkService, ForkServiceBuilder};
use http_api::ApiSecret;
use initialized_validators::InitializedValidators;
use notifier::spawn_notifier;
@@ -46,11 +44,12 @@ use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use sync_committee_service::SyncCommitteeService;
use tokio::{
sync::mpsc,
time::{sleep, Duration},
};
use types::{EthSpec, Fork, Hash256};
use types::{EthSpec, Hash256};
use validator_store::ValidatorStore;
/// The interval between attempts to contact the beacon node during startup.
@@ -66,6 +65,7 @@ const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
@@ -73,9 +73,9 @@ const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
pub struct ProductionValidatorClient<T: EthSpec> {
context: RuntimeContext<T>,
duties_service: Arc<DutiesService<SystemTimeSlotClock, T>>,
fork_service: ForkService<SystemTimeSlotClock, T>,
block_service: BlockService<SystemTimeSlotClock, T>,
attestation_service: AttestationService<SystemTimeSlotClock, T>,
sync_committee_service: SyncCommitteeService<SystemTimeSlotClock, T>,
doppelganger_service: Option<Arc<DoppelgangerService>>,
validator_store: Arc<ValidatorStore<SystemTimeSlotClock, T>>,
http_api_listen_addr: Option<SocketAddr>,
@@ -263,6 +263,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
}
} else {
Timeouts::set_all(slot_duration)
@@ -293,7 +294,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone());
// Perform some potentially long-running initialization tasks.
let (genesis_time, genesis_validators_root, fork) = tokio::select! {
let (genesis_time, genesis_validators_root) = tokio::select! {
tuple = init_from_beacon_node(&beacon_nodes, &context) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string())
};
@@ -313,13 +314,6 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let beacon_nodes = Arc::new(beacon_nodes);
start_fallback_updater_service(context.clone(), beacon_nodes.clone())?;
let fork_service = ForkServiceBuilder::new()
.fork(fork)
.slot_clock(slot_clock.clone())
.beacon_nodes(beacon_nodes.clone())
.log(log.clone())
.build()?;
let doppelganger_service = if config.enable_doppelganger_protection {
Some(Arc::new(DoppelgangerService::new(
context
@@ -331,16 +325,15 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
None
};
let validator_store: Arc<ValidatorStore<SystemTimeSlotClock, T>> =
Arc::new(ValidatorStore::new(
validators,
slashing_protection,
genesis_validators_root,
context.eth2_config.spec.clone(),
fork_service.clone(),
doppelganger_service.clone(),
log.clone(),
));
let validator_store = Arc::new(ValidatorStore::new(
validators,
slashing_protection,
genesis_validators_root,
context.eth2_config.spec.clone(),
doppelganger_service.clone(),
slot_clock.clone(),
log.clone(),
));
info!(
log,
@@ -359,6 +352,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let duties_service = Arc::new(DutiesService {
attesters: <_>::default(),
proposers: <_>::default(),
sync_duties: <_>::default(),
slot_clock: slot_clock.clone(),
beacon_nodes: beacon_nodes.clone(),
validator_store: validator_store.clone(),
@@ -394,6 +388,14 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.runtime_context(context.service_context("attestation".into()))
.build()?;
let sync_committee_service = SyncCommitteeService::new(
duties_service.clone(),
validator_store.clone(),
slot_clock,
beacon_nodes.clone(),
context.service_context("sync_committee".into()),
);
// Wait until genesis has occured.
//
// It seems most sensible to move this into the `start_service` function, but I'm caution
@@ -406,9 +408,9 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
Ok(Self {
context,
duties_service,
fork_service,
block_service,
attestation_service,
sync_committee_service,
doppelganger_service,
validator_store,
config,
@@ -427,11 +429,6 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
duties_service::start_update_service(self.duties_service.clone(), block_service_tx);
self.fork_service
.clone()
.start_update_service(&self.context)
.map_err(|e| format!("Unable to start fork service: {}", e))?;
self.block_service
.clone()
.start_update_service(block_service_rx)
@@ -442,6 +439,11 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.start_update_service(&self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start attestation service: {}", e))?;
self.sync_committee_service
.clone()
.start_update_service(&self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start sync committee service: {}", e))?;
if let Some(doppelganger_service) = self.doppelganger_service.clone() {
DoppelgangerService::start_update_service(
doppelganger_service,
@@ -461,7 +463,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let api_secret = ApiSecret::create_or_open(&self.config.validator_dir)?;
self.http_api_listen_addr = if self.config.http_api.enabled {
let ctx: Arc<http_api::Context<SystemTimeSlotClock, T>> = Arc::new(http_api::Context {
let ctx = Arc::new(http_api::Context {
runtime: self.context.executor.runtime(),
api_secret,
validator_store: Some(self.validator_store.clone()),
@@ -495,7 +497,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
async fn init_from_beacon_node<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
context: &RuntimeContext<E>,
) -> Result<(u64, Hash256, Fork), String> {
) -> Result<(u64, Hash256), String> {
loop {
beacon_nodes.update_unready_candidates().await;
let num_available = beacon_nodes.num_available().await;
@@ -554,33 +556,7 @@ async fn init_from_beacon_node<E: EthSpec>(
sleep(RETRY_DELAY).await;
};
let fork = loop {
match beacon_nodes
.first_success(RequireSynced::No, |node| async move {
node.get_beacon_states_fork(StateId::Head).await
})
.await
{
Ok(Some(fork)) => break fork.data,
Ok(None) => {
info!(
context.log(),
"Failed to get fork, state not found";
);
}
Err(errors) => {
error!(
context.log(),
"Failed to get fork";
"error" => %errors
);
}
}
sleep(RETRY_DELAY).await;
};
Ok((genesis.genesis_time, genesis.genesis_validators_root, fork))
Ok((genesis.genesis_time, genesis.genesis_validators_root))
}
async fn wait_for_genesis<E: EthSpec>(

View File

@@ -0,0 +1,537 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
use environment::RuntimeContext;
use eth2::types::BlockId;
use futures::future::FutureExt;
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::time::{sleep, sleep_until, Duration, Instant};
use types::{
ChainSpec, EthSpec, Hash256, PublicKeyBytes, Slot, SyncCommitteeSubscription,
SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId,
};
pub const SUBSCRIPTION_LOOKAHEAD_EPOCHS: u64 = 4;
pub struct SyncCommitteeService<T: SlotClock + 'static, E: EthSpec> {
inner: Arc<Inner<T, E>>,
}
impl<T: SlotClock + 'static, E: EthSpec> Clone for SyncCommitteeService<T, E> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: SlotClock + 'static, E: EthSpec> Deref for SyncCommitteeService<T, E> {
type Target = Inner<T, E>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
pub struct Inner<T: SlotClock + 'static, E: EthSpec> {
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
/// Boolean to track whether the service has posted subscriptions to the BN at least once.
///
/// This acts as a latch that fires once upon start-up, and then never again.
first_subscription_done: AtomicBool,
}
impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
pub fn new(
duties_service: Arc<DutiesService<T, E>>,
validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>,
) -> Self {
Self {
inner: Arc::new(Inner {
duties_service,
validator_store,
slot_clock,
beacon_nodes,
context,
first_subscription_done: AtomicBool::new(false),
}),
}
}
/// Check if the Altair fork has been activated and therefore sync duties should be performed.
///
/// Slot clock errors are mapped to `false`.
fn altair_fork_activated(&self) -> bool {
self.duties_service
.spec
.altair_fork_epoch
.and_then(|fork_epoch| {
let current_epoch = self.slot_clock.now()?.epoch(E::slots_per_epoch());
Some(current_epoch >= fork_epoch)
})
.unwrap_or(false)
}
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
let log = self.context.log().clone();
let slot_duration = Duration::from_secs(spec.seconds_per_slot);
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?;
info!(
log,
"Sync committee service started";
"next_update_millis" => duration_to_next_slot.as_millis()
);
let executor = self.context.executor.clone();
let interval_fut = async move {
loop {
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
// Wait for contribution broadcast interval 1/3 of the way through the slot.
let log = self.context.log();
sleep(duration_to_next_slot + slot_duration / 3).await;
// Do nothing if the Altair fork has not yet occurred.
if !self.altair_fork_activated() {
continue;
}
if let Err(e) = self.spawn_contribution_tasks(slot_duration).await {
crit!(
log,
"Failed to spawn sync contribution tasks";
"error" => e
)
} else {
trace!(
log,
"Spawned sync contribution tasks";
)
}
// Do subscriptions for future slots/epochs.
self.spawn_subscription_tasks();
} else {
error!(log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
}
}
};
executor.spawn(interval_fut, "sync_committee_service");
Ok(())
}
async fn spawn_contribution_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let log = self.context.log().clone();
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?;
// If a validator needs to publish a sync aggregate, they must do so at 2/3
// through the slot. This delay triggers at this time
let aggregate_production_instant = Instant::now()
+ duration_to_next_slot
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));
let slot_duties = self
.duties_service
.sync_duties
.get_duties_for_slot::<E>(slot, &self.duties_service.spec)
.ok_or_else(|| format!("Error fetching duties for slot {}", slot))?;
if slot_duties.duties.is_empty() {
debug!(
log,
"No local validators in current sync committee";
"slot" => slot,
);
return Ok(());
}
// Fetch block root for `SyncCommitteeContribution`.
let block_root = self
.beacon_nodes
.first_success(RequireSynced::Yes, |beacon_node| async move {
beacon_node.get_beacon_blocks_root(BlockId::Head).await
})
.await
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("No block root found for slot {}", slot))?
.data
.root;
// Spawn one task to publish all of the sync committee signatures.
let validator_duties = slot_duties.duties;
self.inner.context.executor.spawn(
self.clone()
.publish_sync_committee_signatures(slot, block_root, validator_duties)
.map(|_| ()),
"sync_committee_signature_publish",
);
let aggregators = slot_duties.aggregators;
self.inner.context.executor.spawn(
self.clone()
.publish_sync_committee_aggregates(
slot,
block_root,
aggregators,
aggregate_production_instant,
)
.map(|_| ()),
"sync_committee_aggregate_publish",
);
Ok(())
}
/// Publish sync committee signatures.
async fn publish_sync_committee_signatures(
self,
slot: Slot,
beacon_block_root: Hash256,
validator_duties: Vec<SyncDuty>,
) -> Result<(), ()> {
let log = self.context.log().clone();
let committee_signatures = validator_duties
.iter()
.filter_map(|duty| {
self.validator_store
.produce_sync_committee_signature(
slot,
beacon_block_root,
duty.validator_index,
&duty.pubkey,
)
.map_err(|e| {
crit!(
log,
"Failed to sign sync committee signature";
"validator_index" => duty.validator_index,
"slot" => slot,
"error" => ?e,
);
})
.ok()
})
.collect::<Vec<_>>();
let signatures_slice = &committee_signatures;
self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_beacon_pool_sync_committee_signatures(signatures_slice)
.await
})
.await
.map_err(|e| {
error!(
log,
"Unable to publish sync committee messages";
"slot" => slot,
"error" => %e,
);
})?;
info!(
log,
"Successfully published sync committee messages";
"count" => committee_signatures.len(),
"head_block" => ?beacon_block_root,
"slot" => slot,
);
Ok(())
}
async fn publish_sync_committee_aggregates(
self,
slot: Slot,
beacon_block_root: Hash256,
aggregators: HashMap<SyncSubnetId, Vec<(u64, PublicKeyBytes, SyncSelectionProof)>>,
aggregate_instant: Instant,
) {
for (subnet_id, subnet_aggregators) in aggregators {
let service = self.clone();
self.inner.context.executor.spawn(
service
.publish_sync_committee_aggregate_for_subnet(
slot,
beacon_block_root,
subnet_id,
subnet_aggregators,
aggregate_instant,
)
.map(|_| ()),
"sync_committee_aggregate_publish_subnet",
);
}
}
async fn publish_sync_committee_aggregate_for_subnet(
self,
slot: Slot,
beacon_block_root: Hash256,
subnet_id: SyncSubnetId,
subnet_aggregators: Vec<(u64, PublicKeyBytes, SyncSelectionProof)>,
aggregate_instant: Instant,
) -> Result<(), ()> {
sleep_until(aggregate_instant).await;
let log = self.context.log();
let contribution = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let sync_contribution_data = SyncContributionData {
slot,
beacon_block_root,
subcommittee_index: subnet_id.into(),
};
beacon_node
.get_validator_sync_committee_contribution::<E>(&sync_contribution_data)
.await
})
.await
.map_err(|e| {
crit!(
log,
"Failed to produce sync contribution";
"slot" => slot,
"beacon_block_root" => ?beacon_block_root,
"error" => %e,
)
})?
.ok_or_else(|| {
crit!(
log,
"No aggregate contribution found";
"slot" => slot,
"beacon_block_root" => ?beacon_block_root,
);
})?
.data;
// Make `SignedContributionAndProof`s
let signed_contributions = subnet_aggregators
.into_iter()
.filter_map(|(aggregator_index, aggregator_pk, selection_proof)| {
self.validator_store
.produce_signed_contribution_and_proof(
aggregator_index,
&aggregator_pk,
contribution.clone(),
selection_proof,
)
.map_err(|e| {
crit!(
log,
"Unable to sign sync committee contribution";
"slot" => slot,
"error" => ?e,
);
})
.ok()
})
.collect::<Vec<_>>();
// Publish to the beacon node.
let signed_contributions_slice = &signed_contributions;
self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_validator_contribution_and_proofs(signed_contributions_slice)
.await
})
.await
.map_err(|e| {
error!(
log,
"Unable to publish signed contributions and proofs";
"slot" => slot,
"error" => %e,
);
})?;
info!(
log,
"Successfully published sync contributions";
"subnet" => %subnet_id,
"beacon_block_root" => %beacon_block_root,
"num_signers" => contribution.aggregation_bits.num_set_bits(),
"slot" => slot,
);
Ok(())
}
fn spawn_subscription_tasks(&self) {
let service = self.clone();
let log = self.context.log().clone();
self.inner.context.executor.spawn(
async move {
service.publish_subscriptions().await.unwrap_or_else(|e| {
error!(
log,
"Error publishing subscriptions";
"error" => ?e,
)
});
},
"sync_committee_subscription_publish",
);
}
async fn publish_subscriptions(self) -> Result<(), String> {
let log = self.context.log().clone();
let spec = &self.duties_service.spec;
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let mut duty_slots = vec![];
let mut all_succeeded = true;
// At the start of every epoch during the current period, re-post the subscriptions
// to the beacon node. This covers the case where the BN has forgotten the subscriptions
// due to a restart, or where the VC has switched to a fallback BN.
let current_period = sync_period_of_slot::<E>(slot, spec)?;
if !self.first_subscription_done.load(Ordering::Relaxed)
|| slot.as_u64() % E::slots_per_epoch() == 0
{
duty_slots.push((slot, current_period));
}
// Near the end of the current period, push subscriptions for the next period to the
// beacon node. We aggressively push every slot in the lead-up, as this is the main way
// that we want to ensure that the BN is subscribed (well in advance).
let lookahead_slot = slot + SUBSCRIPTION_LOOKAHEAD_EPOCHS * E::slots_per_epoch();
let lookahead_period = sync_period_of_slot::<E>(lookahead_slot, spec)?;
if lookahead_period > current_period {
duty_slots.push((lookahead_slot, lookahead_period));
}
if duty_slots.is_empty() {
return Ok(());
}
// Collect subscriptions.
let mut subscriptions = vec![];
for (duty_slot, sync_committee_period) in duty_slots {
debug!(
log,
"Fetching subscription duties";
"duty_slot" => duty_slot,
"current_slot" => slot,
);
match self
.duties_service
.sync_duties
.get_duties_for_slot::<E>(duty_slot, spec)
{
Some(duties) => subscriptions.extend(subscriptions_from_sync_duties(
duties.duties,
sync_committee_period,
spec,
)),
None => {
warn!(
log,
"Missing duties for subscription";
"slot" => duty_slot,
);
all_succeeded = false;
}
}
}
// Post subscriptions to BN.
debug!(
log,
"Posting sync subscriptions to BN";
"count" => subscriptions.len(),
);
let subscriptions_slice = &subscriptions;
for subscription in subscriptions_slice {
debug!(
log,
"Subscription";
"validator_index" => subscription.validator_index,
"validator_sync_committee_indices" => ?subscription.sync_committee_indices,
"until_epoch" => subscription.until_epoch,
);
}
if let Err(e) = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_validator_sync_committee_subscriptions(subscriptions_slice)
.await
})
.await
{
error!(
log,
"Unable to post sync committee subscriptions";
"slot" => slot,
"error" => %e,
);
all_succeeded = false;
}
// Disable first-subscription latch once all duties have succeeded once.
if all_succeeded {
self.first_subscription_done.store(true, Ordering::Relaxed);
}
Ok(())
}
}
fn sync_period_of_slot<E: EthSpec>(slot: Slot, spec: &ChainSpec) -> Result<u64, String> {
slot.epoch(E::slots_per_epoch())
.sync_committee_period(spec)
.map_err(|e| format!("Error computing sync period: {:?}", e))
}
fn subscriptions_from_sync_duties(
duties: Vec<SyncDuty>,
sync_committee_period: u64,
spec: &ChainSpec,
) -> impl Iterator<Item = SyncCommitteeSubscription> {
let until_epoch = spec.epochs_per_sync_committee_period * (sync_committee_period + 1);
duties
.into_iter()
.map(move |duty| SyncCommitteeSubscription {
validator_index: duty.validator_index,
sync_committee_indices: duty.validator_sync_committee_indices,
until_epoch,
})
}

View File

@@ -1,5 +1,5 @@
use crate::{
doppelganger_service::DoppelgangerService, fork_service::ForkService, http_metrics::metrics,
doppelganger_service::DoppelgangerService, http_metrics::metrics,
initialized_validators::InitializedValidators,
};
use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString};
@@ -8,12 +8,15 @@ use slashing_protection::{NotSafe, Safe, SlashingDatabase};
use slog::{crit, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use types::{
attestation::Error as AttestationError, graffiti::GraffitiString, Attestation, BeaconBlock,
ChainSpec, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes,
SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot,
SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock,
SignedContributionAndProof, SignedRoot, Slot, SyncCommitteeContribution, SyncCommitteeMessage,
SyncSelectionProof, SyncSubnetId,
};
use validator_dir::ValidatorDir;
@@ -69,8 +72,8 @@ pub struct ValidatorStore<T, E: EthSpec> {
spec: Arc<ChainSpec>,
log: Logger,
doppelganger_service: Option<Arc<DoppelgangerService>>,
fork_service: ForkService<T, E>,
slot_clock: T,
_phantom: PhantomData<E>,
}
impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
@@ -79,8 +82,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
slashing_protection: SlashingDatabase,
genesis_validators_root: Hash256,
spec: ChainSpec,
fork_service: ForkService<T, E>,
doppelganger_service: Option<Arc<DoppelgangerService>>,
slot_clock: T,
log: Logger,
) -> Self {
Self {
@@ -89,10 +92,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
slashing_protection_last_prune: Arc::new(Mutex::new(Epoch::new(0))),
genesis_validators_root,
spec: Arc::new(spec),
log: log.clone(),
log,
doppelganger_service,
slot_clock: fork_service.slot_clock(),
fork_service,
slot_clock,
_phantom: PhantomData,
}
}
@@ -253,8 +256,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
self.validators.read().num_enabled()
}
fn fork(&self) -> Fork {
self.fork_service.fork()
fn fork(&self, epoch: Epoch) -> Fork {
self.spec.fork_at_epoch(epoch)
}
/// Runs `func`, providing it access to the `Keypair` corresponding to `validator_pubkey`.
@@ -301,7 +304,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
let domain = self.spec.get_domain(
epoch,
Domain::Randao,
&self.fork(),
&self.fork(epoch),
self.genesis_validators_root,
);
let message = epoch.signing_root(domain);
@@ -334,7 +337,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
}
// Check for slashing conditions.
let fork = self.fork();
let fork = self.fork(block.epoch());
let domain = self.spec.get_domain(
block.epoch(),
Domain::BeaconProposer,
@@ -403,7 +406,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
}
// Checking for slashing conditions.
let fork = self.fork();
let fork = self.fork(attestation.data.target.epoch);
let domain = self.spec.get_domain(
attestation.data.target.epoch,
@@ -486,8 +489,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
aggregate: Attestation<E>,
selection_proof: SelectionProof,
) -> Result<SignedAggregateAndProof<E>, Error> {
// Take the fork early to avoid lock interleaving.
let fork = self.fork();
let fork = self.fork(aggregate.data.target.epoch);
let proof = self.with_validator_keypair(validator_pubkey, move |keypair| {
SignedAggregateAndProof::from_aggregate(
@@ -513,9 +515,6 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
validator_pubkey: PublicKeyBytes,
slot: Slot,
) -> Result<SelectionProof, Error> {
// Take the fork early to avoid lock interleaving.
let fork = self.fork();
// Bypass the `with_validator_keypair` function.
//
// This is because we don't care about doppelganger protection when it comes to selection
@@ -531,7 +530,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
let proof = SelectionProof::new::<E>(
slot,
&keypair.sk,
&fork,
&self.fork(slot.epoch(E::slots_per_epoch())),
self.genesis_validators_root,
&self.spec,
);
@@ -541,6 +540,93 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
Ok(proof)
}
/// Produce a `SyncSelectionProof` for `slot` signed by the secret key of `validator_pubkey`.
pub fn produce_sync_selection_proof(
&self,
validator_pubkey: &PublicKeyBytes,
slot: Slot,
subnet_id: SyncSubnetId,
) -> Result<SyncSelectionProof, Error> {
// Bypass `with_validator_keypair`: sync committee messages are not slashable.
let validators = self.validators.read();
let voting_keypair = validators
.voting_keypair(validator_pubkey)
.ok_or(Error::UnknownPubkey(*validator_pubkey))?;
metrics::inc_counter_vec(
&metrics::SIGNED_SYNC_SELECTION_PROOFS_TOTAL,
&[metrics::SUCCESS],
);
Ok(SyncSelectionProof::new::<E>(
slot,
subnet_id.into(),
&voting_keypair.sk,
&self.fork(slot.epoch(E::slots_per_epoch())),
self.genesis_validators_root,
&self.spec,
))
}
pub fn produce_sync_committee_signature(
&self,
slot: Slot,
beacon_block_root: Hash256,
validator_index: u64,
validator_pubkey: &PublicKeyBytes,
) -> Result<SyncCommitteeMessage, Error> {
// Bypass `with_validator_keypair`: sync committee messages are not slashable.
let validators = self.validators.read();
let voting_keypair = validators
.voting_keypair(validator_pubkey)
.ok_or(Error::UnknownPubkey(*validator_pubkey))?;
metrics::inc_counter_vec(
&metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL,
&[metrics::SUCCESS],
);
Ok(SyncCommitteeMessage::new::<E>(
slot,
beacon_block_root,
validator_index,
&voting_keypair.sk,
&self.fork(slot.epoch(E::slots_per_epoch())),
self.genesis_validators_root,
&self.spec,
))
}
pub fn produce_signed_contribution_and_proof(
&self,
aggregator_index: u64,
aggregator_pubkey: &PublicKeyBytes,
contribution: SyncCommitteeContribution<E>,
selection_proof: SyncSelectionProof,
) -> Result<SignedContributionAndProof<E>, Error> {
// Bypass `with_validator_keypair`: sync committee messages are not slashable.
let validators = self.validators.read();
let voting_keypair = validators
.voting_keypair(aggregator_pubkey)
.ok_or(Error::UnknownPubkey(*aggregator_pubkey))?;
let fork = self.fork(contribution.slot.epoch(E::slots_per_epoch()));
metrics::inc_counter_vec(
&metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL,
&[metrics::SUCCESS],
);
Ok(SignedContributionAndProof::from_aggregate(
aggregator_index,
contribution,
Some(selection_proof),
&voting_keypair.sk,
&fork,
self.genesis_validators_root,
&self.spec,
))
}
/// Prune the slashing protection database so that it remains performant.
///
/// This function will only do actual pruning periodically, so it should usually be