merge conflicts

This commit is contained in:
Eitan Seri-Levi
2026-04-27 17:16:11 +02:00
6 changed files with 360 additions and 1 deletions

View File

@@ -7,6 +7,10 @@ use genesis::{generate_deterministic_keypairs, interop_genesis_state};
use parking_lot::RwLock;
use proto_array::PayloadStatus;
use slot_clock::{SlotClock, TestingSlotClock};
<<<<<<< HEAD
=======
use state_processing::AllCaches;
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
use state_processing::genesis::genesis_block;
use store::{HotColdDB, StoreConfig};
use types::{
@@ -25,7 +29,11 @@ use crate::{
GossipVerificationContext, VerifiedPayloadAttestationMessage,
},
},
<<<<<<< HEAD
test_utils::{EphemeralHarnessType, fork_name_from_env, test_spec},
=======
test_utils::{BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, test_spec},
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
validator_pubkey_cache::ValidatorPubkeyCache,
};
@@ -326,3 +334,99 @@ fn duplicate_after_valid() {
Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. })
));
}
<<<<<<< HEAD
=======
/// Exercises the `partial_state_advance` fallback in gossip verification when
/// the head state is too stale to compute PTC membership (e.g., during a
/// network liveness failure with many missed slots).
#[tokio::test]
async fn stale_head_with_partial_advance() {
if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) {
return;
}
let slots_per_epoch = E::slots_per_epoch();
// Head at epoch 1, message at epoch 5 — 4 epochs of missed slots.
// This exceeds min_seed_lookahead (1), triggering the fallback path:
// get_advanced_hot_state loads the stored state, then partial_state_advance
// advances it through epoch boundaries to populate ptc_window.
let head_slot = Slot::new(slots_per_epoch);
let missed_epochs = 4;
let target_slot = Slot::new(slots_per_epoch * (1 + missed_epochs));
let target_epoch = target_slot.epoch(slots_per_epoch);
// GIVEN a chain with blocks through epoch 1 (so the store has states).
let harness = BeaconChainHarness::builder(E::default())
.default_spec()
.deterministic_keypairs(64)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
harness.extend_to_slot(head_slot).await;
let head = harness.chain.canonical_head.cached_head();
let head_epoch = head.snapshot.beacon_state.current_epoch();
assert!(
target_epoch > head_epoch + harness.spec.min_seed_lookahead,
"precondition: message epoch must exceed head + min_seed_lookahead to trigger fallback"
);
// GIVEN a slot clock advanced to epoch 5 without producing blocks
// (simulating missed slots during a liveness failure).
harness.chain.slot_clock.set_slot(target_slot.as_u64());
// Advance a reference state to compute the PTC at the target slot.
let mut reference_state = head.snapshot.beacon_state.clone();
state_processing::state_advance::partial_state_advance(
&mut reference_state,
Some(head.snapshot.beacon_state_root()),
target_slot,
&harness.spec,
)
.expect("should advance reference state");
reference_state
.build_all_caches(&harness.spec)
.expect("should build caches");
let ptc = reference_state
.get_ptc(target_slot, &harness.spec)
.expect("should get PTC from reference state");
let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64;
// WHEN a properly-signed payload attestation from a PTC member is verified.
let domain = harness.spec.get_domain(
target_epoch,
Domain::PTCAttester,
&reference_state.fork(),
reference_state.genesis_validators_root(),
);
let data = PayloadAttestationData {
beacon_block_root: head.head_block_root(),
slot: target_slot,
payload_present: true,
blob_data_available: true,
};
let message = data.signing_root(domain);
let signature = harness.validator_keypairs[validator_index as usize]
.sk
.sign(message);
let msg = PayloadAttestationMessage {
validator_index,
data,
signature,
};
// THEN verification succeeds despite the head being 4 epochs stale.
let result = harness
.chain
.verify_payload_attestation_message_for_gossip(msg);
assert!(
result.is_ok(),
"expected Ok (head epoch {}, message epoch {}), got: {:?}",
head_epoch,
target_epoch,
result.unwrap_err()
);
}
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12

View File

