Add test to beacon node fallback feature (#6568)

This commit is contained in:
chonghe
2025-02-04 14:43:37 +08:00
committed by GitHub
parent 56f201a257
commit 3d06bc26d1
9 changed files with 405 additions and 13 deletions

67
Cargo.lock generated
View File

@@ -543,6 +543,16 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "155a5a185e42c6b77ac7b88a15143d930a9e9727a5b7b77eed417404ab15c247"
[[package]]
name = "assert-json-diff"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "async-channel"
version = "1.9.0"
@@ -889,6 +899,7 @@ dependencies = [
"eth2",
"futures",
"itertools 0.10.5",
"logging",
"serde",
"slog",
"slot_clock",
@@ -896,6 +907,7 @@ dependencies = [
"tokio",
"types",
"validator_metrics",
"validator_test_rig",
]
[[package]]
@@ -1485,6 +1497,16 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "colored"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
]
[[package]]
name = "compare_fields"
version = "0.2.0"
@@ -5794,6 +5816,30 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9366861eb2a2c436c20b12c8dbec5f798cea6b47ad99216be0282942e2c81ea0"
[[package]]
name = "mockito"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "652cd6d169a36eaf9d1e6bce1a221130439a966d7f27858af66a33a66e9c4ee2"
dependencies = [
"assert-json-diff",
"bytes",
"colored",
"futures-util",
"http 1.2.0",
"http-body 1.0.1",
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"log",
"rand 0.8.5",
"regex",
"serde_json",
"serde_urlencoded",
"similar",
"tokio",
]
[[package]]
name = "moka"
version = "0.12.10"
@@ -8175,6 +8221,12 @@ dependencies = [
"validator_metrics",
]
[[package]]
name = "similar"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1de1d4f81173b03af4c0cbed3c898f6bff5b870e4a7f5d6f4057d62a7a4b686e"
[[package]]
name = "simple_asn1"
version = "0.6.3"
@@ -9090,6 +9142,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@@ -9868,6 +9921,20 @@ dependencies = [
"validator_metrics",
]
[[package]]
name = "validator_test_rig"
version = "0.1.0"
dependencies = [
"eth2",
"logging",
"mockito",
"regex",
"sensitive_url",
"serde_json",
"slog",
"types",
]
[[package]]
name = "valuable"
version = "0.1.1"

View File

@@ -86,8 +86,10 @@ members = [
"testing/simulator",
"testing/state_transition_vectors",
"testing/test-test_logger",
"testing/validator_test_rig",
"testing/web3signer_tests",
"validator_client",
"validator_client/beacon_node_fallback",
"validator_client/doppelganger_service",
@@ -155,6 +157,7 @@ log = "0.4"
lru = "0.12"
maplit = "1"
milhouse = "0.3"
mockito = "1.5.0"
num_cpus = "1"
parking_lot = "0.12"
paste = "1"
@@ -261,6 +264,7 @@ malloc_utils = { path = "common/malloc_utils" }
merkle_proof = { path = "consensus/merkle_proof" }
monitoring_api = { path = "common/monitoring_api" }
network = { path = "beacon_node/network" }
node_test_rig = { path = "testing/node_test_rig" }
operation_pool = { path = "beacon_node/operation_pool" }
pretty_reqwest_error = { path = "common/pretty_reqwest_error" }
proto_array = { path = "consensus/proto_array" }
@@ -283,6 +287,7 @@ validator_http_api = { path = "validator_client/http_api" }
validator_http_metrics = { path = "validator_client/http_metrics" }
validator_metrics = { path = "validator_client/validator_metrics" }
validator_store = { path = "validator_client/validator_store" }
validator_test_rig = { path = "testing/validator_test_rig" }
warp_utils = { path = "common/warp_utils" }
xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "50d63cdf1878e5cf3538e9aae5eed34a22c64e4a" }
zstd = "0.13"

View File

@@ -0,0 +1,14 @@
[package]
name = "validator_test_rig"
version = "0.1.0"
edition = { workspace = true }
[dependencies]
eth2 = { workspace = true }
logging = { workspace = true }
mockito = { workspace = true }
regex = { workspace = true }
sensitive_url = { workspace = true }
serde_json = { workspace = true }
slog = { workspace = true }
types = { workspace = true }

View File

@@ -0,0 +1 @@
pub mod mock_beacon_node;

View File

@@ -0,0 +1,132 @@
use eth2::types::{GenericResponse, SyncingData};
use eth2::{BeaconNodeHttpClient, StatusCode, Timeouts};
use logging::test_logger;
use mockito::{Matcher, Mock, Server, ServerGuard};
use regex::Regex;
use sensitive_url::SensitiveUrl;
use slog::{info, Logger};
use std::marker::PhantomData;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use types::{ChainSpec, ConfigAndPreset, EthSpec, SignedBlindedBeaconBlock};
pub struct MockBeaconNode<E: EthSpec> {
server: ServerGuard,
pub beacon_api_client: BeaconNodeHttpClient,
log: Logger,
_phantom: PhantomData<E>,
pub received_blocks: Arc<Mutex<Vec<SignedBlindedBeaconBlock<E>>>>,
}
impl<E: EthSpec> MockBeaconNode<E> {
pub async fn new() -> Self {
// mock server logging
let server = Server::new_async().await;
let beacon_api_client = BeaconNodeHttpClient::new(
SensitiveUrl::from_str(&server.url()).unwrap(),
Timeouts::set_all(Duration::from_secs(1)),
);
let log = test_logger();
Self {
server,
beacon_api_client,
log,
_phantom: PhantomData,
received_blocks: Arc::new(Mutex::new(Vec::new())),
}
}
/// Resets all mocks
#[allow(dead_code)]
pub fn reset_mocks(&mut self) {
self.server.reset();
}
pub fn mock_config_spec(&mut self, spec: &ChainSpec) {
let path_pattern = Regex::new(r"^/eth/v1/config/spec$").unwrap();
let config_and_preset = ConfigAndPreset::from_chain_spec::<E>(spec, None);
let data = GenericResponse::from(config_and_preset);
self.server
.mock("GET", Matcher::Regex(path_pattern.to_string()))
.with_status(200)
.with_body(serde_json::to_string(&data).unwrap())
.create();
}
pub fn mock_get_node_syncing(&mut self, response: SyncingData) {
let path_pattern = Regex::new(r"^/eth/v1/node/syncing$").unwrap();
let data = GenericResponse::from(response);
self.server
.mock("GET", Matcher::Regex(path_pattern.to_string()))
.with_status(200)
.with_body(serde_json::to_string(&data).unwrap())
.create();
}
/// Mocks the `post_beacon_blinded_blocks_v2_ssz` response with an optional `delay`.
pub fn mock_post_beacon_blinded_blocks_v2_ssz(&mut self, delay: Duration) -> Mock {
let path_pattern = Regex::new(r"^/eth/v2/beacon/blinded_blocks$").unwrap();
let log = self.log.clone();
let url = self.server.url();
let received_blocks = Arc::clone(&self.received_blocks);
self.server
.mock("POST", Matcher::Regex(path_pattern.to_string()))
.match_header("content-type", "application/octet-stream")
.with_status(200)
.with_body_from_request(move |request| {
info!(
log,
"{}",
format!(
"Received published block request on server {} with delay {} s",
url,
delay.as_secs(),
)
);
let body = request.body().expect("Failed to get request body");
let block: SignedBlindedBeaconBlock<E> =
SignedBlindedBeaconBlock::any_from_ssz_bytes(body)
.expect("Failed to deserialize body as SignedBlindedBeaconBlock");
received_blocks.lock().unwrap().push(block);
std::thread::sleep(delay);
vec![]
})
.create()
}
pub fn mock_offline_node(&mut self) -> Mock {
let path_pattern = Regex::new(r"^/eth/v1/node/version$").unwrap();
self.server
.mock("GET", Matcher::Regex(path_pattern.to_string()))
.with_status(StatusCode::INTERNAL_SERVER_ERROR.as_u16() as usize)
.with_header("content-type", "application/json")
.with_body(r#"{"message":"Internal Server Error"}"#)
.create()
}
pub fn mock_online_node(&mut self) -> Mock {
let path_pattern = Regex::new(r"^/eth/v1/node/version$").unwrap();
self.server
.mock("GET", Matcher::Regex(path_pattern.to_string()))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{
"data": {
"version": "lighthouse-mock"
}
}"#,
)
.create()
}
}

View File

@@ -8,9 +8,6 @@ edition = { workspace = true }
name = "validator_client"
path = "src/lib.rs"
[dev-dependencies]
tokio = { workspace = true }
[dependencies]
account_utils = { workspace = true }
beacon_node_fallback = { workspace = true }

View File

@@ -21,3 +21,7 @@ strum = { workspace = true }
tokio = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }
[dev-dependencies]
logging = { workspace = true }
validator_test_rig = { workspace = true }

