Update /validator/subscribe (#969)

* Add progress on duties refactor

* Add simple is_aggregator bool to val subscription

* Remove unused function
This commit is contained in:
Paul Hauner
2020-03-30 14:26:54 +11:00
committed by Age Manning
parent cf2cb26caa
commit aa6f838c3c
9 changed files with 102 additions and 162 deletions

View File

@@ -256,48 +256,34 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
/// slot allowing the beacon node to connect to the required subnet and determine
/// if attestations need to be aggregated.
fn send_subscriptions(&self, duties: Vec<DutyAndState>) -> impl Future<Item = (), Error = ()> {
let mut validator_subscriptions = Vec::new();
let mut successful_duties = Vec::new();
let service_1 = self.clone();
let duties_no = duties.len();
let num_duties = duties.len();
let log_1 = self.context.log.clone();
let log_2 = self.context.log.clone();
// builds a list of subscriptions
for duty in duties {
if let Some((slot, attestation_committee_index, _, validator_index)) =
duty.attestation_duties()
{
if let Some(slot_signature) = self
let (validator_subscriptions, successful_duties): (Vec<_>, Vec<_>) = duties
.into_iter()
.filter_map(|duty| {
let (slot, attestation_committee_index, _, validator_index) =
duty.attestation_duties()?;
let selection_proof = self
.validator_store
.sign_slot(duty.validator_pubkey(), slot)
{
let is_aggregator_proof = if duty.is_aggregator() {
Some(slot_signature.clone())
} else {
None
};
.produce_selection_proof(duty.validator_pubkey(), slot)?;
let modulo = duty.duty.aggregator_modulo?;
let subscription = ValidatorSubscription::new(
validator_index,
attestation_committee_index,
slot,
slot_signature,
);
validator_subscriptions.push(subscription);
let subscription = ValidatorSubscription {
validator_index,
attestation_committee_index,
slot,
is_aggregator: selection_proof.is_aggregator(modulo),
};
// add successful duties to the list, along with whether they are aggregation
// duties or not
successful_duties.push((duty, is_aggregator_proof));
}
} else {
crit!(log_2, "Validator duty doesn't have required fields");
}
}
Some((subscription, (duty, selection_proof)))
})
.unzip();
let failed_duties = duties_no - successful_duties.len();
let num_failed_duties = num_duties - successful_duties.len();
self.beacon_node
.http
@@ -308,8 +294,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
PublishStatus::Valid => info!(
log_1,
"Successfully subscribed validators";
"validators" => duties_no,
"failed_validators" => failed_duties,
"validators" => num_duties,
"failed_validators" => num_failed_duties,
),
PublishStatus::Invalid(msg) => crit!(
log_1,
@@ -321,10 +307,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
}
})
.and_then(move |_| {
for (duty, is_aggregator_proof) in successful_duties {
for (duty, selection_proof) in successful_duties {
service_1
.duties_service
.subscribe_duty(&duty.duty, is_aggregator_proof);
.subscribe_duty(&duty.duty, selection_proof);
}
Ok(())
})