@@ -9,6 +9,7 @@ use crate::version::{
};
use crate::{sync_committees, utils};
use beacon_chain::observed_operations::ObservationOutcome;
use beacon_chain::payload_attestation_verification::Error as PayloadAttestationError;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use bytes::Bytes;
use eth2::types::{AttestationPoolQuery, EndpointVersion, Failure, GenericResponse};
@@ -20,7 +21,7 @@ use ssz::{Decode, Encode};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use types::{
Attestation, AttestationData, AttesterSlashing, ForkName, PayloadAttestationMessage,
ProposerSlashing, SignedBlsToExecutionChange, SignedVoluntaryExit, SingleAttestation,
@@ -542,12 +543,20 @@ pub fn post_beacon_pool_payload_attestations<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
<<<<<<< HEAD
_chain: Arc<BeaconChain<T>>,
=======
chain: Arc<BeaconChain<T>>,
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
messages: Vec<PayloadAttestationMessage>,
_fork_name: Option<ForkName>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
<<<<<<< HEAD
publish_payload_attestation_messages(&network_tx, messages)
=======
publish_payload_attestation_messages(&chain, &network_tx, messages)
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
})
},
)
@@ -573,7 +582,11 @@ pub fn post_beacon_pool_payload_attestations_ssz<T: BeaconChainTypes>(
.then(
|body_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
<<<<<<< HEAD
_chain: Arc<BeaconChain<T>>,
=======
chain: Arc<BeaconChain<T>>,
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.blocking_json_task(Priority::P0, move || {
let item_len = <PayloadAttestationMessage as Encode>::ssz_fixed_len();
@@ -595,13 +608,18 @@ pub fn post_beacon_pool_payload_attestations_ssz<T: BeaconChainTypes>(
})
.collect::<Result<_, _>>()?;
<<<<<<< HEAD
publish_payload_attestation_messages(&network_tx, messages)
=======
publish_payload_attestation_messages(&chain, &network_tx, messages)
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
})
},
)
.boxed()
}
<<<<<<< HEAD
fn publish_payload_attestation_messages<E: types::EthSpec>(
network_tx: &UnboundedSender<NetworkMessage<E>>,
messages: Vec<PayloadAttestationMessage>,
@@ -614,4 +632,63 @@ fn publish_payload_attestation_messages<E: types::EthSpec>(
)?;
}
Ok(())
=======
fn publish_payload_attestation_messages<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
messages: Vec<PayloadAttestationMessage>,
) -> Result<(), warp::Rejection> {
let mut failures = vec![];
let mut num_already_known = 0;
for (index, message) in messages.into_iter().enumerate() {
match chain.verify_payload_attestation_message_for_gossip(message.clone()) {
Ok(verified) => {
utils::publish_pubsub_message(
network_tx,
PubsubMessage::PayloadAttestation(Box::new(message)),
)?;
if let Err(e) = chain.apply_payload_attestation_to_fork_choice(
verified.indexed_payload_attestation(),
verified.ptc(),
) {
warn!(
error = ?e,
request_index = index,
"Payload attestation invalid for fork choice"
);
}
}
Err(PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. }) => {
num_already_known += 1;
}
// TODO(gloas): requeue for reprocessing like attestations do.
Err(e) => {
error!(
error = ?e,
request_index = index,
"Failure verifying payload attestation for gossip"
);
failures.push(Failure::new(index, format!("{e:?}")));
}
}
}
if num_already_known > 0 {
debug!(
count = num_already_known,
"Some payload attestations already known"
);
}
if failures.is_empty() {
Ok(())
} else {
Err(warp_utils::reject::indexed_bad_request(
"error processing payload attestations".to_string(),
failures,
))
}
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
}

View File

