Rework Validator Client fallback mechanism (#4393)

* Rework Validator Client fallback mechanism

* Add CI workflow for fallback simulator

* Tie-break with sync distance for non-synced nodes

* Fix simulator

* Cleanup unused code

* More improvements

* Add IsOptimistic enum for readability

* Use configurable sync distance tiers

* Fix tests

* Combine status and health and improve logging

* Fix nodes not being marked as available

* Fix simulator

* Fix tests again

* Increase fallback simulator tolerance

* Add http api endpoint

* Fix todos and tests

* Update simulator

* Merge branch 'unstable' into vc-fallback

* Add suggestions

* Add id to ui endpoint

* Remove unnecessary clones

* Formatting

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Fix flag tests

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Fix conflicts

* Merge branch 'unstable' into vc-fallback

* Remove unnecessary pubs

* Simplify `compute_distance_tier` and reduce notifier awaits

* Use the more descriptive `user_index` instead of `id`

* Combine sync distance tolerance flags into one

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* wip

* Use new simulator from unstable

* Fix cli text

* Remove leftover files

* Remove old commented code

* Merge branch 'unstable' into vc-fallback

* Update cli text

* Silence candidate errors when pre-genesis

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Retry on failure

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Remove disable_run_on_all

* Remove unused error variant

* Fix out of date comment

* Merge branch 'unstable' into vc-fallback

* Remove unnecessary as_u64

* Remove more out of date comments

* Use tokio RwLock and remove parking_lot

* Merge branch 'unstable' into vc-fallback

* Formatting

* Ensure nodes are still added to total when not available

* Allow VC to detect when BN comes online

* Fix ui endpoint

* Don't have block_service as an Option

* Merge branch 'unstable' into vc-fallback

* Clean up lifetimes and futures

* Revert "Don't have block_service as an Option"

This reverts commit b5445a09e9.

* Merge branch 'unstable' into vc-fallback

* Merge branch 'unstable' into vc-fallback

* Improve rwlock sanitation using clones

* Merge branch 'unstable' into vc-fallback

* Drop read lock immediately by cloning the vec.
This commit is contained in:
Mac L
2024-10-03 09:57:12 +04:00
committed by GitHub
parent 17849b58ec
commit f870b66f49
24 changed files with 1316 additions and 778 deletions

View File

@@ -10,7 +10,6 @@ path = "src/lib.rs"
[dev-dependencies]
tokio = { workspace = true }
itertools = { workspace = true }
[dependencies]
tree_hash = { workspace = true }
@@ -60,4 +59,5 @@ sysinfo = { workspace = true }
system_health = { path = "../common/system_health" }
logging = { workspace = true }
strum = { workspace = true }
itertools = { workspace = true }
fdlimit = "0.3.0"

View File

@@ -1,9 +1,8 @@
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
validator_store::{Error as ValidatorStoreError, ValidatorStore},
OfflineOnFailure,
};
use environment::RuntimeContext;
use futures::future::join_all;
@@ -339,21 +338,17 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let attestation_data = self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;
@@ -458,26 +453,21 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Post the attestations to the BN.
match self
.beacon_nodes
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Attestations,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_beacon_pool_attestations_v2(attestations, fork_name)
.await
} else {
beacon_node
.post_beacon_pool_attestations_v1(attestations)
.await
}
},
)
.request(ApiTopic::Attestations, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_beacon_pool_attestations_v2(attestations, fork_name)
.await
} else {
beacon_node
.post_beacon_pool_attestations_v1(attestations)
.await
}
})
.await
{
Ok(()) => info!(
@@ -540,46 +530,38 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let aggregated_attestation = &self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
if fork_name.electra_enabled() {
beacon_node
.get_validator_aggregate_attestation_v2(
attestation_data.slot,
attestation_data.tree_hash_root(),
committee_index,
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| {
format!("No aggregate available for {:?}", attestation_data)
})
.map(|result| result.data)
} else {
beacon_node
.get_validator_aggregate_attestation_v1(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| {
format!("No aggregate available for {:?}", attestation_data)
})
.map(|result| result.data)
}
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
if fork_name.electra_enabled() {
beacon_node
.get_validator_aggregate_attestation_v2(
attestation_data.slot,
attestation_data.tree_hash_root(),
committee_index,
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
} else {
beacon_node
.get_validator_aggregate_attestation_v1(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
}
})
.await
.map_err(|e| e.to_string())?;
@@ -637,30 +619,26 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice();
match self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_validator_aggregate_and_proof_v2(
signed_aggregate_and_proofs_slice,
fork_name,
)
.await
} else {
beacon_node
.post_validator_aggregate_and_proof_v1(
signed_aggregate_and_proofs_slice,
)
.await
}
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
if fork_name.electra_enabled() {
beacon_node
.post_validator_aggregate_and_proof_v2(
signed_aggregate_and_proofs_slice,
fork_name,
)
.await
} else {
beacon_node
.post_validator_aggregate_and_proof_v1(
signed_aggregate_and_proofs_slice,
)
.await
}
})
.await
{
Ok(()) => {

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,420 @@
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use types::Slot;
/// Sync distances between 0 and DEFAULT_SYNC_TOLERANCE are considered `synced`.
/// Sync distance tiers are determined by the different modifiers.
///
/// The default range is the following:
/// Synced: 0..=8
/// Small: 9..=16
/// Medium: 17..=64
/// Large: 65..
const DEFAULT_SYNC_TOLERANCE: Slot = Slot::new(8);
const DEFAULT_SMALL_SYNC_DISTANCE_MODIFIER: Slot = Slot::new(8);
const DEFAULT_MEDIUM_SYNC_DISTANCE_MODIFIER: Slot = Slot::new(48);
type HealthTier = u8;
type SyncDistance = Slot;
/// Helpful enum which is used when pattern matching to determine health tier.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum SyncDistanceTier {
Synced,
Small,
Medium,
Large,
}
/// Contains the different sync distance tiers which are determined at runtime by the
/// `beacon-nodes-sync-tolerances` flag.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct BeaconNodeSyncDistanceTiers {
pub synced: SyncDistance,
pub small: SyncDistance,
pub medium: SyncDistance,
}
impl Default for BeaconNodeSyncDistanceTiers {
fn default() -> Self {
Self {
synced: DEFAULT_SYNC_TOLERANCE,
small: DEFAULT_SYNC_TOLERANCE + DEFAULT_SMALL_SYNC_DISTANCE_MODIFIER,
medium: DEFAULT_SYNC_TOLERANCE
+ DEFAULT_SMALL_SYNC_DISTANCE_MODIFIER
+ DEFAULT_MEDIUM_SYNC_DISTANCE_MODIFIER,
}
}
}
impl FromStr for BeaconNodeSyncDistanceTiers {
type Err = String;
fn from_str(s: &str) -> Result<Self, String> {
let values: (u64, u64, u64) = s
.split(',')
.map(|s| {
s.parse()
.map_err(|e| format!("Invalid sync distance modifier: {e:?}"))
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.collect_tuple()
.ok_or("Invalid number of sync distance modifiers".to_string())?;
Ok(BeaconNodeSyncDistanceTiers {
synced: Slot::new(values.0),
small: Slot::new(values.0 + values.1),
medium: Slot::new(values.0 + values.1 + values.2),
})
}
}
impl BeaconNodeSyncDistanceTiers {
/// Takes a given sync distance and determines its tier based on the `sync_tolerance` defined by
/// the CLI.
pub fn compute_distance_tier(&self, distance: SyncDistance) -> SyncDistanceTier {
if distance <= self.synced {
SyncDistanceTier::Synced
} else if distance <= self.small {
SyncDistanceTier::Small
} else if distance <= self.medium {
SyncDistanceTier::Medium
} else {
SyncDistanceTier::Large
}
}
}
/// Execution Node health metrics.
///
/// Currently only considers `el_offline`.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum ExecutionEngineHealth {
Healthy,
Unhealthy,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum IsOptimistic {
Yes,
No,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct BeaconNodeHealthTier {
pub tier: HealthTier,
pub sync_distance: SyncDistance,
pub distance_tier: SyncDistanceTier,
}
impl Display for BeaconNodeHealthTier {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Tier{}({})", self.tier, self.sync_distance)
}
}
impl Ord for BeaconNodeHealthTier {
fn cmp(&self, other: &Self) -> Ordering {
let ordering = self.tier.cmp(&other.tier);
if ordering == Ordering::Equal {
if self.distance_tier == SyncDistanceTier::Synced {
// Don't tie-break on sync distance in these cases.
// This ensures validator clients don't artificially prefer one node.
ordering
} else {
self.sync_distance.cmp(&other.sync_distance)
}
} else {
ordering
}
}
}
impl PartialOrd for BeaconNodeHealthTier {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl BeaconNodeHealthTier {
pub fn new(
tier: HealthTier,
sync_distance: SyncDistance,
distance_tier: SyncDistanceTier,
) -> Self {
Self {
tier,
sync_distance,
distance_tier,
}
}
}
/// Beacon Node Health metrics.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct BeaconNodeHealth {
// The index of the Beacon Node. This should correspond with its position in the
// `--beacon-nodes` list. Note that the `user_index` field is used to tie-break nodes with the
// same health so that nodes with a lower index are preferred.
pub user_index: usize,
// The slot number of the head.
pub head: Slot,
// Whether the node is optimistically synced.
pub optimistic_status: IsOptimistic,
// The status of the nodes connected Execution Engine.
pub execution_status: ExecutionEngineHealth,
// The overall health tier of the Beacon Node. Used to rank the nodes for the purposes of
// fallbacks.
pub health_tier: BeaconNodeHealthTier,
}
impl Ord for BeaconNodeHealth {
fn cmp(&self, other: &Self) -> Ordering {
let ordering = self.health_tier.cmp(&other.health_tier);
if ordering == Ordering::Equal {
// Tie-break node health by `user_index`.
self.user_index.cmp(&other.user_index)
} else {
ordering
}
}
}
impl PartialOrd for BeaconNodeHealth {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl BeaconNodeHealth {
pub fn from_status(
user_index: usize,
sync_distance: Slot,
head: Slot,
optimistic_status: IsOptimistic,
execution_status: ExecutionEngineHealth,
distance_tiers: &BeaconNodeSyncDistanceTiers,
) -> Self {
let health_tier = BeaconNodeHealth::compute_health_tier(
sync_distance,
optimistic_status,
execution_status,
distance_tiers,
);
Self {
user_index,
head,
optimistic_status,
execution_status,
health_tier,
}
}
pub fn get_index(&self) -> usize {
self.user_index
}
pub fn get_health_tier(&self) -> BeaconNodeHealthTier {
self.health_tier
}
fn compute_health_tier(
sync_distance: SyncDistance,
optimistic_status: IsOptimistic,
execution_status: ExecutionEngineHealth,
sync_distance_tiers: &BeaconNodeSyncDistanceTiers,
) -> BeaconNodeHealthTier {
let sync_distance_tier = sync_distance_tiers.compute_distance_tier(sync_distance);
let health = (sync_distance_tier, optimistic_status, execution_status);
match health {
(SyncDistanceTier::Synced, IsOptimistic::No, ExecutionEngineHealth::Healthy) => {
BeaconNodeHealthTier::new(1, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Small, IsOptimistic::No, ExecutionEngineHealth::Healthy) => {
BeaconNodeHealthTier::new(2, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Synced, IsOptimistic::No, ExecutionEngineHealth::Unhealthy) => {
BeaconNodeHealthTier::new(3, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Medium, IsOptimistic::No, ExecutionEngineHealth::Healthy) => {
BeaconNodeHealthTier::new(4, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Synced, IsOptimistic::Yes, ExecutionEngineHealth::Healthy) => {
BeaconNodeHealthTier::new(5, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Synced, IsOptimistic::Yes, ExecutionEngineHealth::Unhealthy) => {
BeaconNodeHealthTier::new(6, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Small, IsOptimistic::No, ExecutionEngineHealth::Unhealthy) => {
BeaconNodeHealthTier::new(7, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Small, IsOptimistic::Yes, ExecutionEngineHealth::Healthy) => {
BeaconNodeHealthTier::new(8, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Small, IsOptimistic::Yes, ExecutionEngineHealth::Unhealthy) => {
BeaconNodeHealthTier::new(9, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Large, IsOptimistic::No, ExecutionEngineHealth::Healthy) => {
BeaconNodeHealthTier::new(10, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Medium, IsOptimistic::No, ExecutionEngineHealth::Unhealthy) => {
BeaconNodeHealthTier::new(11, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Medium, IsOptimistic::Yes, ExecutionEngineHealth::Healthy) => {
BeaconNodeHealthTier::new(12, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Medium, IsOptimistic::Yes, ExecutionEngineHealth::Unhealthy) => {
BeaconNodeHealthTier::new(13, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Large, IsOptimistic::No, ExecutionEngineHealth::Unhealthy) => {
BeaconNodeHealthTier::new(14, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Large, IsOptimistic::Yes, ExecutionEngineHealth::Healthy) => {
BeaconNodeHealthTier::new(15, sync_distance, sync_distance_tier)
}
(SyncDistanceTier::Large, IsOptimistic::Yes, ExecutionEngineHealth::Unhealthy) => {
BeaconNodeHealthTier::new(16, sync_distance, sync_distance_tier)
}
}
}
}
#[cfg(test)]
mod tests {
use super::ExecutionEngineHealth::{Healthy, Unhealthy};
use super::{
BeaconNodeHealth, BeaconNodeHealthTier, BeaconNodeSyncDistanceTiers, IsOptimistic,
SyncDistanceTier,
};
use crate::beacon_node_fallback::Config;
use std::str::FromStr;
use types::Slot;
#[test]
fn all_possible_health_tiers() {
let config = Config::default();
let beacon_node_sync_distance_tiers = config.sync_tolerances;
let mut health_vec = vec![];
for head_slot in 0..=64 {
for optimistic_status in &[IsOptimistic::No, IsOptimistic::Yes] {
for ee_health in &[Healthy, Unhealthy] {
let health = BeaconNodeHealth::from_status(
0,
Slot::new(0),
Slot::new(head_slot),
*optimistic_status,
*ee_health,
&beacon_node_sync_distance_tiers,
);
health_vec.push(health);
}
}
}
for health in health_vec {
let health_tier = health.get_health_tier();
let tier = health_tier.tier;
let distance = health_tier.sync_distance;
let distance_tier = beacon_node_sync_distance_tiers.compute_distance_tier(distance);
// Check sync distance.
if [1, 3, 5, 6].contains(&tier) {
assert!(distance_tier == SyncDistanceTier::Synced)
} else if [2, 7, 8, 9].contains(&tier) {
assert!(distance_tier == SyncDistanceTier::Small);
} else if [4, 11, 12, 13].contains(&tier) {
assert!(distance_tier == SyncDistanceTier::Medium);
} else {
assert!(distance_tier == SyncDistanceTier::Large);
}
// Check optimistic status.
if [1, 2, 3, 4, 7, 10, 11, 14].contains(&tier) {
assert_eq!(health.optimistic_status, IsOptimistic::No);
} else {
assert_eq!(health.optimistic_status, IsOptimistic::Yes);
}
// Check execution health.
if [3, 6, 7, 9, 11, 13, 14, 16].contains(&tier) {
assert_eq!(health.execution_status, Unhealthy);
} else {
assert_eq!(health.execution_status, Healthy);
}
}
}
fn new_distance_tier(
distance: u64,
distance_tiers: &BeaconNodeSyncDistanceTiers,
) -> BeaconNodeHealthTier {
BeaconNodeHealth::compute_health_tier(
Slot::new(distance),
IsOptimistic::No,
Healthy,
distance_tiers,
)
}
#[test]
fn sync_tolerance_default() {
let distance_tiers = BeaconNodeSyncDistanceTiers::default();
let synced_low = new_distance_tier(0, &distance_tiers);
let synced_high = new_distance_tier(8, &distance_tiers);
let small_low = new_distance_tier(9, &distance_tiers);
let small_high = new_distance_tier(16, &distance_tiers);
let medium_low = new_distance_tier(17, &distance_tiers);
let medium_high = new_distance_tier(64, &distance_tiers);
let large = new_distance_tier(65, &distance_tiers);
assert_eq!(synced_low.tier, 1);
assert_eq!(synced_high.tier, 1);
assert_eq!(small_low.tier, 2);
assert_eq!(small_high.tier, 2);
assert_eq!(medium_low.tier, 4);
assert_eq!(medium_high.tier, 4);
assert_eq!(large.tier, 10);
}
#[test]
fn sync_tolerance_from_str() {
// String should set the tiers as:
// synced: 0..=4
// small: 5..=8
// medium 9..=12
// large: 13..
let distance_tiers = BeaconNodeSyncDistanceTiers::from_str("4,4,4").unwrap();
let synced_low = new_distance_tier(0, &distance_tiers);
let synced_high = new_distance_tier(4, &distance_tiers);
let small_low = new_distance_tier(5, &distance_tiers);
let small_high = new_distance_tier(8, &distance_tiers);
let medium_low = new_distance_tier(9, &distance_tiers);
let medium_high = new_distance_tier(12, &distance_tiers);
let large = new_distance_tier(13, &distance_tiers);
assert_eq!(synced_low.tier, 1);
assert_eq!(synced_high.tier, 1);
assert_eq!(small_low.tier, 2);
assert_eq!(small_high.tier, 2);
assert_eq!(medium_low.tier, 4);
assert_eq!(medium_high.tier, 4);
assert_eq!(large.tier, 10);
}
}

View File

@@ -1,9 +1,8 @@
use crate::beacon_node_fallback::{Error as FallbackError, Errors};
use crate::{
beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced},
beacon_node_fallback::{ApiTopic, BeaconNodeFallback},
determine_graffiti,
graffiti_file::GraffitiFile,
OfflineOnFailure,
};
use crate::{
http_metrics::metrics,
@@ -141,26 +140,16 @@ pub struct ProposerFallback<T, E: EthSpec> {
impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
// Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`.
pub async fn request_proposers_first<'a, F, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
func: F,
) -> Result<(), Errors<Err>>
pub async fn request_proposers_first<F, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
where
F: Fn(&'a BeaconNodeHttpClient) -> R + Clone,
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<(), Err>>,
Err: Debug,
{
// If there are proposer nodes, try calling `func` on them and return early if they are successful.
if let Some(proposer_nodes) = &self.proposer_nodes {
if proposer_nodes
.request(
require_synced,
offline_on_failure,
ApiTopic::Blocks,
func.clone(),
)
.request(ApiTopic::Blocks, func.clone())
.await
.is_ok()
{
@@ -169,28 +158,18 @@ impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
}
// If the proposer nodes failed, try on the non-proposer nodes.
self.beacon_nodes
.request(require_synced, offline_on_failure, ApiTopic::Blocks, func)
.await
self.beacon_nodes.request(ApiTopic::Blocks, func).await
}
// Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`.
pub async fn request_proposers_last<'a, F, O, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
func: F,
) -> Result<O, Errors<Err>>
pub async fn request_proposers_last<F, O, Err, R>(&self, func: F) -> Result<O, Errors<Err>>
where
F: Fn(&'a BeaconNodeHttpClient) -> R + Clone,
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<O, Err>>,
Err: Debug,
{
// Try running `func` on the non-proposer beacon nodes.
let beacon_nodes_result = self
.beacon_nodes
.first_success(require_synced, offline_on_failure, func.clone())
.await;
let beacon_nodes_result = self.beacon_nodes.first_success(func.clone()).await;
match (beacon_nodes_result, &self.proposer_nodes) {
// The non-proposer node call succeed, return the result.
@@ -198,11 +177,7 @@ impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
// The non-proposer node call failed, but we don't have any proposer nodes. Return an error.
(Err(e), None) => Err(e),
// The non-proposer node call failed, try the same call on the proposer nodes.
(Err(_), Some(proposer_nodes)) => {
proposer_nodes
.first_success(require_synced, offline_on_failure, func)
.await
}
(Err(_), Some(proposer_nodes)) => proposer_nodes.first_success(func).await,
}
}
}
@@ -211,8 +186,8 @@ impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
pub struct Inner<T, E: EthSpec> {
validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: Arc<T>,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
proposer_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
pub(crate) beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
pub(crate) proposer_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: RuntimeContext<E>,
graffiti: Option<Graffiti>,
graffiti_file: Option<GraffitiFile>,
@@ -418,14 +393,10 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// protect them from DoS attacks and they're most likely to successfully
// publish a block.
proposer_fallback
.request_proposers_first(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {
self.publish_signed_block_contents(&signed_block, beacon_node)
.await
},
)
.request_proposers_first(|beacon_node| async {
self.publish_signed_block_contents(&signed_block, beacon_node)
.await
})
.await?;
info!(
@@ -503,32 +474,28 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Try the proposer nodes last, since it's likely that they don't have a
// great view of attestations on the network.
let unsigned_block = proposer_fallback
.request_proposers_last(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
Self::get_validator_block(
beacon_node,
slot,
randao_reveal_ref,
graffiti,
proposer_index,
builder_boost_factor,
log,
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})
},
)
.request_proposers_last(|beacon_node| async move {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
Self::get_validator_block(
&beacon_node,
slot,
randao_reveal_ref,
graffiti,
proposer_index,
builder_boost_factor,
log,
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})
})
.await?;
self_ref
@@ -547,7 +514,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
async fn publish_signed_block_contents(
&self,
signed_block: &SignedBlock<E>,
beacon_node: &BeaconNodeHttpClient,
beacon_node: BeaconNodeHttpClient,
) -> Result<(), BlockError> {
let log = self.context.log();
let slot = signed_block.slot();

View File

@@ -1,80 +1,27 @@
use crate::beacon_node_fallback::CandidateError;
use eth2::BeaconNodeHttpClient;
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use eth2::{types::Slot, BeaconNodeHttpClient};
use slog::{warn, Logger};
/// A distance in slots.
const SYNC_TOLERANCE: u64 = 4;
/// Returns
///
/// `Ok(())` if the beacon node is synced and ready for action,
/// `Err(CandidateError::Offline)` if the beacon node is unreachable,
/// `Err(CandidateError::NotSynced)` if the beacon node indicates that it is syncing **AND**
/// it is more than `SYNC_TOLERANCE` behind the highest
/// known slot.
///
/// The second condition means the even if the beacon node thinks that it's syncing, we'll still
/// try to use it if it's close enough to the head.
pub async fn check_synced<T: SlotClock>(
pub async fn check_node_health(
beacon_node: &BeaconNodeHttpClient,
slot_clock: &T,
log_opt: Option<&Logger>,
) -> Result<(), CandidateError> {
log: &Logger,
) -> Result<(Slot, bool, bool), CandidateError> {
let resp = match beacon_node.get_node_syncing().await {
Ok(resp) => resp,
Err(e) => {
if let Some(log) = log_opt {
warn!(
log,
"Unable connect to beacon node";
"error" => %e
)
}
warn!(
log,
"Unable connect to beacon node";
"error" => %e
);
return Err(CandidateError::Offline);
}
};
let bn_is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE);
let is_synced = bn_is_synced && !resp.data.el_offline;
if let Some(log) = log_opt {
if !is_synced {
debug!(
log,
"Beacon node sync status";
"status" => format!("{:?}", resp),
);
warn!(
log,
"Beacon node is not synced";
"sync_distance" => resp.data.sync_distance.as_u64(),
"head_slot" => resp.data.head_slot.as_u64(),
"endpoint" => %beacon_node,
"el_offline" => resp.data.el_offline,
);
}
if let Some(local_slot) = slot_clock.now() {
let remote_slot = resp.data.head_slot + resp.data.sync_distance;
if remote_slot + 1 < local_slot || local_slot + 1 < remote_slot {
error!(
log,
"Time discrepancy with beacon node";
"msg" => "check the system time on this host and the beacon node",
"beacon_node_slot" => remote_slot,
"local_slot" => local_slot,
"endpoint" => %beacon_node,
);
}
}
}
if is_synced {
Ok(())
} else {
Err(CandidateError::NotSynced)
}
Ok((
resp.data.head_slot,
resp.data.is_optimistic,
resp.data.el_offline,
))
}

View File

@@ -444,6 +444,33 @@ pub fn cli_app() -> Command {
.help_heading(FLAG_HEADER)
.display_order(0)
)
.arg(
Arg::new("beacon-nodes-sync-tolerances")
.long("beacon-nodes-sync-tolerances")
.value_name("SYNC_TOLERANCES")
.help("A comma-separated list of 3 values which sets the size of each sync distance range when \
determining the health of each connected beacon node. \
The first value determines the `Synced` range. \
If a connected beacon node is synced to within this number of slots it is considered 'Synced'. \
The second value determines the `Small` sync distance range. \
This range starts immediately after the `Synced` range. \
The third value determines the `Medium` sync distance range. \
This range starts immediately after the `Small` range. \
Any sync distance value beyond that is considered `Large`. \
For example, a value of `8,8,48` would have ranges like the following: \
`Synced`: 0..=8 \
`Small`: 9..=16 \
`Medium`: 17..=64 \
`Large`: 65.. \
These values are used to determine what ordering beacon node fallbacks are used in. \
Generally, `Synced` nodes are preferred over `Small` and so on. \
Nodes in the `Synced` range will tie-break based on their ordering in `--beacon-nodes`. \
This ensures the primary beacon node is prioritised. \
[default: 8,8,48]")
.action(ArgAction::Set)
.help_heading(FLAG_HEADER)
.display_order(0)
)
.arg(
Arg::new("disable-slashing-protection-web3signer")
.long("disable-slashing-protection-web3signer")

View File

@@ -1,6 +1,8 @@
use crate::beacon_node_fallback::ApiTopic;
use crate::graffiti_file::GraffitiFile;
use crate::{http_api, http_metrics};
use crate::{
beacon_node_fallback, beacon_node_health::BeaconNodeSyncDistanceTiers, http_api, http_metrics,
};
use clap::ArgMatches;
use clap_utils::{flags::DISABLE_MALLOC_TUNING_FLAG, parse_optional, parse_required};
use directory::{
@@ -14,6 +16,7 @@ use slog::{info, warn, Logger};
use std::fs;
use std::net::IpAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use types::{Address, GRAFFITI_BYTES_LEN};
@@ -21,7 +24,7 @@ pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/";
pub const DEFAULT_WEB3SIGNER_KEEP_ALIVE: Option<Duration> = Some(Duration::from_secs(20));
/// Stores the core configuration for this validator instance.
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
/// The data directory, which stores all validator databases
pub validator_dir: PathBuf,
@@ -52,6 +55,8 @@ pub struct Config {
pub http_api: http_api::Config,
/// Configuration for the HTTP REST API.
pub http_metrics: http_metrics::Config,
/// Configuration for the Beacon Node fallback.
pub beacon_node_fallback: beacon_node_fallback::Config,
/// Configuration for sending metrics to a remote explorer endpoint.
pub monitoring_api: Option<monitoring_api::Config>,
/// If true, enable functionality that monitors the network for attestations or proposals from
@@ -117,6 +122,7 @@ impl Default for Config {
fee_recipient: None,
http_api: <_>::default(),
http_metrics: <_>::default(),
beacon_node_fallback: <_>::default(),
monitoring_api: None,
enable_doppelganger_protection: false,
enable_high_validator_count_metrics: false,
@@ -258,6 +264,16 @@ impl Config {
.collect::<Result<_, _>>()?;
}
/*
* Beacon node fallback
*/
if let Some(sync_tolerance) = cli_args.get_one::<String>("beacon-nodes-sync-tolerances") {
config.beacon_node_fallback.sync_tolerances =
BeaconNodeSyncDistanceTiers::from_str(sync_tolerance)?;
} else {
config.beacon_node_fallback.sync_tolerances = BeaconNodeSyncDistanceTiers::default();
}
/*
* Web3 signer
*/

View File

@@ -29,9 +29,8 @@
//!
//! Doppelganger protection is a best-effort, last-line-of-defence mitigation. Do not rely upon it.
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::BeaconNodeFallback;
use crate::validator_store::ValidatorStore;
use crate::OfflineOnFailure;
use environment::RuntimeContext;
use eth2::types::LivenessResponseData;
use parking_lot::RwLock;
@@ -175,12 +174,11 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>(
} else {
// Request the previous epoch liveness state from the beacon node.
beacon_nodes
.first_success(
RequireSynced::Yes,
OfflineOnFailure::Yes,
|beacon_node| async {
.first_success(|beacon_node| {
let validator_indices_ref = &validator_indices;
async move {
beacon_node
.post_validator_liveness_epoch(previous_epoch, &validator_indices)
.post_validator_liveness_epoch(previous_epoch, validator_indices_ref)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| {
@@ -194,8 +192,8 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>(
})
.collect()
})
},
)
}
})
.await
.unwrap_or_else(|e| {
crit!(
@@ -212,12 +210,11 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>(
// Request the current epoch liveness state from the beacon node.
let current_epoch_responses = beacon_nodes
.first_success(
RequireSynced::Yes,
OfflineOnFailure::Yes,
|beacon_node| async {
.first_success(|beacon_node| {
let validator_indices_ref = &validator_indices;
async move {
beacon_node
.post_validator_liveness_epoch(current_epoch, &validator_indices)
.post_validator_liveness_epoch(current_epoch, validator_indices_ref)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| {
@@ -231,8 +228,8 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>(
})
.collect()
})
},
)
}
})
.await
.unwrap_or_else(|e| {
crit!(

View File

@@ -8,7 +8,7 @@
pub mod sync;
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, OfflineOnFailure, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY};
use crate::{
block_service::BlockServiceNotification,
@@ -517,22 +517,18 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
// Query the remote BN to resolve a pubkey to a validator index.
let download_result = duties_service
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::VALIDATOR_ID_HTTP_GET],
);
beacon_node
.get_beacon_states_validator_id(
StateId::Head,
&ValidatorId::PublicKey(pubkey),
)
.await
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::VALIDATOR_ID_HTTP_GET],
);
beacon_node
.get_beacon_states_validator_id(
StateId::Head,
&ValidatorId::PublicKey(pubkey),
)
.await
})
.await;
let fee_recipient = duties_service
@@ -744,20 +740,15 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let subscriptions_ref = &subscriptions;
let subscription_result = duties_service
.beacon_nodes
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Subscriptions,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::SUBSCRIPTIONS_HTTP_POST],
);
beacon_node
.post_validator_beacon_committee_subscriptions(subscriptions_ref)
.await
},
)
.request(ApiTopic::Subscriptions, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::SUBSCRIPTIONS_HTTP_POST],
);
beacon_node
.post_validator_beacon_committee_subscriptions(subscriptions_ref)
.await
})
.await;
if subscription_result.as_ref().is_ok() {
debug!(
@@ -769,7 +760,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
subscription_slots.record_successful_subscription_at(current_slot);
}
} else if let Err(e) = subscription_result {
if e.num_errors() < duties_service.beacon_nodes.num_total() {
if e.num_errors() < duties_service.beacon_nodes.num_total().await {
warn!(
log,
"Some subscriptions failed";
@@ -1037,19 +1028,15 @@ async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
duties_service
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, validator_indices)
.await
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, validator_indices)
.await
})
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
}
@@ -1273,19 +1260,15 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
if !local_pubkeys.is_empty() {
let download_result = duties_service
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
})
.await;
match download_result {

View File

@@ -1,4 +1,3 @@
use crate::beacon_node_fallback::{OfflineOnFailure, RequireSynced};
use crate::{
doppelganger_service::DoppelgangerStatus,
duties_service::{DutiesService, Error},
@@ -442,19 +441,15 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
let duties_response = duties_service
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::VALIDATOR_DUTIES_SYNC_HTTP_POST],
);
beacon_node
.post_validator_duties_sync(period_start_epoch, local_indices)
.await
},
)
.first_success(|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::VALIDATOR_DUTIES_SYNC_HTTP_POST],
);
beacon_node
.post_validator_duties_sync(period_start_epoch, local_indices)
.await
})
.await;
let duties = match duties_response {

View File

@@ -8,10 +8,11 @@ mod tests;
pub mod test_utils;
use crate::beacon_node_fallback::CandidateInfo;
use crate::http_api::graffiti::{delete_graffiti, get_graffiti, set_graffiti};
use crate::http_api::create_signed_voluntary_exit::create_signed_voluntary_exit;
use crate::{determine_graffiti, GraffitiFile, ValidatorStore};
use crate::{determine_graffiti, BlockService, GraffitiFile, ValidatorStore};
use account_utils::{
mnemonic_from_phrase,
validator_definitions::{SigningDefinition, ValidatorDefinition, Web3SignerDefinition},
@@ -72,6 +73,7 @@ impl From<String> for Error {
pub struct Context<T: SlotClock, E: EthSpec> {
pub task_executor: TaskExecutor,
pub api_secret: ApiSecret,
pub block_service: Option<BlockService<T, E>>,
pub validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub validator_dir: Option<PathBuf>,
pub secrets_dir: Option<PathBuf>,
@@ -169,6 +171,17 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
}
};
let inner_block_service = ctx.block_service.clone();
let block_service_filter = warp::any()
.map(move || inner_block_service.clone())
.and_then(|block_service: Option<_>| async move {
block_service.ok_or_else(|| {
warp_utils::reject::custom_not_found(
"block service is not initialized.".to_string(),
)
})
});
let inner_validator_store = ctx.validator_store.clone();
let validator_store_filter = warp::any()
.map(move || inner_validator_store.clone())
@@ -398,6 +411,40 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
},
);
// GET lighthouse/ui/fallback_health
let get_lighthouse_ui_fallback_health = warp::path("lighthouse")
.and(warp::path("ui"))
.and(warp::path("fallback_health"))
.and(warp::path::end())
.and(block_service_filter.clone())
.then(|block_filter: BlockService<T, E>| async move {
let mut result: HashMap<String, Vec<CandidateInfo>> = HashMap::new();
let mut beacon_nodes = Vec::new();
for node in &*block_filter.beacon_nodes.candidates.read().await {
beacon_nodes.push(CandidateInfo {
index: node.index,
endpoint: node.beacon_node.to_string(),
health: *node.health.read().await,
});
}
result.insert("beacon_nodes".to_string(), beacon_nodes);
if let Some(proposer_nodes_list) = &block_filter.proposer_nodes {
let mut proposer_nodes = Vec::new();
for node in &*proposer_nodes_list.candidates.read().await {
proposer_nodes.push(CandidateInfo {
index: node.index,
endpoint: node.beacon_node.to_string(),
health: *node.health.read().await,
});
}
result.insert("proposer_nodes".to_string(), proposer_nodes);
}
blocking_json_task(move || Ok(api_types::GenericResponse::from(result))).await
});
// POST lighthouse/validators/
let post_validators = warp::path("lighthouse")
.and(warp::path("validators"))
@@ -1253,6 +1300,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.or(get_lighthouse_validators_pubkey)
.or(get_lighthouse_ui_health)
.or(get_lighthouse_ui_graffiti)
.or(get_lighthouse_ui_fallback_health)
.or(get_fee_recipient)
.or(get_gas_limit)
.or(get_graffiti)

View File

@@ -127,6 +127,7 @@ impl ApiTester {
let context = Arc::new(Context {
task_executor: test_runtime.task_executor.clone(),
api_secret,
block_service: None,
validator_dir: Some(validator_dir.path().into()),
secrets_dir: Some(secrets_dir.path().into()),
validator_store: Some(validator_store.clone()),

View File

@@ -115,6 +115,7 @@ impl ApiTester {
let context = Arc::new(Context {
task_executor: test_runtime.task_executor.clone(),
api_secret,
block_service: None,
validator_dir: Some(validator_dir.path().into()),
secrets_dir: Some(secrets_dir.path().into()),
validator_store: Some(validator_store.clone()),

View File

@@ -1,5 +1,6 @@
mod attestation_service;
mod beacon_node_fallback;
mod beacon_node_health;
mod block_service;
mod check_synced;
mod cli;
@@ -20,6 +21,7 @@ pub mod initialized_validators;
pub mod validator_store;
pub use beacon_node_fallback::ApiTopic;
pub use beacon_node_health::BeaconNodeSyncDistanceTiers;
pub use cli::cli_app;
pub use config::Config;
use initialized_validators::InitializedValidators;
@@ -29,8 +31,7 @@ use sensitive_url::SensitiveUrl;
pub use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use crate::beacon_node_fallback::{
start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, OfflineOnFailure,
RequireSynced,
start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode,
};
use crate::doppelganger_service::DoppelgangerService;
use crate::graffiti_file::GraffitiFile;
@@ -364,15 +365,21 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.collect::<Result<Vec<BeaconNodeHttpClient>, String>>()?;
let num_nodes = beacon_nodes.len();
// User order of `beacon_nodes` is preserved, so `index` corresponds to the position of
// the node in `--beacon_nodes`.
let candidates = beacon_nodes
.into_iter()
.map(CandidateBeaconNode::new)
.enumerate()
.map(|(index, node)| CandidateBeaconNode::new(node, index))
.collect();
let proposer_nodes_num = proposer_nodes.len();
// User order of `proposer_nodes` is preserved, so `index` corresponds to the position of
// the node in `--proposer_nodes`.
let proposer_candidates = proposer_nodes
.into_iter()
.map(CandidateBeaconNode::new)
.enumerate()
.map(|(index, node)| CandidateBeaconNode::new(node, index))
.collect();
// Set the count for beacon node fallbacks excluding the primary beacon node.
@@ -394,6 +401,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
let mut beacon_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new(
candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
@@ -401,6 +409,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
let mut proposer_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new(
proposer_candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
@@ -563,6 +572,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
let ctx = Arc::new(http_api::Context {
task_executor: self.context.executor.clone(),
api_secret,
block_service: Some(self.block_service.clone()),
validator_store: Some(self.validator_store.clone()),
validator_dir: Some(self.config.validator_dir.clone()),
secrets_dir: Some(self.config.secrets_dir.clone()),
@@ -655,10 +665,10 @@ async fn init_from_beacon_node<E: EthSpec>(
proposer_nodes.update_all_candidates().await;
let num_available = beacon_nodes.num_available().await;
let num_total = beacon_nodes.num_total();
let num_total = beacon_nodes.num_total().await;
let proposer_available = proposer_nodes.num_available().await;
let proposer_total = proposer_nodes.num_total();
let proposer_total = proposer_nodes.num_total().await;
if proposer_total > 0 && proposer_available == 0 {
warn!(
@@ -704,11 +714,7 @@ async fn init_from_beacon_node<E: EthSpec>(
let genesis = loop {
match beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|node| async move { node.get_beacon_genesis().await },
)
.first_success(|node| async move { node.get_beacon_genesis().await })
.await
{
Ok(genesis) => break genesis.data,
@@ -795,11 +801,7 @@ async fn poll_whilst_waiting_for_genesis<E: EthSpec>(
) -> Result<(), String> {
loop {
match beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move { beacon_node.get_lighthouse_staking().await },
)
.first_success(|beacon_node| async move { beacon_node.get_lighthouse_staking().await })
.await
{
Ok(is_staking) => {

View File

@@ -1,7 +1,7 @@
use crate::http_metrics;
use crate::{DutiesService, ProductionValidatorClient};
use lighthouse_metrics::set_gauge;
use slog::{error, info, Logger};
use slog::{debug, error, info, Logger};
use slot_clock::SlotClock;
use tokio::time::{sleep, Duration};
use types::EthSpec;
@@ -39,25 +39,32 @@ async fn notify<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
log: &Logger,
) {
let num_available = duties_service.beacon_nodes.num_available().await;
let (candidate_info, num_available, num_synced) =
duties_service.beacon_nodes.get_notifier_info().await;
let num_total = candidate_info.len();
let num_synced_fallback = num_synced.saturating_sub(1);
set_gauge(
&http_metrics::metrics::AVAILABLE_BEACON_NODES_COUNT,
num_available as i64,
);
let num_synced = duties_service.beacon_nodes.num_synced().await;
set_gauge(
&http_metrics::metrics::SYNCED_BEACON_NODES_COUNT,
num_synced as i64,
);
let num_total = duties_service.beacon_nodes.num_total();
set_gauge(
&http_metrics::metrics::TOTAL_BEACON_NODES_COUNT,
num_total as i64,
);
if num_synced > 0 {
let primary = candidate_info
.first()
.map(|candidate| candidate.endpoint.as_str())
.unwrap_or("None");
info!(
log,
"Connected to beacon node(s)";
"primary" => primary,
"total" => num_total,
"available" => num_available,
"synced" => num_synced,
@@ -71,13 +78,36 @@ async fn notify<T: SlotClock + 'static, E: EthSpec>(
"synced" => num_synced,
)
}
let num_synced_fallback = duties_service.beacon_nodes.num_synced_fallback().await;
if num_synced_fallback > 0 {
set_gauge(&http_metrics::metrics::ETH2_FALLBACK_CONNECTED, 1);
} else {
set_gauge(&http_metrics::metrics::ETH2_FALLBACK_CONNECTED, 0);
}
for info in candidate_info {
if let Ok(health) = info.health {
debug!(
log,
"Beacon node info";
"status" => "Connected",
"index" => info.index,
"endpoint" => info.endpoint,
"head_slot" => %health.head,
"is_optimistic" => ?health.optimistic_status,
"execution_engine_status" => ?health.execution_status,
"health_tier" => %health.health_tier,
);
} else {
debug!(
log,
"Beacon node info";
"status" => "Disconnected",
"index" => info.index,
"endpoint" => info.endpoint,
);
}
}
if let Some(slot) = duties_service.slot_clock.now() {
let epoch = slot.epoch(E::slots_per_epoch());

View File

@@ -1,6 +1,5 @@
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
use crate::OfflineOnFailure;
use bls::PublicKeyBytes;
use environment::RuntimeContext;
use parking_lot::RwLock;
@@ -342,16 +341,11 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
let preparation_entries = preparation_data.as_slice();
match self
.beacon_nodes
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Subscriptions,
|beacon_node| async move {
beacon_node
.post_validator_prepare_beacon_proposer(preparation_entries)
.await
},
)
.request(ApiTopic::Subscriptions, |beacon_node| async move {
beacon_node
.post_validator_prepare_beacon_proposer(preparation_entries)
.await
})
.await
{
Ok(()) => debug!(
@@ -477,13 +471,9 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
for batch in signed.chunks(self.validator_registration_batch_size) {
match self
.beacon_nodes
.broadcast(
RequireSynced::No,
OfflineOnFailure::No,
|beacon_node| async move {
beacon_node.post_validator_register_validator(batch).await
},
)
.broadcast(|beacon_node| async move {
beacon_node.post_validator_register_validator(batch).await
})
.await
{
Ok(()) => info!(

View File

@@ -1,8 +1,7 @@
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use crate::{
duties_service::DutiesService,
validator_store::{Error as ValidatorStoreError, ValidatorStore},
OfflineOnFailure,
};
use environment::RuntimeContext;
use eth2::types::BlockId;
@@ -180,8 +179,6 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let response = self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
match beacon_node.get_beacon_blocks_root(BlockId::Head).await {
Ok(Some(block)) if block.execution_optimistic == Some(false) => {
@@ -299,16 +296,11 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.collect::<Vec<_>>();
self.beacon_nodes
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::SyncCommittee,
|beacon_node| async move {
beacon_node
.post_beacon_pool_sync_committee_signatures(committee_signatures)
.await
},
)
.request(ApiTopic::SyncCommittee, |beacon_node| async move {
beacon_node
.post_beacon_pool_sync_committee_signatures(committee_signatures)
.await
})
.await
.map_err(|e| {
error!(
@@ -371,21 +363,17 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let contribution = &self
.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let sync_contribution_data = SyncContributionData {
slot,
beacon_block_root,
subcommittee_index: subnet_id.into(),
};
.first_success(|beacon_node| async move {
let sync_contribution_data = SyncContributionData {
slot,
beacon_block_root,
subcommittee_index: subnet_id.into(),
};
beacon_node
.get_validator_sync_committee_contribution::<E>(&sync_contribution_data)
.await
},
)
beacon_node
.get_validator_sync_committee_contribution::<E>(&sync_contribution_data)
.await
})
.await
.map_err(|e| {
crit!(
@@ -453,15 +441,11 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// Publish to the beacon node.
self.beacon_nodes
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
beacon_node
.post_validator_contribution_and_proofs(signed_contributions)
.await
},
)
.first_success(|beacon_node| async move {
beacon_node
.post_validator_contribution_and_proofs(signed_contributions)
.await
})
.await
.map_err(|e| {
error!(
@@ -595,16 +579,11 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
if let Err(e) = self
.beacon_nodes
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Subscriptions,
|beacon_node| async move {
beacon_node
.post_validator_sync_committee_subscriptions(subscriptions_slice)
.await
},
)
.request(ApiTopic::Subscriptions, |beacon_node| async move {
beacon_node
.post_validator_sync_committee_subscriptions(subscriptions_slice)
.await
})
.await
{
error!(