mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-30 19:34:37 +00:00
Fix transient bug in dequeue_attestation and optimization (#9524)
`dequeue_attestations` released votes by splitting the queue at the first entry with `slot >= current_slot`, which assumes the queue is sorted by slot. It isn't: `on_attestation` pushes attestations in arrival order and never sorts. When a future-slot vote sits ahead of a vote that is already due, the split happens at the future-slot vote and the due vote stays stuck behind it and is never applied to fork choice, even after its slot is in the past. The PR current uses a naive solution to solve the bug and also adds regression tests to exercise the bug. There are other competing solutions which can be used which also optimize this path at the same time. https://github.com/sigp/lighthouse/pull/8378 https://github.com/sigp/lighthouse/pull/8378#discussion_r2543322106 Co-Authored-By: hopinheimer <knmanas6@gmail.com> Co-Authored-By: hopinheimer <48147533+hopinheimer@users.noreply.github.com> Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
@@ -20,5 +20,10 @@ types = { workspace = true }
|
||||
[dev-dependencies]
|
||||
beacon_chain = { workspace = true }
|
||||
bls = { workspace = true }
|
||||
criterion = { workspace = true }
|
||||
store = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
|
||||
[[bench]]
|
||||
name = "benches"
|
||||
harness = false
|
||||
|
||||
62
consensus/fork_choice/benches/benches.rs
Normal file
62
consensus/fork_choice/benches/benches.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
|
||||
use fork_choice::{QueuedAttestation, dequeue_attestations};
|
||||
use std::collections::BTreeMap;
|
||||
use types::{Epoch, Hash256, Slot};
|
||||
|
||||
fn att(slot: Slot) -> QueuedAttestation {
|
||||
QueuedAttestation {
|
||||
slot,
|
||||
attesting_indices: vec![],
|
||||
block_root: Hash256::ZERO,
|
||||
target_epoch: Epoch::new(0),
|
||||
payload_present: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Anticipated steady-state workload on mainnet: ~94k attestations spread over a small number of slots, then
|
||||
// many iterations of dequeue (one slot's worth) + enqueue (one slot's worth for a future slot).
|
||||
// The queue stays at a constant size, exercising the per-slot dequeue cost.
|
||||
const NUM_ATTESTATIONS: usize = 1_500_000 / 16;
|
||||
const UNIQUE_SLOTS: usize = 2;
|
||||
const NUM_ITERATIONS: usize = 64;
|
||||
const PER_SLOT: usize = NUM_ATTESTATIONS / UNIQUE_SLOTS;
|
||||
|
||||
fn build_queue() -> BTreeMap<Slot, Vec<QueuedAttestation>> {
|
||||
let mut queue: BTreeMap<Slot, Vec<QueuedAttestation>> = BTreeMap::new();
|
||||
for i in 0..NUM_ATTESTATIONS {
|
||||
let slot = Slot::from(i / PER_SLOT);
|
||||
queue.entry(slot).or_default().push(att(slot));
|
||||
}
|
||||
queue
|
||||
}
|
||||
|
||||
fn all_benches(c: &mut Criterion) {
|
||||
let initial = build_queue();
|
||||
|
||||
c.bench_with_input(
|
||||
BenchmarkId::new("dequeue_attestations", NUM_ATTESTATIONS),
|
||||
&initial,
|
||||
|b, initial| {
|
||||
b.iter(|| {
|
||||
let mut queue = initial.clone();
|
||||
for i in 1..=NUM_ITERATIONS {
|
||||
let dequeued = dequeue_attestations(Slot::from(i), &mut queue);
|
||||
let dequeued_count: usize = dequeued.values().map(Vec::len).sum();
|
||||
assert_eq!(dequeued_count, PER_SLOT);
|
||||
|
||||
let next_slot = Slot::from(UNIQUE_SLOTS + i - 1);
|
||||
queue
|
||||
.entry(next_slot)
|
||||
.or_default()
|
||||
.extend(std::iter::repeat_with(|| att(next_slot)).take(PER_SLOT));
|
||||
|
||||
let total: usize = queue.values().map(Vec::len).sum();
|
||||
assert_eq!(total, NUM_ATTESTATIONS);
|
||||
}
|
||||
})
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
criterion_group!(benches, all_benches);
|
||||
criterion_main!(benches);
|
||||
@@ -11,7 +11,7 @@ use state_processing::{
|
||||
per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::marker::PhantomData;
|
||||
use std::time::Duration;
|
||||
use superstruct::superstruct;
|
||||
@@ -284,12 +284,12 @@ fn compute_start_slot_at_epoch<E: EthSpec>(epoch: Epoch) -> Slot {
|
||||
/// information about the attestation.
|
||||
#[derive(Clone, PartialEq, Encode, Decode)]
|
||||
pub struct QueuedAttestation {
|
||||
slot: Slot,
|
||||
attesting_indices: Vec<u64>,
|
||||
block_root: Hash256,
|
||||
target_epoch: Epoch,
|
||||
pub slot: Slot,
|
||||
pub attesting_indices: Vec<u64>,
|
||||
pub block_root: Hash256,
|
||||
pub target_epoch: Epoch,
|
||||
/// Per Gloas spec: `payload_present = attestation.data.index == 1`.
|
||||
payload_present: bool,
|
||||
pub payload_present: bool,
|
||||
}
|
||||
|
||||
/// Legacy queued attestation without payload_present (pre-Gloas, schema V28).
|
||||
@@ -313,25 +313,22 @@ impl<'a, E: EthSpec> From<IndexedAttestationRef<'a, E>> for QueuedAttestation {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns all values in `self.queued_attestations` that have a slot that is earlier than the
|
||||
/// current slot. Also removes those values from `self.queued_attestations`.
|
||||
fn dequeue_attestations(
|
||||
/// Returns all attestations in `queued_attestations` with a slot earlier than the current slot,
|
||||
/// removing them from the queue.
|
||||
pub fn dequeue_attestations(
|
||||
current_slot: Slot,
|
||||
queued_attestations: &mut Vec<QueuedAttestation>,
|
||||
) -> Vec<QueuedAttestation> {
|
||||
let remaining = queued_attestations.split_off(
|
||||
queued_attestations
|
||||
.iter()
|
||||
.position(|a| a.slot >= current_slot)
|
||||
.unwrap_or(queued_attestations.len()),
|
||||
);
|
||||
queued_attestations: &mut BTreeMap<Slot, Vec<QueuedAttestation>>,
|
||||
) -> BTreeMap<Slot, Vec<QueuedAttestation>> {
|
||||
let remaining = queued_attestations.split_off(¤t_slot);
|
||||
let due = std::mem::replace(queued_attestations, remaining);
|
||||
|
||||
let dequeued_count: usize = due.values().map(Vec::len).sum();
|
||||
metrics::inc_counter_by(
|
||||
&metrics::FORK_CHOICE_DEQUEUED_ATTESTATIONS,
|
||||
queued_attestations.len() as u64,
|
||||
dequeued_count as u64,
|
||||
);
|
||||
|
||||
std::mem::replace(queued_attestations, remaining)
|
||||
due
|
||||
}
|
||||
|
||||
/// Denotes whether an attestation we are processing was received from a block or from gossip.
|
||||
@@ -376,8 +373,9 @@ pub struct ForkChoice<T, E> {
|
||||
fc_store: T,
|
||||
/// The underlying representation of the block DAG.
|
||||
proto_array: ProtoArrayForkChoice,
|
||||
/// Attestations that arrived at the current slot and must be queued for later processing.
|
||||
queued_attestations: Vec<QueuedAttestation>,
|
||||
/// Attestations that arrived at the current slot and must be queued for later processing,
|
||||
/// keyed by their slot.
|
||||
queued_attestations: BTreeMap<Slot, Vec<QueuedAttestation>>,
|
||||
/// Stores a cache of the values required to be sent to the execution layer.
|
||||
forkchoice_update_parameters: ForkchoiceUpdateParameters,
|
||||
_phantom: PhantomData<E>,
|
||||
@@ -474,7 +472,7 @@ where
|
||||
let mut fork_choice = Self {
|
||||
fc_store,
|
||||
proto_array,
|
||||
queued_attestations: vec![],
|
||||
queued_attestations: BTreeMap::new(),
|
||||
// This will be updated during the next call to `Self::get_head`.
|
||||
forkchoice_update_parameters: ForkchoiceUpdateParameters {
|
||||
head_hash: None,
|
||||
@@ -1353,8 +1351,11 @@ where
|
||||
// Attestations can only affect the fork choice of subsequent slots.
|
||||
// Delay consideration in the fork choice until their slot is in the past.
|
||||
// ```
|
||||
let queued_attestation = QueuedAttestation::from(attestation);
|
||||
self.queued_attestations
|
||||
.push(QueuedAttestation::from(attestation));
|
||||
.entry(queued_attestation.slot)
|
||||
.or_default()
|
||||
.push(queued_attestation);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1546,10 +1547,11 @@ where
|
||||
/// Processes and removes from the queue any queued attestations which may now be eligible for
|
||||
/// processing due to the slot clock incrementing.
|
||||
fn process_attestation_queue(&mut self) -> Result<(), Error<T::Error>> {
|
||||
for attestation in dequeue_attestations(
|
||||
let dequeued = dequeue_attestations(
|
||||
self.fc_store.get_current_slot(),
|
||||
&mut self.queued_attestations,
|
||||
) {
|
||||
);
|
||||
for attestation in dequeued.into_values().flatten() {
|
||||
for validator_index in attestation.attesting_indices.iter() {
|
||||
self.proto_array.process_attestation(
|
||||
*validator_index as usize,
|
||||
@@ -1795,8 +1797,8 @@ where
|
||||
&self.fc_store
|
||||
}
|
||||
|
||||
/// Returns a reference to the currently queued attestations.
|
||||
pub fn queued_attestations(&self) -> &[QueuedAttestation] {
|
||||
/// Returns a reference to the currently queued attestations, keyed by slot.
|
||||
pub fn queued_attestations(&self) -> &BTreeMap<Slot, Vec<QueuedAttestation>> {
|
||||
&self.queued_attestations
|
||||
}
|
||||
|
||||
@@ -1881,7 +1883,7 @@ where
|
||||
let mut fork_choice = Self {
|
||||
fc_store,
|
||||
proto_array,
|
||||
queued_attestations: vec![],
|
||||
queued_attestations: BTreeMap::new(),
|
||||
// Will be updated in the following call to `Self::get_head`.
|
||||
forkchoice_update_parameters: ForkchoiceUpdateParameters {
|
||||
head_hash: None,
|
||||
@@ -1994,20 +1996,33 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_queued_attestations() -> Vec<QueuedAttestation> {
|
||||
(1..4)
|
||||
.map(|i| QueuedAttestation {
|
||||
slot: Slot::new(i),
|
||||
fn queue_from_slots(
|
||||
slots: impl IntoIterator<Item = u64>,
|
||||
) -> BTreeMap<Slot, Vec<QueuedAttestation>> {
|
||||
let mut queued: BTreeMap<Slot, Vec<QueuedAttestation>> = BTreeMap::new();
|
||||
for i in slots {
|
||||
let slot = Slot::new(i);
|
||||
queued.entry(slot).or_default().push(QueuedAttestation {
|
||||
slot,
|
||||
attesting_indices: vec![],
|
||||
block_root: Hash256::zero(),
|
||||
target_epoch: Epoch::new(0),
|
||||
payload_present: false,
|
||||
})
|
||||
.collect()
|
||||
});
|
||||
}
|
||||
queued
|
||||
}
|
||||
|
||||
fn get_slots(queued_attestations: &[QueuedAttestation]) -> Vec<u64> {
|
||||
queued_attestations.iter().map(|a| a.slot.into()).collect()
|
||||
fn get_queued_attestations() -> BTreeMap<Slot, Vec<QueuedAttestation>> {
|
||||
queue_from_slots(1..4)
|
||||
}
|
||||
|
||||
fn get_slots(queued_attestations: &BTreeMap<Slot, Vec<QueuedAttestation>>) -> Vec<u64> {
|
||||
queued_attestations
|
||||
.values()
|
||||
.flatten()
|
||||
.map(|a| a.slot.into())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn test_queued_attestations(current_time: Slot) -> (Vec<u64>, Vec<u64>) {
|
||||
@@ -2039,4 +2054,25 @@ mod tests {
|
||||
assert!(queued.is_empty());
|
||||
assert_eq!(dequeued, vec![1, 2, 3]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dequeue_attestations_out_of_order() {
|
||||
// A future-slot vote enqueued before a vote that becomes due sooner must not block the
|
||||
// due vote from being released.
|
||||
let mut queued = queue_from_slots([4, 3]);
|
||||
|
||||
// At slot 4, the slot-3 vote is due (3 < 4) and must be released.
|
||||
let dequeued = dequeue_attestations(Slot::new(4), &mut queued);
|
||||
|
||||
assert_eq!(
|
||||
get_slots(&dequeued),
|
||||
vec![3],
|
||||
"slot-3 vote must be dequeued at slot 4"
|
||||
);
|
||||
assert_eq!(
|
||||
get_slots(&queued),
|
||||
vec![4],
|
||||
"only the not-yet-due slot-4 vote should remain"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ pub use crate::fork_choice::{
|
||||
AttestationFromBlock, Error, ForkChoice, ForkChoiceView, ForkchoiceUpdateParameters,
|
||||
InvalidAttestation, InvalidBlock, InvalidPayloadAttestation, ParentImportStatus,
|
||||
PayloadVerificationStatus, PersistedForkChoice, PersistedForkChoiceV28, PersistedForkChoiceV29,
|
||||
QueuedAttestation, ResetPayloadStatuses,
|
||||
QueuedAttestation, ResetPayloadStatuses, dequeue_attestations,
|
||||
};
|
||||
pub use fork_choice_store::ForkChoiceStore;
|
||||
pub use proto_array::{
|
||||
|
||||
@@ -49,7 +49,11 @@ pub static FORK_CHOICE_ON_ATTESTER_SLASHING_TIMES: LazyLock<Result<Histogram>> =
|
||||
pub fn scrape_for_metrics<T: ForkChoiceStore<E>, E: EthSpec>(fork_choice: &ForkChoice<T, E>) {
|
||||
set_gauge(
|
||||
&FORK_CHOICE_QUEUED_ATTESTATIONS,
|
||||
fork_choice.queued_attestations().len() as i64,
|
||||
fork_choice
|
||||
.queued_attestations()
|
||||
.values()
|
||||
.map(Vec::len)
|
||||
.sum::<usize>() as i64,
|
||||
);
|
||||
set_gauge(
|
||||
&FORK_CHOICE_NODES,
|
||||
|
||||
@@ -149,13 +149,17 @@ impl ForkChoiceTest {
|
||||
.fork_choice_write_lock()
|
||||
.update_time(self.harness.chain.slot().unwrap())
|
||||
.unwrap();
|
||||
func(
|
||||
self.harness
|
||||
.chain
|
||||
.canonical_head
|
||||
.fork_choice_read_lock()
|
||||
.queued_attestations(),
|
||||
);
|
||||
let queued = self
|
||||
.harness
|
||||
.chain
|
||||
.canonical_head
|
||||
.fork_choice_read_lock()
|
||||
.queued_attestations()
|
||||
.values()
|
||||
.flatten()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
func(&queued);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -415,6 +419,24 @@ impl ForkChoiceTest {
|
||||
async fn apply_attestation_to_chain<F, G>(
|
||||
self,
|
||||
delay: MutationDelay,
|
||||
mutation_func: F,
|
||||
comparison_func: G,
|
||||
) -> Self
|
||||
where
|
||||
F: FnMut(&mut IndexedAttestation<E>, &BeaconChain<EphemeralHarnessType<E>>),
|
||||
G: FnMut(Result<(), BeaconChainError>),
|
||||
{
|
||||
self.apply_nth_attestation_to_chain(0, delay, mutation_func, comparison_func)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Like `apply_attestation_to_chain`, but attests with the validator at
|
||||
/// `validator_index_in_committee` within the committee. Lets a test enqueue multiple distinct
|
||||
/// votes for the same slot without tripping `PriorAttestationKnown`.
|
||||
async fn apply_nth_attestation_to_chain<F, G>(
|
||||
self,
|
||||
validator_index_in_committee: usize,
|
||||
delay: MutationDelay,
|
||||
mut mutation_func: F,
|
||||
mut comparison_func: G,
|
||||
) -> Self
|
||||
@@ -431,18 +453,16 @@ impl ForkChoiceTest {
|
||||
.produce_unaggregated_attestation(current_slot, 0)
|
||||
.expect("should not error while producing attestation");
|
||||
|
||||
let validator_committee_index = 0;
|
||||
// For these tests we always use committee index 0, which also matches the "dummy" committee
|
||||
// index used post-Electra.
|
||||
let committee_index = 0;
|
||||
|
||||
let validator_index = *head
|
||||
.beacon_state
|
||||
.get_beacon_committee(
|
||||
current_slot,
|
||||
attestation
|
||||
.committee_index()
|
||||
.expect("should get committee index"),
|
||||
)
|
||||
.get_beacon_committee(current_slot, committee_index)
|
||||
.expect("should get committees")
|
||||
.committee
|
||||
.get(validator_committee_index)
|
||||
.get(validator_index_in_committee)
|
||||
.expect("there should be an attesting validator");
|
||||
|
||||
let committee_count = head
|
||||
@@ -452,7 +472,7 @@ impl ForkChoiceTest {
|
||||
|
||||
let subnet_id = SubnetId::compute_subnet::<E>(
|
||||
current_slot,
|
||||
0,
|
||||
committee_index,
|
||||
committee_count,
|
||||
&self.harness.chain.spec,
|
||||
)
|
||||
@@ -463,7 +483,7 @@ impl ForkChoiceTest {
|
||||
attestation
|
||||
.sign(
|
||||
&validator_sk,
|
||||
validator_committee_index,
|
||||
committee_index as usize,
|
||||
&head.beacon_state.fork(),
|
||||
self.harness.chain.genesis_validators_root,
|
||||
&self.harness.chain.spec,
|
||||
@@ -472,7 +492,7 @@ impl ForkChoiceTest {
|
||||
|
||||
let single_attestation = SingleAttestation {
|
||||
attester_index: validator_index as u64,
|
||||
committee_index: validator_committee_index as u64,
|
||||
committee_index,
|
||||
data: attestation.data().clone(),
|
||||
signature: attestation.signature().clone(),
|
||||
};
|
||||
@@ -1039,6 +1059,100 @@ async fn invalid_attestation_delayed_slot() {
|
||||
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0));
|
||||
}
|
||||
|
||||
/// Regression test for dequeuing when votes for two different future slots are queued.
|
||||
///
|
||||
/// With votes queued for consecutive slots, advancing the clock past the earlier one must release
|
||||
/// only that vote and leave the later one queued until its own slot is in the past.
|
||||
#[tokio::test]
|
||||
async fn dequeue_attestations_consecutive_slot_divergence() {
|
||||
ForkChoiceTest::new()
|
||||
.apply_blocks_without_new_attestations(1)
|
||||
.await
|
||||
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0))
|
||||
// Queue a vote for `slot + 2`.
|
||||
.apply_nth_attestation_to_chain(
|
||||
0,
|
||||
MutationDelay::NoDelay,
|
||||
|attestation, _| {
|
||||
let slot = attestation.data().slot;
|
||||
attestation.data_mut().slot = slot + 2;
|
||||
},
|
||||
|result| assert!(result.is_ok()),
|
||||
)
|
||||
.await
|
||||
// Queue a vote for `slot + 1`, which becomes due sooner.
|
||||
// A different committee position avoids `PriorAttestationKnown`.
|
||||
.apply_nth_attestation_to_chain(
|
||||
1,
|
||||
MutationDelay::NoDelay,
|
||||
|attestation, _| {
|
||||
let slot = attestation.data().slot;
|
||||
attestation.data_mut().slot = slot + 1;
|
||||
},
|
||||
|result| assert!(result.is_ok()),
|
||||
)
|
||||
.await
|
||||
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 2))
|
||||
// Advance so the slot+1 vote is due (in the past) but the slot+2 vote is not yet.
|
||||
.skip_slots(2)
|
||||
.inspect_queued_attestations(|queue| {
|
||||
assert_eq!(
|
||||
queue.len(),
|
||||
1,
|
||||
"only the due slot+1 vote should be dequeued"
|
||||
);
|
||||
assert_eq!(
|
||||
queue[0].slot,
|
||||
Slot::new(3),
|
||||
"the surviving vote must be the not-yet-due slot+2 vote"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/// Companion to `dequeue_attestations_consecutive_slot_divergence`: votes for two different slots
|
||||
/// are queued, but the clock is advanced far enough that *both* are due at dequeue time.
|
||||
///
|
||||
/// When every queued vote is in the past, the whole queue drains in a single dequeue.
|
||||
#[tokio::test]
|
||||
async fn dequeue_attestations_conciliation() {
|
||||
ForkChoiceTest::new()
|
||||
.apply_blocks_without_new_attestations(1)
|
||||
.await
|
||||
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0))
|
||||
// Queue a vote for `slot + 2`.
|
||||
.apply_nth_attestation_to_chain(
|
||||
0,
|
||||
MutationDelay::NoDelay,
|
||||
|attestation, _| {
|
||||
let slot = attestation.data().slot;
|
||||
attestation.data_mut().slot = slot + 2;
|
||||
},
|
||||
|result| assert!(result.is_ok()),
|
||||
)
|
||||
.await
|
||||
// Queue a vote for `slot + 1`.
|
||||
.apply_nth_attestation_to_chain(
|
||||
1,
|
||||
MutationDelay::NoDelay,
|
||||
|attestation, _| {
|
||||
let slot = attestation.data().slot;
|
||||
attestation.data_mut().slot = slot + 1;
|
||||
},
|
||||
|result| assert!(result.is_ok()),
|
||||
)
|
||||
.await
|
||||
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 2))
|
||||
// Advance past both votes (to slot + 3) so the whole queue drains.
|
||||
.skip_slots(3)
|
||||
.inspect_queued_attestations(|queue| {
|
||||
assert_eq!(
|
||||
queue.len(),
|
||||
0,
|
||||
"all votes are due, so the entire queue must drain"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/// Tests that the correct target root is used when the attested-to block is in a prior epoch to
|
||||
/// the attestation.
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user