diff --git a/Cargo.lock b/Cargo.lock index 1bfc32a7a0..c0f337a6a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9876,24 +9876,32 @@ dependencies = [ name = "validator_services" version = "0.1.0" dependencies = [ + "account_utils", "beacon_node_fallback", "bls", "either", "eth2", + "eth2_keystore", "futures", "graffiti_file", + "initialized_validators", + "lighthouse_validator_store", "logging", + "mockito", "parking_lot", "reqwest", "safe_arith", + "slashing_protection", "slot_clock", "task_executor", + "tempfile", "tokio", "tracing", "tree_hash", "types", "validator_metrics", "validator_store", + "validator_test_rig", ] [[package]] @@ -9912,6 +9920,7 @@ name = "validator_test_rig" version = "0.1.0" dependencies = [ "eth2", + "ethereum_ssz", "mockito", "regex", "reqwest", diff --git a/testing/validator_test_rig/Cargo.toml b/testing/validator_test_rig/Cargo.toml index 2057a9fdc8..dc8b2babd7 100644 --- a/testing/validator_test_rig/Cargo.toml +++ b/testing/validator_test_rig/Cargo.toml @@ -5,6 +5,7 @@ edition = { workspace = true } [dependencies] eth2 = { workspace = true } +ethereum_ssz = { workspace = true } mockito = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } diff --git a/testing/validator_test_rig/src/mock_beacon_node.rs b/testing/validator_test_rig/src/mock_beacon_node.rs index 1ecdd85f3b..4501379d25 100644 --- a/testing/validator_test_rig/src/mock_beacon_node.rs +++ b/testing/validator_test_rig/src/mock_beacon_node.rs @@ -4,18 +4,23 @@ use mockito::{Matcher, Mock, Server, ServerGuard}; use regex::Regex; use reqwest::StatusCode; use sensitive_url::SensitiveUrl; +use ssz::Decode; use std::marker::PhantomData; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::info; -use types::{ChainSpec, ConfigAndPreset, EthSpec, SignedBlindedBeaconBlock}; +use types::{ + ChainSpec, ConfigAndPreset, EthSpec, ForkName, PayloadAttestationData, + PayloadAttestationMessage, SignedBlindedBeaconBlock, Slot, +}; pub struct MockBeaconNode { server: ServerGuard, pub beacon_api_client: BeaconNodeHttpClient, _phantom: PhantomData, pub received_blocks: Arc>>>, + pub payload_attestation_message: Arc>>, } impl MockBeaconNode { @@ -31,6 +36,7 @@ impl MockBeaconNode { beacon_api_client, _phantom: PhantomData, received_blocks: Arc::new(Mutex::new(Vec::new())), + payload_attestation_message: Arc::new(Mutex::new(Vec::new())), } } @@ -124,4 +130,112 @@ impl MockBeaconNode { ) .create() } + + /// Mocks `GET /eth/v1/validator/payload_attestations_data/{slot}` + pub fn mock_get_validator_payload_attestation_data( + &mut self, + data: &PayloadAttestationData, + fork_name: ForkName, + slot: Slot, + ) -> Mock { + let path_pattern = Regex::new(&format!( + r"^/eth/v1/validator/payload_attestation_data/{}$", + slot.as_u64() + )) + .unwrap(); + + let body = serde_json::json!({ + "version": fork_name.to_string(), + "data": data, + }); + + self.server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_string(&body).unwrap()) + .create() + } + + /// Mocks `GET /eth/v1/validator/payload_attestation_data/{slot}` returning error + pub fn mock_get_validator_payload_attestation_data_error(&mut self, slot: Slot) -> Mock { + let path_pattern = Regex::new(&format!( + r"^/eth/v1/validator/payload_attestation_data/{}$", + slot.as_u64() + )) + .unwrap(); + + self.server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(500) + .with_header("content-type", "application/json") + .with_body(r#"{"message":"Internal server error"}"#) + .create() + } + + /// Mocks `POST /eth/v1/beacon/pool/payload_attestations` + pub fn mock_post_beacon_pool_payload_attestations(&mut self) -> Mock { + let path_pattern = Regex::new(r"^/eth/v1/beacon/pool/payload_attestations$").unwrap(); + let payload_attestation_message = Arc::clone(&self.payload_attestation_message); + + self.server + .mock("POST", Matcher::Regex(path_pattern.to_string())) + .match_header("content-type", "application/json") + .with_status(200) + .with_body_from_request(move |request| { + let body = request.body().expect("Failed to get request body"); + let message: Vec = serde_json::from_slice(body) + .expect("Failed to deserialize payload attestations"); + payload_attestation_message.lock().unwrap().extend(message); + vec![] + }) + .create() + } + + /// Mocks `POST /eth/v1/beacon/pool/payload_attestations` (SSZ) with an optional `delay`. + pub fn mock_post_beacon_pool_payload_attestations_ssz(&mut self, delay: Duration) -> Mock { + let path_pattern = Regex::new(r"^/eth/v1/beacon/pool/payload_attestations$").unwrap(); + let url = self.server.url(); + + let payload_attestation_message = Arc::clone(&self.payload_attestation_message); + + self.server + .mock("POST", Matcher::Regex(path_pattern.to_string())) + .match_header("content-type", "application/octet-stream") + .with_status(200) + .with_body_from_request(move |request| { + info!( + "Received payload attestation SSZ on server {} with delay {} ms", + url, + delay.as_secs(), + ); + let body = request.body().expect("Failed to get request body"); + + let chunk_size = ::ssz_fixed_len(); + let messages: Vec = body + .chunks(chunk_size) + .map(|chunk| { + PayloadAttestationMessage::from_ssz_bytes(chunk) + .expect("Failed to deserialize PayloadAttestationMessage from SSZ") + }) + .collect(); + + payload_attestation_message.lock().unwrap().extend(messages); + std::thread::sleep(delay); + vec![] + }) + .create() + } + + /// Mocks `POST /eth/v1/beacon/pool/payload_attestations` (SSZ) returning error + pub fn mock_post_beacon_pool_payload_attestations_ssz_error(&mut self) -> Mock { + let path_pattern = Regex::new(r"^/eth/v1/beacon/pool/payload_attestations$").unwrap(); + + self.server + .mock("POST", Matcher::Regex(path_pattern.to_string())) + .match_header("content-type", "application/octet-stream") + .with_status(500) + .with_body(r#"{"message":"Internal server error"}"#) + .create() + } } diff --git a/validator_client/validator_services/Cargo.toml b/validator_client/validator_services/Cargo.toml index 2582968265..f2fbf193e1 100644 --- a/validator_client/validator_services/Cargo.toml +++ b/validator_client/validator_services/Cargo.toml @@ -23,3 +23,20 @@ tree_hash = { workspace = true } types = { workspace = true } validator_metrics = { workspace = true } validator_store = { workspace = true } + +[dev-dependencies] +account_utils = { workspace = true } +eth2_keystore = { workspace = true } +initialized_validators = { workspace = true } +lighthouse_validator_store = { workspace = true } +mockito = { workspace = true } +slashing_protection = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true, features = [ + "rt-multi-thread", + "sync", + "signal", + "macros", + "test-util", +] } +validator_test_rig = { workspace = true } diff --git a/validator_client/validator_services/src/payload_attestation_service.rs b/validator_client/validator_services/src/payload_attestation_service.rs index f41893941f..f4cd26552a 100644 --- a/validator_client/validator_services/src/payload_attestation_service.rs +++ b/validator_client/validator_services/src/payload_attestation_service.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use task_executor::TaskExecutor; use tokio::time::sleep; use tracing::{debug, error, info}; -use types::{ChainSpec, EthSpec}; +use types::{ChainSpec, EthSpec, Slot}; use validator_store::ValidatorStore; pub struct Inner { @@ -39,7 +39,11 @@ impl Deref for PayloadAttestationService { } } -impl PayloadAttestationService { +impl PayloadAttestationService +where + S: ValidatorStore + 'static, + T: SlotClock + 'static, +{ pub fn new( duties_service: Arc>, validator_store: Arc, @@ -61,11 +65,8 @@ impl PayloadAttestationServ } pub fn start_update_service(self) -> Result<(), String> { - let slot_duration = self.chain_spec.get_slot_duration(); - let payload_attestation_due = self.chain_spec.get_payload_attestation_due(); - info!( - payload_attestation_due_ms = payload_attestation_due.as_millis(), + payload_attestation_due_ms = self.chain_spec.get_payload_attestation_due().as_millis(), "Payload attestation service started" ); @@ -73,46 +74,7 @@ impl PayloadAttestationServ let interval_fut = async move { loop { - let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else { - error!("Failed to read slot clock"); - sleep(slot_duration).await; - continue; - }; - - let Some(current_slot) = self.slot_clock.now() else { - error!("Failed to read slot clock after trigger"); - continue; - }; - - if !self - .chain_spec - .fork_name_at_slot::(current_slot) - .gloas_enabled() - { - let duration_to_next_epoch = self - .slot_clock - .duration_to_next_epoch(S::E::slots_per_epoch()) - .unwrap_or_else(|| { - self.chain_spec.get_slot_duration() * S::E::slots_per_epoch() as u32 - }); - sleep(duration_to_next_epoch).await; - continue; - } - - sleep(duration_to_next_slot + payload_attestation_due).await; - - let Some(attestation_slot) = self.slot_clock.now() else { - error!("Failed to read slot clock after sleep"); - continue; - }; - - let service = self.clone(); - self.executor.spawn( - async move { - service.produce_and_publish(attestation_slot).await; - }, - "payload_attestation_producer", - ); + self.run_update().await; } }; @@ -120,6 +82,60 @@ impl PayloadAttestationServ Ok(()) } + async fn run_update(&self) { + let Some(attestation_slot) = self.wait_for_attestation_slot().await else { + return; + }; + + let service = self.clone(); + self.executor.spawn( + async move { + service.produce_and_publish(attestation_slot).await; + }, + "payload_attestation_producer", + ); + } + + async fn wait_for_attestation_slot(&self) -> Option { + let slot_duration = self.chain_spec.get_slot_duration(); + let payload_attestation_due = self.chain_spec.get_payload_attestation_due(); + + let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + return None; + }; + + let Some(current_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after trigger"); + return None; + }; + + if !self + .chain_spec + .fork_name_at_slot::(current_slot) + .gloas_enabled() + { + let duration_to_next_epoch = self + .slot_clock + .duration_to_next_epoch(S::E::slots_per_epoch()) + .unwrap_or_else(|| { + self.chain_spec.get_slot_duration() * S::E::slots_per_epoch() as u32 + }); + sleep(duration_to_next_epoch).await; + return None; + } + + sleep(duration_to_next_slot + payload_attestation_due).await; + + let Some(attestation_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after sleep"); + return None; + }; + + Some(attestation_slot) + } + async fn produce_and_publish(&self, slot: types::Slot) { let duties = self.duties_service.get_ptc_duties_for_slot(slot); @@ -249,3 +265,469 @@ impl PayloadAttestationServ } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::duties_service::DutiesServiceBuilder; + use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition}; + use beacon_node_fallback::{ + BeaconNodeFallback, CandidateBeaconNode, Config as BeaconNodeConfig, + }; + use bls::{Keypair, PublicKeyBytes}; + use eth2::types::PtcDuty; + use eth2_keystore::KeystoreBuilder; + use futures::FutureExt; + use initialized_validators::InitializedValidators; + use lighthouse_validator_store::LighthouseValidatorStore; + use slashing_protection::{SLASHING_PROTECTION_FILENAME, SlashingDatabase}; + use slot_clock::ManualSlotClock; + use std::sync::Arc; + use std::time::Duration; + use task_executor::test_utils::TestRuntime; + use tempfile::{TempDir, tempdir}; + use types::{Epoch, ForkName, Hash256, MainnetEthSpec, PayloadAttestationData, Slot}; + use validator_test_rig::mock_beacon_node::MockBeaconNode; + + type E = MainnetEthSpec; + type S = LighthouseValidatorStore; + + async fn create_validator_store( + slot_clock: ManualSlotClock, + spec: Arc, + executor: TaskExecutor, + num_validators: usize, + ) -> (Arc, Vec, TempDir) { + let validator_dir = tempdir().unwrap(); + let password = b"test"; + + let mut validator_definitions = Vec::with_capacity(num_validators); + let mut pubkeys = Vec::with_capacity(num_validators); + + for i in 0..num_validators { + let keypair = Keypair::random(); + let keystore = KeystoreBuilder::new(&keypair, password, String::new()) + .unwrap() + .build() + .unwrap(); + let keystore_path = validator_dir + .path() + .join(format!("voting-keystore-{i}.json")); + keystore + .to_json_writer(std::fs::File::create(&keystore_path).unwrap()) + .unwrap(); + + let validator_definition = ValidatorDefinition::new_keystore_with_password( + keystore_path, + PasswordStorage::ValidatorDefinitions( + String::from_utf8(password.to_vec()).unwrap().into(), + ), + None, + None, + None, + None, + None, + None, + ) + .unwrap(); + + pubkeys.push(keypair.pk.into()); + validator_definitions.push(validator_definition); + } + + let initialized_validators = InitializedValidators::from_definitions( + validator_definitions.into(), + validator_dir.path().into(), + Default::default(), + ) + .await + .unwrap(); + + let slashing_db_path = validator_dir.path().join(SLASHING_PROTECTION_FILENAME); + let slashing_protection = SlashingDatabase::open_or_create(&slashing_db_path).unwrap(); + + let validator_store = Arc::new(LighthouseValidatorStore::<_, E>::new( + initialized_validators, + slashing_protection, + Hash256::ZERO, + spec, + None, + slot_clock, + &Default::default(), + executor, + )); + + for (i, pubkey) in pubkeys.iter().enumerate() { + validator_store.set_validator_index(pubkey, i as u64); + } + + (validator_store, pubkeys, validator_dir) + } + + struct TestHarness { + mock_beacon_node_1: MockBeaconNode, + mock_beacon_node_2: MockBeaconNode, + service: PayloadAttestationService, + pubkeys: Vec, + _test_runtime: TestRuntime, + _validator_dir: TempDir, + } + + impl TestHarness { + async fn create_validators(num_validators: usize) -> Self { + let mut default_spec = MainnetEthSpec::default_spec(); + default_spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(default_spec); + + let test_runtime = TestRuntime::default(); + let executor = test_runtime.task_executor.clone(); + let slot_duration = spec.get_slot_duration(); + let slot_clock = + ManualSlotClock::new(Slot::new(0), Duration::from_secs(0), slot_duration); + + let (validator_store, pubkeys, validator_dir) = create_validator_store( + slot_clock.clone(), + spec.clone(), + executor.clone(), + num_validators, + ) + .await; + + let mock_beacon_node_1 = MockBeaconNode::::new().await; + let mock_beacon_node_2 = MockBeaconNode::::new().await; + + let beacon_node_1 = + CandidateBeaconNode::new(mock_beacon_node_1.beacon_api_client.clone(), 0); + let beacon_node_2 = + CandidateBeaconNode::new(mock_beacon_node_2.beacon_api_client.clone(), 1); + + let beacon_node_fallback = Arc::new(BeaconNodeFallback::new( + vec![beacon_node_1, beacon_node_2], + BeaconNodeConfig::default(), + vec![], + spec.clone(), + )); + + let duties_service = Arc::new( + DutiesServiceBuilder::new() + .validator_store(validator_store.clone()) + .slot_clock(slot_clock.clone()) + .beacon_nodes(beacon_node_fallback.clone()) + .executor(executor.clone()) + .spec(spec.clone()) + .build() + .unwrap(), + ); + + let service = PayloadAttestationService::new( + duties_service, + validator_store, + slot_clock, + beacon_node_fallback, + executor, + spec, + ); + + Self { + mock_beacon_node_1, + mock_beacon_node_2, + service, + pubkeys, + _test_runtime: test_runtime, + _validator_dir: validator_dir, + } + } + + fn insert_ptc_duties(&self, slot: Slot) { + let duties = self + .pubkeys + .iter() + .enumerate() + .map(|(i, pubkey)| PtcDuty { + pubkey: *pubkey, + validator_index: i as u64, + slot, + }) + .collect(); + self.service + .duties_service + .ptc_duties + .write() + .insert(Epoch::new(0), (Hash256::ZERO, duties)); + } + } + + // advance_time so that we don't have to wait for real-time to elapse in the test + async fn advance_time(slot_clock: &ManualSlotClock, duration: Duration) { + slot_clock.advance_time(duration); + tokio::time::advance(duration).await; + } + + #[tokio::test] + async fn test_wait_for_attestation_slot() { + tokio::time::pause(); + + let harness = TestHarness::create_validators(1).await; + let service = &harness.service; + let service_wait = service.wait_for_attestation_slot(); + tokio::pin!(service_wait); + + // This first call of .now_or_never() starts the timer and registers the sleep timer with tokio + // It calls sleep(duration_to_next_slot + payload_attestation_due).await which registers a timer with a deadline of 21s + assert!(service_wait.as_mut().now_or_never().is_none()); + + let duration_to_next_slot = harness.service.slot_clock.duration_to_next_slot().unwrap(); + let payload_attestation_due = harness.service.chain_spec.get_payload_attestation_due(); + let duration_to_wait = duration_to_next_slot + payload_attestation_due; + // Advance both slot_clock and tokio::time to 21s (the sleep deadline) + // The timer hasn't fired yet because tokio requires time to be strictly past the deadline. + // so the following assert! should return None + // This verifies that the function wait_for_attestation_slot waits for the correct duration before returning a slot. + advance_time(&harness.service.slot_clock, duration_to_wait).await; + assert!( + service_wait.as_mut().now_or_never().is_none(), + "Function should return None before the sleep duration has elapsed" + ); + + // Advance time for 1 more second, the sleep should have completed and the function should return Some(attestation_slot) + // slot_clock is now at 22s, which is slot 1 + // Removing this advance_time should cause the following assert_eq! to fail + advance_time(&harness.service.slot_clock, Duration::from_secs(1)).await; + assert_eq!( + service_wait.as_mut().now_or_never().unwrap(), + Some(Slot::new(1)) + ); + } + + #[tokio::test] + async fn publish_payload_attestation_ssz() { + let mut harness = TestHarness::create_validators(1).await; + + let attestation_slot = Slot::new(1); + harness.insert_ptc_duties(attestation_slot); + + let expected_payload_attestation = PayloadAttestationData { + beacon_block_root: Hash256::ZERO, + slot: attestation_slot, + payload_present: true, + blob_data_available: true, + }; + + harness + .mock_beacon_node_1 + .mock_get_validator_payload_attestation_data( + &expected_payload_attestation, + ForkName::Gloas, + attestation_slot, + ); + + let mock_ssz = harness + .mock_beacon_node_1 + .mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0)); + let mock_json = harness + .mock_beacon_node_2 + .mock_post_beacon_pool_payload_attestations(); + + let service = harness.service; + service.produce_and_publish(attestation_slot).await; + + let messages = harness + .mock_beacon_node_1 + .payload_attestation_message + .lock() + .unwrap(); + + // We create one validator with one PTC duty, so the PayloadAttestationMessage length should be 1 + assert_eq!( + messages.len(), + 1, + "Expected one payload attestation message" + ); + + // First try on beacon_node_1 (mock_ssz) is successful + // therefore mock_json is not hit at all + mock_ssz.expect(1).assert(); + mock_json.expect(0).assert(); + + let result = &messages[0]; + assert_eq!(result.validator_index, 0); + assert_eq!( + result.data.beacon_block_root, + expected_payload_attestation.beacon_block_root + ); + assert_eq!(result.data.slot, attestation_slot); + assert!(result.data.payload_present); + assert!(result.data.blob_data_available); + } + + #[tokio::test] + async fn publish_payload_attestation_ssz_fails_fallback_to_json() { + let mut harness = TestHarness::create_validators(1).await; + + let attestation_slot = Slot::new(1); + harness.insert_ptc_duties(attestation_slot); + + let expected_payload_attestation = PayloadAttestationData { + beacon_block_root: Hash256::ZERO, + slot: attestation_slot, + payload_present: true, + blob_data_available: true, + }; + + harness + .mock_beacon_node_1 + .mock_get_validator_payload_attestation_data( + &expected_payload_attestation, + ForkName::Gloas, + Slot::new(1), + ); + + // mock_ssz returns 500 to simulate BN does not support SSZ, so that it fallbacks to mock_json + let mock_ssz = harness + .mock_beacon_node_1 + .mock_post_beacon_pool_payload_attestations_ssz_error(); + let mock_json = harness + .mock_beacon_node_2 + .mock_post_beacon_pool_payload_attestations(); + + let service = harness.service; + service.produce_and_publish(attestation_slot).await; + + // first_success function tries both beacon nodes for SSZ post payload attestation: + // first pass: both fail (mock_ssz returns 500, mock_json does not support SSZ) + // second pass: repeats the first pass + // Therefore mock_ssz is hit twice. + // When SSZ fails, it fallbacks to JSON and should succeed on first call on mock_json. + mock_ssz.expect(2).assert(); + mock_json.expect(1).assert(); + + let messages = harness + .mock_beacon_node_2 + .payload_attestation_message + .lock() + .unwrap(); + + assert_eq!( + messages.len(), + 1, + "Expected one payload attestation via JSON fallback" + ); + } + + #[tokio::test] + async fn no_duties_no_publish() { + let mut harness = TestHarness::create_validators(1).await; + + // we do not insert any duties in this test + let mock = harness + .mock_beacon_node_1 + .mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0)); + + let service = harness.service; + + // when there is no duty, produce_and_publish should return early + // therefore, the beacon node is not called, expected to hit 0 + service.produce_and_publish(Slot::new(1)).await; + mock.expect(0).assert(); + + assert!( + harness + .mock_beacon_node_1 + .payload_attestation_message + .lock() + .unwrap() + .is_empty(), + "No payload attestation should be published when there are no duties" + ); + } + + #[tokio::test] + async fn test_get_payload_attestation_data_error() { + let mut harness = TestHarness::create_validators(1).await; + + let attestation_slot = Slot::new(1); + // We have PTC duties + harness.insert_ptc_duties(attestation_slot); + + // However, we simulate that both BNs have error in get_validator_payload_attestation_data + harness + .mock_beacon_node_1 + .mock_get_validator_payload_attestation_data_error(attestation_slot); + harness + .mock_beacon_node_2 + .mock_get_validator_payload_attestation_data_error(attestation_slot); + + let mock_ssz = harness + .mock_beacon_node_1 + .mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0)); + let mock_json = harness + .mock_beacon_node_2 + .mock_post_beacon_pool_payload_attestations(); + + let service = harness.service; + // The produce_and_publish() should return early before reaching the POST endpoint + service.produce_and_publish(attestation_slot).await; + + // Both beacon nodes should not be called at all + mock_ssz.expect(0).assert(); + mock_json.expect(0).assert(); + + // No payload attestation message published + assert!( + harness + .mock_beacon_node_1 + .payload_attestation_message + .lock() + .unwrap() + .is_empty(), + "No payload attestation should be published when get data fails" + ); + } + + #[tokio::test] + async fn publish_multiple_payload_attestation_messages() { + // Create 3 validators with 1 PTC duty for each validator + let mut harness = TestHarness::create_validators(3).await; + + let attestation_slot = Slot::new(1); + harness.insert_ptc_duties(attestation_slot); + + let expected_payload_attestation = PayloadAttestationData { + beacon_block_root: Hash256::ZERO, + slot: attestation_slot, + payload_present: true, + blob_data_available: true, + }; + + harness + .mock_beacon_node_1 + .mock_get_validator_payload_attestation_data( + &expected_payload_attestation, + ForkName::Gloas, + attestation_slot, + ); + + let mock_ssz = harness + .mock_beacon_node_1 + .mock_post_beacon_pool_payload_attestations_ssz(Duration::from_secs(0)); + + let service = harness.service; + service.produce_and_publish(attestation_slot).await; + + let messages = harness + .mock_beacon_node_1 + .payload_attestation_message + .lock() + .unwrap(); + + // With 3 PTC duties in total, we should have 3 PayloadAttestationMessage + assert_eq!( + messages.len(), + 3, + "Expected three payload attestation messages" + ); + // mock_ssz is only hit once + // this is to verify that a single call to the POST endpoint can publish multiple messages in one go + mock_ssz.expect(1).assert(); + } +}