Squashed reset to unstable

This commit is contained in:
Daniel Knopik
2025-03-13 12:50:29 +01:00
committed by Daniel Knopik
parent b71b5f2231
commit f61f0b654c
416 changed files with 13195 additions and 38478 deletions

View File

@@ -10,13 +10,13 @@ path = "src/lib.rs"
[dependencies]
clap = { workspace = true }
environment = { workspace = true }
eth2 = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
serde = { workspace = true }
slot_clock = { workspace = true }
strum = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }

View File

@@ -8,6 +8,7 @@ use beacon_node_health::{
IsOptimistic, SyncDistanceTier,
};
use clap::ValueEnum;
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::future;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
@@ -16,11 +17,11 @@ use std::cmp::Ordering;
use std::fmt;
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec::Vec;
use strum::EnumVariantNames;
use task_executor::TaskExecutor;
use tokio::{sync::RwLock, time::sleep};
use tracing::{debug, error, warn};
use types::{ChainSpec, Config as ConfigSpec, EthSpec, Slot};
@@ -60,16 +61,17 @@ pub struct LatencyMeasurement {
///
/// See `SLOT_LOOKAHEAD` for information about when this should run.
pub fn start_fallback_updater_service<T: SlotClock + 'static, E: EthSpec>(
executor: TaskExecutor,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
context: RuntimeContext<E>,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
) -> Result<(), &'static str> {
let executor = context.executor;
if beacon_nodes.slot_clock.is_none() {
return Err("Cannot start fallback updater without slot clock");
}
let future = async move {
loop {
beacon_nodes.update_all_candidates::<E>().await;
beacon_nodes.update_all_candidates().await;
let sleep_time = beacon_nodes
.slot_clock
@@ -184,27 +186,29 @@ impl Serialize for CandidateInfo {
/// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used
/// for a query.
#[derive(Clone, Debug)]
pub struct CandidateBeaconNode {
pub struct CandidateBeaconNode<E> {
pub index: usize,
pub beacon_node: BeaconNodeHttpClient,
pub health: Arc<RwLock<Result<BeaconNodeHealth, CandidateError>>>,
_phantom: PhantomData<E>,
}
impl PartialEq for CandidateBeaconNode {
impl<E: EthSpec> PartialEq for CandidateBeaconNode<E> {
fn eq(&self, other: &Self) -> bool {
self.index == other.index && self.beacon_node == other.beacon_node
}
}
impl Eq for CandidateBeaconNode {}
impl<E: EthSpec> Eq for CandidateBeaconNode<E> {}
impl CandidateBeaconNode {
impl<E: EthSpec> CandidateBeaconNode<E> {
/// Instantiate a new node.
pub fn new(beacon_node: BeaconNodeHttpClient, index: usize) -> Self {
Self {
index,
beacon_node,
health: Arc::new(RwLock::new(Err(CandidateError::Uninitialized))),
_phantom: PhantomData,
}
}
@@ -213,13 +217,13 @@ impl CandidateBeaconNode {
*self.health.read().await
}
pub async fn refresh_health<E: EthSpec, T: SlotClock>(
pub async fn refresh_health<T: SlotClock>(
&self,
distance_tiers: &BeaconNodeSyncDistanceTiers,
slot_clock: Option<&T>,
spec: &ChainSpec,
) -> Result<(), CandidateError> {
if let Err(e) = self.is_compatible::<E>(spec).await {
if let Err(e) = self.is_compatible(spec).await {
*self.health.write().await = Err(e);
return Err(e);
}
@@ -283,7 +287,7 @@ impl CandidateBeaconNode {
}
/// Checks if the node has the correct specification.
async fn is_compatible<E: EthSpec>(&self, spec: &ChainSpec) -> Result<(), CandidateError> {
async fn is_compatible(&self, spec: &ChainSpec) -> Result<(), CandidateError> {
let config = self
.beacon_node
.get_config_spec::<ConfigSpec>()
@@ -353,10 +357,10 @@ impl CandidateBeaconNode {
);
} else if beacon_node_spec.fulu_fork_epoch != spec.fulu_fork_epoch {
warn!(
endpoint = %self.beacon_node,
endpoint_fulu_fork_epoc = ?beacon_node_spec.fulu_fork_epoch,
hint = UPDATE_REQUIRED_LOG_HINT,
"Beacon node has mismatched Fulu fork epoch"
endpoint = %self.beacon_node,
endpoint_fulu_fork_epoch = ?beacon_node_spec.fulu_fork_epoch,
hint = UPDATE_REQUIRED_LOG_HINT,
"Beacon node has mismatched Fulu fork epoch"
);
}
@@ -368,17 +372,17 @@ impl CandidateBeaconNode {
/// behaviour, where the failure of one candidate results in the next candidate receiving an
/// identical query.
#[derive(Clone, Debug)]
pub struct BeaconNodeFallback<T> {
pub candidates: Arc<RwLock<Vec<CandidateBeaconNode>>>,
pub struct BeaconNodeFallback<T, E> {
pub candidates: Arc<RwLock<Vec<CandidateBeaconNode<E>>>>,
distance_tiers: BeaconNodeSyncDistanceTiers,
slot_clock: Option<T>,
broadcast_topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
}
impl<T: SlotClock> BeaconNodeFallback<T> {
impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub fn new(
candidates: Vec<CandidateBeaconNode>,
candidates: Vec<CandidateBeaconNode<E>>,
config: Config,
broadcast_topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
@@ -460,7 +464,7 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
/// It is possible for a node to return an unsynced status while continuing to serve
/// low quality responses. To route around this it's best to poll all connected beacon nodes.
/// A previous implementation of this function polled only the unavailable BNs.
pub async fn update_all_candidates<E: EthSpec>(&self) {
pub async fn update_all_candidates(&self) {
// Clone the vec, so we release the read lock immediately.
// `candidate.health` is behind an Arc<RwLock>, so this would still allow us to mutate the values.
let candidates = self.candidates.read().await.clone();
@@ -468,7 +472,7 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
let mut nodes = Vec::with_capacity(candidates.len());
for candidate in candidates.iter() {
futures.push(candidate.refresh_health::<E, T>(
futures.push(candidate.refresh_health(
&self.distance_tiers,
self.slot_clock.as_ref(),
&self.spec,
@@ -671,7 +675,7 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
}
/// Helper functions to allow sorting candidate nodes by health.
async fn sort_nodes_by_health(nodes: &mut Vec<CandidateBeaconNode>) {
async fn sort_nodes_by_health<E: EthSpec>(nodes: &mut Vec<CandidateBeaconNode<E>>) {
// Fetch all health values.
let health_results: Vec<Result<BeaconNodeHealth, CandidateError>> =
future::join_all(nodes.iter().map(|node| node.health())).await;
@@ -689,7 +693,7 @@ async fn sort_nodes_by_health(nodes: &mut Vec<CandidateBeaconNode>) {
});
// Reorder candidates based on the sorted indices.
let sorted_nodes: Vec<CandidateBeaconNode> = indices_with_health
let sorted_nodes: Vec<CandidateBeaconNode<E>> = indices_with_health
.into_iter()
.map(|(index, _)| nodes[index].clone())
.collect();
@@ -748,7 +752,7 @@ mod tests {
let optimistic_status = IsOptimistic::No;
let execution_status = ExecutionEngineHealth::Healthy;
fn new_candidate(index: usize) -> CandidateBeaconNode {
fn new_candidate(index: usize) -> CandidateBeaconNode<E> {
let beacon_node = BeaconNodeHttpClient::new(
SensitiveUrl::parse(&format!("http://example_{index}.com")).unwrap(),
Timeouts::set_all(Duration::from_secs(index as u64)),
@@ -855,21 +859,21 @@ mod tests {
async fn new_mock_beacon_node(
index: usize,
spec: &ChainSpec,
) -> (MockBeaconNode<E>, CandidateBeaconNode) {
let mut mock_beacon_node = MockBeaconNode::new().await;
) -> (MockBeaconNode<E>, CandidateBeaconNode<E>) {
let mut mock_beacon_node = MockBeaconNode::<E>::new().await;
mock_beacon_node.mock_config_spec(spec);
let beacon_node =
CandidateBeaconNode::new(mock_beacon_node.beacon_api_client.clone(), index);
CandidateBeaconNode::<E>::new(mock_beacon_node.beacon_api_client.clone(), index);
(mock_beacon_node, beacon_node)
}
fn create_beacon_node_fallback(
candidates: Vec<CandidateBeaconNode>,
candidates: Vec<CandidateBeaconNode<E>>,
topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
) -> BeaconNodeFallback<TestingSlotClock> {
) -> BeaconNodeFallback<TestingSlotClock, E> {
let mut beacon_node_fallback =
BeaconNodeFallback::new(candidates, Config::default(), topics, spec);
@@ -925,7 +929,7 @@ mod tests {
sync_distance: Slot::new(0),
});
beacon_node_fallback.update_all_candidates::<E>().await;
beacon_node_fallback.update_all_candidates().await;
let candidates = beacon_node_fallback.candidates.read().await;
assert_eq!(