mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-10 09:37:38 +00:00
Refactor payload_attestation_service and add payload attestation test to validator client (#9357)
Co-Authored-By: Tan Chee Keong <tanck@sigmaprime.io>
This commit is contained in:
9
Cargo.lock
generated
9
Cargo.lock
generated
@@ -9876,24 +9876,32 @@ dependencies = [
|
||||
name = "validator_services"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"account_utils",
|
||||
"beacon_node_fallback",
|
||||
"bls",
|
||||
"either",
|
||||
"eth2",
|
||||
"eth2_keystore",
|
||||
"futures",
|
||||
"graffiti_file",
|
||||
"initialized_validators",
|
||||
"lighthouse_validator_store",
|
||||
"logging",
|
||||
"mockito",
|
||||
"parking_lot",
|
||||
"reqwest",
|
||||
"safe_arith",
|
||||
"slashing_protection",
|
||||
"slot_clock",
|
||||
"task_executor",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tree_hash",
|
||||
"types",
|
||||
"validator_metrics",
|
||||
"validator_store",
|
||||
"validator_test_rig",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -9912,6 +9920,7 @@ name = "validator_test_rig"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"eth2",
|
||||
"ethereum_ssz",
|
||||
"mockito",
|
||||
"regex",
|
||||
"reqwest",
|
||||
|
||||
@@ -5,6 +5,7 @@ edition = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
eth2 = { workspace = true }
|
||||
ethereum_ssz = { workspace = true }
|
||||
mockito = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
|
||||
@@ -4,18 +4,23 @@ use mockito::{Matcher, Mock, Server, ServerGuard};
|
||||
use regex::Regex;
|
||||
use reqwest::StatusCode;
|
||||
use sensitive_url::SensitiveUrl;
|
||||
use ssz::Decode;
|
||||
use std::marker::PhantomData;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use tracing::info;
|
||||
use types::{ChainSpec, ConfigAndPreset, EthSpec, SignedBlindedBeaconBlock};
|
||||
use types::{
|
||||
ChainSpec, ConfigAndPreset, EthSpec, ForkName, PayloadAttestationData,
|
||||
PayloadAttestationMessage, SignedBlindedBeaconBlock, Slot,
|
||||
};
|
||||
|
||||
pub struct MockBeaconNode<E: EthSpec> {
|
||||
server: ServerGuard,
|
||||
pub beacon_api_client: BeaconNodeHttpClient,
|
||||
_phantom: PhantomData<E>,
|
||||
pub received_blocks: Arc<Mutex<Vec<SignedBlindedBeaconBlock<E>>>>,
|
||||
pub payload_attestation_message: Arc<Mutex<Vec<PayloadAttestationMessage>>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> MockBeaconNode<E> {
|
||||
@@ -31,6 +36,7 @@ impl<E: EthSpec> MockBeaconNode<E> {
|
||||
beacon_api_client,
|
||||
_phantom: PhantomData,
|
||||
received_blocks: Arc::new(Mutex::new(Vec::new())),
|
||||
payload_attestation_message: Arc::new(Mutex::new(Vec::new())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,4 +130,112 @@ impl<E: EthSpec> MockBeaconNode<E> {
|
||||
)
|
||||
.create()
|
||||
}
|
||||
|
||||
/// Mocks `GET /eth/v1/validator/payload_attestations_data/{slot}`
|
||||
pub fn mock_get_validator_payload_attestation_data(
|
||||
&mut self,
|
||||
data: &PayloadAttestationData,
|
||||
fork_name: ForkName,
|
||||
slot: Slot,
|
||||
) -> Mock {
|
||||
let path_pattern = Regex::new(&format!(
|
||||
r"^/eth/v1/validator/payload_attestation_data/{}$",
|
||||
slot.as_u64()
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let body = serde_json::json!({
|
||||
"version": fork_name.to_string(),
|
||||
"data": data,
|
||||
});
|
||||
|
||||
self.server
|
||||
.mock("GET", Matcher::Regex(path_pattern.to_string()))
|
||||
.with_status(200)
|
||||
.with_header("content-type", "application/json")
|
||||
.with_body(serde_json::to_string(&body).unwrap())
|
||||
.create()
|
||||
}
|
||||
|
||||
/// Mocks `GET /eth/v1/validator/payload_attestation_data/{slot}` returning error
|
||||
pub fn mock_get_validator_payload_attestation_data_error(&mut self, slot: Slot) -> Mock {
|
||||
let path_pattern = Regex::new(&format!(
|
||||
r"^/eth/v1/validator/payload_attestation_data/{}$",
|
||||
slot.as_u64()
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
self.server
|
||||
.mock("GET", Matcher::Regex(path_pattern.to_string()))
|
||||
.with_status(500)
|
||||
.with_header("content-type", "application/json")
|
||||
.with_body(r#"{"message":"Internal server error"}"#)
|
||||
.create()
|
||||
}
|
||||
|
||||
/// Mocks `POST /eth/v1/beacon/pool/payload_attestations`
|
||||
pub fn mock_post_beacon_pool_payload_attestations(&mut self) -> Mock {
|
||||
let path_pattern = Regex::new(r"^/eth/v1/beacon/pool/payload_attestations$").unwrap();
|
||||
let payload_attestation_message = Arc::clone(&self.payload_attestation_message);
|
||||
|
||||
self.server
|
||||
.mock("POST", Matcher::Regex(path_pattern.to_string()))
|
||||
.match_header("content-type", "application/json")
|
||||
.with_status(200)
|
||||
.with_body_from_request(move |request| {
|
||||
let body = request.body().expect("Failed to get request body");
|
||||
let message: Vec<PayloadAttestationMessage> = serde_json::from_slice(body)
|
||||
.expect("Failed to deserialize payload attestations");
|
||||
payload_attestation_message.lock().unwrap().extend(message);
|
||||
vec![]
|
||||
})
|
||||
.create()
|
||||
}
|
||||
|
||||
/// Mocks `POST /eth/v1/beacon/pool/payload_attestations` (SSZ) with an optional `delay`.
|
||||
pub fn mock_post_beacon_pool_payload_attestations_ssz(&mut self, delay: Duration) -> Mock {
|
||||
let path_pattern = Regex::new(r"^/eth/v1/beacon/pool/payload_attestations$").unwrap();
|
||||
let url = self.server.url();
|
||||
|
||||
let payload_attestation_message = Arc::clone(&self.payload_attestation_message);
|
||||
|
||||
self.server
|
||||
.mock("POST", Matcher::Regex(path_pattern.to_string()))
|
||||
.match_header("content-type", "application/octet-stream")
|
||||
.with_status(200)
|
||||
.with_body_from_request(move |request| {
|
||||
info!(
|
||||
"Received payload attestation SSZ on server {} with delay {} ms",
|
||||
url,
|
||||
delay.as_secs(),
|
||||
);
|
||||
let body = request.body().expect("Failed to get request body");
|
||||
|
||||
let chunk_size = <PayloadAttestationMessage>::ssz_fixed_len();
|
||||
let messages: Vec<PayloadAttestationMessage> = body
|
||||
.chunks(chunk_size)
|
||||
.map(|chunk| {
|
||||
PayloadAttestationMessage::from_ssz_bytes(chunk)
|
||||
.expect("Failed to deserialize PayloadAttestationMessage from SSZ")
|
||||
})
|
||||
.collect();
|
||||
|
||||
payload_attestation_message.lock().unwrap().extend(messages);
|
||||
std::thread::sleep(delay);
|
||||
vec![]
|
||||
})
|
||||
.create()
|
||||
}
|
||||
|
||||
/// Mocks `POST /eth/v1/beacon/pool/payload_attestations` (SSZ) returning error
|
||||
pub fn mock_post_beacon_pool_payload_attestations_ssz_error(&mut self) -> Mock {
|
||||
let path_pattern = Regex::new(r"^/eth/v1/beacon/pool/payload_attestations$").unwrap();
|
||||
|
||||
self.server
|
||||
.mock("POST", Matcher::Regex(path_pattern.to_string()))
|
||||
.match_header("content-type", "application/octet-stream")
|
||||
.with_status(500)
|
||||
.with_body(r#"{"message":"Internal server error"}"#)
|
||||
.create()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,3 +23,20 @@ tree_hash = { workspace = true }
|
||||
types = { workspace = true }
|
||||
validator_metrics = { workspace = true }
|
||||
validator_store = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
account_utils = { workspace = true }
|
||||
eth2_keystore = { workspace = true }
|
||||
initialized_validators = { workspace = true }
|
||||
lighthouse_validator_store = { workspace = true }
|
||||
mockito = { workspace = true }
|
||||
slashing_protection = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tokio = { workspace = true, features = [
|
||||
"rt-multi-thread",
|
||||
"sync",
|
||||
"signal",
|
||||
"macros",
|
||||
"test-util",
|
||||
] }
|
||||
validator_test_rig = { workspace = true }
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{debug, error, info};
|
||||
use types::{ChainSpec, EthSpec};
|
||||
use types::{ChainSpec, EthSpec, Slot};
|
||||
use validator_store::ValidatorStore;
|
||||
|
||||
pub struct Inner<S, T> {
|
||||
@@ -39,7 +39,11 @@ impl<S, T> Deref for PayloadAttestationService<S, T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationService<S, T> {
|
||||
impl<S, T> PayloadAttestationService<S, T>
|
||||
where
|
||||
S: ValidatorStore + 'static,
|
||||
T: SlotClock + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
duties_service: Arc<DutiesService<S, T>>,
|
||||
validator_store: Arc<S>,
|
||||
@@ -61,11 +65,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
|
||||
}
|
||||
|
||||
pub fn start_update_service(self) -> Result<(), String> {
|
||||
let slot_duration = self.chain_spec.get_slot_duration();
|
||||
let payload_attestation_due = self.chain_spec.get_payload_attestation_due();
|
||||
|
||||
info!(
|
||||
payload_attestation_due_ms = payload_attestation_due.as_millis(),
|
||||
payload_attestation_due_ms = self.chain_spec.get_payload_attestation_due().as_millis(),
|
||||
"Payload attestation service started"
|
||||
);
|
||||
|
||||
@@ -73,46 +74,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
|
||||
|
||||
let interval_fut = async move {
|
||||
loop {
|
||||
let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else {
|
||||
error!("Failed to read slot clock");
|
||||
sleep(slot_duration).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(current_slot) = self.slot_clock.now() else {
|
||||
error!("Failed to read slot clock after trigger");
|
||||
continue;
|
||||
};
|
||||
|
||||
if !self
|
||||
.chain_spec
|
||||
.fork_name_at_slot::<S::E>(current_slot)
|
||||
.gloas_enabled()
|
||||
{
|
||||
let duration_to_next_epoch = self
|
||||
.slot_clock
|
||||
.duration_to_next_epoch(S::E::slots_per_epoch())
|
||||
.unwrap_or_else(|| {
|
||||
self.chain_spec.get_slot_duration() * S::E::slots_per_epoch() as u32
|
||||
});
|
||||
sleep(duration_to_next_epoch).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
sleep(duration_to_next_slot + payload_attestation_due).await;
|
||||
|
||||
let Some(attestation_slot) = self.slot_clock.now() else {
|
||||
error!("Failed to read slot clock after sleep");
|
||||
continue;
|
||||
};
|
||||
|
||||
let service = self.clone();
|
||||
self.executor.spawn(
|
||||
async move {
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
},
|
||||
"payload_attestation_producer",
|
||||
);
|
||||
self.run_update().await;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -120,6 +82,60 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_update(&self) {
|
||||
let Some(attestation_slot) = self.wait_for_attestation_slot().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
let service = self.clone();
|
||||
self.executor.spawn(
|
||||
async move {
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
},
|
||||
"payload_attestation_producer",
|
||||
);
|
||||
}
|
||||
|
||||
async fn wait_for_attestation_slot(&self) -> Option<Slot> {
|
||||
let slot_duration = self.chain_spec.get_slot_duration();
|
||||
let payload_attestation_due = self.chain_spec.get_payload_attestation_due();
|
||||
|
||||
let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else {
|
||||
error!("Failed to read slot clock");
|
||||
sleep(slot_duration).await;
|
||||
return None;
|
||||
};
|
||||
|
||||
let Some(current_slot) = self.slot_clock.now() else {
|
||||
error!("Failed to read slot clock after trigger");
|
||||
return None;
|
||||
};
|
||||
|
||||
if !self
|
||||
.chain_spec
|
||||
.fork_name_at_slot::<S::E>(current_slot)
|
||||
.gloas_enabled()
|
||||
{
|
||||
let duration_to_next_epoch = self
|
||||
.slot_clock
|
||||
.duration_to_next_epoch(S::E::slots_per_epoch())
|
||||
.unwrap_or_else(|| {
|
||||
self.chain_spec.get_slot_duration() * S::E::slots_per_epoch() as u32
|
||||
});
|
||||
sleep(duration_to_next_epoch).await;
|
||||
return None;
|
||||
}
|
||||
|
||||
sleep(duration_to_next_slot + payload_attestation_due).await;
|
||||
|
||||
let Some(attestation_slot) = self.slot_clock.now() else {
|
||||
error!("Failed to read slot clock after sleep");
|
||||
return None;
|
||||
};
|
||||
|
||||
Some(attestation_slot)
|
||||
}
|
||||
|
||||
async fn produce_and_publish(&self, slot: types::Slot) {
|
||||
let duties = self.duties_service.get_ptc_duties_for_slot(slot);
|
||||
|
||||
@@ -249,3 +265,469 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::duties_service::DutiesServiceBuilder;
|
||||
use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition};
|
||||
use beacon_node_fallback::{
|
||||
BeaconNodeFallback, CandidateBeaconNode, Config as BeaconNodeConfig,
|
||||
};
|
||||
use bls::{Keypair, PublicKeyBytes};
|
||||
use eth2::types::PtcDuty;
|
||||
use eth2_keystore::KeystoreBuilder;
|
||||
use futures::FutureExt;
|
||||
use initialized_validators::InitializedValidators;
|
||||
use lighthouse_validator_store::LighthouseValidatorStore;
|
||||
use slashing_protection::{SLASHING_PROTECTION_FILENAME, SlashingDatabase};
|
||||
use slot_clock::ManualSlotClock;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
use tempfile::{TempDir, tempdir};
|
||||
use types::{Epoch, ForkName, Hash256, MainnetEthSpec, PayloadAttestationData, Slot};
|
||||
use validator_test_rig::mock_beacon_node::MockBeaconNode;
|
||||
|
||||
type E = MainnetEthSpec;
|
||||
type S = LighthouseValidatorStore<ManualSlotClock, E>;
|
||||
|
||||
async fn create_validator_store(
|
||||
slot_clock: ManualSlotClock,
|
||||
spec: Arc<ChainSpec>,
|
||||
executor: TaskExecutor,
|
||||
num_validators: usize,
|
||||
) -> (Arc<S>, Vec<PublicKeyBytes>, TempDir) {
|
||||
let validator_dir = tempdir().unwrap();
|
||||
let password = b"test";
|
||||
|
||||
let mut validator_definitions = Vec::with_capacity(num_validators);
|
||||
let mut pubkeys = Vec::with_capacity(num_validators);
|
||||
|
||||
for i in 0..num_validators {
|
||||
let keypair = Keypair::random();
|
||||
let keystore = KeystoreBuilder::new(&keypair, password, String::new())
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
let keystore_path = validator_dir
|
||||
.path()
|
||||
.join(format!("voting-keystore-{i}.json"));
|
||||
keystore
|
||||
.to_json_writer(std::fs::File::create(&keystore_path).unwrap())
|
||||
.unwrap();
|
||||
|
||||
let validator_definition = ValidatorDefinition::new_keystore_with_password(
|
||||
keystore_path,
|
||||
PasswordStorage::ValidatorDefinitions(
|
||||
String::from_utf8(password.to_vec()).unwrap().into(),
|
||||
),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
pubkeys.push(keypair.pk.into());
|
||||
validator_definitions.push(validator_definition);
|
||||
}
|
||||
|
||||
let initialized_validators = InitializedValidators::from_definitions(
|
||||
validator_definitions.into(),
|
||||
validator_dir.path().into(),
|
||||
Default::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let slashing_db_path = validator_dir.path().join(SLASHING_PROTECTION_FILENAME);
|
||||
let slashing_protection = SlashingDatabase::open_or_create(&slashing_db_path).unwrap();
|
||||
|
||||
let validator_store = Arc::new(LighthouseValidatorStore::<_, E>::new(
|
||||
initialized_validators,
|
||||
slashing_protection,
|
||||
Hash256::ZERO,
|
||||
spec,
|
||||
None,
|
||||
slot_clock,
|
||||
&Default::default(),
|
||||
executor,
|
||||
));
|
||||
|
||||
for (i, pubkey) in pubkeys.iter().enumerate() {
|
||||
validator_store.set_validator_index(pubkey, i as u64);
|
||||
}
|
||||
|
||||
(validator_store, pubkeys, validator_dir)
|
||||
}
|
||||
|
||||
struct TestHarness {
|
||||
mock_beacon_node_1: MockBeaconNode<E>,
|
||||
mock_beacon_node_2: MockBeaconNode<E>,
|
||||
service: PayloadAttestationService<S, ManualSlotClock>,
|
||||
pubkeys: Vec<PublicKeyBytes>,
|
||||
_test_runtime: TestRuntime,
|
||||
_validator_dir: TempDir,
|
||||
}
|
||||
|
||||
impl TestHarness {
|
||||
async fn create_validators(num_validators: usize) -> Self {
|
||||
let mut default_spec = MainnetEthSpec::default_spec();
|
||||
default_spec.gloas_fork_epoch = Some(Epoch::new(0));
|
||||
let spec = Arc::new(default_spec);
|
||||
|
||||
let test_runtime = TestRuntime::default();
|
||||
let executor = test_runtime.task_executor.clone();
|
||||
let slot_duration = spec.get_slot_duration();
|
||||
let slot_clock =
|
||||
ManualSlotClock::new(Slot::new(0), Duration::from_secs(0), slot_duration);
|
||||
|
||||
let (validator_store, pubkeys, validator_dir) = create_validator_store(
|
||||
slot_clock.clone(),
|
||||
spec.clone(),
|
||||
executor.clone(),
|
||||
num_validators,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mock_beacon_node_1 = MockBeaconNode::<E>::new().await;
|
||||
let mock_beacon_node_2 = MockBeaconNode::<E>::new().await;
|
||||
|
||||
let beacon_node_1 =
|
||||
CandidateBeaconNode::new(mock_beacon_node_1.beacon_api_client.clone(), 0);
|
||||
let beacon_node_2 =
|
||||
CandidateBeaconNode::new(mock_beacon_node_2.beacon_api_client.clone(), 1);
|
||||
|
||||
let beacon_node_fallback = Arc::new(BeaconNodeFallback::new(
|
||||
vec![beacon_node_1, beacon_node_2],
|
||||
BeaconNodeConfig::default(),
|
||||
vec![],
|
||||
spec.clone(),
|
||||
));
|
||||
|
||||
let duties_service = Arc::new(
|
||||
DutiesServiceBuilder::new()
|
||||
.validator_store(validator_store.clone())
|
||||
.slot_clock(slot_clock.clone())
|
||||
.beacon_nodes(beacon_node_fallback.clone())
|
||||
.executor(executor.clone())
|
||||
.spec(spec.clone())
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let service = PayloadAttestationService::new(
|
||||
duties_service,
|
||||
validator_store,
|
||||
slot_clock,
|
||||
beacon_node_fallback,
|
||||
executor,
|
||||
spec,
|
||||
);
|
||||
|
||||
Self {
|
||||
mock_beacon_node_1,
|
||||
mock_beacon_node_2,
|
||||
service,
|
||||
pubkeys,
|
||||
_test_runtime: test_runtime,
|
||||
_validator_dir: validator_dir,
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_ptc_duties(&self, slot: Slot) {
|
||||
let duties = self
|
||||
.pubkeys
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, pubkey)| PtcDuty {
|
||||
pubkey: *pubkey,
|
||||
validator_index: i as u64,
|
||||
slot,
|
||||
})
|
||||
.collect();
|
||||
self.service
|
||||
.duties_service
|
||||
.ptc_duties
|
||||
.write()
|
||||
.insert(Epoch::new(0), (Hash256::ZERO, duties));
|
||||
}
|
||||
}
|
||||
|
||||
// advance_time so that we don't have to wait for real-time to elapse in the test
|
||||
async fn advance_time(slot_clock: &ManualSlotClock, duration: Duration) {
|
||||
slot_clock.advance_time(duration);
|
||||
tokio::time::advance(duration).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_wait_for_attestation_slot() {
|
||||
tokio::time::pause();
|
||||
|
||||
let harness = TestHarness::create_validators(1).await;
|
||||
let service = &harness.service;
|
||||
let service_wait = service.wait_for_attestation_slot();
|
||||
tokio::pin!(service_wait);
|
||||
|
||||
// This first call of .now_or_never() starts the timer and registers the sleep timer with tokio
|
||||
// It calls sleep(duration_to_next_slot + payload_attestation_due).await which registers a timer with a deadline of 21s
|
||||
assert!(service_wait.as_mut().now_or_never().is_none());
|
||||
|
||||
let duration_to_next_slot = harness.service.slot_clock.duration_to_next_slot().unwrap();
|
||||
let payload_attestation_due = harness.service.chain_spec.get_payload_attestation_due();
|
||||
let duration_to_wait = duration_to_next_slot + payload_attestation_due;
|
||||
// Advance both slot_clock and tokio::time to 21s (the sleep deadline)
|
||||
// The timer hasn't fired yet because tokio requires time to be strictly past the deadline.
|
||||
// so the following assert! should return None
|
||||
// This verifies that the function wait_for_attestation_slot waits for the correct duration before returning a slot.
|
||||
advance_time(&harness.service.slot_clock, duration_to_wait).await;
|
||||
assert!(
|
||||
service_wait.as_mut().now_or_never().is_none(),
|
||||
"Function should return None before the sleep duration has elapsed"
|
||||
);
|
||||
|
||||
// Advance time for 1 more second, the sleep should have completed and the function should return Some(attestation_slot)
|
||||
// slot_clock is now at 22s, which is slot 1
|
||||
// Removing this advance_time should cause the following assert_eq! to fail
|
||||
advance_time(&harness.service.slot_clock, Duration::from_secs(1)).await;
|
||||
assert_eq!(
|
||||
service_wait.as_mut().now_or_never().unwrap(),
|
||||
Some(Slot::new(1))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn publish_payload_attestation_ssz() {
|
||||
let mut harness = TestHarness::create_validators(1).await;
|
||||
|
||||
let attestation_slot = Slot::new(1);
|
||||
harness.insert_ptc_duties(attestation_slot);
|
||||
|
||||
let expected_payload_attestation = PayloadAttestationData {
|
||||
beacon_block_root: Hash256::ZERO,
|
||||
slot: attestation_slot,
|
||||
payload_present: true,
|
||||
blob_data_available: true,
|
||||
};
|
||||
|
||||
harness
|
||||
.mock_beacon_node_1
|
||||
.mock_get_validator_payload_attestation_data(
|
||||
&expected_payload_attestation,
|
||||
ForkName::Gloas,
|
||||
attestation_slot,
|
||||
);
|
||||
|
||||
let mock_ssz = harness
|
||||
.mock_beacon_node_1
|
||||
.mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0));
|
||||
let mock_json = harness
|
||||
.mock_beacon_node_2
|
||||
.mock_post_beacon_pool_payload_attestations();
|
||||
|
||||
let service = harness.service;
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
|
||||
let messages = harness
|
||||
.mock_beacon_node_1
|
||||
.payload_attestation_message
|
||||
.lock()
|
||||
.unwrap();
|
||||
|
||||
// We create one validator with one PTC duty, so the PayloadAttestationMessage length should be 1
|
||||
assert_eq!(
|
||||
messages.len(),
|
||||
1,
|
||||
"Expected one payload attestation message"
|
||||
);
|
||||
|
||||
// First try on beacon_node_1 (mock_ssz) is successful
|
||||
// therefore mock_json is not hit at all
|
||||
mock_ssz.expect(1).assert();
|
||||
mock_json.expect(0).assert();
|
||||
|
||||
let result = &messages[0];
|
||||
assert_eq!(result.validator_index, 0);
|
||||
assert_eq!(
|
||||
result.data.beacon_block_root,
|
||||
expected_payload_attestation.beacon_block_root
|
||||
);
|
||||
assert_eq!(result.data.slot, attestation_slot);
|
||||
assert!(result.data.payload_present);
|
||||
assert!(result.data.blob_data_available);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn publish_payload_attestation_ssz_fails_fallback_to_json() {
|
||||
let mut harness = TestHarness::create_validators(1).await;
|
||||
|
||||
let attestation_slot = Slot::new(1);
|
||||
harness.insert_ptc_duties(attestation_slot);
|
||||
|
||||
let expected_payload_attestation = PayloadAttestationData {
|
||||
beacon_block_root: Hash256::ZERO,
|
||||
slot: attestation_slot,
|
||||
payload_present: true,
|
||||
blob_data_available: true,
|
||||
};
|
||||
|
||||
harness
|
||||
.mock_beacon_node_1
|
||||
.mock_get_validator_payload_attestation_data(
|
||||
&expected_payload_attestation,
|
||||
ForkName::Gloas,
|
||||
Slot::new(1),
|
||||
);
|
||||
|
||||
// mock_ssz returns 500 to simulate BN does not support SSZ, so that it fallbacks to mock_json
|
||||
let mock_ssz = harness
|
||||
.mock_beacon_node_1
|
||||
.mock_post_beacon_pool_payload_attestations_ssz_error();
|
||||
let mock_json = harness
|
||||
.mock_beacon_node_2
|
||||
.mock_post_beacon_pool_payload_attestations();
|
||||
|
||||
let service = harness.service;
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
|
||||
// first_success function tries both beacon nodes for SSZ post payload attestation:
|
||||
// first pass: both fail (mock_ssz returns 500, mock_json does not support SSZ)
|
||||
// second pass: repeats the first pass
|
||||
// Therefore mock_ssz is hit twice.
|
||||
// When SSZ fails, it fallbacks to JSON and should succeed on first call on mock_json.
|
||||
mock_ssz.expect(2).assert();
|
||||
mock_json.expect(1).assert();
|
||||
|
||||
let messages = harness
|
||||
.mock_beacon_node_2
|
||||
.payload_attestation_message
|
||||
.lock()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
messages.len(),
|
||||
1,
|
||||
"Expected one payload attestation via JSON fallback"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_duties_no_publish() {
|
||||
let mut harness = TestHarness::create_validators(1).await;
|
||||
|
||||
// we do not insert any duties in this test
|
||||
let mock = harness
|
||||
.mock_beacon_node_1
|
||||
.mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0));
|
||||
|
||||
let service = harness.service;
|
||||
|
||||
// when there is no duty, produce_and_publish should return early
|
||||
// therefore, the beacon node is not called, expected to hit 0
|
||||
service.produce_and_publish(Slot::new(1)).await;
|
||||
mock.expect(0).assert();
|
||||
|
||||
assert!(
|
||||
harness
|
||||
.mock_beacon_node_1
|
||||
.payload_attestation_message
|
||||
.lock()
|
||||
.unwrap()
|
||||
.is_empty(),
|
||||
"No payload attestation should be published when there are no duties"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_payload_attestation_data_error() {
|
||||
let mut harness = TestHarness::create_validators(1).await;
|
||||
|
||||
let attestation_slot = Slot::new(1);
|
||||
// We have PTC duties
|
||||
harness.insert_ptc_duties(attestation_slot);
|
||||
|
||||
// However, we simulate that both BNs have error in get_validator_payload_attestation_data
|
||||
harness
|
||||
.mock_beacon_node_1
|
||||
.mock_get_validator_payload_attestation_data_error(attestation_slot);
|
||||
harness
|
||||
.mock_beacon_node_2
|
||||
.mock_get_validator_payload_attestation_data_error(attestation_slot);
|
||||
|
||||
let mock_ssz = harness
|
||||
.mock_beacon_node_1
|
||||
.mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0));
|
||||
let mock_json = harness
|
||||
.mock_beacon_node_2
|
||||
.mock_post_beacon_pool_payload_attestations();
|
||||
|
||||
let service = harness.service;
|
||||
// The produce_and_publish() should return early before reaching the POST endpoint
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
|
||||
// Both beacon nodes should not be called at all
|
||||
mock_ssz.expect(0).assert();
|
||||
mock_json.expect(0).assert();
|
||||
|
||||
// No payload attestation message published
|
||||
assert!(
|
||||
harness
|
||||
.mock_beacon_node_1
|
||||
.payload_attestation_message
|
||||
.lock()
|
||||
.unwrap()
|
||||
.is_empty(),
|
||||
"No payload attestation should be published when get data fails"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn publish_multiple_payload_attestation_messages() {
|
||||
// Create 3 validators with 1 PTC duty for each validator
|
||||
let mut harness = TestHarness::create_validators(3).await;
|
||||
|
||||
let attestation_slot = Slot::new(1);
|
||||
harness.insert_ptc_duties(attestation_slot);
|
||||
|
||||
let expected_payload_attestation = PayloadAttestationData {
|
||||
beacon_block_root: Hash256::ZERO,
|
||||
slot: attestation_slot,
|
||||
payload_present: true,
|
||||
blob_data_available: true,
|
||||
};
|
||||
|
||||
harness
|
||||
.mock_beacon_node_1
|
||||
.mock_get_validator_payload_attestation_data(
|
||||
&expected_payload_attestation,
|
||||
ForkName::Gloas,
|
||||
attestation_slot,
|
||||
);
|
||||
|
||||
let mock_ssz = harness
|
||||
.mock_beacon_node_1
|
||||
.mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0));
|
||||
|
||||
let service = harness.service;
|
||||
service.produce_and_publish(attestation_slot).await;
|
||||
|
||||
let messages = harness
|
||||
.mock_beacon_node_1
|
||||
.payload_attestation_message
|
||||
.lock()
|
||||
.unwrap();
|
||||
|
||||
// With 3 PTC duties in total, we should have 3 PayloadAttestationMessage
|
||||
assert_eq!(
|
||||
messages.len(),
|
||||
3,
|
||||
"Expected three payload attestation messages"
|
||||
);
|
||||
// mock_ssz is only hit once
|
||||
// this is to verify that a single call to the POST endpoint can publish multiple messages in one go
|
||||
mock_ssz.expect(1).assert();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user