View File

@@ -752,8 +752,12 @@ mod tests {
use crate::beacon_node_health::BeaconNodeHealthTier;
use eth2::SensitiveUrl;
use eth2::Timeouts;
use logging::test_logger;
use slot_clock::TestingSlotClock;
use strum::VariantNames;
use types::{MainnetEthSpec, Slot};
use types::{BeaconBlockDeneb, MainnetEthSpec, Slot};
use types::{EmptyBlock, Signature, SignedBeaconBlockDeneb, SignedBlindedBeaconBlock};
use validator_test_rig::mock_beacon_node::MockBeaconNode;
type E = MainnetEthSpec;
@@ -772,7 +776,7 @@ mod tests {
#[tokio::test]
async fn check_candidate_order() {
// These fields is irrelvant for sorting. They are set to arbitrary values.
// These fields are irrelevant for sorting. They are set to arbitrary values.
let head = Slot::new(99);
let optimistic_status = IsOptimistic::No;
let execution_status = ExecutionEngineHealth::Healthy;
@@ -880,4 +884,172 @@ mod tests {
assert_eq!(candidates, expected_candidates);
}
async fn new_mock_beacon_node(
index: usize,
spec: &ChainSpec,
) -> (MockBeaconNode<E>, CandidateBeaconNode<E>) {
let mut mock_beacon_node = MockBeaconNode::<E>::new().await;
mock_beacon_node.mock_config_spec(spec);
let beacon_node =
CandidateBeaconNode::<E>::new(mock_beacon_node.beacon_api_client.clone(), index);
(mock_beacon_node, beacon_node)
}
fn create_beacon_node_fallback(
candidates: Vec<CandidateBeaconNode<E>>,
topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
log: Logger,
) -> BeaconNodeFallback<TestingSlotClock, E> {
let mut beacon_node_fallback =
BeaconNodeFallback::new(candidates, Config::default(), topics, spec, log);
beacon_node_fallback.set_slot_clock(TestingSlotClock::new(
Slot::new(1),
Duration::from_secs(0),
Duration::from_secs(12),
));
beacon_node_fallback
}
#[tokio::test]
async fn update_all_candidates_should_update_sync_status() {
let spec = Arc::new(MainnetEthSpec::default_spec());
let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await;
let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await;
let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await;
let beacon_node_fallback = create_beacon_node_fallback(
// Put this out of order to be sorted later
vec![
beacon_node_2.clone(),
beacon_node_3.clone(),
beacon_node_1.clone(),
],
vec![],
spec.clone(),
test_logger(),
);
// BeaconNodeHealthTier 1
mock_beacon_node_1.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: false,
el_offline: false,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});
// BeaconNodeHealthTier 3
mock_beacon_node_2.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: false,
el_offline: true,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});
// BeaconNodeHealthTier 5
mock_beacon_node_3.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: true,
el_offline: false,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});
beacon_node_fallback.update_all_candidates().await;
let candidates = beacon_node_fallback.candidates.read().await;
assert_eq!(
vec![beacon_node_1, beacon_node_2, beacon_node_3],
*candidates
);
}
#[tokio::test]
async fn broadcast_should_send_to_all_bns() {
let spec = Arc::new(MainnetEthSpec::default_spec());
let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await;
let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await;
let beacon_node_fallback = create_beacon_node_fallback(
vec![beacon_node_1, beacon_node_2],
vec![ApiTopic::Blocks],
spec.clone(),
test_logger(),
);
mock_beacon_node_1.mock_post_beacon_blinded_blocks_v2_ssz(Duration::from_secs(0));
mock_beacon_node_2.mock_post_beacon_blinded_blocks_v2_ssz(Duration::from_secs(0));
let signed_block = SignedBlindedBeaconBlock::<E>::Deneb(SignedBeaconBlockDeneb {
message: BeaconBlockDeneb::empty(&spec),
signature: Signature::empty(),
});
// trigger broadcast to `post_beacon_blinded_blocks_v2`
let result = beacon_node_fallback
.broadcast(|client| {
let signed_block_cloned = signed_block.clone();
async move {
client
.post_beacon_blinded_blocks_v2_ssz(&signed_block_cloned, None)
.await
}
})
.await;
assert!(result.is_ok());
let received_blocks_from_bn_1 = mock_beacon_node_1.received_blocks.lock().unwrap();
let received_blocks_from_bn_2 = mock_beacon_node_2.received_blocks.lock().unwrap();
assert_eq!(received_blocks_from_bn_1.len(), 1);
assert_eq!(received_blocks_from_bn_2.len(), 1);
}
#[tokio::test]
async fn first_success_should_try_nodes_in_order() {
let spec = Arc::new(MainnetEthSpec::default_spec());
let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await;
let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await;
let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await;
let beacon_node_fallback = create_beacon_node_fallback(
vec![beacon_node_1, beacon_node_2, beacon_node_3],
vec![],
spec.clone(),
test_logger(),
);
let mock1 = mock_beacon_node_1.mock_offline_node();
let mock2 = mock_beacon_node_2.mock_offline_node();
let mock3 = mock_beacon_node_3.mock_online_node();
let result_success = beacon_node_fallback
.first_success(|client| async move { client.get_node_version().await })
.await;
// mock3 expects to be called once since it is online in the first pass
mock3.expect(1).assert();
assert!(result_success.is_ok());
// make all beacon node offline and the result should error
let _mock3 = mock_beacon_node_3.mock_offline_node();
let result_failure = beacon_node_fallback
.first_success(|client| async move { client.get_node_version().await })
.await;
assert!(result_failure.is_err());
// Both mock1 and mock2 should be called 3 times:
// - the first time is for the result_success case,
// - the second time is when it calls all 3 mock beacon nodes and all fails in the first pass,
// - which gives the third call because the function gives a second pass if no candidates succeeded in the first pass
mock1.expect(3).assert();
mock2.expect(3).assert();
}
}

View File

@@ -203,15 +203,15 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
config.initialized_validators.clone(),
log.clone(),
)
.await
.map_err(|e| {
match e {
UnableToOpenVotingKeystore(err) => {
format!("Unable to initialize validators: {:?}. If you have recently moved the location of your data directory \
.await
.map_err(|e| {
match e {
UnableToOpenVotingKeystore(err) => {
format!("Unable to initialize validators: {:?}. If you have recently moved the location of your data directory \
make sure to update the location of voting_keystore_path in your validator_definitions.yml", err)
},
err => {
format!("Unable to initialize validators: {:?}", err)}
},
err => {
format!("Unable to initialize validators: {:?}", err)}
}
})?;