mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 18:21:45 +00:00
Update VC and BN APIs for naive aggregation (#950)
* Refactor `Attestation` production * Add constant * Start refactor for aggregation * Return early when no attesting validators * Refactor into individual functions * Tidy, add comments * Add first draft of NaiveAggregationPool * Further progress on naive aggregation pool * Fix compile errors in VC * Change locking logic for naive pool * Introduce AttesationType * Add pruning, comments * Add MAX_ATTESTATIONS_PER_SLOT restriction * Add pruning based on slot * Update BN for new aggregation fns * Fix test compile errors * Fix failing rest_api test * Move SignedAggregateAndProof into own file * Update docs, fix warning * Tidy some formatting in validator API * Remove T::default_spec from signing * Fix failing rest test * Tidy * Add test, fix bug * Improve naive pool tests * Add max attestations test * Revert changes to the op_pool * Refactor timer
This commit is contained in:
@@ -4,17 +4,17 @@ use crate::{
|
||||
};
|
||||
use environment::RuntimeContext;
|
||||
use exit_future::Signal;
|
||||
use futures::{Future, Stream};
|
||||
use futures::{future, Future, Stream};
|
||||
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
|
||||
use rest_types::{ValidatorDuty, ValidatorSubscription};
|
||||
use slog::{crit, info, trace};
|
||||
use rest_types::ValidatorSubscription;
|
||||
use slog::{crit, debug, info, trace};
|
||||
use slot_clock::SlotClock;
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::timer::{Delay, Interval};
|
||||
use types::{AggregateAndProof, ChainSpec, CommitteeIndex, EthSpec, Slot};
|
||||
use types::{Attestation, ChainSpec, CommitteeIndex, EthSpec, Slot};
|
||||
|
||||
/// Builds an `AttestationService`.
|
||||
pub struct AttestationServiceBuilder<T, E: EthSpec> {
|
||||
@@ -193,13 +193,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
|
||||
// If a validator needs to publish an aggregate attestation, they must do so at 2/3
|
||||
// through the slot. This delay triggers at this time
|
||||
let aggregator_delay_instant = {
|
||||
if duration_to_next_slot <= slot_duration / 3 {
|
||||
Instant::now()
|
||||
} else {
|
||||
Instant::now() + duration_to_next_slot - (slot_duration / 3)
|
||||
}
|
||||
};
|
||||
let aggregate_production_instant = Instant::now()
|
||||
+ duration_to_next_slot
|
||||
.checked_sub(slot_duration / 3)
|
||||
.unwrap_or_else(|| Duration::from_secs(0));
|
||||
|
||||
let epoch = slot.epoch(E::slots_per_epoch());
|
||||
// Check if any attestation subscriptions are required. If there a new attestation duties for
|
||||
@@ -217,61 +214,39 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
.executor
|
||||
.spawn(self.clone().send_subscriptions(duties_to_subscribe));
|
||||
|
||||
// Builds a map of committee index and spawn individual tasks to process raw attestations
|
||||
// and aggregated attestations
|
||||
let mut committee_indices: HashMap<CommitteeIndex, Vec<ValidatorDuty>> = HashMap::new();
|
||||
let mut aggregator_committee_indices: HashMap<CommitteeIndex, Vec<DutyAndState>> =
|
||||
HashMap::new();
|
||||
|
||||
service
|
||||
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndState>> = service
|
||||
.duties_service
|
||||
.attesters(slot)
|
||||
.into_iter()
|
||||
.for_each(|duty_and_state| {
|
||||
.fold(HashMap::new(), |mut map, duty_and_state| {
|
||||
if let Some(committee_index) = duty_and_state.duty.attestation_committee_index {
|
||||
let validator_duties = committee_indices
|
||||
.entry(committee_index)
|
||||
.or_insert_with(|| vec![]);
|
||||
validator_duties.push(duty_and_state.duty.clone());
|
||||
let validator_duties = map.entry(committee_index).or_insert_with(|| vec![]);
|
||||
|
||||
// If this duty entails the validator aggregating attestations, perform
|
||||
// aggregation tasks
|
||||
if duty_and_state.is_aggregator() {
|
||||
let validator_duties = aggregator_committee_indices
|
||||
.entry(committee_index)
|
||||
.or_insert_with(|| vec![]);
|
||||
validator_duties.push(duty_and_state);
|
||||
}
|
||||
validator_duties.push(duty_and_state);
|
||||
}
|
||||
|
||||
map
|
||||
});
|
||||
|
||||
// spawns tasks for all required raw attestations production
|
||||
committee_indices
|
||||
// For each committee index for this slot:
|
||||
//
|
||||
// - Create and publish an `Attestation` for all required validators.
|
||||
// - Create and publish `SignedAggregateAndProof` for all aggregating validators.
|
||||
duties_by_committee_index
|
||||
.into_iter()
|
||||
.for_each(|(committee_index, validator_duties)| {
|
||||
// Spawn a separate task for each attestation.
|
||||
service.context.executor.spawn(self.clone().do_attestation(
|
||||
slot,
|
||||
committee_index,
|
||||
validator_duties,
|
||||
));
|
||||
});
|
||||
|
||||
// spawns tasks for all aggregate attestation production
|
||||
aggregator_committee_indices
|
||||
.into_iter()
|
||||
.for_each(|(committee_index, validator_duties)| {
|
||||
// Spawn a separate task for each aggregate attestation.
|
||||
service
|
||||
.context
|
||||
.executor
|
||||
.spawn(self.clone().do_aggregate_attestation(
|
||||
.spawn(self.clone().publish_attestations_and_aggregates(
|
||||
slot,
|
||||
committee_index,
|
||||
validator_duties,
|
||||
Delay::new(aggregator_delay_instant.clone()),
|
||||
aggregate_production_instant,
|
||||
));
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -280,7 +255,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
/// This informs the beacon node that the validator has a duty on a particular
|
||||
/// slot allowing the beacon node to connect to the required subnet and determine
|
||||
/// if attestations need to be aggregated.
|
||||
fn send_subscriptions(&self, duties: Vec<ValidatorDuty>) -> impl Future<Item = (), Error = ()> {
|
||||
fn send_subscriptions(&self, duties: Vec<DutyAndState>) -> impl Future<Item = (), Error = ()> {
|
||||
let mut validator_subscriptions = Vec::new();
|
||||
let mut successful_duties = Vec::new();
|
||||
|
||||
@@ -293,12 +268,13 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
// builds a list of subscriptions
|
||||
for duty in duties {
|
||||
if let Some((slot, attestation_committee_index, _, validator_index)) =
|
||||
attestation_duties(&duty)
|
||||
duty.attestation_duties()
|
||||
{
|
||||
if let Some(slot_signature) =
|
||||
self.validator_store.sign_slot(&duty.validator_pubkey, slot)
|
||||
if let Some(slot_signature) = self
|
||||
.validator_store
|
||||
.sign_slot(duty.validator_pubkey(), slot)
|
||||
{
|
||||
let is_aggregator_proof = if duty.is_aggregator(&slot_signature) {
|
||||
let is_aggregator_proof = if duty.is_aggregator() {
|
||||
Some(slot_signature.clone())
|
||||
} else {
|
||||
None
|
||||
@@ -348,7 +324,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
for (duty, is_aggregator_proof) in successful_duties {
|
||||
service_1
|
||||
.duties_service
|
||||
.subscribe_duty(&duty, is_aggregator_proof);
|
||||
.subscribe_duty(&duty.duty, is_aggregator_proof);
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
@@ -361,218 +337,377 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
})
|
||||
}
|
||||
|
||||
/// For a given `committee_index`, download the attestation, have each validator in
|
||||
/// `validator_duties` sign it and send the collection back to the beacon node.
|
||||
fn do_attestation(
|
||||
&self,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
validator_duties: Vec<ValidatorDuty>,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
let service_1 = self.clone();
|
||||
let service_2 = self.clone();
|
||||
let log_1 = self.context.log.clone();
|
||||
let log_2 = self.context.log.clone();
|
||||
|
||||
self.beacon_node
|
||||
.http
|
||||
.validator()
|
||||
.produce_attestation(slot, committee_index)
|
||||
.map_err(|e| format!("Failed to produce attestation: {:?}", e))
|
||||
.map(move |attestation| {
|
||||
validator_duties.iter().fold(
|
||||
(Vec::new(), attestation),
|
||||
|(mut attestation_list, attestation), duty| {
|
||||
let log = service_1.context.log.clone();
|
||||
|
||||
if let Some((
|
||||
duty_slot,
|
||||
duty_committee_index,
|
||||
validator_committee_position,
|
||||
_,
|
||||
)) = attestation_duties(duty)
|
||||
{
|
||||
let mut raw_attestation = attestation.clone();
|
||||
if duty_slot == slot && duty_committee_index == committee_index {
|
||||
if service_1
|
||||
.validator_store
|
||||
.sign_attestation(
|
||||
&duty.validator_pubkey,
|
||||
validator_committee_position,
|
||||
&mut raw_attestation,
|
||||
)
|
||||
.is_none()
|
||||
{
|
||||
crit!(log, "Failed to sign attestation");
|
||||
} else {
|
||||
attestation_list.push(raw_attestation);
|
||||
}
|
||||
} else {
|
||||
crit!(log, "Inconsistent validator duties during signing");
|
||||
}
|
||||
} else {
|
||||
crit!(log, "Missing validator duties when signing");
|
||||
}
|
||||
|
||||
(attestation_list, attestation)
|
||||
},
|
||||
)
|
||||
})
|
||||
.and_then(move |(attestation_list, attestation)| {
|
||||
service_2
|
||||
.beacon_node
|
||||
.http
|
||||
.validator()
|
||||
.publish_attestations(attestation_list.clone())
|
||||
.map(|publish_status| (attestation_list, attestation, publish_status))
|
||||
.map_err(|e| format!("Failed to publish attestations: {:?}", e))
|
||||
})
|
||||
.map(
|
||||
move |(attestation_list, attestation, publish_status)| match publish_status {
|
||||
PublishStatus::Valid => info!(
|
||||
log_1,
|
||||
"Successfully published attestation";
|
||||
"signatures" => attestation_list.len(),
|
||||
"head_block" => format!("{}", attestation.data.beacon_block_root),
|
||||
"committee_index" => attestation.data.index,
|
||||
"slot" => attestation.data.slot.as_u64(),
|
||||
),
|
||||
PublishStatus::Invalid(msg) => crit!(
|
||||
log_1,
|
||||
"Published attestation was invalid";
|
||||
"message" => msg,
|
||||
"committee_index" => attestation.data.index,
|
||||
"slot" => attestation.data.slot.as_u64(),
|
||||
),
|
||||
PublishStatus::Unknown => {
|
||||
crit!(log_1, "Unknown condition when publishing attestation")
|
||||
}
|
||||
},
|
||||
)
|
||||
.map_err(move |e| {
|
||||
crit!(
|
||||
log_2,
|
||||
"Error during attestation production";
|
||||
"error" => e
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// For a given `committee_index`, download the aggregate attestation, have it signed by all validators
|
||||
/// in `validator_duties` then upload it.
|
||||
fn do_aggregate_attestation(
|
||||
/// Performs the first step of the attesting process: downloading `Attestation` objects,
|
||||
/// signing them and returning them to the validator.
|
||||
///
|
||||
/// https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/validator.md#attesting
|
||||
///
|
||||
/// ## Detail
|
||||
///
|
||||
/// The given `validator_duties` should already be filtered to only contain those that match
|
||||
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
|
||||
fn publish_attestations_and_aggregates(
|
||||
&self,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
validator_duties: Vec<DutyAndState>,
|
||||
aggregator_delay: Delay,
|
||||
) -> impl Future<Item = (), Error = ()> {
|
||||
aggregate_production_instant: Instant,
|
||||
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
|
||||
// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
|
||||
// any validators for the given `slot` and `committee_index`.
|
||||
if validator_duties.is_empty() {
|
||||
return Box::new(future::ok(()));
|
||||
}
|
||||
|
||||
let service_1 = self.clone();
|
||||
let log_1 = self.context.log.clone();
|
||||
let validator_duties_1 = Arc::new(validator_duties);
|
||||
let validator_duties_2 = validator_duties_1.clone();
|
||||
|
||||
Box::new(
|
||||
// Step 1.
|
||||
//
|
||||
// Download, sign and publish an `Attestation` for each validator.
|
||||
self.produce_and_publish_attestations(slot, committee_index, validator_duties_1)
|
||||
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(
|
||||
move |attestation_opt| {
|
||||
if let Some(attestation) = attestation_opt {
|
||||
Box::new(
|
||||
// Step 2. (Only if step 1 produced an attestation)
|
||||
//
|
||||
// First, wait until the `aggregation_production_instant` (2/3rds
|
||||
// of the way though the slot). As verified in the
|
||||
// `delay_triggers_when_in_the_past` test, this code will still run
|
||||
// even if the instant has already elapsed.
|
||||
//
|
||||
// Then download, sign and publish a `SignedAggregateAndProof` for each
|
||||
// validator that is elected to aggregate for this `slot` and
|
||||
// `committee_index`.
|
||||
Delay::new(aggregate_production_instant)
|
||||
.map_err(|e| {
|
||||
format!(
|
||||
"Unable to create aggregate production delay: {:?}",
|
||||
e
|
||||
)
|
||||
})
|
||||
.and_then(move |()| {
|
||||
service_1.produce_and_publish_aggregates(
|
||||
attestation,
|
||||
validator_duties_2,
|
||||
)
|
||||
}),
|
||||
)
|
||||
} else {
|
||||
// If `produce_and_publish_attestations` did not download any
|
||||
// attestations then there is no need to produce any
|
||||
// `SignedAggregateAndProof`.
|
||||
Box::new(future::ok(()))
|
||||
}
|
||||
},
|
||||
)
|
||||
.map_err(move |e| {
|
||||
crit!(
|
||||
log_1,
|
||||
"Error during attestation routine";
|
||||
"error" => format!("{:?}", e),
|
||||
"committee_index" => committee_index,
|
||||
"slot" => slot.as_u64(),
|
||||
)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Performs the first step of the attesting process: downloading `Attestation` objects,
|
||||
/// signing them and returning them to the validator.
|
||||
///
|
||||
/// https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/validator.md#attesting
|
||||
///
|
||||
/// ## Detail
|
||||
///
|
||||
/// The given `validator_duties` should already be filtered to only contain those that match
|
||||
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
|
||||
///
|
||||
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each
|
||||
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
|
||||
fn produce_and_publish_attestations(
|
||||
&self,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
validator_duties: Arc<Vec<DutyAndState>>,
|
||||
) -> Box<dyn Future<Item = Option<Attestation<E>>, Error = String> + Send> {
|
||||
if validator_duties.is_empty() {
|
||||
return Box::new(future::ok(None));
|
||||
}
|
||||
|
||||
let service = self.clone();
|
||||
|
||||
Box::new(
|
||||
self.beacon_node
|
||||
.http
|
||||
.validator()
|
||||
.produce_attestation(slot, committee_index)
|
||||
.map_err(|e| format!("Failed to produce attestation: {:?}", e))
|
||||
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(move |attestation| {
|
||||
let log = service.context.log.clone();
|
||||
|
||||
// For each validator in `validator_duties`, clone the `attestation` and add
|
||||
// their signature.
|
||||
//
|
||||
// If any validator is unable to sign, they are simply skipped.
|
||||
let signed_attestations = validator_duties
|
||||
.iter()
|
||||
.filter_map(|duty| {
|
||||
let log = service.context.log.clone();
|
||||
|
||||
// Ensure that all required fields are present in the validator duty.
|
||||
let (duty_slot, duty_committee_index, validator_committee_position, _) =
|
||||
if let Some(tuple) = duty.attestation_duties() {
|
||||
tuple
|
||||
} else {
|
||||
crit!(
|
||||
log,
|
||||
"Missing validator duties when signing";
|
||||
"duties" => format!("{:?}", duty)
|
||||
);
|
||||
return None;
|
||||
};
|
||||
|
||||
// Ensure that the attestation matches the duties.
|
||||
if duty_slot != attestation.data.slot
|
||||
|| duty_committee_index != attestation.data.index
|
||||
{
|
||||
crit!(
|
||||
log,
|
||||
"Inconsistent validator duties during signing";
|
||||
"validator" => format!("{:?}", duty.validator_pubkey()),
|
||||
"duty_slot" => duty_slot,
|
||||
"attestation_slot" => attestation.data.slot,
|
||||
"duty_index" => duty_committee_index,
|
||||
"attestation_index" => attestation.data.index,
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut attestation = attestation.clone();
|
||||
|
||||
if service
|
||||
.validator_store
|
||||
.sign_attestation(
|
||||
duty.validator_pubkey(),
|
||||
validator_committee_position,
|
||||
&mut attestation,
|
||||
)
|
||||
.is_none()
|
||||
{
|
||||
crit!(
|
||||
log,
|
||||
"Attestation signing refused";
|
||||
"validator" => format!("{:?}", duty.validator_pubkey()),
|
||||
"slot" => attestation.data.slot,
|
||||
"index" => attestation.data.index,
|
||||
);
|
||||
None
|
||||
} else {
|
||||
Some(attestation)
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// If there are any signed attestations, publish them to the BN. Otherwise,
|
||||
// just return early.
|
||||
if let Some(attestation) = signed_attestations.first().cloned() {
|
||||
let num_attestations = signed_attestations.len();
|
||||
let beacon_block_root = attestation.data.beacon_block_root;
|
||||
|
||||
Box::new(
|
||||
service
|
||||
.beacon_node
|
||||
.http
|
||||
.validator()
|
||||
.publish_attestations(signed_attestations)
|
||||
.map_err(|e| format!("Failed to publish attestation: {:?}", e))
|
||||
.map(move |publish_status| match publish_status {
|
||||
PublishStatus::Valid => info!(
|
||||
log,
|
||||
"Successfully published attestations";
|
||||
"count" => num_attestations,
|
||||
"head_block" => format!("{:?}", beacon_block_root),
|
||||
"committee_index" => committee_index,
|
||||
"slot" => slot.as_u64(),
|
||||
),
|
||||
PublishStatus::Invalid(msg) => crit!(
|
||||
log,
|
||||
"Published attestation was invalid";
|
||||
"message" => msg,
|
||||
"committee_index" => committee_index,
|
||||
"slot" => slot.as_u64(),
|
||||
),
|
||||
PublishStatus::Unknown => {
|
||||
crit!(log, "Unknown condition when publishing attestation")
|
||||
}
|
||||
})
|
||||
.map(|()| Some(attestation)),
|
||||
)
|
||||
} else {
|
||||
debug!(
|
||||
log,
|
||||
"No attestations to publish";
|
||||
"committee_index" => committee_index,
|
||||
"slot" => slot.as_u64(),
|
||||
);
|
||||
Box::new(future::ok(None))
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Performs the second step of the attesting process: downloading an aggregated `Attestation`,
|
||||
/// converting it into a `SignedAggregateAndProof` and returning it to the BN.
|
||||
///
|
||||
/// https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/validator.md#broadcast-aggregate
|
||||
///
|
||||
/// ## Detail
|
||||
///
|
||||
/// The given `validator_duties` should already be filtered to only contain those that match
|
||||
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
|
||||
///
|
||||
/// Only one aggregated `Attestation` is downloaded from the BN. It is then cloned and signed
|
||||
/// by each validator and the list of individually-signed `SignedAggregateAndProof` objects is
|
||||
/// returned to the BN.
|
||||
fn produce_and_publish_aggregates(
|
||||
&self,
|
||||
attestation: Attestation<E>,
|
||||
validator_duties: Arc<Vec<DutyAndState>>,
|
||||
) -> impl Future<Item = (), Error = String> {
|
||||
let service_1 = self.clone();
|
||||
let service_2 = self.clone();
|
||||
let log_1 = self.context.log.clone();
|
||||
let log_2 = self.context.log.clone();
|
||||
|
||||
self.beacon_node
|
||||
.http
|
||||
.validator()
|
||||
.produce_aggregate_attestation(slot, committee_index)
|
||||
.produce_aggregate_attestation(&attestation.data)
|
||||
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))
|
||||
.map(move |attestation| {
|
||||
validator_duties.iter().fold(
|
||||
(Vec::new(), attestation),
|
||||
|(mut aggregate_and_proof_list, attestation), duty_and_state| {
|
||||
let log = service_1.context.log.clone();
|
||||
|
||||
match (
|
||||
duty_and_state.selection_proof(),
|
||||
attestation_duties(&duty_and_state.duty),
|
||||
) {
|
||||
(
|
||||
Some(selection_proof),
|
||||
Some((duty_slot, duty_committee_index, _, aggregator_index)),
|
||||
) => {
|
||||
let raw_attestation = attestation.clone();
|
||||
if duty_slot == slot && duty_committee_index == committee_index {
|
||||
// build the `AggregateAndProof` struct for each validator
|
||||
let aggregate_and_proof = AggregateAndProof {
|
||||
aggregator_index,
|
||||
aggregate: raw_attestation,
|
||||
selection_proof,
|
||||
};
|
||||
|
||||
if let Some(signed_aggregate_and_proof) =
|
||||
service_1.validator_store.sign_aggregate_and_proof(
|
||||
&duty_and_state.duty.validator_pubkey,
|
||||
aggregate_and_proof,
|
||||
)
|
||||
{
|
||||
aggregate_and_proof_list.push(signed_aggregate_and_proof);
|
||||
} else {
|
||||
crit!(log, "Failed to sign attestation");
|
||||
}
|
||||
} else {
|
||||
crit!(log, "Inconsistent validator duties during signing");
|
||||
}
|
||||
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(
|
||||
move |aggregated_attestation| {
|
||||
// For each validator, clone the `aggregated_attestation` and convert it into
|
||||
// a `SignedAggregateAndProof`
|
||||
let signed_aggregate_and_proofs = validator_duties
|
||||
.iter()
|
||||
.filter_map(|duty_and_state| {
|
||||
// Do not produce a signed aggregator for validators that are not
|
||||
// subscribed aggregators.
|
||||
//
|
||||
// Note: this function returns `false` if the validator is required to
|
||||
// be an aggregator but has not yet subscribed.
|
||||
if !duty_and_state.is_aggregator() {
|
||||
return None;
|
||||
}
|
||||
_ => crit!(
|
||||
log,
|
||||
"Missing validator duties or not aggregate duty when signing"
|
||||
),
|
||||
}
|
||||
|
||||
(aggregate_and_proof_list, attestation)
|
||||
},
|
||||
)
|
||||
})
|
||||
.and_then(move |(aggregate_and_proof_list, attestation)| {
|
||||
aggregator_delay
|
||||
.map(move |_| (aggregate_and_proof_list, attestation))
|
||||
.map_err(move |e| format!("Error during aggregator delay: {:?}", e))
|
||||
})
|
||||
.and_then(move |(aggregate_and_proof_list, attestation)| {
|
||||
service_2
|
||||
.beacon_node
|
||||
.http
|
||||
.validator()
|
||||
.publish_aggregate_and_proof(aggregate_and_proof_list)
|
||||
.map(|publish_status| (attestation, publish_status))
|
||||
.map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e))
|
||||
})
|
||||
.map(move |(attestation, publish_status)| match publish_status {
|
||||
PublishStatus::Valid => info!(
|
||||
log_1,
|
||||
"Successfully published aggregate attestations";
|
||||
"signatures" => attestation.aggregation_bits.num_set_bits(),
|
||||
"head_block" => format!("{}", attestation.data.beacon_block_root),
|
||||
"committee_index" => attestation.data.index,
|
||||
"slot" => attestation.data.slot.as_u64(),
|
||||
),
|
||||
PublishStatus::Invalid(msg) => crit!(
|
||||
log_1,
|
||||
"Published attestation was invalid";
|
||||
"message" => msg,
|
||||
"committee_index" => attestation.data.index,
|
||||
"slot" => attestation.data.slot.as_u64(),
|
||||
),
|
||||
PublishStatus::Unknown => {
|
||||
crit!(log_1, "Unknown condition when publishing attestation")
|
||||
}
|
||||
})
|
||||
.map_err(move |e| {
|
||||
crit!(
|
||||
log_2,
|
||||
"Error during attestation production";
|
||||
"error" => e
|
||||
)
|
||||
})
|
||||
let (duty_slot, duty_committee_index, _, validator_index) =
|
||||
duty_and_state.attestation_duties().or_else(|| {
|
||||
crit!(log_1, "Missing duties when signing aggregate");
|
||||
None
|
||||
})?;
|
||||
|
||||
let pubkey = &duty_and_state.duty.validator_pubkey;
|
||||
let slot = attestation.data.slot;
|
||||
let committee_index = attestation.data.index;
|
||||
|
||||
if duty_slot != slot || duty_committee_index != committee_index {
|
||||
crit!(log_1, "Inconsistent validator duties during signing");
|
||||
return None;
|
||||
}
|
||||
|
||||
if let Some(signed_aggregate_and_proof) = service_1
|
||||
.validator_store
|
||||
.produce_signed_aggregate_and_proof(
|
||||
pubkey,
|
||||
validator_index,
|
||||
aggregated_attestation.clone(),
|
||||
)
|
||||
{
|
||||
Some(signed_aggregate_and_proof)
|
||||
} else {
|
||||
crit!(log_1, "Failed to sign attestation");
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// If there any signed aggregates and proofs were produced, publish them to the
|
||||
// BN.
|
||||
if let Some(first) = signed_aggregate_and_proofs.first().cloned() {
|
||||
let attestation = first.message.aggregate;
|
||||
|
||||
Box::new(service_1
|
||||
.beacon_node
|
||||
.http
|
||||
.validator()
|
||||
.publish_aggregate_and_proof(signed_aggregate_and_proofs)
|
||||
.map(|publish_status| (attestation, publish_status))
|
||||
.map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e))
|
||||
.map(move |(attestation, publish_status)| match publish_status {
|
||||
PublishStatus::Valid => info!(
|
||||
log_1,
|
||||
"Successfully published aggregate attestations";
|
||||
"signatures" => attestation.aggregation_bits.num_set_bits(),
|
||||
"head_block" => format!("{}", attestation.data.beacon_block_root),
|
||||
"committee_index" => attestation.data.index,
|
||||
"slot" => attestation.data.slot.as_u64(),
|
||||
),
|
||||
PublishStatus::Invalid(msg) => crit!(
|
||||
log_1,
|
||||
"Published attestation was invalid";
|
||||
"message" => msg,
|
||||
"committee_index" => attestation.data.index,
|
||||
"slot" => attestation.data.slot.as_u64(),
|
||||
),
|
||||
PublishStatus::Unknown => {
|
||||
crit!(log_1, "Unknown condition when publishing attestation")
|
||||
}
|
||||
}))
|
||||
} else {
|
||||
debug!(
|
||||
log_1,
|
||||
"No signed aggregates to publish";
|
||||
"committee_index" => attestation.data.index,
|
||||
"slot" => attestation.data.slot.as_u64(),
|
||||
);
|
||||
Box::new(future::ok(()))
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn attestation_duties(duty: &ValidatorDuty) -> Option<(Slot, CommitteeIndex, usize, u64)> {
|
||||
Some((
|
||||
duty.attestation_slot?,
|
||||
duty.attestation_committee_index?,
|
||||
duty.attestation_committee_position?,
|
||||
duty.validator_index?,
|
||||
))
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use parking_lot::RwLock;
|
||||
use tokio::runtime::Builder as RuntimeBuilder;
|
||||
|
||||
/// This test is to ensure that a `tokio_timer::Delay` with an instant in the past will still
|
||||
/// trigger.
|
||||
#[test]
|
||||
fn delay_triggers_when_in_the_past() {
|
||||
let in_the_past = Instant::now() - Duration::from_secs(2);
|
||||
let state_1 = Arc::new(RwLock::new(in_the_past));
|
||||
let state_2 = state_1.clone();
|
||||
|
||||
let future = Delay::new(in_the_past)
|
||||
.map_err(|_| panic!("Failed to create duration"))
|
||||
.map(move |()| *state_1.write() = Instant::now());
|
||||
|
||||
let mut runtime = RuntimeBuilder::new()
|
||||
.core_threads(1)
|
||||
.build()
|
||||
.expect("failed to start runtime");
|
||||
|
||||
runtime.block_on(future).expect("failed to complete future");
|
||||
|
||||
assert!(
|
||||
*state_2.read() > in_the_past,
|
||||
"state should have been updated"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::timer::Interval;
|
||||
use types::{ChainSpec, Epoch, EthSpec, PublicKey, Slot};
|
||||
use types::{ChainSpec, CommitteeIndex, Epoch, EthSpec, PublicKey, Slot};
|
||||
|
||||
/// Delay this period of time after the slot starts. This allows the node to process the new slot.
|
||||
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100);
|
||||
@@ -24,14 +24,6 @@ const PRUNE_DEPTH: u64 = 4;
|
||||
|
||||
type BaseHashMap = HashMap<PublicKey, HashMap<Epoch, DutyAndState>>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DutyAndState {
|
||||
/// The validator duty.
|
||||
pub duty: ValidatorDuty,
|
||||
/// The current state of the validator duty.
|
||||
state: DutyState,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DutyState {
|
||||
/// This duty has not been subscribed to the beacon node.
|
||||
@@ -43,6 +35,14 @@ pub enum DutyState {
|
||||
SubscribedAggregator(Signature),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DutyAndState {
|
||||
/// The validator duty.
|
||||
pub duty: ValidatorDuty,
|
||||
/// The current state of the validator duty.
|
||||
state: DutyState,
|
||||
}
|
||||
|
||||
impl DutyAndState {
|
||||
/// Returns true if the duty is an aggregation duty (the validator must aggregate all
|
||||
/// attestations.
|
||||
@@ -70,6 +70,21 @@ impl DutyAndState {
|
||||
DutyState::SubscribedAggregator(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the information required for an attesting validator, if they are scheduled to
|
||||
/// attest.
|
||||
pub fn attestation_duties(&self) -> Option<(Slot, CommitteeIndex, usize, u64)> {
|
||||
Some((
|
||||
self.duty.attestation_slot?,
|
||||
self.duty.attestation_committee_index?,
|
||||
self.duty.attestation_committee_position?,
|
||||
self.duty.validator_index?,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn validator_pubkey(&self) -> &PublicKey {
|
||||
&self.duty.validator_pubkey
|
||||
}
|
||||
}
|
||||
|
||||
impl TryInto<DutyAndState> for ValidatorDutyBytes {
|
||||
@@ -166,7 +181,7 @@ impl DutiesStore {
|
||||
/// Gets a list of validator duties for an epoch that have not yet been subscribed
|
||||
/// to the beacon node.
|
||||
// Note: Potentially we should modify the data structure to store the unsubscribed epoch duties for validator clients with a large number of validators. This currently adds an O(N) search each slot.
|
||||
fn unsubscribed_epoch_duties(&self, epoch: &Epoch) -> Vec<ValidatorDuty> {
|
||||
fn unsubscribed_epoch_duties(&self, epoch: &Epoch) -> Vec<DutyAndState> {
|
||||
self.store
|
||||
.read()
|
||||
.iter()
|
||||
@@ -179,7 +194,7 @@ impl DutiesStore {
|
||||
}
|
||||
})
|
||||
})
|
||||
.map(|duties| duties.duty.clone())
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -403,7 +418,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
||||
}
|
||||
|
||||
/// Returns all `ValidatorDuty` that have not been registered with the beacon node.
|
||||
pub fn unsubscribed_epoch_duties(&self, epoch: &Epoch) -> Vec<ValidatorDuty> {
|
||||
pub fn unsubscribed_epoch_duties(&self, epoch: &Epoch) -> Vec<DutyAndState> {
|
||||
self.store.unsubscribed_epoch_duties(epoch)
|
||||
}
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.into_future()
|
||||
.map_err(|e| format!("Unable to read system time: {:?}", e))
|
||||
.and_then(move |now| {
|
||||
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(move |now| {
|
||||
let log = log_3.clone();
|
||||
let genesis = Duration::from_secs(genesis_time);
|
||||
|
||||
@@ -114,9 +114,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
//
|
||||
// If the validator client starts before genesis, it will get errors from
|
||||
// the slot clock.
|
||||
let box_future: Box<dyn Future<Item = _, Error = _> + Send> = if now
|
||||
< genesis
|
||||
{
|
||||
if now < genesis {
|
||||
info!(
|
||||
log,
|
||||
"Starting node prior to genesis";
|
||||
@@ -138,9 +136,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
);
|
||||
|
||||
Box::new(future::ok((beacon_node, remote_eth2_config, genesis_time)))
|
||||
};
|
||||
|
||||
box_future
|
||||
}
|
||||
})
|
||||
})
|
||||
.and_then(move |(beacon_node, remote_eth2_config, genesis_time)| {
|
||||
|
||||
@@ -12,8 +12,8 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tempdir::TempDir;
|
||||
use types::{
|
||||
AggregateAndProof, Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork,
|
||||
PublicKey, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot,
|
||||
Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, PublicKey, Signature,
|
||||
SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -199,6 +199,28 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Signs an `AggregateAndProof` for a given validator.
|
||||
///
|
||||
/// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be
|
||||
/// modified by actors other than the signing validator.
|
||||
pub fn produce_signed_aggregate_and_proof(
|
||||
&self,
|
||||
validator_pubkey: &PublicKey,
|
||||
validator_index: u64,
|
||||
aggregate: Attestation<E>,
|
||||
) -> Option<SignedAggregateAndProof<E>> {
|
||||
let validators = self.validators.read();
|
||||
let voting_keypair = validators.get(validator_pubkey)?.voting_keypair.as_ref()?;
|
||||
|
||||
Some(SignedAggregateAndProof::from_aggregate(
|
||||
validator_index,
|
||||
aggregate,
|
||||
&voting_keypair.sk,
|
||||
&self.fork()?,
|
||||
&self.spec,
|
||||
))
|
||||
}
|
||||
|
||||
/// Signs a slot for a given validator.
|
||||
///
|
||||
/// This is used to subscribe a validator to a beacon node and is used to determine if the
|
||||
@@ -217,19 +239,4 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
|
||||
Some(Signature::new(message.as_bytes(), &voting_keypair.sk))
|
||||
}
|
||||
|
||||
/// Signs an `AggregateAndProof` for a given validator.
|
||||
///
|
||||
/// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be
|
||||
/// modified by actors other than the signing validator.
|
||||
pub fn sign_aggregate_and_proof(
|
||||
&self,
|
||||
validator_pubkey: &PublicKey,
|
||||
aggregate_and_proof: AggregateAndProof<E>,
|
||||
) -> Option<SignedAggregateAndProof<E>> {
|
||||
let validators = self.validators.read();
|
||||
let voting_keypair = validators.get(validator_pubkey)?.voting_keypair.as_ref()?;
|
||||
|
||||
Some(aggregate_and_proof.into_signed(&voting_keypair.sk, &self.fork()?))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user