Implement standard eth2.0 API (#1569)

- Resolves #1550
- Resolves #824
- Resolves #825
- Resolves #1131
- Resolves #1411
- Resolves #1256
- Resolve #1177

- Includes the `ShufflingId` struct initially defined in #1492. That PR is now closed and the changes are included here, with significant bug fixes.
- Implement the https://github.com/ethereum/eth2.0-APIs in a new `http_api` crate using `warp`. This replaces the `rest_api` crate.
- Add a new `common/eth2` crate which provides a wrapper around `reqwest`, providing the HTTP client that is used by the validator client and for testing. This replaces the `common/remote_beacon_node` crate.
- Create a `http_metrics` crate which is a dedicated server for Prometheus metrics (they are no longer served on the same port as the REST API). We now have flags for `--metrics`, `--metrics-address`, etc.
- Allow the `subnet_id` to be an optional parameter for `VerifiedUnaggregatedAttestation::verify`. This means it does not need to be provided unnecessarily by the validator client.
- Move `fn map_attestation_committee` in `mod beacon_chain::attestation_verification` to a new `fn with_committee_cache` on the `BeaconChain` so the same cache can be used for obtaining validator duties.
- Add some other helpers to `BeaconChain` to assist with common API duties (e.g., `block_root_at_slot`, `head_beacon_block_root`).
- Change the `NaiveAggregationPool` so it can index attestations by `hash_tree_root(attestation.data)`. This is a requirement of the API.
- Add functions to `BeaconChainHarness` to allow it to create slashings and exits.
- Allow for `eth1::Eth1NetworkId` to go to/from a `String`.
- Add functions to the `OperationPool` to allow getting all objects in the pool.
- Add function to `BeaconState` to check if a committee cache is initialized.
- Fix bug where `seconds_per_eth1_block` was not transferring over from `YamlConfig` to `ChainSpec`.
- Add the `deposit_contract_address` to `YamlConfig` and `ChainSpec`. We needed to be able to return it in an API response.
- Change some uses of serde `serialize_with` and `deserialize_with` to a single use of `with` (code quality).
- Impl `Display` and `FromStr` for several BLS fields.
- Check for clock discrepancy when VC polls BN for sync state (with +/- 1 slot tolerance). This is not intended to be comprehensive, it was just easy to do.

- See #1434 for a per-endpoint overview.
- Seeking clarity here: https://github.com/ethereum/eth2.0-APIs/issues/75

- [x] Add docs for prom port to close #1256
- [x] Follow up on this #1177
- [x] ~~Follow up with #1424~~ Will fix in future PR.
- [x] Follow up with #1411
- [x] ~~Follow up with  #1260~~ Will fix in future PR.
- [x] Add quotes to all integers.
- [x] Remove `rest_types`
- [x] Address missing beacon block error. (#1629)
- [x] ~~Add tests for lighthouse/peers endpoints~~ Wontfix
- [x] ~~Follow up with validator status proposal~~ Tracked in #1434
- [x] Unify graffiti structs
- [x] ~~Start server when waiting for genesis?~~ Will fix in future PR.
- [x] TODO in http_api tests
- [x] Move lighthouse endpoints off /eth/v1
- [x] Update docs to link to standard

- ~~Blocked on #1586~~

Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Paul Hauner
2020-09-29 03:46:54 +00:00
parent 8e20176337
commit cdec3cec18
156 changed files with 8862 additions and 8916 deletions

View File

@@ -3,22 +3,26 @@ use crate::{
validator_store::ValidatorStore,
};
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::StreamExt;
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
use slog::{crit, debug, error, info, trace};
use slog::{crit, error, info, trace};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use tokio::time::{delay_until, interval_at, Duration, Instant};
use types::{Attestation, ChainSpec, CommitteeIndex, EthSpec, Slot, SubnetId};
use tree_hash::TreeHash;
use types::{
AggregateSignature, Attestation, AttestationData, BitList, ChainSpec, CommitteeIndex, EthSpec,
Slot,
};
/// Builds an `AttestationService`.
pub struct AttestationServiceBuilder<T, E: EthSpec> {
duties_service: Option<DutiesService<T, E>>,
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<T>,
beacon_node: Option<RemoteBeaconNode<E>>,
beacon_node: Option<BeaconNodeHttpClient>,
context: Option<RuntimeContext<E>>,
}
@@ -48,7 +52,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
self
}
pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode<E>) -> Self {
pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self {
self.beacon_node = Some(beacon_node);
self
}
@@ -86,7 +90,7 @@ pub struct Inner<T, E: EthSpec> {
duties_service: DutiesService<T, E>,
validator_store: ValidatorStore<T, E>,
slot_clock: T,
beacon_node: RemoteBeaconNode<E>,
beacon_node: BeaconNodeHttpClient,
context: RuntimeContext<E>,
}
@@ -262,7 +266,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Step 2.
//
// If an attestation was produced, make an aggregate.
if let Some(attestation) = attestation_opt {
if let Some(attestation_data) = attestation_opt {
// First, wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
@@ -272,7 +276,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(attestation, &validator_duties)
self.produce_and_publish_aggregates(attestation_data, &validator_duties)
.await
.map_err(move |e| {
crit!(
@@ -305,7 +309,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
) -> Result<Option<Attestation<E>>, String> {
) -> Result<Option<AttestationData>, String> {
let log = self.context.log();
if validator_duties.is_empty() {
@@ -318,124 +322,88 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.ok_or_else(|| "Unable to determine current slot from clock".to_string())?
.epoch(E::slots_per_epoch());
let attestation = self
let attestation_data = self
.beacon_node
.http
.validator()
.produce_attestation(slot, committee_index)
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation: {:?}", e))?;
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))?
.data;
// For each validator in `validator_duties`, clone the `attestation` and add
// their signature.
//
// If any validator is unable to sign, they are simply skipped.
let signed_attestations = validator_duties
.iter()
.filter_map(|duty| {
// Ensure that all required fields are present in the validator duty.
let (
duty_slot,
duty_committee_index,
for duty in validator_duties {
// Ensure that all required fields are present in the validator duty.
let (
duty_slot,
duty_committee_index,
validator_committee_position,
_,
_,
committee_length,
) = if let Some(tuple) = duty.attestation_duties() {
tuple
} else {
crit!(
log,
"Missing validator duties when signing";
"duties" => format!("{:?}", duty)
);
continue;
};
// Ensure that the attestation matches the duties.
if duty_slot != attestation_data.slot || duty_committee_index != attestation_data.index
{
crit!(
log,
"Inconsistent validator duties during signing";
"validator" => format!("{:?}", duty.validator_pubkey()),
"duty_slot" => duty_slot,
"attestation_slot" => attestation_data.slot,
"duty_index" => duty_committee_index,
"attestation_index" => attestation_data.index,
);
continue;
}
let mut attestation = Attestation {
aggregation_bits: BitList::with_capacity(committee_length as usize).unwrap(),
data: attestation_data.clone(),
signature: AggregateSignature::infinity(),
};
self.validator_store
.sign_attestation(
duty.validator_pubkey(),
validator_committee_position,
_,
committee_count_at_slot,
) = if let Some(tuple) = duty.attestation_duties() {
tuple
} else {
crit!(
log,
"Missing validator duties when signing";
"duties" => format!("{:?}", duty)
);
return None;
};
// Ensure that the attestation matches the duties.
if duty_slot != attestation.data.slot
|| duty_committee_index != attestation.data.index
{
crit!(
log,
"Inconsistent validator duties during signing";
"validator" => format!("{:?}", duty.validator_pubkey()),
"duty_slot" => duty_slot,
"attestation_slot" => attestation.data.slot,
"duty_index" => duty_committee_index,
"attestation_index" => attestation.data.index,
);
return None;
}
let mut attestation = attestation.clone();
let subnet_id = SubnetId::compute_subnet_for_attestation_data::<E>(
&attestation.data,
committee_count_at_slot,
&self.context.eth2_config().spec,
&mut attestation,
current_epoch,
)
.map_err(|e| {
error!(
log,
"Failed to compute subnet id to publish attestation: {:?}", e
)
})
.ok()?;
self.validator_store
.sign_attestation(
duty.validator_pubkey(),
validator_committee_position,
&mut attestation,
current_epoch,
)
.map(|_| (attestation, subnet_id))
})
.collect::<Vec<_>>();
.ok_or_else(|| "Failed to sign attestation".to_string())?;
// If there are any signed attestations, publish them to the BN. Otherwise,
// just return early.
if let Some(attestation) = signed_attestations.first().cloned() {
let num_attestations = signed_attestations.len();
let beacon_block_root = attestation.0.data.beacon_block_root;
self.beacon_node
.http
.validator()
.publish_attestations(signed_attestations)
match self
.beacon_node
.post_beacon_pool_attestations(&attestation)
.await
.map_err(|e| format!("Failed to publish attestation: {:?}", e))
.map(move |publish_status| match publish_status {
PublishStatus::Valid => info!(
log,
"Successfully published attestations";
"count" => num_attestations,
"head_block" => format!("{:?}", beacon_block_root),
"committee_index" => committee_index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
),
PublishStatus::Invalid(msg) => crit!(
log,
"Published attestation was invalid";
"message" => msg,
"committee_index" => committee_index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
),
PublishStatus::Unknown => {
crit!(log, "Unknown condition when publishing unagg. attestation")
}
})
.map(|()| Some(attestation.0))
} else {
debug!(
log,
"No attestations to publish";
"committee_index" => committee_index,
"slot" => slot.as_u64(),
);
Ok(None)
{
Ok(()) => info!(
log,
"Successfully published attestation";
"head_block" => format!("{:?}", attestation.data.beacon_block_root),
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "unaggregated",
),
Err(e) => error!(
log,
"Unable to publish attestation";
"error" => e.to_string(),
"committee_index" => attestation.data.index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
),
}
}
Ok(Some(attestation_data))
}
/// Performs the second step of the attesting process: downloading an aggregated `Attestation`,
@@ -453,103 +421,89 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
/// returned to the BN.
async fn produce_and_publish_aggregates(
&self,
attestation: Attestation<E>,
attestation_data: AttestationData,
validator_duties: &[DutyAndProof],
) -> Result<(), String> {
let log = self.context.log();
let aggregated_attestation = self
.beacon_node
.http
.validator()
.produce_aggregate_attestation(&attestation.data)
.get_validator_aggregate_attestation(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?;
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))?
.data;
// For each validator, clone the `aggregated_attestation` and convert it into
// a `SignedAggregateAndProof`
let signed_aggregate_and_proofs = validator_duties
.iter()
.filter_map(|duty_and_proof| {
// Do not produce a signed aggregator for validators that are not
for duty_and_proof in validator_duties {
let selection_proof = if let Some(proof) = duty_and_proof.selection_proof.as_ref() {
proof
} else {
// Do not produce a signed aggregate for validators that are not
// subscribed aggregators.
let selection_proof = duty_and_proof.selection_proof.as_ref()?.clone();
let (duty_slot, duty_committee_index, _, validator_index, _) =
duty_and_proof.attestation_duties().or_else(|| {
crit!(log, "Missing duties when signing aggregate");
None
})?;
let pubkey = &duty_and_proof.duty.validator_pubkey;
let slot = attestation.data.slot;
let committee_index = attestation.data.index;
if duty_slot != slot || duty_committee_index != committee_index {
crit!(log, "Inconsistent validator duties during signing");
return None;
}
if let Some(signed_aggregate_and_proof) =
self.validator_store.produce_signed_aggregate_and_proof(
pubkey,
validator_index,
aggregated_attestation.clone(),
selection_proof,
)
{
Some(signed_aggregate_and_proof)
continue;
};
let (duty_slot, duty_committee_index, _, validator_index, _, _) =
if let Some(tuple) = duty_and_proof.attestation_duties() {
tuple
} else {
crit!(log, "Failed to sign attestation");
None
}
})
.collect::<Vec<_>>();
crit!(log, "Missing duties when signing aggregate");
continue;
};
// If there any signed aggregates and proofs were produced, publish them to the
// BN.
if let Some(first) = signed_aggregate_and_proofs.first().cloned() {
let attestation = first.message.aggregate;
let pubkey = &duty_and_proof.duty.validator_pubkey;
let slot = attestation_data.slot;
let committee_index = attestation_data.index;
let publish_status = self
if duty_slot != slot || duty_committee_index != committee_index {
crit!(log, "Inconsistent validator duties during signing");
continue;
}
let signed_aggregate_and_proof = if let Some(aggregate) =
self.validator_store.produce_signed_aggregate_and_proof(
pubkey,
validator_index,
aggregated_attestation.clone(),
selection_proof.clone(),
) {
aggregate
} else {
crit!(log, "Failed to sign attestation");
continue;
};
let attestation = &signed_aggregate_and_proof.message.aggregate;
match self
.beacon_node
.http
.validator()
.publish_aggregate_and_proof(signed_aggregate_and_proofs)
.post_validator_aggregate_and_proof(&signed_aggregate_and_proof)
.await
.map_err(|e| format!("Failed to publish aggregate and proofs: {:?}", e))?;
match publish_status {
PublishStatus::Valid => info!(
{
Ok(()) => info!(
log,
"Successfully published attestations";
"Successfully published attestation";
"aggregator" => signed_aggregate_and_proof.message.aggregator_index,
"signatures" => attestation.aggregation_bits.num_set_bits(),
"head_block" => format!("{:?}", attestation.data.beacon_block_root),
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",
),
PublishStatus::Invalid(msg) => crit!(
Err(e) => crit!(
log,
"Published attestation was invalid";
"message" => msg,
"Failed to publish attestation";
"error" => e.to_string(),
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",
),
PublishStatus::Unknown => {
crit!(log, "Unknown condition when publishing agg. attestation")
}
};
Ok(())
} else {
debug!(
log,
"No signed aggregates to publish";
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
);
Ok(())
}
}
Ok(())
}
}

View File

@@ -1,19 +1,19 @@
use crate::validator_store::ValidatorStore;
use environment::RuntimeContext;
use eth2::{types::Graffiti, BeaconNodeHttpClient};
use futures::channel::mpsc::Receiver;
use futures::{StreamExt, TryFutureExt};
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock;
use std::ops::Deref;
use std::sync::Arc;
use types::{EthSpec, Graffiti, PublicKey, Slot};
use types::{EthSpec, PublicKey, Slot};
/// Builds a `BlockService`.
pub struct BlockServiceBuilder<T, E: EthSpec> {
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<Arc<T>>,
beacon_node: Option<RemoteBeaconNode<E>>,
beacon_node: Option<BeaconNodeHttpClient>,
context: Option<RuntimeContext<E>>,
graffiti: Option<Graffiti>,
}
@@ -39,7 +39,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
self
}
pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode<E>) -> Self {
pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self {
self.beacon_node = Some(beacon_node);
self
}
@@ -79,7 +79,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
pub struct Inner<T, E: EthSpec> {
validator_store: ValidatorStore<T, E>,
slot_clock: Arc<T>,
beacon_node: RemoteBeaconNode<E>,
beacon_node: BeaconNodeHttpClient,
context: RuntimeContext<E>,
graffiti: Option<Graffiti>,
}
@@ -221,41 +221,28 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let block = self
.beacon_node
.http
.validator()
.produce_block(slot, randao_reveal, self.graffiti)
.get_validator_blocks(slot, randao_reveal.into(), self.graffiti.as_ref())
.await
.map_err(|e| format!("Error from beacon node when producing block: {:?}", e))?;
.map_err(|e| format!("Error from beacon node when producing block: {:?}", e))?
.data;
let signed_block = self
.validator_store
.sign_block(&validator_pubkey, block, current_slot)
.ok_or_else(|| "Unable to sign block".to_string())?;
let publish_status = self
.beacon_node
.http
.validator()
.publish_block(signed_block.clone())
self.beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| format!("Error from beacon node when publishing block: {:?}", e))?;
match publish_status {
PublishStatus::Valid => info!(
log,
"Successfully published block";
"deposits" => signed_block.message.body.deposits.len(),
"attestations" => signed_block.message.body.attestations.len(),
"slot" => signed_block.slot().as_u64(),
),
PublishStatus::Invalid(msg) => crit!(
log,
"Published block was invalid";
"message" => msg,
"slot" => signed_block.slot().as_u64(),
),
PublishStatus::Unknown => crit!(log, "Unknown condition when publishing block"),
}
info!(
log,
"Successfully published block";
"deposits" => signed_block.message.body.deposits.len(),
"attestations" => signed_block.message.body.attestations.len(),
"slot" => signed_block.slot().as_u64(),
);
Ok(())
}

View File

@@ -4,9 +4,10 @@ use directory::{
get_testnet_name, DEFAULT_HARDCODED_TESTNET, DEFAULT_ROOT_DIR, DEFAULT_SECRET_DIR,
DEFAULT_VALIDATOR_DIR,
};
use eth2::types::Graffiti;
use serde_derive::{Deserialize, Serialize};
use std::path::PathBuf;
use types::{Graffiti, GRAFFITI_BYTES_LEN};
use types::GRAFFITI_BYTES_LEN;
pub const DEFAULT_HTTP_SERVER: &str = "http://localhost:5052/";
/// Path to the slashing protection database within the datadir.
@@ -119,15 +120,14 @@ impl Config {
GRAFFITI_BYTES_LEN
));
} else {
// Default graffiti to all 0 bytes.
let mut graffiti = Graffiti::default();
let mut graffiti = [0; 32];
// Copy the provided bytes over.
//
// Panic-free because `graffiti_bytes.len()` <= `GRAFFITI_BYTES_LEN`.
graffiti[..graffiti_bytes.len()].copy_from_slice(&graffiti_bytes);
config.graffiti = Some(graffiti);
config.graffiti = Some(graffiti.into());
}
}

View File

@@ -1,16 +1,15 @@
use crate::{
block_service::BlockServiceNotification, is_synced::is_synced, validator_store::ValidatorStore,
block_service::BlockServiceNotification, is_synced::is_synced, validator_duty::ValidatorDuty,
validator_store::ValidatorStore,
};
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, StreamExt};
use parking_lot::RwLock;
use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
use rest_types::{ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription};
use slog::{debug, error, trace, warn};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::Deref;
use std::sync::Arc;
use tokio::time::{interval_at, Duration, Instant};
@@ -44,14 +43,14 @@ impl DutyAndProof {
pub fn compute_selection_proof<T: SlotClock + 'static, E: EthSpec>(
&mut self,
validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec,
) -> Result<(), String> {
let (modulo, slot) = if let (Some(modulo), Some(slot)) =
(self.duty.aggregator_modulo, self.duty.attestation_slot)
let (committee_length, slot) = if let (Some(count), Some(slot)) =
(self.duty.committee_length, self.duty.attestation_slot)
{
(modulo, slot)
(count as usize, slot)
} else {
// If there is no modulo or for the aggregator we assume they are not activated and
// therefore not an aggregator.
// If there are no attester duties we assume the validator is inactive.
self.selection_proof = None;
return Ok(());
};
@@ -61,7 +60,7 @@ impl DutyAndProof {
.ok_or_else(|| "Failed to produce selection proof".to_string())?;
self.selection_proof = selection_proof
.is_aggregator_from_modulo(modulo)
.is_aggregator(committee_length, spec)
.map_err(|e| format!("Invalid modulo: {:?}", e))
.map(|is_aggregator| {
if is_aggregator {
@@ -87,19 +86,20 @@ impl DutyAndProof {
/// It's important to note that this doesn't actually check `self.selection_proof`, instead it
/// checks to see if the inputs to computing the selection proof are equal.
fn selection_proof_eq(&self, other: &Self) -> bool {
self.duty.aggregator_modulo == other.duty.aggregator_modulo
self.duty.committee_count_at_slot == other.duty.committee_count_at_slot
&& self.duty.attestation_slot == other.duty.attestation_slot
}
/// Returns the information required for an attesting validator, if they are scheduled to
/// attest.
pub fn attestation_duties(&self) -> Option<(Slot, CommitteeIndex, usize, u64, u64)> {
pub fn attestation_duties(&self) -> Option<(Slot, CommitteeIndex, usize, u64, u64, u64)> {
Some((
self.duty.attestation_slot?,
self.duty.attestation_committee_index?,
self.duty.attestation_committee_position?,
self.duty.validator_index?,
self.duty.committee_count_at_slot?,
self.duty.committee_length?,
))
}
@@ -108,26 +108,12 @@ impl DutyAndProof {
}
}
impl TryInto<DutyAndProof> for ValidatorDutyBytes {
type Error = String;
fn try_into(self) -> Result<DutyAndProof, Self::Error> {
let duty = ValidatorDuty {
validator_pubkey: (&self.validator_pubkey)
.try_into()
.map_err(|e| format!("Invalid pubkey bytes from server: {:?}", e))?,
validator_index: self.validator_index,
attestation_slot: self.attestation_slot,
attestation_committee_index: self.attestation_committee_index,
attestation_committee_position: self.attestation_committee_position,
committee_count_at_slot: self.committee_count_at_slot,
block_proposal_slots: self.block_proposal_slots,
aggregator_modulo: self.aggregator_modulo,
};
Ok(DutyAndProof {
duty,
impl Into<DutyAndProof> for ValidatorDuty {
fn into(self) -> DutyAndProof {
DutyAndProof {
duty: self,
selection_proof: None,
})
}
}
}
@@ -260,6 +246,7 @@ impl DutiesStore {
mut duties: DutyAndProof,
slots_per_epoch: u64,
validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec,
) -> Result<InsertOutcome, String> {
let mut store = self.store.write();
@@ -282,7 +269,7 @@ impl DutiesStore {
}
} else {
// Compute the selection proof.
duties.compute_selection_proof(validator_store)?;
duties.compute_selection_proof(validator_store, spec)?;
// Determine if a re-subscription is required.
let should_resubscribe = !duties.subscription_eq(known_duties);
@@ -294,7 +281,7 @@ impl DutiesStore {
}
} else {
// Compute the selection proof.
duties.compute_selection_proof(validator_store)?;
duties.compute_selection_proof(validator_store, spec)?;
validator_map.insert(epoch, duties);
@@ -302,7 +289,7 @@ impl DutiesStore {
}
} else {
// Compute the selection proof.
duties.compute_selection_proof(validator_store)?;
duties.compute_selection_proof(validator_store, spec)?;
let validator_pubkey = duties.duty.validator_pubkey.clone();
@@ -328,7 +315,7 @@ impl DutiesStore {
pub struct DutiesServiceBuilder<T, E: EthSpec> {
validator_store: Option<ValidatorStore<T, E>>,
slot_clock: Option<T>,
beacon_node: Option<RemoteBeaconNode<E>>,
beacon_node: Option<BeaconNodeHttpClient>,
context: Option<RuntimeContext<E>>,
allow_unsynced_beacon_node: bool,
}
@@ -354,7 +341,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesServiceBuilder<T, E> {
self
}
pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode<E>) -> Self {
pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self {
self.beacon_node = Some(beacon_node);
self
}
@@ -397,7 +384,7 @@ pub struct Inner<T, E: EthSpec> {
store: Arc<DutiesStore>,
validator_store: ValidatorStore<T, E>,
pub(crate) slot_clock: T,
pub(crate) beacon_node: RemoteBeaconNode<E>,
pub(crate) beacon_node: BeaconNodeHttpClient,
context: RuntimeContext<E>,
/// If true, the duties service will poll for duties from the beacon node even if it is not
/// synced.
@@ -462,7 +449,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
pub fn start_update_service(
self,
mut block_service_tx: Sender<BlockServiceNotification>,
spec: &ChainSpec,
spec: Arc<ChainSpec>,
) -> Result<(), String> {
let duration_to_next_slot = self
.slot_clock
@@ -481,17 +468,22 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
// Run an immediate update before starting the updater service.
let duties_service = self.clone();
let mut block_service_tx_clone = block_service_tx.clone();
let inner_spec = spec.clone();
self.inner
.context
.executor
.runtime_handle()
.spawn(async move { duties_service.do_update(&mut block_service_tx_clone).await });
.spawn(async move {
duties_service
.do_update(&mut block_service_tx_clone, &inner_spec)
.await
});
let executor = self.inner.context.executor.clone();
let interval_fut = async move {
while interval.next().await.is_some() {
self.clone().do_update(&mut block_service_tx).await;
self.clone().do_update(&mut block_service_tx, &spec).await;
}
};
@@ -501,7 +493,11 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
}
/// Attempt to download the duties of all managed validators for this epoch and the next.
async fn do_update(self, block_service_tx: &mut Sender<BlockServiceNotification>) {
async fn do_update(
self,
block_service_tx: &mut Sender<BlockServiceNotification>,
spec: &ChainSpec,
) {
let log = self.context.log();
if !is_synced(&self.beacon_node, &self.slot_clock, None).await
@@ -534,7 +530,11 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
// Update duties for the current epoch, but keep running if there's an error:
// block production or the next epoch update could still succeed.
if let Err(e) = self.clone().update_epoch(current_epoch).await {
if let Err(e) = self
.clone()
.update_epoch(current_epoch, current_epoch, spec)
.await
{
error!(
log,
"Failed to get current epoch duties";
@@ -558,7 +558,11 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
};
// Update duties for the next epoch.
if let Err(e) = self.clone().update_epoch(current_epoch + 1).await {
if let Err(e) = self
.clone()
.update_epoch(current_epoch, current_epoch + 1, spec)
.await
{
error!(
log,
"Failed to get next epoch duties";
@@ -567,18 +571,15 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
}
}
/// Attempt to download the duties of all managed validators for the given `epoch`.
async fn update_epoch(self, epoch: Epoch) -> Result<(), String> {
let pubkeys = self.validator_store.voting_pubkeys();
let all_duties = self
.beacon_node
.http
.validator()
.get_duties(epoch, pubkeys.as_slice())
.await
.map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e))?;
let log = self.context.log().clone();
/// Attempt to download the duties of all managed validators for the given `request_epoch`. The
/// `current_epoch` should be a local reading of the slot clock.
async fn update_epoch(
self,
current_epoch: Epoch,
request_epoch: Epoch,
spec: &ChainSpec,
) -> Result<(), String> {
let log = self.context.log();
let mut new_validator = 0;
let mut new_epoch = 0;
@@ -587,74 +588,76 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
let mut replaced = 0;
let mut invalid = 0;
// For each of the duties, attempt to insert them into our local store and build a
// list of new or changed selections proofs for any aggregating validators.
let validator_subscriptions = all_duties
.into_iter()
.filter_map(|remote_duties| {
// Convert the remote duties into our local representation.
let duties: DutyAndProof = remote_duties
.clone()
.try_into()
.map_err(|e| {
error!(
log,
"Unable to convert remote duties";
"error" => e
)
})
.ok()?;
let validator_pubkey = duties.duty.validator_pubkey.clone();
// Attempt to update our local store.
let outcome = self
.store
.insert(epoch, duties, E::slots_per_epoch(), &self.validator_store)
.map_err(|e| {
error!(
log,
"Unable to store duties";
"error" => e
)
})
.ok()?;
match &outcome {
InsertOutcome::NewValidator => {
debug!(
log,
"First duty assignment for validator";
"proposal_slots" => format!("{:?}", &remote_duties.block_proposal_slots),
"attestation_slot" => format!("{:?}", &remote_duties.attestation_slot),
"validator" => format!("{:?}", &remote_duties.validator_pubkey)
);
new_validator += 1;
}
InsertOutcome::NewProposalSlots => new_proposal_slots += 1,
InsertOutcome::NewEpoch => new_epoch += 1,
InsertOutcome::Identical => identical += 1,
InsertOutcome::Replaced { .. } => replaced += 1,
InsertOutcome::Invalid => invalid += 1,
};
// The selection proof is computed on `store.insert`, so it's necessary to check
// with the store that the validator is an aggregator.
let is_aggregator = self.store.is_aggregator(&validator_pubkey, epoch)?;
if outcome.is_subscription_candidate() {
Some(ValidatorSubscription {
validator_index: remote_duties.validator_index?,
attestation_committee_index: remote_duties.attestation_committee_index?,
slot: remote_duties.attestation_slot?,
committee_count_at_slot: remote_duties.committee_count_at_slot?,
is_aggregator,
})
} else {
None
let mut validator_subscriptions = vec![];
for pubkey in self.validator_store.voting_pubkeys() {
let remote_duties = match ValidatorDuty::download(
&self.beacon_node,
current_epoch,
request_epoch,
pubkey,
)
.await
{
Ok(duties) => duties,
Err(e) => {
error!(
log,
"Failed to download validator duties";
"error" => e
);
continue;
}
})
.collect::<Vec<_>>();
};
// Convert the remote duties into our local representation.
let duties: DutyAndProof = remote_duties.clone().into();
let validator_pubkey = duties.duty.validator_pubkey.clone();
// Attempt to update our local store.
match self.store.insert(
request_epoch,
duties,
E::slots_per_epoch(),
&self.validator_store,
spec,
) {
Ok(outcome) => {
match &outcome {
InsertOutcome::NewValidator => {
debug!(
log,
"First duty assignment for validator";
"proposal_slots" => format!("{:?}", &remote_duties.block_proposal_slots),
"attestation_slot" => format!("{:?}", &remote_duties.attestation_slot),
"validator" => format!("{:?}", &remote_duties.validator_pubkey)
);
new_validator += 1;
}
InsertOutcome::NewProposalSlots => new_proposal_slots += 1,
InsertOutcome::NewEpoch => new_epoch += 1,
InsertOutcome::Identical => identical += 1,
InsertOutcome::Replaced { .. } => replaced += 1,
InsertOutcome::Invalid => invalid += 1,
}
if let Some(is_aggregator) =
self.store.is_aggregator(&validator_pubkey, request_epoch)
{
if outcome.is_subscription_candidate() {
if let Some(subscription) = remote_duties.subscription(is_aggregator) {
validator_subscriptions.push(subscription)
}
}
}
}
Err(e) => error!(
log,
"Unable to store duties";
"error" => e
),
}
}
if invalid > 0 {
error!(
@@ -673,7 +676,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
"new_proposal_slots" => new_proposal_slots,
"new_validator" => new_validator,
"replaced" => replaced,
"epoch" => format!("{}", epoch)
"epoch" => format!("{}", request_epoch)
);
if replaced > 0 {
@@ -690,34 +693,19 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
if count == 0 {
debug!(log, "No new subscriptions required");
Ok(())
} else {
self.beacon_node
.http
.validator()
.subscribe(validator_subscriptions)
.post_validator_beacon_committee_subscriptions(&validator_subscriptions)
.await
.map_err(|e| format!("Failed to subscribe validators: {:?}", e))
.map(move |status| {
match status {
PublishStatus::Valid => debug!(
log,
"Successfully subscribed validators";
"count" => count
),
PublishStatus::Unknown => error!(
log,
"Unknown response from subscription";
),
PublishStatus::Invalid(e) => error!(
log,
"Failed to subscribe validator";
"error" => e
),
};
})
.map_err(|e| format!("Failed to subscribe validators: {:?}", e))?;
debug!(
log,
"Successfully subscribed validators";
"count" => count
);
}
Ok(())
}
}

View File

@@ -1,7 +1,7 @@
use environment::RuntimeContext;
use eth2::{types::StateId, BeaconNodeHttpClient};
use futures::StreamExt;
use parking_lot::RwLock;
use remote_beacon_node::RemoteBeaconNode;
use slog::{debug, trace};
use slot_clock::SlotClock;
use std::ops::Deref;
@@ -16,7 +16,7 @@ const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(80);
pub struct ForkServiceBuilder<T, E: EthSpec> {
fork: Option<Fork>,
slot_clock: Option<T>,
beacon_node: Option<RemoteBeaconNode<E>>,
beacon_node: Option<BeaconNodeHttpClient>,
context: Option<RuntimeContext<E>>,
}
@@ -35,7 +35,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkServiceBuilder<T, E> {
self
}
pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode<E>) -> Self {
pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self {
self.beacon_node = Some(beacon_node);
self
}
@@ -66,7 +66,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkServiceBuilder<T, E> {
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
fork: RwLock<Option<Fork>>,
beacon_node: RemoteBeaconNode<E>,
beacon_node: BeaconNodeHttpClient,
context: RuntimeContext<E>,
slot_clock: T,
}
@@ -141,9 +141,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
let fork = self
.inner
.beacon_node
.http
.beacon()
.get_fork()
.get_beacon_states_fork(StateId::Head)
.await
.map_err(|e| {
trace!(
@@ -151,7 +149,15 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
"Fork update failed";
"error" => format!("Error retrieving fork: {:?}", e)
)
})?;
})?
.ok_or_else(|| {
trace!(
log,
"Fork update failed";
"error" => "The beacon head fork is unknown"
)
})?
.data;
if self.fork.read().as_ref() != Some(&fork) {
*(self.fork.write()) = Some(fork);

View File

@@ -50,8 +50,6 @@ pub enum Error {
UnableToSaveDefinitions(validator_definitions::Error),
/// It is not legal to try and initialize a disabled validator definition.
UnableToInitializeDisabledValidator,
/// It is not legal to try and initialize a disabled validator definition.
PasswordUnknown(PathBuf),
/// There was an error reading from stdin.
UnableToReadPasswordFromUser(String),
/// There was an error running a tokio async task.
@@ -333,6 +331,7 @@ impl InitializedValidators {
/// validator will be removed from `self.validators`.
///
/// Saves the `ValidatorDefinitions` to file, even if no definitions were changed.
#[allow(dead_code)] // Will be used once VC API is enabled.
pub async fn set_validator_status(
&mut self,
voting_public_key: &PublicKey,

View File

@@ -1,8 +1,6 @@
use remote_beacon_node::RemoteBeaconNode;
use rest_types::SyncingResponse;
use slog::{debug, error, Logger};
use eth2::BeaconNodeHttpClient;
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use types::EthSpec;
/// A distance in slots.
const SYNC_TOLERANCE: u64 = 4;
@@ -17,19 +15,19 @@ const SYNC_TOLERANCE: u64 = 4;
///
/// The second condition means the even if the beacon node thinks that it's syncing, we'll still
/// try to use it if it's close enough to the head.
pub async fn is_synced<T: SlotClock, E: EthSpec>(
beacon_node: &RemoteBeaconNode<E>,
pub async fn is_synced<T: SlotClock>(
beacon_node: &BeaconNodeHttpClient,
slot_clock: &T,
log_opt: Option<&Logger>,
) -> bool {
let resp = match beacon_node.http.node().syncing_status().await {
let resp = match beacon_node.get_node_syncing().await {
Ok(resp) => resp,
Err(e) => {
if let Some(log) = log_opt {
error!(
log,
"Unable connect to beacon node";
"error" => format!("{:?}", e)
"error" => e.to_string()
)
}
@@ -37,44 +35,38 @@ pub async fn is_synced<T: SlotClock, E: EthSpec>(
}
};
match &resp {
SyncingResponse {
is_syncing: false, ..
} => true,
SyncingResponse {
is_syncing: true,
sync_status,
} => {
if let Some(log) = log_opt {
debug!(
let is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE);
if let Some(log) = log_opt {
if !is_synced {
debug!(
log,
"Beacon node sync status";
"status" => format!("{:?}", resp),
);
warn!(
log,
"Beacon node is syncing";
"msg" => "not receiving new duties",
"sync_distance" => resp.data.sync_distance.as_u64(),
"head_slot" => resp.data.head_slot.as_u64(),
);
}
if let Some(local_slot) = slot_clock.now() {
let remote_slot = resp.data.head_slot + resp.data.sync_distance;
if remote_slot + 1 < local_slot || local_slot + 1 < remote_slot {
error!(
log,
"Beacon node sync status";
"status" => format!("{:?}", resp),
"Time discrepancy with beacon node";
"msg" => "check the system time on this host and the beacon node",
"beacon_node_slot" => remote_slot,
"local_slot" => local_slot,
);
}
let now = if let Some(slot) = slot_clock.now() {
slot
} else {
// There's no good reason why we shouldn't be able to read the slot clock, so we'll
// indicate we're not synced if that's the case.
return false;
};
if sync_status.current_slot + SYNC_TOLERANCE >= now {
true
} else {
if let Some(log) = log_opt {
error!(
log,
"Beacon node is syncing";
"msg" => "not receiving new duties",
"target_slot" => sync_status.highest_slot.as_u64(),
"current_slot" => sync_status.current_slot.as_u64(),
);
}
false
}
}
}
is_synced
}

View File

@@ -7,6 +7,7 @@ mod fork_service;
mod initialized_validators;
mod is_synced;
mod notifier;
mod validator_duty;
mod validator_store;
pub use cli::cli_app;
@@ -18,18 +19,18 @@ use block_service::{BlockService, BlockServiceBuilder};
use clap::ArgMatches;
use duties_service::{DutiesService, DutiesServiceBuilder};
use environment::RuntimeContext;
use eth2_config::Eth2Config;
use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Url};
use fork_service::{ForkService, ForkServiceBuilder};
use futures::channel::mpsc;
use initialized_validators::InitializedValidators;
use notifier::spawn_notifier;
use remote_beacon_node::RemoteBeaconNode;
use slog::{error, info, Logger};
use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{delay_for, Duration};
use types::{EthSpec, Hash256};
use types::{EthSpec, Hash256, YamlConfig};
use validator_store::ValidatorStore;
/// The interval between attempts to contact the beacon node during startup.
@@ -61,7 +62,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
/// Instantiates the validator client, _without_ starting the timers to trigger block
/// and attestation production.
pub async fn new(mut context: RuntimeContext<T>, config: Config) -> Result<Self, String> {
pub async fn new(context: RuntimeContext<T>, config: Config) -> Result<Self, String> {
let log = context.log().clone();
info!(
@@ -104,33 +105,36 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
"enabled" => validators.num_enabled(),
);
let beacon_node_url: Url = config
.http_server
.parse()
.map_err(|e| format!("Unable to parse beacon node URL: {:?}", e))?;
let beacon_node_http_client = ClientBuilder::new()
.timeout(HTTP_TIMEOUT)
.build()
.map_err(|e| format!("Unable to build HTTP client: {:?}", e))?;
let beacon_node =
RemoteBeaconNode::new_with_timeout(config.http_server.clone(), HTTP_TIMEOUT)
.map_err(|e| format!("Unable to init beacon node http client: {}", e))?;
BeaconNodeHttpClient::from_components(beacon_node_url, beacon_node_http_client);
// Perform some potentially long-running initialization tasks.
let (eth2_config, genesis_time, genesis_validators_root) = tokio::select! {
let (yaml_config, genesis_time, genesis_validators_root) = tokio::select! {
tuple = init_from_beacon_node(&beacon_node, &context) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string())
};
let beacon_node_spec = yaml_config.apply_to_chain_spec::<T>(&T::default_spec())
.ok_or_else(||
"The minimal/mainnet spec type of the beacon node does not match the validator client. \
See the --testnet command.".to_string()
)?;
// Do not permit a connection to a beacon node using different spec constants.
if context.eth2_config.spec_constants != eth2_config.spec_constants {
return Err(format!(
"Beacon node is using an incompatible spec. Got {}, expected {}",
eth2_config.spec_constants, context.eth2_config.spec_constants
));
if context.eth2_config.spec != beacon_node_spec {
return Err(
"The beacon node is using a different Eth2 specification to this validator client. \
See the --testnet command."
.to_string(),
);
}
// Note: here we just assume the spec variables of the remote node. This is very useful
// for testnets, but perhaps a security issue when it comes to mainnet.
//
// A damaging attack would be for a beacon node to convince the validator client of a
// different `SLOTS_PER_EPOCH` variable. This could result in slashable messages being
// produced. We are safe from this because `SLOTS_PER_EPOCH` is a type-level constant
// for Lighthouse.
context.eth2_config = eth2_config;
let slot_clock = SystemTimeSlotClock::new(
context.eth2_config.spec.genesis_slot,
Duration::from_secs(genesis_time),
@@ -203,7 +207,10 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
self.duties_service
.clone()
.start_update_service(block_service_tx, &self.context.eth2_config.spec)
.start_update_service(
block_service_tx,
Arc::new(self.context.eth2_config.spec.clone()),
)
.map_err(|e| format!("Unable to start duties service: {}", e))?;
self.fork_service
@@ -228,80 +235,85 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
}
async fn init_from_beacon_node<E: EthSpec>(
beacon_node: &RemoteBeaconNode<E>,
beacon_node: &BeaconNodeHttpClient,
context: &RuntimeContext<E>,
) -> Result<(Eth2Config, u64, Hash256), String> {
) -> Result<(YamlConfig, u64, Hash256), String> {
// Wait for the beacon node to come online.
wait_for_node(beacon_node, context.log()).await?;
let eth2_config = beacon_node
.http
.spec()
.get_eth2_config()
let yaml_config = beacon_node
.get_config_spec()
.await
.map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?;
let genesis_time = beacon_node
.http
.beacon()
.get_genesis_time()
.await
.map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?;
.map_err(|e| format!("Unable to read spec from beacon node: {:?}", e))?
.data;
let genesis = loop {
match beacon_node.get_beacon_genesis().await {
Ok(genesis) => break genesis.data,
Err(e) => {
// A 404 error on the genesis endpoint indicates that genesis has not yet occurred.
if e.status() == Some(StatusCode::NOT_FOUND) {
info!(
context.log(),
"Waiting for genesis";
);
} else {
error!(
context.log(),
"Error polling beacon node";
"error" => format!("{:?}", e)
);
}
}
}
delay_for(RETRY_DELAY).await;
};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("Unable to read system time: {:?}", e))?;
let genesis = Duration::from_secs(genesis_time);
let genesis_time = Duration::from_secs(genesis.genesis_time);
// If the time now is less than (prior to) genesis, then delay until the
// genesis instant.
//
// If the validator client starts before genesis, it will get errors from
// the slot clock.
if now < genesis {
if now < genesis_time {
info!(
context.log(),
"Starting node prior to genesis";
"seconds_to_wait" => (genesis - now).as_secs()
"seconds_to_wait" => (genesis_time - now).as_secs()
);
delay_for(genesis - now).await;
delay_for(genesis_time - now).await;
} else {
info!(
context.log(),
"Genesis has already occurred";
"seconds_ago" => (now - genesis).as_secs()
"seconds_ago" => (now - genesis_time).as_secs()
);
}
let genesis_validators_root = beacon_node
.http
.beacon()
.get_genesis_validators_root()
.await
.map_err(|e| {
format!(
"Unable to read genesis validators root from beacon node: {:?}",
e
)
})?;
Ok((eth2_config, genesis_time, genesis_validators_root))
Ok((
yaml_config,
genesis.genesis_time,
genesis.genesis_validators_root,
))
}
/// Request the version from the node, looping back and trying again on failure. Exit once the node
/// has been contacted.
async fn wait_for_node<E: EthSpec>(
beacon_node: &RemoteBeaconNode<E>,
log: &Logger,
) -> Result<(), String> {
async fn wait_for_node(beacon_node: &BeaconNodeHttpClient, log: &Logger) -> Result<(), String> {
// Try to get the version string from the node, looping until success is returned.
loop {
let log = log.clone();
let result = beacon_node
.clone()
.http
.node()
.get_version()
.get_node_version()
.await
.map_err(|e| format!("{:?}", e));
.map_err(|e| format!("{:?}", e))
.map(|body| body.data.version);
match result {
Ok(version) => {

View File

@@ -0,0 +1,131 @@
use eth2::{
types::{BeaconCommitteeSubscription, StateId, ValidatorId},
BeaconNodeHttpClient,
};
use serde::{Deserialize, Serialize};
use types::{CommitteeIndex, Epoch, PublicKey, PublicKeyBytes, Slot};
/// This struct is being used as a shim since we deprecated the `rest_api` in favour of `http_api`.
///
/// Tracking issue: https://github.com/sigp/lighthouse/issues/1643
// NOTE: if you add or remove fields, please adjust `eq_ignoring_proposal_slots`
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct ValidatorDuty {
/// The validator's BLS public key, uniquely identifying them.
pub validator_pubkey: PublicKey,
/// The validator's index in `state.validators`
pub validator_index: Option<u64>,
/// The slot at which the validator must attest.
pub attestation_slot: Option<Slot>,
/// The index of the committee within `slot` of which the validator is a member.
pub attestation_committee_index: Option<CommitteeIndex>,
/// The position of the validator in the committee.
pub attestation_committee_position: Option<usize>,
/// The committee count at `attestation_slot`.
pub committee_count_at_slot: Option<u64>,
/// The number of validators in the committee.
pub committee_length: Option<u64>,
/// The slots in which a validator must propose a block (can be empty).
///
/// Should be set to `None` when duties are not yet known (before the current epoch).
pub block_proposal_slots: Option<Vec<Slot>>,
}
impl ValidatorDuty {
/// Instantiate `Self` as if there are no known dutes for `validator_pubkey`.
fn no_duties(validator_pubkey: PublicKey) -> Self {
ValidatorDuty {
validator_pubkey,
validator_index: None,
attestation_slot: None,
attestation_committee_index: None,
attestation_committee_position: None,
committee_count_at_slot: None,
committee_length: None,
block_proposal_slots: None,
}
}
/// Instantiate `Self` by performing requests on the `beacon_node`.
///
/// Will only request proposer duties if `current_epoch == request_epoch`.
pub async fn download(
beacon_node: &BeaconNodeHttpClient,
current_epoch: Epoch,
request_epoch: Epoch,
pubkey: PublicKey,
) -> Result<ValidatorDuty, String> {
let pubkey_bytes = PublicKeyBytes::from(&pubkey);
let validator_index = if let Some(index) = beacon_node
.get_beacon_states_validator_id(
StateId::Head,
&ValidatorId::PublicKey(pubkey_bytes.clone()),
)
.await
.map_err(|e| format!("Failed to get validator index: {}", e))?
.map(|body| body.data.index)
{
index
} else {
return Ok(Self::no_duties(pubkey));
};
if let Some(attester) = beacon_node
.get_validator_duties_attester(request_epoch, Some(&[validator_index]))
.await
.map_err(|e| format!("Failed to get attester duties: {}", e))?
.data
.first()
{
let block_proposal_slots = if current_epoch == request_epoch {
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
.map_err(|e| format!("Failed to get proposer indices: {}", e))?
.data
.into_iter()
.filter(|data| data.pubkey == pubkey_bytes)
.map(|data| data.slot)
.collect()
} else {
vec![]
};
Ok(ValidatorDuty {
validator_pubkey: pubkey,
validator_index: Some(attester.validator_index),
attestation_slot: Some(attester.slot),
attestation_committee_index: Some(attester.committee_index),
attestation_committee_position: Some(attester.validator_committee_index as usize),
committee_count_at_slot: Some(attester.committees_at_slot),
committee_length: Some(attester.committee_length),
block_proposal_slots: Some(block_proposal_slots),
})
} else {
Ok(Self::no_duties(pubkey))
}
}
/// Return `true` if these validator duties are equal, ignoring their `block_proposal_slots`.
pub fn eq_ignoring_proposal_slots(&self, other: &Self) -> bool {
self.validator_pubkey == other.validator_pubkey
&& self.validator_index == other.validator_index
&& self.attestation_slot == other.attestation_slot
&& self.attestation_committee_index == other.attestation_committee_index
&& self.attestation_committee_position == other.attestation_committee_position
&& self.committee_count_at_slot == other.committee_count_at_slot
&& self.committee_length == other.committee_length
}
/// Generate a subscription for `self`, if `self` has appropriate attestation duties.
pub fn subscription(&self, is_aggregator: bool) -> Option<BeaconCommitteeSubscription> {
Some(BeaconCommitteeSubscription {
validator_index: self.validator_index?,
committee_index: self.attestation_committee_index?,
committees_at_slot: self.committee_count_at_slot?,
slot: self.attestation_slot?,
is_aggregator,
})
}
}