Resolve merge conflicts

This commit is contained in:
Eitan Seri-Levi
2026-06-25 14:59:08 +03:00
54 changed files with 1351 additions and 1207 deletions

View File

@@ -20,5 +20,10 @@ types = { workspace = true }
[dev-dependencies]
beacon_chain = { workspace = true }
bls = { workspace = true }
criterion = { workspace = true }
store = { workspace = true }
tokio = { workspace = true }
[[bench]]
name = "benches"
harness = false

View File

@@ -0,0 +1,62 @@
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use fork_choice::{QueuedAttestation, dequeue_attestations};
use std::collections::BTreeMap;
use types::{Epoch, Hash256, Slot};
fn att(slot: Slot) -> QueuedAttestation {
QueuedAttestation {
slot,
attesting_indices: vec![],
block_root: Hash256::ZERO,
target_epoch: Epoch::new(0),
payload_present: false,
}
}
// Anticipated steady-state workload on mainnet: ~94k attestations spread over a small number of slots, then
// many iterations of dequeue (one slot's worth) + enqueue (one slot's worth for a future slot).
// The queue stays at a constant size, exercising the per-slot dequeue cost.
const NUM_ATTESTATIONS: usize = 1_500_000 / 16;
const UNIQUE_SLOTS: usize = 2;
const NUM_ITERATIONS: usize = 64;
const PER_SLOT: usize = NUM_ATTESTATIONS / UNIQUE_SLOTS;
fn build_queue() -> BTreeMap<Slot, Vec<QueuedAttestation>> {
let mut queue: BTreeMap<Slot, Vec<QueuedAttestation>> = BTreeMap::new();
for i in 0..NUM_ATTESTATIONS {
let slot = Slot::from(i / PER_SLOT);
queue.entry(slot).or_default().push(att(slot));
}
queue
}
fn all_benches(c: &mut Criterion) {
let initial = build_queue();
c.bench_with_input(
BenchmarkId::new("dequeue_attestations", NUM_ATTESTATIONS),
&initial,
|b, initial| {
b.iter(|| {
let mut queue = initial.clone();
for i in 1..=NUM_ITERATIONS {
let dequeued = dequeue_attestations(Slot::from(i), &mut queue);
let dequeued_count: usize = dequeued.values().map(Vec::len).sum();
assert_eq!(dequeued_count, PER_SLOT);
let next_slot = Slot::from(UNIQUE_SLOTS + i - 1);
queue
.entry(next_slot)
.or_default()
.extend(std::iter::repeat_with(|| att(next_slot)).take(PER_SLOT));
let total: usize = queue.values().map(Vec::len).sum();
assert_eq!(total, NUM_ATTESTATIONS);
}
})
},
);
}
criterion_group!(benches, all_benches);
criterion_main!(benches);

View File

