Merge branch 'unstable' into into-anchor

This commit is contained in:
Daniel Knopik
2025-02-10 08:41:20 +01:00
207 changed files with 8646 additions and 4714 deletions

View File

@@ -9,6 +9,7 @@ name = "beacon_node_fallback"
path = "src/lib.rs"
[dependencies]
clap = { workspace = true }
eth2 = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
@@ -20,3 +21,7 @@ tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }
[dev-dependencies]
logging = { workspace = true }
validator_test_rig = { workspace = true }

View File

@@ -1,10 +1,8 @@
use super::CandidateError;
use eth2::BeaconNodeHttpClient;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use tracing::warn;
use types::Slot;
@@ -53,29 +51,6 @@ impl Default for BeaconNodeSyncDistanceTiers {
}
}
impl FromStr for BeaconNodeSyncDistanceTiers {
type Err = String;
fn from_str(s: &str) -> Result<Self, String> {
let values: (u64, u64, u64) = s
.split(',')
.map(|s| {
s.parse()
.map_err(|e| format!("Invalid sync distance modifier: {e:?}"))
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.collect_tuple()
.ok_or("Invalid number of sync distance modifiers".to_string())?;
Ok(BeaconNodeSyncDistanceTiers {
synced: Slot::new(values.0),
small: Slot::new(values.0 + values.1),
medium: Slot::new(values.0 + values.1 + values.2),
})
}
}
impl BeaconNodeSyncDistanceTiers {
/// Takes a given sync distance and determines its tier based on the `sync_tolerance` defined by
/// the CLI.
@@ -90,6 +65,17 @@ impl BeaconNodeSyncDistanceTiers {
SyncDistanceTier::Large
}
}
pub fn from_vec(tiers: &[u64]) -> Result<Self, String> {
if tiers.len() != 3 {
return Err("Invalid number of sync distance modifiers".to_string());
}
Ok(BeaconNodeSyncDistanceTiers {
synced: Slot::new(tiers[0]),
small: Slot::new(tiers[0] + tiers[1]),
medium: Slot::new(tiers[0] + tiers[1] + tiers[2]),
})
}
}
/// Execution Node health metrics.
@@ -318,7 +304,6 @@ mod tests {
SyncDistanceTier,
};
use crate::Config;
use std::str::FromStr;
use types::Slot;
#[test]
@@ -421,7 +406,7 @@ mod tests {
// medium 9..=12
// large: 13..
let distance_tiers = BeaconNodeSyncDistanceTiers::from_str("4,4,4").unwrap();
let distance_tiers = BeaconNodeSyncDistanceTiers::from_vec(&[4, 4, 4]).unwrap();
let synced_low = new_distance_tier(0, &distance_tiers);
let synced_high = new_distance_tier(4, &distance_tiers);

View File

@@ -7,6 +7,7 @@ use beacon_node_health::{
check_node_health, BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth,
IsOptimistic, SyncDistanceTier,
};
use clap::ValueEnum;
use eth2::BeaconNodeHttpClient;
use futures::future;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
@@ -17,7 +18,8 @@ use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum::{EnumString, EnumVariantNames};
use std::vec::Vec;
use strum::EnumVariantNames;
use task_executor::TaskExecutor;
use tokio::{sync::RwLock, time::sleep};
use tracing::{debug, error, warn};
@@ -695,9 +697,10 @@ async fn sort_nodes_by_health(nodes: &mut Vec<CandidateBeaconNode>) {
}
/// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted.
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)]
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumVariantNames, ValueEnum)]
#[strum(serialize_all = "kebab-case")]
pub enum ApiTopic {
None,
Attestations,
Blocks,
Subscriptions,
@@ -717,23 +720,30 @@ mod tests {
use crate::beacon_node_health::BeaconNodeHealthTier;
use eth2::SensitiveUrl;
use eth2::Timeouts;
use std::str::FromStr;
use slot_clock::TestingSlotClock;
use strum::VariantNames;
use types::Slot;
use types::{BeaconBlockDeneb, MainnetEthSpec, Slot};
use types::{EmptyBlock, Signature, SignedBeaconBlockDeneb, SignedBlindedBeaconBlock};
use validator_test_rig::mock_beacon_node::MockBeaconNode;
type E = MainnetEthSpec;
#[test]
fn api_topic_all() {
let all = ApiTopic::all();
assert_eq!(all.len(), ApiTopic::VARIANTS.len());
assert!(ApiTopic::VARIANTS
// ignore NONE variant
let mut variants = ApiTopic::VARIANTS.to_vec();
variants.retain(|s| *s != "none");
assert_eq!(all.len(), variants.len());
assert!(variants
.iter()
.map(|topic| ApiTopic::from_str(topic).unwrap())
.map(|topic| ApiTopic::from_str(topic, true).unwrap())
.eq(all.into_iter()));
}
#[tokio::test]
async fn check_candidate_order() {
// These fields is irrelvant for sorting. They are set to arbitrary values.
// These fields are irrelevant for sorting. They are set to arbitrary values.
let head = Slot::new(99);
let optimistic_status = IsOptimistic::No;
let execution_status = ExecutionEngineHealth::Healthy;
@@ -841,4 +851,168 @@ mod tests {
assert_eq!(candidates, expected_candidates);
}
async fn new_mock_beacon_node(
index: usize,
spec: &ChainSpec,
) -> (MockBeaconNode<E>, CandidateBeaconNode) {
let mut mock_beacon_node = MockBeaconNode::new().await;
mock_beacon_node.mock_config_spec(spec);
let beacon_node =
CandidateBeaconNode::new(mock_beacon_node.beacon_api_client.clone(), index);
(mock_beacon_node, beacon_node)
}
fn create_beacon_node_fallback(
candidates: Vec<CandidateBeaconNode>,
topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
) -> BeaconNodeFallback<TestingSlotClock> {
let mut beacon_node_fallback =
BeaconNodeFallback::new(candidates, Config::default(), topics, spec);
beacon_node_fallback.set_slot_clock(TestingSlotClock::new(
Slot::new(1),
Duration::from_secs(0),
Duration::from_secs(12),
));
beacon_node_fallback
}
#[tokio::test]
async fn update_all_candidates_should_update_sync_status() {
let spec = Arc::new(MainnetEthSpec::default_spec());
let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await;
let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await;
let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await;
let beacon_node_fallback = create_beacon_node_fallback(
// Put this out of order to be sorted later
vec![
beacon_node_2.clone(),
beacon_node_3.clone(),
beacon_node_1.clone(),
],
vec![],
spec.clone(),
);
// BeaconNodeHealthTier 1
mock_beacon_node_1.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: false,
el_offline: false,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});
// BeaconNodeHealthTier 3
mock_beacon_node_2.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: false,
el_offline: true,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});
// BeaconNodeHealthTier 5
mock_beacon_node_3.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: true,
el_offline: false,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});
beacon_node_fallback.update_all_candidates::<E>().await;
let candidates = beacon_node_fallback.candidates.read().await;
assert_eq!(
vec![beacon_node_1, beacon_node_2, beacon_node_3],
*candidates
);
}
#[tokio::test]
async fn broadcast_should_send_to_all_bns() {
let spec = Arc::new(MainnetEthSpec::default_spec());
let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await;
let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await;
let beacon_node_fallback = create_beacon_node_fallback(
vec![beacon_node_1, beacon_node_2],
vec![ApiTopic::Blocks],
spec.clone(),
);
mock_beacon_node_1.mock_post_beacon_blinded_blocks_v2_ssz(Duration::from_secs(0));
mock_beacon_node_2.mock_post_beacon_blinded_blocks_v2_ssz(Duration::from_secs(0));
let signed_block = SignedBlindedBeaconBlock::<E>::Deneb(SignedBeaconBlockDeneb {
message: BeaconBlockDeneb::empty(&spec),
signature: Signature::empty(),
});
// trigger broadcast to `post_beacon_blinded_blocks_v2`
let result = beacon_node_fallback
.broadcast(|client| {
let signed_block_cloned = signed_block.clone();
async move {
client
.post_beacon_blinded_blocks_v2_ssz(&signed_block_cloned, None)
.await
}
})
.await;
assert!(result.is_ok());
let received_blocks_from_bn_1 = mock_beacon_node_1.received_blocks.lock().unwrap();
let received_blocks_from_bn_2 = mock_beacon_node_2.received_blocks.lock().unwrap();
assert_eq!(received_blocks_from_bn_1.len(), 1);
assert_eq!(received_blocks_from_bn_2.len(), 1);
}
#[tokio::test]
async fn first_success_should_try_nodes_in_order() {
let spec = Arc::new(MainnetEthSpec::default_spec());
let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await;
let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await;
let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await;
let beacon_node_fallback = create_beacon_node_fallback(
vec![beacon_node_1, beacon_node_2, beacon_node_3],
vec![],
spec.clone(),
);
let mock1 = mock_beacon_node_1.mock_offline_node();
let mock2 = mock_beacon_node_2.mock_offline_node();
let mock3 = mock_beacon_node_3.mock_online_node();
let result_success = beacon_node_fallback
.first_success(|client| async move { client.get_node_version().await })
.await;
// mock3 expects to be called once since it is online in the first pass
mock3.expect(1).assert();
assert!(result_success.is_ok());
// make all beacon node offline and the result should error
let _mock3 = mock_beacon_node_3.mock_offline_node();
let result_failure = beacon_node_fallback
.first_success(|client| async move { client.get_node_version().await })
.await;
assert!(result_failure.is_err());
// Both mock1 and mock2 should be called 3 times:
// - the first time is for the result_success case,
// - the second time is when it calls all 3 mock beacon nodes and all fails in the first pass,
// - which gives the third call because the function gives a second pass if no candidates succeeded in the first pass
mock1.expect(3).assert();
mock2.expect(3).assert();
}
}