@@ -2793,6 +2793,7 @@ impl ApiTester {
self
}
<<<<<<< HEAD
pub async fn test_post_beacon_pool_payload_attestations_valid(mut self) -> Self {
let slot = self.chain.slot().unwrap();
let head_root = self.chain.head_beacon_block_root();
@@ -2810,6 +2811,63 @@ impl ApiTester {
self.client
.post_beacon_pool_payload_attestations(&[message])
=======
fn make_valid_payload_attestation_message(
&self,
ptc_offset: usize,
) -> PayloadAttestationMessage {
let head = self.chain.head_snapshot();
let head_slot = head.beacon_block.slot();
let head_root = head.beacon_block_root;
let fork = head.beacon_state.fork();
let genesis_validators_root = self.chain.genesis_validators_root;
let ptc = head
.beacon_state
.get_ptc(head_slot, &self.chain.spec)
.expect("should get PTC");
// Find distinct validator indices in the PTC (may contain duplicates due to
// weighted sampling with a small validator set).
let mut seen = std::collections::HashSet::new();
let distinct_indices: Vec<usize> = ptc
.0
.iter()
.copied()
.filter(|idx| seen.insert(*idx))
.collect();
let validator_index = distinct_indices[ptc_offset % distinct_indices.len()];
let data = PayloadAttestationData {
beacon_block_root: head_root,
slot: head_slot,
payload_present: true,
blob_data_available: true,
};
let epoch = head_slot.epoch(E::slots_per_epoch());
let domain =
self.chain
.spec
.get_domain(epoch, Domain::PTCAttester, &fork, genesis_validators_root);
let signing_root = data.signing_root(domain);
let sk = &self.validator_keypairs()[validator_index].sk;
let signature = sk.sign(signing_root);
PayloadAttestationMessage {
validator_index: validator_index as u64,
data,
signature,
}
}
pub async fn test_post_beacon_pool_payload_attestations_valid(mut self) -> Self {
let message = self.make_valid_payload_attestation_message(0);
let fork_name = self.chain.spec.fork_name_at_slot::<E>(message.data.slot);
self.client
.post_beacon_pool_payload_attestations(&[message], fork_name)
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
.await
.unwrap();
@@ -2822,6 +2880,7 @@ impl ApiTester {
}
pub async fn test_post_beacon_pool_payload_attestations_valid_ssz(mut self) -> Self {
<<<<<<< HEAD
let slot = self.chain.slot().unwrap();
let head_root = self.chain.head_beacon_block_root();
@@ -2838,6 +2897,13 @@ impl ApiTester {
self.client
.post_beacon_pool_payload_attestations_ssz(&[message])
=======
let message = self.make_valid_payload_attestation_message(1);
let fork_name = self.chain.spec.fork_name_at_slot::<E>(message.data.slot);
self.client
.post_beacon_pool_payload_attestations_ssz(&[message], fork_name)
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
.await
.unwrap();

View File

@@ -1793,6 +1793,10 @@ impl BeaconNodeHttpClient {
pub async fn post_beacon_pool_payload_attestations(
&self,
messages: &[PayloadAttestationMessage],
<<<<<<< HEAD
=======
fork_name: ForkName,
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
) -> Result<(), Error> {
let mut path = self.eth_path(V1)?;
@@ -1802,7 +1806,11 @@ impl BeaconNodeHttpClient {
.push("pool")
.push("payload_attestations");
<<<<<<< HEAD
self.post_generic_with_consensus_version(path, &messages, None, ForkName::Gloas)
=======
self.post_generic_with_consensus_version(path, &messages, None, fork_name)
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
.await?;
Ok(())
@@ -1812,6 +1820,10 @@ impl BeaconNodeHttpClient {
pub async fn post_beacon_pool_payload_attestations_ssz(
&self,
messages: &[PayloadAttestationMessage],
<<<<<<< HEAD
=======
fork_name: ForkName,
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
) -> Result<(), Error> {
let mut path = self.eth_path(V1)?;
@@ -1823,6 +1835,7 @@ impl BeaconNodeHttpClient {
let ssz_body: Vec<u8> = messages.iter().flat_map(|m| m.as_ssz_bytes()).collect();
<<<<<<< HEAD
self.post_generic_with_consensus_version_and_ssz_body(
path,
ssz_body,
@@ -1830,6 +1843,10 @@ impl BeaconNodeHttpClient {
ForkName::Gloas,
)
.await?;
=======
self.post_generic_with_consensus_version_and_ssz_body(path, ssz_body, None, fork_name)
.await?;
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
Ok(())
}

View File

@@ -45,7 +45,11 @@ use validator_services::{
block_service::{BlockService, BlockServiceBuilder},
duties_service::{self, DutiesService, DutiesServiceBuilder},
latency_service,
<<<<<<< HEAD
payload_attestation_service::{PayloadAttestationService, PayloadAttestationServiceBuilder},
=======
payload_attestation_service::PayloadAttestationService,
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
preparation_service::{PreparationService, PreparationServiceBuilder},
sync_committee_service::SyncCommitteeService,
};
@@ -554,6 +558,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
context.executor.clone(),
);
<<<<<<< HEAD
let payload_attestation_service = PayloadAttestationServiceBuilder::new()
.duties_service(duties_service.clone())
.validator_store(validator_store.clone())
@@ -562,6 +567,16 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.executor(context.executor.clone())
.chain_spec(context.eth2_config.spec.clone())
.build()?;
=======
let payload_attestation_service = PayloadAttestationService::new(
duties_service.clone(),
validator_store.clone(),
slot_clock.clone(),
beacon_nodes.clone(),
context.executor.clone(),
context.eth2_config.spec.clone(),
);
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
Ok(Self {
context,
@@ -641,10 +656,19 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.start_update_service(&self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start sync committee service: {}", e))?;
<<<<<<< HEAD
self.payload_attestation_service
.clone()
.start_update_service()
.map_err(|e| format!("Unable to start payload attestation service: {}", e))?;
=======
if self.context.eth2_config.spec.is_gloas_scheduled() {
self.payload_attestation_service
.clone()
.start_update_service()
.map_err(|e| format!("Unable to start payload attestation service: {}", e))?;
}
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
self.preparation_service
.clone()

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::sleep;
use tracing::{debug, error, info};
<<<<<<< HEAD
use types::ChainSpec;
use validator_store::ValidatorStore;
@@ -95,6 +96,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
}
}
=======
use types::{ChainSpec, EthSpec};
use validator_store::ValidatorStore;
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
pub struct Inner<S, T> {
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
@@ -125,6 +131,29 @@ impl<S, T> Deref for PayloadAttestationService<S, T> {
}
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationService<S, T> {
<<<<<<< HEAD
=======
pub fn new(
duties_service: Arc<DutiesService<S, T>>,
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
chain_spec: Arc<ChainSpec>,
) -> Self {
Self {
inner: Arc::new(Inner {
duties_service,
validator_store,
slot_clock,
beacon_nodes,
executor,
chain_spec,
}),
}
}
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
pub fn start_update_service(self) -> Result<(), String> {
let slot_duration = self.chain_spec.get_slot_duration();
let payload_attestation_due = self.chain_spec.get_payload_attestation_due();
@@ -144,8 +173,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
continue;
};
<<<<<<< HEAD
sleep(duration_to_next_slot + payload_attestation_due).await;
=======
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
let Some(current_slot) = self.slot_clock.now() else {
error!("Failed to read slot clock after trigger");
continue;
@@ -156,6 +188,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
.fork_name_at_slot::<S::E>(current_slot)
.gloas_enabled()
{
<<<<<<< HEAD
continue;
}
@@ -169,6 +202,19 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
duty_count = duties.len(),
"Producing payload attestations"
);
=======
let duration_to_next_epoch = self
.slot_clock
.duration_to_next_epoch(S::E::slots_per_epoch())
.unwrap_or_else(|| {
self.chain_spec.get_slot_duration() * S::E::slots_per_epoch() as u32
});
sleep(duration_to_next_epoch).await;
continue;
}
sleep(duration_to_next_slot + payload_attestation_due).await;
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
let service = self.clone();
self.executor.spawn(
@@ -186,10 +232,23 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
async fn produce_and_publish(&self, slot: types::Slot) {
let duties = self.duties_service.get_ptc_duties_for_slot(slot);
<<<<<<< HEAD
=======
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
if duties.is_empty() {
return;
}
<<<<<<< HEAD
=======
debug!(
%slot,
duty_count = duties.len(),
"Producing payload attestations"
);
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
let attestation_data = match self
.beacon_nodes
.first_success(|beacon_node| async move {
@@ -246,13 +305,21 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
}
let count = messages.len();
<<<<<<< HEAD
=======
let fork_name = self.chain_spec.fork_name_at_slot::<S::E>(slot);
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
let result = self
.beacon_nodes
.first_success(|beacon_node| {
let messages = messages.clone();
async move {
beacon_node
<<<<<<< HEAD
.post_beacon_pool_payload_attestations_ssz(&messages)
=======
.post_beacon_pool_payload_attestations_ssz(&messages, fork_name)
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
.await
.map_err(|e| format!("Failed to publish payload attestations (SSZ): {e:?}"))
}
@@ -268,7 +335,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PayloadAttestationServ
let messages = messages.clone();
async move {
beacon_node
<<<<<<< HEAD
.post_beacon_pool_payload_attestations(&messages)
=======
.post_beacon_pool_payload_attestations(&messages, fork_name)
>>>>>>> 028b5a42a9715c31f416d45db70add39d9934b12
.await
.map_err(|e| {
format!("Failed to publish payload attestations (JSON): {e:?}")