@@ -11,7 +11,7 @@ use state_processing::{
per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing,
};
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};
use std::marker::PhantomData;
use std::time::Duration;
use superstruct::superstruct;
@@ -285,12 +285,12 @@ fn compute_start_slot_at_epoch<E: EthSpec>(epoch: Epoch) -> Slot {
/// information about the attestation.
#[derive(Clone, PartialEq, Encode, Decode)]
pub struct QueuedAttestation {
slot: Slot,
attesting_indices: Vec<u64>,
block_root: Hash256,
target_epoch: Epoch,
pub slot: Slot,
pub attesting_indices: Vec<u64>,
pub block_root: Hash256,
pub target_epoch: Epoch,
/// Per Gloas spec: `payload_present = attestation.data.index == 1`.
payload_present: bool,
pub payload_present: bool,
}
/// Legacy queued attestation without payload_present (pre-Gloas, schema V28).
@@ -314,25 +314,22 @@ impl<'a, E: EthSpec> From<IndexedAttestationRef<'a, E>> for QueuedAttestation {
}
}
/// Returns all values in `self.queued_attestations` that have a slot that is earlier than the
/// current slot. Also removes those values from `self.queued_attestations`.
fn dequeue_attestations(
/// Returns all attestations in `queued_attestations` with a slot earlier than the current slot,
/// removing them from the queue.
pub fn dequeue_attestations(
current_slot: Slot,
queued_attestations: &mut Vec<QueuedAttestation>,
) -> Vec<QueuedAttestation> {
let remaining = queued_attestations.split_off(
queued_attestations
.iter()
.position(|a| a.slot >= current_slot)
.unwrap_or(queued_attestations.len()),
);
queued_attestations: &mut BTreeMap<Slot, Vec<QueuedAttestation>>,
) -> BTreeMap<Slot, Vec<QueuedAttestation>> {
let remaining = queued_attestations.split_off(&current_slot);
let due = std::mem::replace(queued_attestations, remaining);
let dequeued_count: usize = due.values().map(Vec::len).sum();
metrics::inc_counter_by(
&metrics::FORK_CHOICE_DEQUEUED_ATTESTATIONS,
queued_attestations.len() as u64,
dequeued_count as u64,
);
std::mem::replace(queued_attestations, remaining)
due
}
/// Denotes whether an attestation we are processing was received from a block or from gossip.
@@ -377,8 +374,9 @@ pub struct ForkChoice<T, E> {
fc_store: T,
/// The underlying representation of the block DAG.
proto_array: ProtoArrayForkChoice,
/// Attestations that arrived at the current slot and must be queued for later processing.
queued_attestations: Vec<QueuedAttestation>,
/// Attestations that arrived at the current slot and must be queued for later processing,
/// keyed by their slot.
queued_attestations: BTreeMap<Slot, Vec<QueuedAttestation>>,
/// Stores a cache of the values required to be sent to the execution layer.
forkchoice_update_parameters: ForkchoiceUpdateParameters,
_phantom: PhantomData<E>,
@@ -475,7 +473,7 @@ where
let mut fork_choice = Self {
fc_store,
proto_array,
queued_attestations: vec![],
queued_attestations: BTreeMap::new(),
// This will be updated during the next call to `Self::get_head`.
forkchoice_update_parameters: ForkchoiceUpdateParameters {
head_hash: None,
@@ -1367,8 +1365,11 @@ where
// Attestations can only affect the fork choice of subsequent slots.
// Delay consideration in the fork choice until their slot is in the past.
// ```
let queued_attestation = QueuedAttestation::from(attestation);
self.queued_attestations
.push(QueuedAttestation::from(attestation));
.entry(queued_attestation.slot)
.or_default()
.push(queued_attestation);
}
Ok(())
@@ -1560,10 +1561,11 @@ where
/// Processes and removes from the queue any queued attestations which may now be eligible for
/// processing due to the slot clock incrementing.
fn process_attestation_queue(&mut self) -> Result<(), Error<T::Error>> {
for attestation in dequeue_attestations(
let dequeued = dequeue_attestations(
self.fc_store.get_current_slot(),
&mut self.queued_attestations,
) {
);
for attestation in dequeued.into_values().flatten() {
for validator_index in attestation.attesting_indices.iter() {
self.proto_array.process_attestation(
*validator_index as usize,
@@ -1826,8 +1828,8 @@ where
&self.fc_store
}
/// Returns a reference to the currently queued attestations.
pub fn queued_attestations(&self) -> &[QueuedAttestation] {
/// Returns a reference to the currently queued attestations, keyed by slot.
pub fn queued_attestations(&self) -> &BTreeMap<Slot, Vec<QueuedAttestation>> {
&self.queued_attestations
}
@@ -1912,7 +1914,7 @@ where
let mut fork_choice = Self {
fc_store,
proto_array,
queued_attestations: vec![],
queued_attestations: BTreeMap::new(),
// Will be updated in the following call to `Self::get_head`.
forkchoice_update_parameters: ForkchoiceUpdateParameters {
head_hash: None,
@@ -2025,20 +2027,33 @@ mod tests {
}
}
fn get_queued_attestations() -> Vec<QueuedAttestation> {
(1..4)
.map(|i| QueuedAttestation {
slot: Slot::new(i),
fn queue_from_slots(
slots: impl IntoIterator<Item = u64>,
) -> BTreeMap<Slot, Vec<QueuedAttestation>> {
let mut queued: BTreeMap<Slot, Vec<QueuedAttestation>> = BTreeMap::new();
for i in slots {
let slot = Slot::new(i);
queued.entry(slot).or_default().push(QueuedAttestation {
slot,
attesting_indices: vec![],
block_root: Hash256::zero(),
target_epoch: Epoch::new(0),
payload_present: false,
})
.collect()
});
}
queued
}
fn get_slots(queued_attestations: &[QueuedAttestation]) -> Vec<u64> {
queued_attestations.iter().map(|a| a.slot.into()).collect()
fn get_queued_attestations() -> BTreeMap<Slot, Vec<QueuedAttestation>> {
queue_from_slots(1..4)
}
fn get_slots(queued_attestations: &BTreeMap<Slot, Vec<QueuedAttestation>>) -> Vec<u64> {
queued_attestations
.values()
.flatten()
.map(|a| a.slot.into())
.collect()
}
fn test_queued_attestations(current_time: Slot) -> (Vec<u64>, Vec<u64>) {
@@ -2070,4 +2085,25 @@ mod tests {
assert!(queued.is_empty());
assert_eq!(dequeued, vec![1, 2, 3]);
}
#[test]
fn dequeue_attestations_out_of_order() {
// A future-slot vote enqueued before a vote that becomes due sooner must not block the
// due vote from being released.
let mut queued = queue_from_slots([4, 3]);
// At slot 4, the slot-3 vote is due (3 < 4) and must be released.
let dequeued = dequeue_attestations(Slot::new(4), &mut queued);
assert_eq!(
get_slots(&dequeued),
vec![3],
"slot-3 vote must be dequeued at slot 4"
);
assert_eq!(
get_slots(&queued),
vec![4],
"only the not-yet-due slot-4 vote should remain"
);
}
}

View File

@@ -6,7 +6,7 @@ pub use crate::fork_choice::{
AttestationFromBlock, Error, ForkChoice, ForkChoiceView, ForkchoiceUpdateParameters,
InvalidAttestation, InvalidBlock, InvalidPayloadAttestation, ParentImportStatus,
PayloadVerificationStatus, PersistedForkChoice, PersistedForkChoiceV28, PersistedForkChoiceV29,
QueuedAttestation, ResetPayloadStatuses,
QueuedAttestation, ResetPayloadStatuses, dequeue_attestations,
};
pub use fork_choice_store::ForkChoiceStore;
pub use proto_array::{

View File

@@ -49,7 +49,11 @@ pub static FORK_CHOICE_ON_ATTESTER_SLASHING_TIMES: LazyLock<Result<Histogram>> =
pub fn scrape_for_metrics<T: ForkChoiceStore<E>, E: EthSpec>(fork_choice: &ForkChoice<T, E>) {
set_gauge(
&FORK_CHOICE_QUEUED_ATTESTATIONS,
fork_choice.queued_attestations().len() as i64,
fork_choice
.queued_attestations()
.values()
.map(Vec::len)
.sum::<usize>() as i64,
);
set_gauge(
&FORK_CHOICE_NODES,

View File

@@ -149,13 +149,17 @@ impl ForkChoiceTest {
.fork_choice_write_lock()
.update_time(self.harness.chain.slot().unwrap())
.unwrap();
func(
self.harness
.chain
.canonical_head
.fork_choice_read_lock()
.queued_attestations(),
);
let queued = self
.harness
.chain
.canonical_head
.fork_choice_read_lock()
.queued_attestations()
.values()
.flatten()
.cloned()
.collect::<Vec<_>>();
func(&queued);
self
}
@@ -415,6 +419,24 @@ impl ForkChoiceTest {
async fn apply_attestation_to_chain<F, G>(
self,
delay: MutationDelay,
mutation_func: F,
comparison_func: G,
) -> Self
where
F: FnMut(&mut IndexedAttestation<E>, &BeaconChain<EphemeralHarnessType<E>>),
G: FnMut(Result<(), BeaconChainError>),
{
self.apply_nth_attestation_to_chain(0, delay, mutation_func, comparison_func)
.await
}
/// Like `apply_attestation_to_chain`, but attests with the validator at
/// `validator_index_in_committee` within the committee. Lets a test enqueue multiple distinct
/// votes for the same slot without tripping `PriorAttestationKnown`.
async fn apply_nth_attestation_to_chain<F, G>(
self,
validator_index_in_committee: usize,
delay: MutationDelay,
mut mutation_func: F,
mut comparison_func: G,
) -> Self
@@ -431,18 +453,16 @@ impl ForkChoiceTest {
.produce_unaggregated_attestation(current_slot, 0)
.expect("should not error while producing attestation");
let validator_committee_index = 0;
// For these tests we always use committee index 0, which also matches the "dummy" committee
// index used post-Electra.
let committee_index = 0;
let validator_index = *head
.beacon_state
.get_beacon_committee(
current_slot,
attestation
.committee_index()
.expect("should get committee index"),
)
.get_beacon_committee(current_slot, committee_index)
.expect("should get committees")
.committee
.get(validator_committee_index)
.get(validator_index_in_committee)
.expect("there should be an attesting validator");
let committee_count = head
@@ -452,7 +472,7 @@ impl ForkChoiceTest {
let subnet_id = SubnetId::compute_subnet::<E>(
current_slot,
0,
committee_index,
committee_count,
&self.harness.chain.spec,
)
@@ -463,7 +483,7 @@ impl ForkChoiceTest {
attestation
.sign(
&validator_sk,
validator_committee_index,
committee_index as usize,
&head.beacon_state.fork(),
self.harness.chain.genesis_validators_root,
&self.harness.chain.spec,
@@ -472,7 +492,7 @@ impl ForkChoiceTest {
let single_attestation = SingleAttestation {
attester_index: validator_index as u64,
committee_index: validator_committee_index as u64,
committee_index,
data: attestation.data().clone(),
signature: attestation.signature().clone(),
};
@@ -1039,6 +1059,100 @@ async fn invalid_attestation_delayed_slot() {
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0));
}
/// Regression test for dequeuing when votes for two different future slots are queued.
///
/// With votes queued for consecutive slots, advancing the clock past the earlier one must release
/// only that vote and leave the later one queued until its own slot is in the past.
#[tokio::test]
async fn dequeue_attestations_consecutive_slot_divergence() {
ForkChoiceTest::new()
.apply_blocks_without_new_attestations(1)
.await
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0))
// Queue a vote for `slot + 2`.
.apply_nth_attestation_to_chain(
0,
MutationDelay::NoDelay,
|attestation, _| {
let slot = attestation.data().slot;
attestation.data_mut().slot = slot + 2;
},
|result| assert!(result.is_ok()),
)
.await
// Queue a vote for `slot + 1`, which becomes due sooner.
// A different committee position avoids `PriorAttestationKnown`.
.apply_nth_attestation_to_chain(
1,
MutationDelay::NoDelay,
|attestation, _| {
let slot = attestation.data().slot;
attestation.data_mut().slot = slot + 1;
},
|result| assert!(result.is_ok()),
)
.await
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 2))
// Advance so the slot+1 vote is due (in the past) but the slot+2 vote is not yet.
.skip_slots(2)
.inspect_queued_attestations(|queue| {
assert_eq!(
queue.len(),
1,
"only the due slot+1 vote should be dequeued"
);
assert_eq!(
queue[0].slot,
Slot::new(3),
"the surviving vote must be the not-yet-due slot+2 vote"
);
});
}
/// Companion to `dequeue_attestations_consecutive_slot_divergence`: votes for two different slots
/// are queued, but the clock is advanced far enough that *both* are due at dequeue time.
///
/// When every queued vote is in the past, the whole queue drains in a single dequeue.
#[tokio::test]
async fn dequeue_attestations_conciliation() {
ForkChoiceTest::new()
.apply_blocks_without_new_attestations(1)
.await
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0))
// Queue a vote for `slot + 2`.
.apply_nth_attestation_to_chain(
0,
MutationDelay::NoDelay,
|attestation, _| {
let slot = attestation.data().slot;
attestation.data_mut().slot = slot + 2;
},
|result| assert!(result.is_ok()),
)
.await
// Queue a vote for `slot + 1`.
.apply_nth_attestation_to_chain(
1,
MutationDelay::NoDelay,
|attestation, _| {
let slot = attestation.data().slot;
attestation.data_mut().slot = slot + 1;
},
|result| assert!(result.is_ok()),
)
.await
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 2))
// Advance past both votes (to slot + 3) so the whole queue drains.
.skip_slots(3)
.inspect_queued_attestations(|queue| {
assert_eq!(
queue.len(),
0,
"all votes are due, so the entire queue must drain"
);
});
}
/// Tests that the correct target root is used when the attested-to block is in a prior epoch to
/// the attestation.
#[tokio::test]

View File

@@ -37,6 +37,10 @@ impl<E: EthSpec> SignedExecutionPayloadBid<E> {
signature: Signature::empty(),
}
}
pub fn num_blobs_expected(&self) -> usize {
self.message.blob_kzg_commitments.len()
}
}
#[cfg(test)]