diff --git a/Cargo.lock b/Cargo.lock index d4a531d26d..5d200768cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3600,6 +3600,7 @@ version = "0.1.0" dependencies = [ "beacon_chain", "bls", + "criterion", "ethereum_ssz", "ethereum_ssz_derive", "fixed_bytes", diff --git a/consensus/fork_choice/Cargo.toml b/consensus/fork_choice/Cargo.toml index df47a5c9d1..03dace1f9f 100644 --- a/consensus/fork_choice/Cargo.toml +++ b/consensus/fork_choice/Cargo.toml @@ -20,5 +20,10 @@ types = { workspace = true } [dev-dependencies] beacon_chain = { workspace = true } bls = { workspace = true } +criterion = { workspace = true } store = { workspace = true } tokio = { workspace = true } + +[[bench]] +name = "benches" +harness = false diff --git a/consensus/fork_choice/benches/benches.rs b/consensus/fork_choice/benches/benches.rs new file mode 100644 index 0000000000..bf01b4c346 --- /dev/null +++ b/consensus/fork_choice/benches/benches.rs @@ -0,0 +1,62 @@ +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use fork_choice::{QueuedAttestation, dequeue_attestations}; +use std::collections::BTreeMap; +use types::{Epoch, Hash256, Slot}; + +fn att(slot: Slot) -> QueuedAttestation { + QueuedAttestation { + slot, + attesting_indices: vec![], + block_root: Hash256::ZERO, + target_epoch: Epoch::new(0), + payload_present: false, + } +} + +// Anticipated steady-state workload on mainnet: ~94k attestations spread over a small number of slots, then +// many iterations of dequeue (one slot's worth) + enqueue (one slot's worth for a future slot). +// The queue stays at a constant size, exercising the per-slot dequeue cost. +const NUM_ATTESTATIONS: usize = 1_500_000 / 16; +const UNIQUE_SLOTS: usize = 2; +const NUM_ITERATIONS: usize = 64; +const PER_SLOT: usize = NUM_ATTESTATIONS / UNIQUE_SLOTS; + +fn build_queue() -> BTreeMap> { + let mut queue: BTreeMap> = BTreeMap::new(); + for i in 0..NUM_ATTESTATIONS { + let slot = Slot::from(i / PER_SLOT); + queue.entry(slot).or_default().push(att(slot)); + } + queue +} + +fn all_benches(c: &mut Criterion) { + let initial = build_queue(); + + c.bench_with_input( + BenchmarkId::new("dequeue_attestations", NUM_ATTESTATIONS), + &initial, + |b, initial| { + b.iter(|| { + let mut queue = initial.clone(); + for i in 1..=NUM_ITERATIONS { + let dequeued = dequeue_attestations(Slot::from(i), &mut queue); + let dequeued_count: usize = dequeued.values().map(Vec::len).sum(); + assert_eq!(dequeued_count, PER_SLOT); + + let next_slot = Slot::from(UNIQUE_SLOTS + i - 1); + queue + .entry(next_slot) + .or_default() + .extend(std::iter::repeat_with(|| att(next_slot)).take(PER_SLOT)); + + let total: usize = queue.values().map(Vec::len).sum(); + assert_eq!(total, NUM_ATTESTATIONS); + } + }) + }, + ); +} + +criterion_group!(benches, all_benches); +criterion_main!(benches); diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index aca5ab7851..95c8f70a04 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -11,7 +11,7 @@ use state_processing::{ per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing, }; use std::cmp::Ordering; -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::marker::PhantomData; use std::time::Duration; use superstruct::superstruct; @@ -284,12 +284,12 @@ fn compute_start_slot_at_epoch(epoch: Epoch) -> Slot { /// information about the attestation. #[derive(Clone, PartialEq, Encode, Decode)] pub struct QueuedAttestation { - slot: Slot, - attesting_indices: Vec, - block_root: Hash256, - target_epoch: Epoch, + pub slot: Slot, + pub attesting_indices: Vec, + pub block_root: Hash256, + pub target_epoch: Epoch, /// Per Gloas spec: `payload_present = attestation.data.index == 1`. - payload_present: bool, + pub payload_present: bool, } /// Legacy queued attestation without payload_present (pre-Gloas, schema V28). @@ -313,25 +313,22 @@ impl<'a, E: EthSpec> From> for QueuedAttestation { } } -/// Returns all values in `self.queued_attestations` that have a slot that is earlier than the -/// current slot. Also removes those values from `self.queued_attestations`. -fn dequeue_attestations( +/// Returns all attestations in `queued_attestations` with a slot earlier than the current slot, +/// removing them from the queue. +pub fn dequeue_attestations( current_slot: Slot, - queued_attestations: &mut Vec, -) -> Vec { - let remaining = queued_attestations.split_off( - queued_attestations - .iter() - .position(|a| a.slot >= current_slot) - .unwrap_or(queued_attestations.len()), - ); + queued_attestations: &mut BTreeMap>, +) -> BTreeMap> { + let remaining = queued_attestations.split_off(¤t_slot); + let due = std::mem::replace(queued_attestations, remaining); + let dequeued_count: usize = due.values().map(Vec::len).sum(); metrics::inc_counter_by( &metrics::FORK_CHOICE_DEQUEUED_ATTESTATIONS, - queued_attestations.len() as u64, + dequeued_count as u64, ); - std::mem::replace(queued_attestations, remaining) + due } /// Denotes whether an attestation we are processing was received from a block or from gossip. @@ -376,8 +373,9 @@ pub struct ForkChoice { fc_store: T, /// The underlying representation of the block DAG. proto_array: ProtoArrayForkChoice, - /// Attestations that arrived at the current slot and must be queued for later processing. - queued_attestations: Vec, + /// Attestations that arrived at the current slot and must be queued for later processing, + /// keyed by their slot. + queued_attestations: BTreeMap>, /// Stores a cache of the values required to be sent to the execution layer. forkchoice_update_parameters: ForkchoiceUpdateParameters, _phantom: PhantomData, @@ -474,7 +472,7 @@ where let mut fork_choice = Self { fc_store, proto_array, - queued_attestations: vec![], + queued_attestations: BTreeMap::new(), // This will be updated during the next call to `Self::get_head`. forkchoice_update_parameters: ForkchoiceUpdateParameters { head_hash: None, @@ -1353,8 +1351,11 @@ where // Attestations can only affect the fork choice of subsequent slots. // Delay consideration in the fork choice until their slot is in the past. // ``` + let queued_attestation = QueuedAttestation::from(attestation); self.queued_attestations - .push(QueuedAttestation::from(attestation)); + .entry(queued_attestation.slot) + .or_default() + .push(queued_attestation); } Ok(()) @@ -1546,10 +1547,11 @@ where /// Processes and removes from the queue any queued attestations which may now be eligible for /// processing due to the slot clock incrementing. fn process_attestation_queue(&mut self) -> Result<(), Error> { - for attestation in dequeue_attestations( + let dequeued = dequeue_attestations( self.fc_store.get_current_slot(), &mut self.queued_attestations, - ) { + ); + for attestation in dequeued.into_values().flatten() { for validator_index in attestation.attesting_indices.iter() { self.proto_array.process_attestation( *validator_index as usize, @@ -1795,8 +1797,8 @@ where &self.fc_store } - /// Returns a reference to the currently queued attestations. - pub fn queued_attestations(&self) -> &[QueuedAttestation] { + /// Returns a reference to the currently queued attestations, keyed by slot. + pub fn queued_attestations(&self) -> &BTreeMap> { &self.queued_attestations } @@ -1881,7 +1883,7 @@ where let mut fork_choice = Self { fc_store, proto_array, - queued_attestations: vec![], + queued_attestations: BTreeMap::new(), // Will be updated in the following call to `Self::get_head`. forkchoice_update_parameters: ForkchoiceUpdateParameters { head_hash: None, @@ -1994,20 +1996,33 @@ mod tests { } } - fn get_queued_attestations() -> Vec { - (1..4) - .map(|i| QueuedAttestation { - slot: Slot::new(i), + fn queue_from_slots( + slots: impl IntoIterator, + ) -> BTreeMap> { + let mut queued: BTreeMap> = BTreeMap::new(); + for i in slots { + let slot = Slot::new(i); + queued.entry(slot).or_default().push(QueuedAttestation { + slot, attesting_indices: vec![], block_root: Hash256::zero(), target_epoch: Epoch::new(0), payload_present: false, - }) - .collect() + }); + } + queued } - fn get_slots(queued_attestations: &[QueuedAttestation]) -> Vec { - queued_attestations.iter().map(|a| a.slot.into()).collect() + fn get_queued_attestations() -> BTreeMap> { + queue_from_slots(1..4) + } + + fn get_slots(queued_attestations: &BTreeMap>) -> Vec { + queued_attestations + .values() + .flatten() + .map(|a| a.slot.into()) + .collect() } fn test_queued_attestations(current_time: Slot) -> (Vec, Vec) { @@ -2039,4 +2054,25 @@ mod tests { assert!(queued.is_empty()); assert_eq!(dequeued, vec![1, 2, 3]); } + + #[test] + fn dequeue_attestations_out_of_order() { + // A future-slot vote enqueued before a vote that becomes due sooner must not block the + // due vote from being released. + let mut queued = queue_from_slots([4, 3]); + + // At slot 4, the slot-3 vote is due (3 < 4) and must be released. + let dequeued = dequeue_attestations(Slot::new(4), &mut queued); + + assert_eq!( + get_slots(&dequeued), + vec![3], + "slot-3 vote must be dequeued at slot 4" + ); + assert_eq!( + get_slots(&queued), + vec![4], + "only the not-yet-due slot-4 vote should remain" + ); + } } diff --git a/consensus/fork_choice/src/lib.rs b/consensus/fork_choice/src/lib.rs index dcc499547b..453926fbee 100644 --- a/consensus/fork_choice/src/lib.rs +++ b/consensus/fork_choice/src/lib.rs @@ -6,7 +6,7 @@ pub use crate::fork_choice::{ AttestationFromBlock, Error, ForkChoice, ForkChoiceView, ForkchoiceUpdateParameters, InvalidAttestation, InvalidBlock, InvalidPayloadAttestation, ParentImportStatus, PayloadVerificationStatus, PersistedForkChoice, PersistedForkChoiceV28, PersistedForkChoiceV29, - QueuedAttestation, ResetPayloadStatuses, + QueuedAttestation, ResetPayloadStatuses, dequeue_attestations, }; pub use fork_choice_store::ForkChoiceStore; pub use proto_array::{ diff --git a/consensus/fork_choice/src/metrics.rs b/consensus/fork_choice/src/metrics.rs index b5cda2f587..2ad18fc1ed 100644 --- a/consensus/fork_choice/src/metrics.rs +++ b/consensus/fork_choice/src/metrics.rs @@ -49,7 +49,11 @@ pub static FORK_CHOICE_ON_ATTESTER_SLASHING_TIMES: LazyLock> = pub fn scrape_for_metrics, E: EthSpec>(fork_choice: &ForkChoice) { set_gauge( &FORK_CHOICE_QUEUED_ATTESTATIONS, - fork_choice.queued_attestations().len() as i64, + fork_choice + .queued_attestations() + .values() + .map(Vec::len) + .sum::() as i64, ); set_gauge( &FORK_CHOICE_NODES, diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 02229e6f33..9b3f5d8857 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -149,13 +149,17 @@ impl ForkChoiceTest { .fork_choice_write_lock() .update_time(self.harness.chain.slot().unwrap()) .unwrap(); - func( - self.harness - .chain - .canonical_head - .fork_choice_read_lock() - .queued_attestations(), - ); + let queued = self + .harness + .chain + .canonical_head + .fork_choice_read_lock() + .queued_attestations() + .values() + .flatten() + .cloned() + .collect::>(); + func(&queued); self } @@ -415,6 +419,24 @@ impl ForkChoiceTest { async fn apply_attestation_to_chain( self, delay: MutationDelay, + mutation_func: F, + comparison_func: G, + ) -> Self + where + F: FnMut(&mut IndexedAttestation, &BeaconChain>), + G: FnMut(Result<(), BeaconChainError>), + { + self.apply_nth_attestation_to_chain(0, delay, mutation_func, comparison_func) + .await + } + + /// Like `apply_attestation_to_chain`, but attests with the validator at + /// `validator_index_in_committee` within the committee. Lets a test enqueue multiple distinct + /// votes for the same slot without tripping `PriorAttestationKnown`. + async fn apply_nth_attestation_to_chain( + self, + validator_index_in_committee: usize, + delay: MutationDelay, mut mutation_func: F, mut comparison_func: G, ) -> Self @@ -431,18 +453,16 @@ impl ForkChoiceTest { .produce_unaggregated_attestation(current_slot, 0) .expect("should not error while producing attestation"); - let validator_committee_index = 0; + // For these tests we always use committee index 0, which also matches the "dummy" committee + // index used post-Electra. + let committee_index = 0; + let validator_index = *head .beacon_state - .get_beacon_committee( - current_slot, - attestation - .committee_index() - .expect("should get committee index"), - ) + .get_beacon_committee(current_slot, committee_index) .expect("should get committees") .committee - .get(validator_committee_index) + .get(validator_index_in_committee) .expect("there should be an attesting validator"); let committee_count = head @@ -452,7 +472,7 @@ impl ForkChoiceTest { let subnet_id = SubnetId::compute_subnet::( current_slot, - 0, + committee_index, committee_count, &self.harness.chain.spec, ) @@ -463,7 +483,7 @@ impl ForkChoiceTest { attestation .sign( &validator_sk, - validator_committee_index, + committee_index as usize, &head.beacon_state.fork(), self.harness.chain.genesis_validators_root, &self.harness.chain.spec, @@ -472,7 +492,7 @@ impl ForkChoiceTest { let single_attestation = SingleAttestation { attester_index: validator_index as u64, - committee_index: validator_committee_index as u64, + committee_index, data: attestation.data().clone(), signature: attestation.signature().clone(), }; @@ -1039,6 +1059,100 @@ async fn invalid_attestation_delayed_slot() { .inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0)); } +/// Regression test for dequeuing when votes for two different future slots are queued. +/// +/// With votes queued for consecutive slots, advancing the clock past the earlier one must release +/// only that vote and leave the later one queued until its own slot is in the past. +#[tokio::test] +async fn dequeue_attestations_consecutive_slot_divergence() { + ForkChoiceTest::new() + .apply_blocks_without_new_attestations(1) + .await + .inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0)) + // Queue a vote for `slot + 2`. + .apply_nth_attestation_to_chain( + 0, + MutationDelay::NoDelay, + |attestation, _| { + let slot = attestation.data().slot; + attestation.data_mut().slot = slot + 2; + }, + |result| assert!(result.is_ok()), + ) + .await + // Queue a vote for `slot + 1`, which becomes due sooner. + // A different committee position avoids `PriorAttestationKnown`. + .apply_nth_attestation_to_chain( + 1, + MutationDelay::NoDelay, + |attestation, _| { + let slot = attestation.data().slot; + attestation.data_mut().slot = slot + 1; + }, + |result| assert!(result.is_ok()), + ) + .await + .inspect_queued_attestations(|queue| assert_eq!(queue.len(), 2)) + // Advance so the slot+1 vote is due (in the past) but the slot+2 vote is not yet. + .skip_slots(2) + .inspect_queued_attestations(|queue| { + assert_eq!( + queue.len(), + 1, + "only the due slot+1 vote should be dequeued" + ); + assert_eq!( + queue[0].slot, + Slot::new(3), + "the surviving vote must be the not-yet-due slot+2 vote" + ); + }); +} + +/// Companion to `dequeue_attestations_consecutive_slot_divergence`: votes for two different slots +/// are queued, but the clock is advanced far enough that *both* are due at dequeue time. +/// +/// When every queued vote is in the past, the whole queue drains in a single dequeue. +#[tokio::test] +async fn dequeue_attestations_conciliation() { + ForkChoiceTest::new() + .apply_blocks_without_new_attestations(1) + .await + .inspect_queued_attestations(|queue| assert_eq!(queue.len(), 0)) + // Queue a vote for `slot + 2`. + .apply_nth_attestation_to_chain( + 0, + MutationDelay::NoDelay, + |attestation, _| { + let slot = attestation.data().slot; + attestation.data_mut().slot = slot + 2; + }, + |result| assert!(result.is_ok()), + ) + .await + // Queue a vote for `slot + 1`. + .apply_nth_attestation_to_chain( + 1, + MutationDelay::NoDelay, + |attestation, _| { + let slot = attestation.data().slot; + attestation.data_mut().slot = slot + 1; + }, + |result| assert!(result.is_ok()), + ) + .await + .inspect_queued_attestations(|queue| assert_eq!(queue.len(), 2)) + // Advance past both votes (to slot + 3) so the whole queue drains. + .skip_slots(3) + .inspect_queued_attestations(|queue| { + assert_eq!( + queue.len(), + 0, + "all votes are due, so the entire queue must drain" + ); + }); +} + /// Tests that the correct target root is used when the attested-to block is in a prior epoch to /// the attestation. #[tokio::test]