Port validator_client to stable futures (#1114)

* Add PH & MS slot clock changes

* Account for genesis time

* Add progress on duties refactor

* Add simple is_aggregator bool to val subscription

* Start work on attestation_verification.rs

* Add progress on ObservedAttestations

* Progress with ObservedAttestations

* Fix tests

* Add observed attestations to the beacon chain

* Add attestation observation to processing code

* Add progress on attestation verification

* Add first draft of ObservedAttesters

* Add more tests

* Add observed attesters to beacon chain

* Add observers to attestation processing

* Add more attestation verification

* Create ObservedAggregators map

* Remove commented-out code

* Add observed aggregators into chain

* Add progress

* Finish adding features to attestation verification

* Ensure beacon chain compiles

* Link attn verification into chain

* Integrate new attn verification in chain

* Remove old attestation processing code

* Start trying to fix beacon_chain tests

* Split adding into pools into two functions

* Add aggregation to harness

* Get test harness working again

* Adjust the number of aggregators for test harness

* Fix edge-case in harness

* Integrate new attn processing in network

* Fix compile bug in validator_client

* Update validator API endpoints

* Fix aggreagation in test harness

* Fix enum thing

* Fix attestation observation bug:

* Patch failing API tests

* Start adding comments to attestation verification

* Remove unused attestation field

* Unify "is block known" logic

* Update comments

* Supress fork choice errors for network processing

* Add todos

* Tidy

* Add gossip attn tests

* Disallow test harness to produce old attns

* Comment out in-progress tests

* Partially address pruning tests

* Fix failing store test

* Add aggregate tests

* Add comments about which spec conditions we check

* Dont re-aggregate

* Split apart test harness attn production

* Fix compile error in network

* Make progress on commented-out test

* Fix skipping attestation test

* Add fork choice verification tests

* Tidy attn tests, remove dead code

* Remove some accidentally added code

* Fix clippy lint

* Rename test file

* Add block tests, add cheap block proposer check

* Rename block testing file

* Add observed_block_producers

* Tidy

* Switch around block signature verification

* Finish block testing

* Remove gossip from signature tests

* First pass of self review

* Fix deviation in spec

* Update test spec tags

* Start moving over to hashset

* Finish moving observed attesters to hashmap

* Move aggregation pool over to hashmap

* Make fc attn borrow again

* Fix rest_api compile error

* Fix missing comments

* Fix monster test

* Uncomment increasing slots test

* Address remaining comments

* Remove unsafe, use cfg test

* Remove cfg test flag

* Fix dodgy comment

* Revert "Update hashmap hashset to stable futures"

This reverts commit d432378a3c.

* Revert "Adds panic test to hashset delay"

This reverts commit 281502396f.

* Ported attestation_service

* Ported duties_service

* Ported fork_service

* More ports

* Port block_service

* Minor fixes

* VC compiles

* Update TODOS

* Borrow self where possible

* Ignore aggregates that are already known.

* Unify aggregator modulo logic

* Fix typo in logs

* Refactor validator subscription logic

* Avoid reproducing selection proof

* Skip HTTP call if no subscriptions

* Rename DutyAndState -> DutyAndProof

* Tidy logs

* Print root as dbg

* Fix compile errors in tests

* Fix compile error in test

* Re-Fix attestation and duties service

* Minor fixes

Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
Pawan Dhananjay
2020-05-07 10:29:26 +05:30
committed by GitHub
parent 3e44d7a258
commit c99b134e0f
9 changed files with 826 additions and 970 deletions

View File

@@ -4,15 +4,14 @@ use crate::{
};
use environment::RuntimeContext;
use exit_future::Signal;
use futures::{future, Future, Stream};
use futures::{FutureExt, StreamExt};
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
use slog::{crit, debug, info, trace};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::timer::{Delay, Interval};
use tokio::time::{delay_until, interval_at, Duration, Instant};
use types::{Attestation, ChainSpec, CommitteeIndex, EthSpec, Slot};
/// Builds an `AttestationService`.
@@ -130,7 +129,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.ok_or_else(|| "Unable to determine duration to next slot".to_string())?;
let interval = {
Interval::new(
// Note: `interval_at` panics if `slot_duration` is 0
interval_at(
Instant::now() + duration_to_next_slot + slot_duration / 3,
slot_duration,
)
@@ -140,38 +140,28 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let service = self.clone();
let log_1 = log.clone();
let log_2 = log.clone();
let log_3 = log.clone();
context.executor.spawn(
exit_fut
.until(
interval
.map_err(move |e| {
crit! {
log_1,
"Timer thread failed";
"error" => format!("{}", e)
}
})
.for_each(move |_| {
if let Err(e) = service.spawn_attestation_tasks(slot_duration) {
crit!(
log_2,
"Failed to spawn attestation tasks";
"error" => e
)
} else {
trace!(
log_2,
"Spawned attestation tasks";
)
}
Ok(())
}),
let interval_fut = interval.for_each(move |_| {
if let Err(e) = service.spawn_attestation_tasks(slot_duration) {
crit!(
log_1,
"Failed to spawn attestation tasks";
"error" => e
)
.map(move |_| info!(log_3, "Shutdown complete")),
} else {
trace!(
log_1,
"Spawned attestation tasks";
)
}
futures::future::ready(())
});
let future = futures::future::select(
interval_fut,
exit_fut.map(move |_| info!(log_2, "Shutdown complete")),
);
tokio::task::spawn(future);
Ok(exit_signal)
}
@@ -181,11 +171,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let service = self.clone();
let slot = service
let slot = self
.slot_clock
.now()
.ok_or_else(|| "Failed to read slot clock".to_string())?;
let duration_to_next_slot = service
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or_else(|| "Unable to determine duration to next slot".to_string())?;
@@ -197,7 +187,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = service
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service
.attesters(slot)
.into_iter()
@@ -219,15 +209,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.into_iter()
.for_each(|(committee_index, validator_duties)| {
// Spawn a separate task for each attestation.
service
.context
.executor
.spawn(self.clone().publish_attestations_and_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
));
tokio::task::spawn(service.clone().publish_attestations_and_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
));
});
Ok(())
@@ -242,75 +229,68 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
fn publish_attestations_and_aggregates(
&self,
async fn publish_attestations_and_aggregates(
self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: Vec<DutyAndProof>,
aggregate_production_instant: Instant,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
) -> Result<(), ()> {
// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
// any validators for the given `slot` and `committee_index`.
if validator_duties.is_empty() {
return Box::new(future::ok(()));
return Ok(());
}
let service_1 = self.clone();
let log_1 = self.context.log.clone();
let log_2 = self.context.log.clone();
let validator_duties_1 = Arc::new(validator_duties);
let validator_duties_2 = validator_duties_1.clone();
Box::new(
// Step 1.
//
// Download, sign and publish an `Attestation` for each validator.
self.produce_and_publish_attestations(slot, committee_index, validator_duties_1)
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(
move |attestation_opt| {
if let Some(attestation) = attestation_opt {
Box::new(
// Step 2. (Only if step 1 produced an attestation)
//
// First, wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
//
// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
Delay::new(aggregate_production_instant)
.map_err(|e| {
format!(
"Unable to create aggregate production delay: {:?}",
e
)
})
.and_then(move |()| {
service_1.produce_and_publish_aggregates(
attestation,
validator_duties_2,
)
}),
)
} else {
// If `produce_and_publish_attestations` did not download any
// attestations then there is no need to produce any
// `SignedAggregateAndProof`.
Box::new(future::ok(()))
}
},
// Step 1.
//
// Download, sign and publish an `Attestation` for each validator.
let attestation_opt = self
.produce_and_publish_attestations(slot, committee_index, validator_duties_1)
.await
.map_err(move |e| {
crit!(
log_1,
"Error during attestation routine";
"error" => format!("{:?}", e),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
)
.map_err(move |e| {
crit!(
log_1,
"Error during attestation routine";
"error" => format!("{:?}", e),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
)
}),
)
})?;
if let Some(attestation) = attestation_opt {
// Step 2. (Only if step 1 produced an attestation)
//
// First, wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
//
// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
delay_until(aggregate_production_instant).await;
self.produce_and_publish_aggregates(attestation, validator_duties_2)
.await
} else {
// If `produce_and_publish_attestations` did not download any
// attestations then there is no need to produce any
// `SignedAggregateAndProof`.
Ok(())
}
.map_err(move |e| {
crit!(
log_2,
"Error during attestation routine";
"error" => format!("{:?}", e),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
)
})
}
/// Performs the first step of the attesting process: downloading `Attestation` objects,
@@ -325,139 +305,132 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
///
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
fn produce_and_publish_attestations(
async fn produce_and_publish_attestations(
&self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: Arc<Vec<DutyAndProof>>,
) -> Box<dyn Future<Item = Option<Attestation<E>>, Error = String> + Send> {
) -> Result<Option<Attestation<E>>, String> {
if validator_duties.is_empty() {
return Box::new(future::ok(None));
return Ok(None);
}
let service = self.clone();
let attestation = self
.beacon_node
.http
.validator()
.produce_attestation(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation: {:?}", e))?;
let log = self.context.log.clone();
// For each validator in `validator_duties`, clone the `attestation` and add
// their signature.
//
// If any validator is unable to sign, they are simply skipped.
let signed_attestations = validator_duties
.iter()
.filter_map(|duty| {
let log = self.context.log.clone();
// Ensure that all required fields are present in the validator duty.
let (duty_slot, duty_committee_index, validator_committee_position, _) =
if let Some(tuple) = duty.attestation_duties() {
tuple
} else {
crit!(
log,
"Missing validator duties when signing";
"duties" => format!("{:?}", duty)
);
return None;
};
// Ensure that the attestation matches the duties.
if duty_slot != attestation.data.slot
|| duty_committee_index != attestation.data.index
{
crit!(
log,
"Inconsistent validator duties during signing";
"validator" => format!("{:?}", duty.validator_pubkey()),
"duty_slot" => duty_slot,
"attestation_slot" => attestation.data.slot,
"duty_index" => duty_committee_index,
"attestation_index" => attestation.data.index,
);
return None;
}
let mut attestation = attestation.clone();
if self
.validator_store
.sign_attestation(
duty.validator_pubkey(),
validator_committee_position,
&mut attestation,
)
.is_none()
{
crit!(
log,
"Attestation signing refused";
"validator" => format!("{:?}", duty.validator_pubkey()),
"slot" => attestation.data.slot,
"index" => attestation.data.index,
);
None
} else {
Some(attestation)
}
})
.collect::<Vec<_>>();
// If there are any signed attestations, publish them to the BN. Otherwise,
// just return early.
if let Some(attestation) = signed_attestations.first().cloned() {
let num_attestations = signed_attestations.len();
let beacon_block_root = attestation.data.beacon_block_root;
Box::new(
self.beacon_node
.http
.validator()
.produce_attestation(slot, committee_index)
.map_err(|e| format!("Failed to produce attestation: {:?}", e))
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(move |attestation| {
let log = service.context.log.clone();
// For each validator in `validator_duties`, clone the `attestation` and add
// their signature.
//
// If any validator is unable to sign, they are simply skipped.
let signed_attestations = validator_duties
.iter()
.filter_map(|duty| {
let log = service.context.log.clone();
// Ensure that all required fields are present in the validator duty.
let (duty_slot, duty_committee_index, validator_committee_position, _) =
if let Some(tuple) = duty.attestation_duties() {
tuple
} else {
crit!(
log,
"Missing validator duties when signing";
"duties" => format!("{:?}", duty)
);
return None;
};
// Ensure that the attestation matches the duties.
if duty_slot != attestation.data.slot
|| duty_committee_index != attestation.data.index
{
crit!(
log,
"Inconsistent validator duties during signing";
"validator" => format!("{:?}", duty.validator_pubkey()),
"duty_slot" => duty_slot,
"attestation_slot" => attestation.data.slot,
"duty_index" => duty_committee_index,
"attestation_index" => attestation.data.index,
);
return None;
}
let mut attestation = attestation.clone();
if service
.validator_store
.sign_attestation(
duty.validator_pubkey(),
validator_committee_position,
&mut attestation,
)
.is_none()
{
crit!(
log,
"Attestation signing refused";
"validator" => format!("{:?}", duty.validator_pubkey()),
"slot" => attestation.data.slot,
"index" => attestation.data.index,
);
None
} else {
Some(attestation)
}
})
.collect::<Vec<_>>();
// If there are any signed attestations, publish them to the BN. Otherwise,
// just return early.
if let Some(attestation) = signed_attestations.first().cloned() {
let num_attestations = signed_attestations.len();
let beacon_block_root = attestation.data.beacon_block_root;
Box::new(
service
.beacon_node
.http
.validator()
.publish_attestations(signed_attestations)
.map_err(|e| format!("Failed to publish attestation: {:?}", e))
.map(move |publish_status| match publish_status {
PublishStatus::Valid => info!(
log,
"Successfully published attestations";
"count" => num_attestations,
"head_block" => format!("{:?}", beacon_block_root),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
),
PublishStatus::Invalid(msg) => crit!(
log,
"Published attestation was invalid";
"message" => msg,
"committee_index" => committee_index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
),
PublishStatus::Unknown => crit!(
log,
"Unknown condition when publishing unagg. attestation"
),
})
.map(|()| Some(attestation)),
)
} else {
debug!(
log,
"No attestations to publish";
"committee_index" => committee_index,
"slot" => slot.as_u64(),
);
Box::new(future::ok(None))
.publish_attestations(signed_attestations)
.await
.map_err(|e| format!("Failed to publish attestation: {:?}", e))
.map(move |publish_status| match publish_status {
PublishStatus::Valid => info!(
log,
"Successfully published attestations";
"count" => num_attestations,
"head_block" => format!("{:?}", beacon_block_root),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
),
PublishStatus::Invalid(msg) => crit!(
log,
"Published attestation was invalid";
"message" => msg,
"committee_index" => committee_index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
),
PublishStatus::Unknown => {
crit!(log, "Unknown condition when publishing unagg. attestation")
}
}),
)
})
.map(|()| Some(attestation))
} else {
debug!(
log,
"No attestations to publish";
"committee_index" => committee_index,
"slot" => slot.as_u64(),
);
return Ok(None);
}
}
/// Performs the second step of the attesting process: downloading an aggregated `Attestation`,
@@ -473,107 +446,105 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
/// Only one aggregated `Attestation` is downloaded from the BN. It is then cloned and signed
/// by each validator and the list of individually-signed `SignedAggregateAndProof` objects is
/// returned to the BN.
fn produce_and_publish_aggregates(
async fn produce_and_publish_aggregates(
&self,
attestation: Attestation<E>,
validator_duties: Arc<Vec<DutyAndProof>>,
) -> impl Future<Item = (), Error = String> {
let service_1 = self.clone();
) -> Result<(), String> {
let log_1 = self.context.log.clone();
self.beacon_node
let aggregated_attestation = self
.beacon_node
.http
.validator()
.produce_aggregate_attestation(&attestation.data)
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(
move |aggregated_attestation| {
// For each validator, clone the `aggregated_attestation` and convert it into
// a `SignedAggregateAndProof`
let signed_aggregate_and_proofs = validator_duties
.iter()
.filter_map(|duty_and_proof| {
// Do not produce a signed aggregator for validators that are not
// subscribed aggregators.
let selection_proof = duty_and_proof.selection_proof.as_ref()?.clone();
.await
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?;
let (duty_slot, duty_committee_index, _, validator_index) =
duty_and_proof.attestation_duties().or_else(|| {
crit!(log_1, "Missing duties when signing aggregate");
None
})?;
// For each validator, clone the `aggregated_attestation` and convert it into
// a `SignedAggregateAndProof`
let signed_aggregate_and_proofs = validator_duties
.iter()
.filter_map(|duty_and_proof| {
// Do not produce a signed aggregator for validators that are not
// subscribed aggregators.
let selection_proof = duty_and_proof.selection_proof.as_ref()?.clone();
let pubkey = &duty_and_proof.duty.validator_pubkey;
let slot = attestation.data.slot;
let committee_index = attestation.data.index;
let (duty_slot, duty_committee_index, _, validator_index) =
duty_and_proof.attestation_duties().or_else(|| {
crit!(log_1, "Missing duties when signing aggregate");
None
})?;
if duty_slot != slot || duty_committee_index != committee_index {
crit!(log_1, "Inconsistent validator duties during signing");
return None;
}
let pubkey = &duty_and_proof.duty.validator_pubkey;
let slot = attestation.data.slot;
let committee_index = attestation.data.index;
if let Some(signed_aggregate_and_proof) = service_1
.validator_store
.produce_signed_aggregate_and_proof(
pubkey,
validator_index,
aggregated_attestation.clone(),
selection_proof,
)
{
Some(signed_aggregate_and_proof)
} else {
crit!(log_1, "Failed to sign attestation");
None
}
})
.collect::<Vec<_>>();
if duty_slot != slot || duty_committee_index != committee_index {
crit!(log_1, "Inconsistent validator duties during signing");
return None;
}
// If there any signed aggregates and proofs were produced, publish them to the
// BN.
if let Some(first) = signed_aggregate_and_proofs.first().cloned() {
let attestation = first.message.aggregate;
if let Some(signed_aggregate_and_proof) =
self.validator_store.produce_signed_aggregate_and_proof(
pubkey,
validator_index,
aggregated_attestation.clone(),
selection_proof,
)
{
Some(signed_aggregate_and_proof)
} else {
crit!(log_1, "Failed to sign attestation");
None
}
})
.collect::<Vec<_>>();
Box::new(service_1
.beacon_node
.http
.validator()
.publish_aggregate_and_proof(signed_aggregate_and_proofs)
.map(|publish_status| (attestation, publish_status))
.map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e))
.map(move |(attestation, publish_status)| match publish_status {
PublishStatus::Valid => info!(
log_1,
"Successfully published attestations";
"signatures" => attestation.aggregation_bits.num_set_bits(),
"head_block" => format!("{:?}", attestation.data.beacon_block_root),
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",
),
PublishStatus::Invalid(msg) => crit!(
log_1,
"Published attestation was invalid";
"message" => msg,
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",
),
PublishStatus::Unknown => {
crit!(log_1, "Unknown condition when publishing agg. attestation")
}
}))
} else {
debug!(
log_1,
"No signed aggregates to publish";
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
);
Box::new(future::ok(()))
}
},
)
// If there any signed aggregates and proofs were produced, publish them to the
// BN.
if let Some(first) = signed_aggregate_and_proofs.first().cloned() {
let attestation = first.message.aggregate;
let publish_status = self
.beacon_node
.http
.validator()
.publish_aggregate_and_proof(signed_aggregate_and_proofs)
.await
.map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e))?;
match publish_status {
PublishStatus::Valid => info!(
log_1,
"Successfully published attestations";
"signatures" => attestation.aggregation_bits.num_set_bits(),
"head_block" => format!("{:?}", attestation.data.beacon_block_root),
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",
),
PublishStatus::Invalid(msg) => crit!(
log_1,
"Published attestation was invalid";
"message" => msg,
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",
),
PublishStatus::Unknown => {
crit!(log_1, "Unknown condition when publishing agg. attestation")
}
};
Ok(())
} else {
debug!(
log_1,
"No signed aggregates to publish";
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
);
Ok(())
}
}
}
@@ -591,16 +562,14 @@ mod tests {
let state_1 = Arc::new(RwLock::new(in_the_past));
let state_2 = state_1.clone();
let future = Delay::new(in_the_past)
.map_err(|_| panic!("Failed to create duration"))
.map(move |()| *state_1.write() = Instant::now());
let future = delay_until(in_the_past).map(move |()| *state_1.write() = Instant::now());
let mut runtime = RuntimeBuilder::new()
.core_threads(1)
.build()
.expect("failed to start runtime");
runtime.block_on(future).expect("failed to complete future");
runtime.block_on(future);
assert!(
*state_2.read() > in_the_past,