From 95b99ee7247aa966aac48dc5e9bfb3d73db25b39 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 16 Mar 2026 22:40:22 +1100 Subject: [PATCH 1/3] Spec v1.7.0 alpha.3 (#8988) Update spec code for compliance with spec v1.7.0-alpha.3: https://github.com/ethereum/consensus-specs/releases/tag/v1.7.0-alpha.3 The actual consensus changes are minimal. There are few more changes that are only relevant to fork choice or P2P validation that we will pick up in future PRs. The change "Ignore beacon block if parent payload unknown" is currently covered in a hacky way by `load_parent` and can be improved once we have fork choice. The change "Add parent_block_root to bid filtering key" is relevant to bid gossip validation, which we don't have at all in unstable yet. Co-Authored-By: Michael Sproul --- .../beacon_chain/src/block_verification.rs | 1 + .../process_operations.rs | 139 +++++++++++++++++- consensus/types/src/core/consts.rs | 6 +- testing/ef_tests/Makefile | 2 +- testing/ef_tests/check_all_files_accessed.py | 2 + testing/ef_tests/download_test_vectors.sh | 7 +- testing/ef_tests/src/cases/operations.rs | 9 +- testing/ef_tests/src/handler.rs | 5 - 8 files changed, 151 insertions(+), 20 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 06ec26185f..802b090f6a 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1940,6 +1940,7 @@ fn load_parent>( { if block.as_block().is_parent_block_full(parent_bid_block_hash) { // TODO(gloas): loading the envelope here is not very efficient + // TODO(gloas): check parent payload existence prior to this point? let envelope = chain.store.get_payload_envelope(&root)?.ok_or_else(|| { BeaconChainError::DBInconsistent(format!( "Missing envelope for parent block {root:?}", diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index 9743812632..ac64398655 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -4,7 +4,10 @@ use crate::common::{ get_attestation_participation_flag_indices, increase_balance, initiate_validator_exit, slash_validator, }; -use crate::per_block_processing::errors::{BlockProcessingError, IntoWithIndex}; +use crate::per_block_processing::builder::{ + convert_validator_index_to_builder_index, is_builder_index, +}; +use crate::per_block_processing::errors::{BlockProcessingError, ExitInvalid, IntoWithIndex}; use crate::per_block_processing::verify_payload_attestation::verify_payload_attestation; use bls::{PublicKeyBytes, SignatureBytes}; use ssz_types::FixedVector; @@ -507,7 +510,26 @@ pub fn process_exits( // Verify and apply each exit in series. We iterate in series because higher-index exits may // become invalid due to the application of lower-index ones. for (i, exit) in voluntary_exits.iter().enumerate() { - verify_exit(state, None, exit, verify_signatures, spec) + // Exits must specify an epoch when they become valid; they are not valid before then. + let current_epoch = state.current_epoch(); + if current_epoch < exit.message.epoch { + return Err(BlockOperationError::invalid(ExitInvalid::FutureEpoch { + state: current_epoch, + exit: exit.message.epoch, + }) + .into_with_index(i)); + } + + // [New in Gloas:EIP7732] + if state.fork_name_unchecked().gloas_enabled() + && is_builder_index(exit.message.validator_index) + { + process_builder_voluntary_exit(state, exit, verify_signatures, spec) + .map_err(|e| e.into_with_index(i))?; + continue; + } + + verify_exit(state, Some(current_epoch), exit, verify_signatures, spec) .map_err(|e| e.into_with_index(i))?; initiate_validator_exit(state, exit.message.validator_index as usize, spec)?; @@ -515,6 +537,87 @@ pub fn process_exits( Ok(()) } +/// Process a builder voluntary exit. [New in Gloas:EIP7732] +fn process_builder_voluntary_exit( + state: &mut BeaconState, + signed_exit: &SignedVoluntaryExit, + verify_signatures: VerifySignatures, + spec: &ChainSpec, +) -> Result<(), BlockOperationError> { + let builder_index = + convert_validator_index_to_builder_index(signed_exit.message.validator_index); + + let builder = state + .builders()? + .get(builder_index as usize) + .cloned() + .ok_or(BlockOperationError::invalid(ExitInvalid::ValidatorUnknown( + signed_exit.message.validator_index, + )))?; + + // Verify the builder is active + let finalized_epoch = state.finalized_checkpoint().epoch; + if !builder.is_active_at_finalized_epoch(finalized_epoch, spec) { + return Err(BlockOperationError::invalid(ExitInvalid::NotActive( + signed_exit.message.validator_index, + ))); + } + + // Only exit builder if it has no pending withdrawals in the queue + let pending_balance = state.get_pending_balance_to_withdraw_for_builder(builder_index)?; + if pending_balance != 0 { + return Err(BlockOperationError::invalid( + ExitInvalid::PendingWithdrawalInQueue(signed_exit.message.validator_index), + )); + } + + // Verify signature (using EIP-7044 domain: capella_fork_version for Deneb+) + if verify_signatures.is_true() { + let pubkey = builder.pubkey; + let domain = spec.compute_domain( + Domain::VoluntaryExit, + spec.capella_fork_version, + state.genesis_validators_root(), + ); + let message = signed_exit.message.signing_root(domain); + // TODO(gloas): use builder pubkey cache once available + let bls_pubkey = pubkey + .decompress() + .map_err(|_| BlockOperationError::invalid(ExitInvalid::BadSignature))?; + if !signed_exit.signature.verify(&bls_pubkey, message) { + return Err(BlockOperationError::invalid(ExitInvalid::BadSignature)); + } + } + + // Initiate builder exit + initiate_builder_exit(state, builder_index, spec)?; + + Ok(()) +} + +/// Initiate the exit of a builder. [New in Gloas:EIP7732] +fn initiate_builder_exit( + state: &mut BeaconState, + builder_index: u64, + spec: &ChainSpec, +) -> Result<(), BeaconStateError> { + let current_epoch = state.current_epoch(); + let builder = state + .builders_mut()? + .get_mut(builder_index as usize) + .ok_or(BeaconStateError::UnknownBuilder(builder_index))?; + + // Return if builder already initiated exit + if builder.withdrawable_epoch != spec.far_future_epoch { + return Ok(()); + } + + // Set builder exit epoch + builder.withdrawable_epoch = current_epoch.safe_add(spec.min_builder_withdrawability_delay)?; + + Ok(()) +} + /// Validates each `bls_to_execution_change` and updates the state /// /// Returns `Ok(())` if the validation and state updates completed successfully. Otherwise returns @@ -814,6 +917,30 @@ pub fn process_deposit_requests_post_gloas( Ok(()) } +/// Check if there is a pending deposit for a new validator with the given pubkey. +// TODO(gloas): cache the deposit signature validation or remove this loop entirely if possible, +// it is `O(n * m)` where `n` is max 8192 and `m` is max 128M. +fn is_pending_validator( + state: &BeaconState, + pubkey: &PublicKeyBytes, + spec: &ChainSpec, +) -> Result { + for deposit in state.pending_deposits()?.iter() { + if deposit.pubkey == *pubkey { + let deposit_data = DepositData { + pubkey: deposit.pubkey, + withdrawal_credentials: deposit.withdrawal_credentials, + amount: deposit.amount, + signature: deposit.signature.clone(), + }; + if is_valid_deposit_signature(&deposit_data, spec).is_ok() { + return Ok(true); + } + } + } + Ok(false) +} + pub fn process_deposit_request_post_gloas( state: &mut BeaconState, deposit_request: &DepositRequest, @@ -835,10 +962,14 @@ pub fn process_deposit_request_post_gloas( let validator_index = state.get_validator_index(&deposit_request.pubkey)?; let is_validator = validator_index.is_some(); - let is_builder_prefix = + let has_builder_prefix = is_builder_withdrawal_credential(deposit_request.withdrawal_credentials, spec); - if is_builder || (is_builder_prefix && !is_validator) { + if is_builder + || (has_builder_prefix + && !is_validator + && !is_pending_validator(state, &deposit_request.pubkey, spec)?) + { // Apply builder deposits immediately apply_deposit_for_builder( state, diff --git a/consensus/types/src/core/consts.rs b/consensus/types/src/core/consts.rs index 0d4c0591cb..049094da76 100644 --- a/consensus/types/src/core/consts.rs +++ b/consensus/types/src/core/consts.rs @@ -31,9 +31,9 @@ pub mod gloas { // Fork choice constants pub type PayloadStatus = u8; - pub const PAYLOAD_STATUS_PENDING: PayloadStatus = 0; - pub const PAYLOAD_STATUS_EMPTY: PayloadStatus = 1; - pub const PAYLOAD_STATUS_FULL: PayloadStatus = 2; + pub const PAYLOAD_STATUS_EMPTY: PayloadStatus = 0; + pub const PAYLOAD_STATUS_FULL: PayloadStatus = 1; + pub const PAYLOAD_STATUS_PENDING: PayloadStatus = 2; pub const ATTESTATION_TIMELINESS_INDEX: usize = 0; pub const PTC_TIMELINESS_INDEX: usize = 1; diff --git a/testing/ef_tests/Makefile b/testing/ef_tests/Makefile index fd8a3f6da0..48378a4c95 100644 --- a/testing/ef_tests/Makefile +++ b/testing/ef_tests/Makefile @@ -1,6 +1,6 @@ # To download/extract nightly tests, run: # CONSENSUS_SPECS_TEST_VERSION=nightly make -CONSENSUS_SPECS_TEST_VERSION ?= v1.7.0-alpha.2 +CONSENSUS_SPECS_TEST_VERSION ?= v1.7.0-alpha.3 REPO_NAME := consensus-spec-tests OUTPUT_DIR := ./$(REPO_NAME) diff --git a/testing/ef_tests/check_all_files_accessed.py b/testing/ef_tests/check_all_files_accessed.py index 782b554ff1..dd6be14306 100755 --- a/testing/ef_tests/check_all_files_accessed.py +++ b/testing/ef_tests/check_all_files_accessed.py @@ -47,6 +47,8 @@ excluded_paths = [ "bls12-381-tests/hash_to_G2", "tests/.*/eip7732", "tests/.*/eip7805", + # Heze fork is not implemented + "tests/.*/heze/.*", # TODO(gloas): remove these ignores as Gloas consensus is implemented "tests/.*/gloas/fork_choice/.*", # Ignore MatrixEntry SSZ tests for now. diff --git a/testing/ef_tests/download_test_vectors.sh b/testing/ef_tests/download_test_vectors.sh index ff5b61bb47..f91b2d1c38 100755 --- a/testing/ef_tests/download_test_vectors.sh +++ b/testing/ef_tests/download_test_vectors.sh @@ -10,7 +10,7 @@ if [[ "$version" == "nightly" || "$version" =~ ^nightly-[0-9]+$ ]]; then exit 1 fi - for cmd in unzip jq; do + for cmd in jq; do if ! command -v "${cmd}" >/dev/null 2>&1; then echo "Error ${cmd} is not installed" exit 1 @@ -48,13 +48,10 @@ if [[ "$version" == "nightly" || "$version" =~ ^nightly-[0-9]+$ ]]; then echo "Downloading artifact: ${name}" curl --progress-bar --location --show-error --retry 3 --retry-all-errors --fail \ -H "${auth_header}" -H "Accept: application/vnd.github+json" \ - --output "${name}.zip" "${url}" || { + --output "${name}" "${url}" || { echo "Failed to download ${name}" exit 1 } - - unzip -qo "${name}.zip" - rm -f "${name}.zip" done else for test in "${TESTS[@]}"; do diff --git a/testing/ef_tests/src/cases/operations.rs b/testing/ef_tests/src/cases/operations.rs index ca0124e1aa..798c66b666 100644 --- a/testing/ef_tests/src/cases/operations.rs +++ b/testing/ef_tests/src/cases/operations.rs @@ -716,8 +716,13 @@ impl> LoadCase for Operations { // Check BLS setting here before SSZ deserialization, as most types require signatures // to be valid. - let (operation, bls_error) = if metadata.bls_setting.unwrap_or_default().check().is_ok() { - match O::decode(&path.join(O::filename()), fork_name, spec) { + let operation_path = path.join(O::filename()); + let (operation, bls_error) = if !operation_path.is_file() { + // Some test cases (e.g. builder_voluntary_exit__success) have no operation file. + // TODO(gloas): remove this once the test vectors are fixed + (None, None) + } else if metadata.bls_setting.unwrap_or_default().check().is_ok() { + match O::decode(&operation_path, fork_name, spec) { Ok(op) => (Some(op), None), Err(Error::InvalidBLSInput(error)) => (None, Some(error)), Err(e) => return Err(e), diff --git a/testing/ef_tests/src/handler.rs b/testing/ef_tests/src/handler.rs index da3c5533b6..f8c16aec0b 100644 --- a/testing/ef_tests/src/handler.rs +++ b/testing/ef_tests/src/handler.rs @@ -537,11 +537,6 @@ impl Handler for RandomHandler { fn handler_name(&self) -> String { "random".into() } - - fn disabled_forks(&self) -> Vec { - // TODO(gloas): remove once we have Gloas random tests - vec![ForkName::Gloas] - } } #[derive(Educe)] From 17d183eb5bf1718054598d4fc91efd2c8ef33431 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 17 Mar 2026 16:35:05 +0900 Subject: [PATCH 2/3] Unknown block for envelope (#8992) Add a queue that allows us to reprocess an envelope when it arrives over gossip references a unknown block root. When the block is finally imported, we immediately reprocess the queued envelope. Note that we don't trigger a block lookup sync. Incoming attestations for this block root will already trigger a lookup for us. I think thats good enough Co-Authored-By: Eitan Seri- Levi --- beacon_node/beacon_processor/src/lib.rs | 33 +- .../src/scheduler/work_queue.rs | 5 + .../src/scheduler/work_reprocessing_queue.rs | 281 ++++++++++++++++++ .../gossip_methods.rs | 63 +++- .../src/network_beacon_processor/tests.rs | 5 + 5 files changed, 383 insertions(+), 4 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 33a00bfa49..c33f4840e0 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -41,7 +41,8 @@ pub use crate::scheduler::BeaconProcessorQueueLengths; use crate::scheduler::work_queue::WorkQueues; use crate::work_reprocessing_queue::{ - QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, + QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope, + ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -242,6 +243,18 @@ impl From for WorkEvent { process_fn, }, }, + ReadyWork::Envelope(QueuedGossipEnvelope { + beacon_block_slot, + beacon_block_root, + process_fn, + }) => Self { + drop_during_sync: false, + work: Work::DelayedImportEnvelope { + beacon_block_slot, + beacon_block_root, + process_fn, + }, + }, ReadyWork::RpcBlock(QueuedRpcBlock { beacon_block_root, process_fn, @@ -384,6 +397,11 @@ pub enum Work { beacon_block_root: Hash256, process_fn: AsyncFn, }, + DelayedImportEnvelope { + beacon_block_slot: Slot, + beacon_block_root: Hash256, + process_fn: AsyncFn, + }, GossipVoluntaryExit(BlockingFn), GossipProposerSlashing(BlockingFn), GossipAttesterSlashing(BlockingFn), @@ -447,6 +465,7 @@ pub enum WorkType { GossipBlobSidecar, GossipDataColumnSidecar, DelayedImportBlock, + DelayedImportEnvelope, GossipVoluntaryExit, GossipProposerSlashing, GossipAttesterSlashing, @@ -498,6 +517,7 @@ impl Work { Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar, Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar, Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock, + Work::DelayedImportEnvelope { .. } => WorkType::DelayedImportEnvelope, Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit, Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing, Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing, @@ -793,6 +813,8 @@ impl BeaconProcessor { // on the delayed ones. } else if let Some(item) = work_queues.delayed_block_queue.pop() { Some(item) + } else if let Some(item) = work_queues.delayed_envelope_queue.pop() { + Some(item) // Check gossip blocks and payloads before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = work_queues.gossip_block_queue.pop() { @@ -1111,6 +1133,9 @@ impl BeaconProcessor { Work::DelayedImportBlock { .. } => { work_queues.delayed_block_queue.push(work, work_id) } + Work::DelayedImportEnvelope { .. } => { + work_queues.delayed_envelope_queue.push(work, work_id) + } Work::GossipVoluntaryExit { .. } => { work_queues.gossip_voluntary_exit_queue.push(work, work_id) } @@ -1238,6 +1263,7 @@ impl BeaconProcessor { work_queues.gossip_data_column_queue.len() } WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(), + WorkType::DelayedImportEnvelope => work_queues.delayed_envelope_queue.len(), WorkType::GossipVoluntaryExit => { work_queues.gossip_voluntary_exit_queue.len() } @@ -1435,6 +1461,11 @@ impl BeaconProcessor { beacon_block_slot: _, beacon_block_root: _, process_fn, + } + | Work::DelayedImportEnvelope { + beacon_block_slot: _, + beacon_block_root: _, + process_fn, } => task_spawner.spawn_async(process_fn), Work::RpcBlock { process_fn, diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index 934659b304..e48c776b6d 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -127,6 +127,7 @@ pub struct BeaconProcessorQueueLengths { gossip_blob_queue: usize, gossip_data_column_queue: usize, delayed_block_queue: usize, + delayed_envelope_queue: usize, status_queue: usize, block_brange_queue: usize, block_broots_queue: usize, @@ -197,6 +198,7 @@ impl BeaconProcessorQueueLengths { gossip_blob_queue: 1024, gossip_data_column_queue: 1024, delayed_block_queue: 1024, + delayed_envelope_queue: 1024, status_queue: 1024, block_brange_queue: 1024, block_broots_queue: 1024, @@ -250,6 +252,7 @@ pub struct WorkQueues { pub gossip_blob_queue: FifoQueue>, pub gossip_data_column_queue: FifoQueue>, pub delayed_block_queue: FifoQueue>, + pub delayed_envelope_queue: FifoQueue>, pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, @@ -315,6 +318,7 @@ impl WorkQueues { let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); + let delayed_envelope_queue = FifoQueue::new(queue_lengths.delayed_envelope_queue); let status_queue = FifoQueue::new(queue_lengths.status_queue); let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue); @@ -375,6 +379,7 @@ impl WorkQueues { gossip_blob_queue, gossip_data_column_queue, delayed_block_queue, + delayed_envelope_queue, status_queue, block_brange_queue, block_broots_queue, diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index c99388287c..38306b3bb6 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -35,6 +35,7 @@ use types::{EthSpec, Hash256, Slot}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; +const GOSSIP_ENVELOPES: &str = "gossip_envelopes"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root"; @@ -51,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); /// For how long to queue light client updates for re-processing. pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); +/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be +/// sent for processing after this many slots worth of time, even if the block hasn't arrived. +const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1; + /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4); @@ -65,6 +70,9 @@ pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150); /// it's nice to have extra protection. const MAXIMUM_QUEUED_BLOCKS: usize = 16; +/// Set an arbitrary upper-bound on the number of queued envelopes to avoid DoS attacks. +const MAXIMUM_QUEUED_ENVELOPES: usize = 16; + /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; @@ -93,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4); pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. EarlyBlock(QueuedGossipBlock), + /// An execution payload envelope that references a block not yet in fork choice. + UnknownBlockForEnvelope(QueuedGossipEnvelope), /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), @@ -120,6 +130,7 @@ pub enum ReprocessQueueMessage { /// Events sent by the scheduler once they are ready for re-processing. pub enum ReadyWork { Block(QueuedGossipBlock), + Envelope(QueuedGossipEnvelope), RpcBlock(QueuedRpcBlock), IgnoredRpcBlock(IgnoredRpcBlock), Unaggregate(QueuedUnaggregate), @@ -157,6 +168,13 @@ pub struct QueuedGossipBlock { pub process_fn: AsyncFn, } +/// An execution payload envelope that arrived early and has been queued for later import. +pub struct QueuedGossipEnvelope { + pub beacon_block_slot: Slot, + pub beacon_block_root: Hash256, + pub process_fn: AsyncFn, +} + /// A block that arrived for processing when the same block was being imported over gossip. /// It is queued for later import. pub struct QueuedRpcBlock { @@ -209,6 +227,8 @@ impl From for WorkEvent { enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. ReadyGossipBlock(QueuedGossipBlock), + /// An envelope whose block has been imported and is now ready for processing. + ReadyEnvelope(Hash256), /// A rpc block that was queued because the same gossip block was being imported /// will now be retried for import. ReadyRpcBlock(QueuedRpcBlock), @@ -234,6 +254,8 @@ struct ReprocessQueue { /* Queues */ /// Queue to manage scheduled early blocks. gossip_block_delay_queue: DelayQueue, + /// Queue to manage envelope timeouts (keyed by block root). + envelope_delay_queue: DelayQueue, /// Queue to manage scheduled early blocks. rpc_block_delay_queue: DelayQueue, /// Queue to manage scheduled attestations. @@ -246,6 +268,8 @@ struct ReprocessQueue { /* Queued items */ /// Queued blocks. queued_gossip_block_roots: HashSet, + /// Queued envelopes awaiting their block, keyed by block root. + awaiting_envelopes_per_root: HashMap, /// Queued aggregated attestations. queued_aggregates: FnvHashMap, /// Queued attestations. @@ -266,6 +290,7 @@ struct ReprocessQueue { next_attestation: usize, next_lc_update: usize, early_block_debounce: TimeLatch, + envelope_delay_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, @@ -315,6 +340,13 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.envelope_delay_queue.poll_expired(cx) { + Poll::Ready(Some(block_root)) => { + return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner()))); + } + Poll::Ready(None) | Poll::Pending => (), + } + match self.rpc_block_delay_queue.poll_expired(cx) { Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); @@ -418,11 +450,13 @@ impl ReprocessQueue { work_reprocessing_rx, ready_work_tx, gossip_block_delay_queue: DelayQueue::new(), + envelope_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), column_reconstructions_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), + awaiting_envelopes_per_root: HashMap::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), @@ -433,6 +467,7 @@ impl ReprocessQueue { next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), + envelope_delay_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), @@ -498,6 +533,52 @@ impl ReprocessQueue { } } } + // An envelope that references an unknown block. Queue it until the block is + // imported, or until the timeout expires. + InboundEvent::Msg(UnknownBlockForEnvelope(queued_envelope)) => { + let block_root = queued_envelope.beacon_block_root; + + // TODO(gloas): Perform lightweight pre-validation before queuing + // (e.g. verify builder signature) to prevent unsigned garbage from + // consuming queue slots. + + // Don't add the same envelope to the queue twice. This prevents DoS attacks. + if self.awaiting_envelopes_per_root.contains_key(&block_root) { + trace!( + ?block_root, + "Duplicate envelope for same block root, dropping" + ); + return; + } + + // When the queue is full, evict the oldest entry to make room for newer envelopes. + if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES { + if self.envelope_delay_debounce.elapsed() { + warn!( + queue_size = MAXIMUM_QUEUED_ENVELOPES, + msg = "system resources may be saturated", + "Envelope delay queue is full, evicting oldest entry" + ); + } + if let Some(oldest_root) = + self.awaiting_envelopes_per_root.keys().next().copied() + && let Some((_envelope, delay_key)) = + self.awaiting_envelopes_per_root.remove(&oldest_root) + { + self.envelope_delay_queue.remove(&delay_key); + } + } + + // Register the timeout. + let delay_key = self.envelope_delay_queue.insert( + block_root, + self.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS, + ); + + // Store the envelope keyed by block root. + self.awaiting_envelopes_per_root + .insert(block_root, (queued_envelope, delay_key)); + } // A rpc block arrived for processing at the same time when a gossip block // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` // and then send the rpc block back for processing assuming the gossip import @@ -647,6 +728,23 @@ impl ReprocessQueue { block_root, parent_root, }) => { + // Unqueue the envelope we have for this root, if any. + if let Some((envelope, delay_key)) = + self.awaiting_envelopes_per_root.remove(&block_root) + { + self.envelope_delay_queue.remove(&delay_key); + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!( + ?block_root, + "Failed to send envelope for reprocessing after block import" + ); + } + } + // Unqueue the attestations we have for this root, if any. if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { let mut sent_count = 0; @@ -802,6 +900,25 @@ impl ReprocessQueue { error!("Failed to pop queued block"); } } + // An envelope's timeout has expired. Send it for processing regardless of + // whether the block has been imported. + InboundEvent::ReadyEnvelope(block_root) => { + if let Some((envelope, _delay_key)) = + self.awaiting_envelopes_per_root.remove(&block_root) + { + debug!( + ?block_root, + "Envelope timed out waiting for block, sending for processing" + ); + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!(?block_root, "Failed to send envelope after timeout"); + } + } + } InboundEvent::ReadyAttestation(queued_id) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, @@ -941,6 +1058,11 @@ impl ReprocessQueue { &[GOSSIP_BLOCKS], self.gossip_block_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[GOSSIP_ENVELOPES], + self.awaiting_envelopes_per_root.len() as i64, + ); metrics::set_gauge_vec( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, &[RPC_BLOCKS], @@ -1339,4 +1461,163 @@ mod tests { assert_eq!(reconstruction.block_root, block_root); } } + + // Test that envelopes are properly cleaned up from `awaiting_envelopes_per_root` on timeout. + #[tokio::test] + async fn prune_awaiting_envelopes_per_root() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + // Insert an envelope. + let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root, + process_fn: Box::pin(async {}), + }); + + // Process the event to enter it into the delay queue. + queue.handle_message(InboundEvent::Msg(msg)); + + // Check that it is queued. + assert_eq!(queue.awaiting_envelopes_per_root.len(), 1); + assert!( + queue + .awaiting_envelopes_per_root + .contains_key(&beacon_block_root) + ); + + // Advance time to expire the envelope. + advance_time( + &queue.slot_clock, + queue.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS * 2, + ) + .await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyEnvelope(_))); + queue.handle_message(ready_msg); + + // The entry for the block root should be gone. + assert!(queue.awaiting_envelopes_per_root.is_empty()); + } + + #[tokio::test] + async fn envelope_released_on_block_imported() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + let parent_root = Hash256::repeat_byte(0xab); + + // Insert an envelope. + let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root, + process_fn: Box::pin(async {}), + }); + + // Process the event to enter it into the delay queue. + queue.handle_message(InboundEvent::Msg(msg)); + + // Check that it is queued. + assert_eq!(queue.awaiting_envelopes_per_root.len(), 1); + + // Simulate block import. + let imported = ReprocessQueueMessage::BlockImported { + block_root: beacon_block_root, + parent_root, + }; + queue.handle_message(InboundEvent::Msg(imported)); + + // The entry for the block root should be gone. + assert!(queue.awaiting_envelopes_per_root.is_empty()); + // Delay queue entry should also be cancelled. + assert_eq!(queue.envelope_delay_queue.len(), 0); + } + + #[tokio::test] + async fn envelope_dedup_drops_second() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + // Insert an envelope. + let msg1 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root, + process_fn: Box::pin(async {}), + }); + let msg2 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root, + process_fn: Box::pin(async {}), + }); + + // Process both events. + queue.handle_message(InboundEvent::Msg(msg1)); + queue.handle_message(InboundEvent::Msg(msg2)); + + // Only one should be queued. + assert_eq!(queue.awaiting_envelopes_per_root.len(), 1); + assert_eq!(queue.envelope_delay_queue.len(), 1); + } + + #[tokio::test] + async fn envelope_capacity_evicts_oldest() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + // Fill the queue to capacity. + for i in 0..MAXIMUM_QUEUED_ENVELOPES { + let block_root = Hash256::repeat_byte(i as u8); + let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root: block_root, + process_fn: Box::pin(async {}), + }); + queue.handle_message(InboundEvent::Msg(msg)); + } + assert_eq!( + queue.awaiting_envelopes_per_root.len(), + MAXIMUM_QUEUED_ENVELOPES + ); + + // One more should evict the oldest and insert the new one. + let overflow_root = Hash256::repeat_byte(0xff); + let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root: overflow_root, + process_fn: Box::pin(async {}), + }); + queue.handle_message(InboundEvent::Msg(msg)); + + // Queue should still be at capacity, with the new root present. + assert_eq!( + queue.awaiting_envelopes_per_root.len(), + MAXIMUM_QUEUED_ENVELOPES + ); + assert!( + queue + .awaiting_envelopes_per_root + .contains_key(&overflow_root) + ); + } } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 3335315157..1f55d9a878 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -20,7 +20,9 @@ use beacon_chain::{ }; use beacon_chain::{ blob_verification::{GossipBlobError, GossipVerifiedBlob}, - payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope, + payload_envelope_verification::{ + EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, + }, }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; @@ -49,8 +51,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ DuplicateCache, GossipAggregatePackage, GossipAttestationBatch, work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, - ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate, + QueuedUnaggregate, ReprocessQueueMessage, }, }; @@ -3332,6 +3334,61 @@ impl NetworkBeaconProcessor { verified_envelope } + + Err(EnvelopeError::BlockRootUnknown { block_root }) => { + let envelope_slot = envelope.slot(); + + debug!( + ?block_root, + %envelope_slot, + "Envelope references unknown block, deferring to reprocess queue" + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + + let inner_self = self.clone(); + let chain = self.chain.clone(); + let process_fn = Box::pin(async move { + match chain.verify_envelope_for_gossip(envelope).await { + Ok(verified_envelope) => { + inner_self + .process_gossip_verified_execution_payload_envelope( + peer_id, + verified_envelope, + ) + .await; + } + Err(e) => { + debug!( + error = ?e, + "Deferred envelope failed verification" + ); + } + } + }); + + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::UnknownBlockForEnvelope( + QueuedGossipEnvelope { + beacon_block_slot: envelope_slot, + beacon_block_root: block_root, + process_fn, + }, + )), + }) + .is_err() + { + error!( + %envelope_slot, + ?block_root, + "Failed to defer envelope import" + ); + } + return None; + } // TODO(gloas) penalize peers accordingly Err(_) => return None, }; diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 5fa8c729cb..c5ccbc2ae6 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -2090,3 +2090,8 @@ async fn test_data_columns_by_range_no_duplicates_with_skip_slots() { unique_roots.len(), ); } + +// TODO(ePBS): Add integration tests for envelope deferral (UnknownBlockForEnvelope): +// 1. Gossip envelope arrives before its block → queued via UnknownBlockForEnvelope +// 2. Block imported → envelope released and processed successfully +// 3. Timeout path → envelope released and re-verified From a965bfdf77a0b1a3cb2471b9df787edbe99779e8 Mon Sep 17 00:00:00 2001 From: Mac L Date: Wed, 18 Mar 2026 04:24:58 +0300 Subject: [PATCH 3/3] Remove `lighthouse/analysis` endpoints (#8968) Some of our custom `lighthouse/analysis` endpoints will require maintenance for the Gloas hard fork. We have decided instead to remove those endpoints. We don't utilize them internally and they have pretty limited utility and so we feel they are not worth maintaining. Remove `lighthouse/analysis/attestation_performance` and `lighthouse/analysis/block_packing_efficiency` endpoints. Co-Authored-By: Mac L --- .github/forbidden-files.txt | 4 + .../http_api/src/attestation_performance.rs | 217 --------- .../http_api/src/block_packing_efficiency.rs | 410 ------------------ beacon_node/http_api/src/lib.rs | 37 -- book/src/api_lighthouse.md | 120 ----- common/eth2/src/lighthouse.rs | 56 --- .../src/lighthouse/attestation_performance.rs | 39 -- .../lighthouse/block_packing_efficiency.rs | 34 -- testing/simulator/src/checks.rs | 46 +- 9 files changed, 26 insertions(+), 937 deletions(-) delete mode 100644 beacon_node/http_api/src/attestation_performance.rs delete mode 100644 beacon_node/http_api/src/block_packing_efficiency.rs delete mode 100644 common/eth2/src/lighthouse/attestation_performance.rs delete mode 100644 common/eth2/src/lighthouse/block_packing_efficiency.rs diff --git a/.github/forbidden-files.txt b/.github/forbidden-files.txt index a08a6b4e98..b070067350 100644 --- a/.github/forbidden-files.txt +++ b/.github/forbidden-files.txt @@ -6,5 +6,9 @@ beacon_node/beacon_chain/src/otb_verification_service.rs beacon_node/store/src/partial_beacon_state.rs beacon_node/store/src/consensus_context.rs beacon_node/beacon_chain/src/block_reward.rs +beacon_node/http_api/src/attestation_performance.rs +beacon_node/http_api/src/block_packing_efficiency.rs beacon_node/http_api/src/block_rewards.rs +common/eth2/src/lighthouse/attestation_performance.rs +common/eth2/src/lighthouse/block_packing_efficiency.rs common/eth2/src/lighthouse/block_rewards.rs diff --git a/beacon_node/http_api/src/attestation_performance.rs b/beacon_node/http_api/src/attestation_performance.rs deleted file mode 100644 index 05ed36e68b..0000000000 --- a/beacon_node/http_api/src/attestation_performance.rs +++ /dev/null @@ -1,217 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; -use eth2::lighthouse::{ - AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, -}; -use state_processing::{ - BlockReplayError, BlockReplayer, per_epoch_processing::EpochProcessingSummary, -}; -use std::sync::Arc; -use types::{BeaconState, BeaconStateError, EthSpec, Hash256}; -use warp_utils::reject::{custom_bad_request, custom_server_error, unhandled_error}; - -const MAX_REQUEST_RANGE_EPOCHS: usize = 100; -const BLOCK_ROOT_CHUNK_SIZE: usize = 100; - -#[derive(Debug)] -// We don't use the inner values directly, but they're used in the Debug impl. -enum AttestationPerformanceError { - BlockReplay(#[allow(dead_code)] BlockReplayError), - BeaconState(#[allow(dead_code)] BeaconStateError), - UnableToFindValidator(#[allow(dead_code)] usize), -} - -impl From for AttestationPerformanceError { - fn from(e: BlockReplayError) -> Self { - Self::BlockReplay(e) - } -} - -impl From for AttestationPerformanceError { - fn from(e: BeaconStateError) -> Self { - Self::BeaconState(e) - } -} - -pub fn get_attestation_performance( - target: String, - query: AttestationPerformanceQuery, - chain: Arc>, -) -> Result, warp::Rejection> { - let spec = &chain.spec; - // We increment by 2 here so that when we build the state from the `prior_slot` it is - // still 1 epoch ahead of the first epoch we want to analyse. - // This ensures the `.is_previous_epoch_X` functions on `EpochProcessingSummary` return results - // for the correct epoch. - let start_epoch = query.start_epoch + 2; - let start_slot = start_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let prior_slot = start_slot - 1; - - let end_epoch = query.end_epoch + 2; - let end_slot = end_epoch.end_slot(T::EthSpec::slots_per_epoch()); - - // Ensure end_epoch is smaller than the current epoch - 1. - let current_epoch = chain.epoch().map_err(unhandled_error)?; - if query.end_epoch >= current_epoch - 1 { - return Err(custom_bad_request(format!( - "end_epoch must be less than the current epoch - 1. current: {}, end: {}", - current_epoch, query.end_epoch - ))); - } - - // Check query is valid. - if start_epoch > end_epoch { - return Err(custom_bad_request(format!( - "start_epoch must not be larger than end_epoch. start: {}, end: {}", - query.start_epoch, query.end_epoch - ))); - } - - // The response size can grow exceptionally large therefore we should check that the - // query is within permitted bounds to prevent potential OOM errors. - if (end_epoch - start_epoch).as_usize() > MAX_REQUEST_RANGE_EPOCHS { - return Err(custom_bad_request(format!( - "end_epoch must not exceed start_epoch by more than {} epochs. start: {}, end: {}", - MAX_REQUEST_RANGE_EPOCHS, query.start_epoch, query.end_epoch - ))); - } - - // Either use the global validator set, or the specified index. - // - // Does no further validation of the indices, so in the event an index has not yet been - // activated or does not yet exist (according to the head state), it will return all fields as - // `false`. - let index_range = if target.to_lowercase() == "global" { - chain - .with_head(|head| Ok((0..head.beacon_state.validators().len() as u64).collect())) - .map_err(unhandled_error::)? - } else { - vec![target.parse::().map_err(|_| { - custom_bad_request(format!( - "Invalid validator index: {:?}", - target.to_lowercase() - )) - })?] - }; - - // Load block roots. - let mut block_roots: Vec = chain - .forwards_iter_block_roots_until(start_slot, end_slot) - .map_err(unhandled_error)? - .map(|res| res.map(|(root, _)| root)) - .collect::, _>>() - .map_err(unhandled_error)?; - block_roots.dedup(); - - // Load first block so we can get its parent. - let first_block_root = block_roots.first().ok_or_else(|| { - custom_server_error( - "No blocks roots could be loaded. Ensure the beacon node is synced.".to_string(), - ) - })?; - let first_block = chain - .get_blinded_block(first_block_root) - .and_then(|maybe_block| { - maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root)) - }) - .map_err(unhandled_error)?; - - // Load the block of the prior slot which will be used to build the starting state. - let prior_block = chain - .get_blinded_block(&first_block.parent_root()) - .and_then(|maybe_block| { - maybe_block - .ok_or_else(|| BeaconChainError::MissingBeaconBlock(first_block.parent_root())) - }) - .map_err(unhandled_error)?; - - // Load state for block replay. - let state_root = prior_block.state_root(); - - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. - let state = chain - .get_state(&state_root, Some(prior_slot), true) - .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) - .map_err(unhandled_error)?; - - // Allocate an AttestationPerformance vector for each validator in the range. - let mut perfs: Vec = - AttestationPerformance::initialize(index_range.clone()); - - let post_slot_hook = |state: &mut BeaconState, - summary: Option>, - _is_skip_slot: bool| - -> Result<(), AttestationPerformanceError> { - // If a `summary` was not output then an epoch boundary was not crossed - // so we move onto the next slot. - if let Some(summary) = summary { - for (position, i) in index_range.iter().enumerate() { - let index = *i as usize; - - let val = perfs - .get_mut(position) - .ok_or(AttestationPerformanceError::UnableToFindValidator(index))?; - - // We are two epochs ahead since the summary is generated for - // `state.previous_epoch()` then `summary.is_previous_epoch_X` functions return - // data for the epoch before that. - let epoch = state.previous_epoch().as_u64() - 1; - - let is_active = summary.is_active_unslashed_in_previous_epoch(index); - - let received_source_reward = summary.is_previous_epoch_source_attester(index)?; - - let received_head_reward = summary.is_previous_epoch_head_attester(index)?; - - let received_target_reward = summary.is_previous_epoch_target_attester(index)?; - - let inclusion_delay = summary - .previous_epoch_inclusion_info(index) - .map(|info| info.delay); - - let perf = AttestationPerformanceStatistics { - active: is_active, - head: received_head_reward, - target: received_target_reward, - source: received_source_reward, - delay: inclusion_delay, - }; - - val.epochs.insert(epoch, perf); - } - } - Ok(()) - }; - - // Initialize block replayer - let mut replayer = BlockReplayer::new(state, spec) - .no_state_root_iter() - .no_signature_verification() - .minimal_block_root_verification() - .post_slot_hook(Box::new(post_slot_hook)); - - // Iterate through block roots in chunks to reduce load on memory. - for block_root_chunks in block_roots.chunks(BLOCK_ROOT_CHUNK_SIZE) { - // Load blocks from the block root chunks. - let blocks = block_root_chunks - .iter() - .map(|root| { - chain - .get_blinded_block(root) - .and_then(|maybe_block| { - maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root)) - }) - .map_err(unhandled_error) - }) - .collect::, _>>()?; - - // TODO(gloas): add payloads - replayer = replayer - .apply_blocks(blocks, vec![], None) - .map_err(|e| custom_server_error(format!("{:?}", e)))?; - } - - drop(replayer); - - Ok(perfs) -} diff --git a/beacon_node/http_api/src/block_packing_efficiency.rs b/beacon_node/http_api/src/block_packing_efficiency.rs deleted file mode 100644 index 725a0648a5..0000000000 --- a/beacon_node/http_api/src/block_packing_efficiency.rs +++ /dev/null @@ -1,410 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; -use eth2::lighthouse::{ - BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, -}; -use parking_lot::Mutex; -use state_processing::{ - BlockReplayError, BlockReplayer, per_epoch_processing::EpochProcessingSummary, -}; -use std::collections::{HashMap, HashSet}; -use std::marker::PhantomData; -use std::sync::Arc; -use types::{ - AttestationRef, BeaconCommittee, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, - Epoch, EthSpec, Hash256, OwnedBeaconCommittee, RelativeEpoch, SignedBeaconBlock, Slot, -}; -use warp_utils::reject::{custom_bad_request, custom_server_error, unhandled_error}; - -/// Load blocks from block roots in chunks to reduce load on memory. -const BLOCK_ROOT_CHUNK_SIZE: usize = 100; - -#[derive(Debug)] -// We don't use the inner values directly, but they're used in the Debug impl. -enum PackingEfficiencyError { - BlockReplay(#[allow(dead_code)] BlockReplayError), - BeaconState(#[allow(dead_code)] BeaconStateError), - CommitteeStoreError(#[allow(dead_code)] Slot), - InvalidAttestationError, -} - -impl From for PackingEfficiencyError { - fn from(e: BlockReplayError) -> Self { - Self::BlockReplay(e) - } -} - -impl From for PackingEfficiencyError { - fn from(e: BeaconStateError) -> Self { - Self::BeaconState(e) - } -} - -struct CommitteeStore { - current_epoch_committees: Vec, - previous_epoch_committees: Vec, -} - -impl CommitteeStore { - fn new() -> Self { - CommitteeStore { - current_epoch_committees: Vec::new(), - previous_epoch_committees: Vec::new(), - } - } -} - -struct PackingEfficiencyHandler { - current_slot: Slot, - current_epoch: Epoch, - prior_skip_slots: u64, - available_attestations: HashSet, - included_attestations: HashMap, - committee_store: CommitteeStore, - _phantom: PhantomData, -} - -impl PackingEfficiencyHandler { - fn new( - start_epoch: Epoch, - starting_state: BeaconState, - spec: &ChainSpec, - ) -> Result { - let mut handler = PackingEfficiencyHandler { - current_slot: start_epoch.start_slot(E::slots_per_epoch()), - current_epoch: start_epoch, - prior_skip_slots: 0, - available_attestations: HashSet::new(), - included_attestations: HashMap::new(), - committee_store: CommitteeStore::new(), - _phantom: PhantomData, - }; - - handler.compute_epoch(start_epoch, &starting_state, spec)?; - Ok(handler) - } - - fn update_slot(&mut self, slot: Slot) { - self.current_slot = slot; - if slot % E::slots_per_epoch() == 0 { - self.current_epoch = Epoch::new(slot.as_u64() / E::slots_per_epoch()); - } - } - - fn prune_included_attestations(&mut self) { - let epoch = self.current_epoch; - self.included_attestations.retain(|x, _| { - x.slot >= Epoch::new(epoch.as_u64().saturating_sub(2)).start_slot(E::slots_per_epoch()) - }); - } - - fn prune_available_attestations(&mut self) { - let slot = self.current_slot; - self.available_attestations - .retain(|x| x.slot >= (slot.as_u64().saturating_sub(E::slots_per_epoch()))); - } - - fn apply_block( - &mut self, - block: &SignedBeaconBlock>, - ) -> Result { - let block_body = block.message().body(); - let attestations = block_body.attestations(); - - let mut attestations_in_block = HashMap::new(); - for attestation in attestations { - match attestation { - AttestationRef::Base(attn) => { - for (position, voted) in attn.aggregation_bits.iter().enumerate() { - if voted { - let unique_attestation = UniqueAttestation { - slot: attn.data.slot, - committee_index: attn.data.index, - committee_position: position, - }; - let inclusion_distance: u64 = block - .slot() - .as_u64() - .checked_sub(attn.data.slot.as_u64()) - .ok_or(PackingEfficiencyError::InvalidAttestationError)?; - - self.available_attestations.remove(&unique_attestation); - attestations_in_block.insert(unique_attestation, inclusion_distance); - } - } - } - AttestationRef::Electra(attn) => { - for (position, voted) in attn.aggregation_bits.iter().enumerate() { - if voted { - let unique_attestation = UniqueAttestation { - slot: attn.data.slot, - committee_index: attn.data.index, - committee_position: position, - }; - let inclusion_distance: u64 = block - .slot() - .as_u64() - .checked_sub(attn.data.slot.as_u64()) - .ok_or(PackingEfficiencyError::InvalidAttestationError)?; - - self.available_attestations.remove(&unique_attestation); - attestations_in_block.insert(unique_attestation, inclusion_distance); - } - } - } - } - } - - // Remove duplicate attestations as these yield no reward. - attestations_in_block.retain(|x, _| !self.included_attestations.contains_key(x)); - self.included_attestations - .extend(attestations_in_block.clone()); - - Ok(attestations_in_block.len()) - } - - fn add_attestations(&mut self, slot: Slot) -> Result<(), PackingEfficiencyError> { - let committees = self.get_committees_at_slot(slot)?; - for committee in committees { - for position in 0..committee.committee.len() { - let unique_attestation = UniqueAttestation { - slot, - committee_index: committee.index, - committee_position: position, - }; - self.available_attestations.insert(unique_attestation); - } - } - - Ok(()) - } - - fn compute_epoch( - &mut self, - epoch: Epoch, - state: &BeaconState, - spec: &ChainSpec, - ) -> Result<(), PackingEfficiencyError> { - // Free some memory by pruning old attestations from the included set. - self.prune_included_attestations(); - - let new_committees = if state.committee_cache_is_initialized(RelativeEpoch::Current) { - state - .get_beacon_committees_at_epoch(RelativeEpoch::Current)? - .into_iter() - .map(BeaconCommittee::into_owned) - .collect::>() - } else { - state - .initialize_committee_cache(epoch, spec)? - .get_all_beacon_committees()? - .into_iter() - .map(BeaconCommittee::into_owned) - .collect::>() - }; - - self.committee_store - .previous_epoch_committees - .clone_from(&self.committee_store.current_epoch_committees); - - self.committee_store.current_epoch_committees = new_committees; - - Ok(()) - } - - fn get_committees_at_slot( - &self, - slot: Slot, - ) -> Result, PackingEfficiencyError> { - let mut committees = Vec::new(); - - for committee in &self.committee_store.current_epoch_committees { - if committee.slot == slot { - committees.push(committee.clone()); - } - } - for committee in &self.committee_store.previous_epoch_committees { - if committee.slot == slot { - committees.push(committee.clone()); - } - } - - if committees.is_empty() { - return Err(PackingEfficiencyError::CommitteeStoreError(slot)); - } - - Ok(committees) - } -} - -pub fn get_block_packing_efficiency( - query: BlockPackingEfficiencyQuery, - chain: Arc>, -) -> Result, warp::Rejection> { - let spec = &chain.spec; - - let start_epoch = query.start_epoch; - let start_slot = start_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let prior_slot = start_slot - 1; - - let end_epoch = query.end_epoch; - let end_slot = end_epoch.end_slot(T::EthSpec::slots_per_epoch()); - - // Check query is valid. - if start_epoch > end_epoch || start_epoch == 0 { - return Err(custom_bad_request(format!( - "invalid start and end epochs: {}, {}", - start_epoch, end_epoch - ))); - } - - let prior_epoch = start_epoch - 1; - let start_slot_of_prior_epoch = prior_epoch.start_slot(T::EthSpec::slots_per_epoch()); - - // Load block roots. - let mut block_roots: Vec = chain - .forwards_iter_block_roots_until(start_slot_of_prior_epoch, end_slot) - .map_err(unhandled_error)? - .collect::, _>>() - .map_err(unhandled_error)? - .iter() - .map(|(root, _)| *root) - .collect(); - block_roots.dedup(); - - let first_block_root = block_roots - .first() - .ok_or_else(|| custom_server_error("no blocks were loaded".to_string()))?; - - let first_block = chain - .get_blinded_block(first_block_root) - .and_then(|maybe_block| { - maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root)) - }) - .map_err(unhandled_error)?; - - // Load state for block replay. - let starting_state_root = first_block.state_root(); - - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. - let starting_state = chain - .get_state(&starting_state_root, Some(prior_slot), true) - .and_then(|maybe_state| { - maybe_state.ok_or(BeaconChainError::MissingBeaconState(starting_state_root)) - }) - .map_err(unhandled_error)?; - - // Initialize response vector. - let mut response = Vec::new(); - - // Initialize handler. - let handler = Arc::new(Mutex::new( - PackingEfficiencyHandler::new(prior_epoch, starting_state.clone(), spec) - .map_err(|e| custom_server_error(format!("{:?}", e)))?, - )); - - let pre_slot_hook = - |_, state: &mut BeaconState| -> Result<(), PackingEfficiencyError> { - // Add attestations to `available_attestations`. - handler.lock().add_attestations(state.slot())?; - Ok(()) - }; - - let post_slot_hook = |state: &mut BeaconState, - _summary: Option>, - is_skip_slot: bool| - -> Result<(), PackingEfficiencyError> { - handler.lock().update_slot(state.slot()); - - // Check if this a new epoch. - if state.slot() % T::EthSpec::slots_per_epoch() == 0 { - handler.lock().compute_epoch( - state.slot().epoch(T::EthSpec::slots_per_epoch()), - state, - spec, - )?; - } - - if is_skip_slot { - handler.lock().prior_skip_slots += 1; - } - - // Remove expired attestations. - handler.lock().prune_available_attestations(); - - Ok(()) - }; - - let pre_block_hook = |_state: &mut BeaconState, - block: &SignedBeaconBlock<_, BlindedPayload<_>>| - -> Result<(), PackingEfficiencyError> { - let slot = block.slot(); - - let block_message = block.message(); - // Get block proposer info. - let proposer_info = ProposerInfo { - validator_index: block_message.proposer_index(), - graffiti: block_message.body().graffiti().as_utf8_lossy(), - }; - - // Store the count of available attestations at this point. - // In the future it may be desirable to check that the number of available attestations - // does not exceed the maximum possible amount given the length of available committees. - let available_count = handler.lock().available_attestations.len(); - - // Get all attestations included in the block. - let included = handler.lock().apply_block(block)?; - - let efficiency = BlockPackingEfficiency { - slot, - block_hash: block.canonical_root(), - proposer_info, - available_attestations: available_count, - included_attestations: included, - prior_skip_slots: handler.lock().prior_skip_slots, - }; - - // Write to response. - if slot >= start_slot { - response.push(efficiency); - } - - handler.lock().prior_skip_slots = 0; - - Ok(()) - }; - - // Build BlockReplayer. - let mut replayer = BlockReplayer::new(starting_state, spec) - .no_state_root_iter() - .no_signature_verification() - .minimal_block_root_verification() - .pre_slot_hook(Box::new(pre_slot_hook)) - .post_slot_hook(Box::new(post_slot_hook)) - .pre_block_hook(Box::new(pre_block_hook)); - - // Iterate through the block roots, loading blocks in chunks to reduce load on memory. - for block_root_chunks in block_roots.chunks(BLOCK_ROOT_CHUNK_SIZE) { - // Load blocks from the block root chunks. - let blocks = block_root_chunks - .iter() - .map(|root| { - chain - .get_blinded_block(root) - .and_then(|maybe_block| { - maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root)) - }) - .map_err(unhandled_error) - }) - .collect::, _>>()?; - - // TODO(gloas): add payloads - replayer = replayer - .apply_blocks(blocks, vec![], None) - .map_err(|e: PackingEfficiencyError| custom_server_error(format!("{:?}", e)))?; - } - - drop(replayer); - - Ok(response) -} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fc92128c91..29e2d39aee 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -7,11 +7,9 @@ //! used for development. mod aggregate_attestation; -mod attestation_performance; mod attester_duties; mod beacon; mod block_id; -mod block_packing_efficiency; mod build_block_contents; mod builder_states; mod custody; @@ -3091,39 +3089,6 @@ pub fn serve( }, ); - // GET lighthouse/analysis/attestation_performance/{index} - let get_lighthouse_attestation_performance = warp::path("lighthouse") - .and(warp::path("analysis")) - .and(warp::path("attestation_performance")) - .and(warp::path::param::()) - .and(warp::query::()) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |target, query, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { - attestation_performance::get_attestation_performance(target, query, chain) - }) - }, - ); - - // GET lighthouse/analysis/block_packing_efficiency - let get_lighthouse_block_packing_efficiency = warp::path("lighthouse") - .and(warp::path("analysis")) - .and(warp::path("block_packing_efficiency")) - .and(warp::query::()) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |query, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { - block_packing_efficiency::get_block_packing_efficiency(query, chain) - }) - }, - ); - let get_events = eth_v1 .clone() .and(warp::path("events")) @@ -3359,12 +3324,10 @@ pub fn serve( .uor(get_lighthouse_database_info) .uor(get_lighthouse_database_invariants) .uor(get_lighthouse_custody_info) - .uor(get_lighthouse_attestation_performance) .uor(get_beacon_light_client_optimistic_update) .uor(get_beacon_light_client_finality_update) .uor(get_beacon_light_client_bootstrap) .uor(get_beacon_light_client_updates) - .uor(get_lighthouse_block_packing_efficiency) .uor(get_events) .uor(get_expected_withdrawals) .uor(lighthouse_log_events.boxed()) diff --git a/book/src/api_lighthouse.md b/book/src/api_lighthouse.md index 2fd7290cb2..c2e4fbdd5a 100644 --- a/book/src/api_lighthouse.md +++ b/book/src/api_lighthouse.md @@ -512,126 +512,6 @@ As all testnets and Mainnet have been merged, both values will be the same after } ``` -## `/lighthouse/analysis/attestation_performance/{index}` - -Fetch information about the attestation performance of a validator index or all validators for a -range of consecutive epochs. - -Two query parameters are required: - -- `start_epoch` (inclusive): the first epoch to compute attestation performance for. -- `end_epoch` (inclusive): the final epoch to compute attestation performance for. - -Example: - -```bash -curl -X GET "http://localhost:5052/lighthouse/analysis/attestation_performance/1?start_epoch=1&end_epoch=1" | jq -``` - -```json -[ - { - "index": 1, - "epochs": { - "1": { - "active": true, - "head": true, - "target": true, - "source": true, - "delay": 1 - } - } - } -] -``` - -Instead of specifying a validator index, you can specify the entire validator set by using `global`: - -```bash -curl -X GET "http://localhost:5052/lighthouse/analysis/attestation_performance/global?start_epoch=1&end_epoch=1" | jq -``` - -```json -[ - { - "index": 0, - "epochs": { - "1": { - "active": true, - "head": true, - "target": true, - "source": true, - "delay": 1 - } - } - }, - { - "index": 1, - "epochs": { - "1": { - "active": true, - "head": true, - "target": true, - "source": true, - "delay": 1 - } - } - }, - { - .. - } -] - -``` - -Caveats: - -- For maximum efficiency the start_epoch should satisfy `(start_epoch * slots_per_epoch) % slots_per_restore_point == 1`. - This is because the state *prior* to the `start_epoch` needs to be loaded from the database, - and loading a state on a boundary is most efficient. - -## `/lighthouse/analysis/block_packing` - -Fetch information about the block packing efficiency of blocks for a range of consecutive -epochs. - -Two query parameters are required: - -- `start_epoch` (inclusive): the epoch of the first block to compute packing efficiency for. -- `end_epoch` (inclusive): the epoch of the last block to compute packing efficiency for. - -```bash -curl -X GET "http://localhost:5052/lighthouse/analysis/block_packing_efficiency?start_epoch=1&end_epoch=1" | jq -``` - -An excerpt of the response looks like: - -```json -[ - { - "slot": "33", - "block_hash": "0xb20970bb97c6c6de6b1e2b689d6381dd15b3d3518fbaee032229495f963bd5da", - "proposer_info": { - "validator_index": 855, - "graffiti": "poapZoJ7zWNfK7F3nWjEausWVBvKa6gA" - }, - "available_attestations": 3805, - "included_attestations": 1143, - "prior_skip_slots": 1 - }, - { - .. - } -] -``` - -Caveats: - -- `start_epoch` must not be `0`. -- For maximum efficiency the `start_epoch` should satisfy `(start_epoch * slots_per_epoch) % slots_per_restore_point == 1`. - This is because the state *prior* to the `start_epoch` needs to be loaded from the database, and - loading a state on a boundary is most efficient. - ## `/lighthouse/logs` This is a Server Side Event subscription endpoint. This allows a user to read diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 3c039b16b3..5ff7a7e0f0 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -1,7 +1,5 @@ //! This module contains endpoints that are non-standard and only available on Lighthouse servers. -mod attestation_performance; -mod block_packing_efficiency; mod custody; pub mod sync_state; @@ -15,12 +13,6 @@ use serde::{Deserialize, Serialize}; use ssz::four_byte_option_impl; use ssz_derive::{Decode, Encode}; -pub use attestation_performance::{ - AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, -}; -pub use block_packing_efficiency::{ - BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, -}; pub use custody::CustodyInfo; // Define "legacy" implementations of `Option` which use four bytes for encoding the union @@ -310,52 +302,4 @@ impl BeaconNodeHttpClient { self.post_with_response(path, &req).await } - - /* - Analysis endpoints. - */ - - /// `GET` lighthouse/analysis/block_packing?start_epoch,end_epoch - pub async fn get_lighthouse_analysis_block_packing( - &self, - start_epoch: Epoch, - end_epoch: Epoch, - ) -> Result, Error> { - let mut path = self.server.expose_full().clone(); - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("lighthouse") - .push("analysis") - .push("block_packing_efficiency"); - - path.query_pairs_mut() - .append_pair("start_epoch", &start_epoch.to_string()) - .append_pair("end_epoch", &end_epoch.to_string()); - - self.get(path).await - } - - /// `GET` lighthouse/analysis/attestation_performance/{index}?start_epoch,end_epoch - pub async fn get_lighthouse_analysis_attestation_performance( - &self, - start_epoch: Epoch, - end_epoch: Epoch, - target: String, - ) -> Result, Error> { - let mut path = self.server.expose_full().clone(); - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("lighthouse") - .push("analysis") - .push("attestation_performance") - .push(&target); - - path.query_pairs_mut() - .append_pair("start_epoch", &start_epoch.to_string()) - .append_pair("end_epoch", &end_epoch.to_string()); - - self.get(path).await - } } diff --git a/common/eth2/src/lighthouse/attestation_performance.rs b/common/eth2/src/lighthouse/attestation_performance.rs deleted file mode 100644 index 5ce1d90a38..0000000000 --- a/common/eth2/src/lighthouse/attestation_performance.rs +++ /dev/null @@ -1,39 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use types::Epoch; - -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct AttestationPerformanceStatistics { - pub active: bool, - pub head: bool, - pub target: bool, - pub source: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub delay: Option, -} - -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct AttestationPerformance { - pub index: u64, - pub epochs: HashMap, -} - -impl AttestationPerformance { - pub fn initialize(indices: Vec) -> Vec { - let mut vec = Vec::with_capacity(indices.len()); - for index in indices { - vec.push(Self { - index, - ..Default::default() - }) - } - vec - } -} - -/// Query parameters for the `/lighthouse/analysis/attestation_performance` endpoint. -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct AttestationPerformanceQuery { - pub start_epoch: Epoch, - pub end_epoch: Epoch, -} diff --git a/common/eth2/src/lighthouse/block_packing_efficiency.rs b/common/eth2/src/lighthouse/block_packing_efficiency.rs deleted file mode 100644 index 0ad6f46031..0000000000 --- a/common/eth2/src/lighthouse/block_packing_efficiency.rs +++ /dev/null @@ -1,34 +0,0 @@ -use serde::{Deserialize, Serialize}; -use types::{Epoch, Hash256, Slot}; - -type CommitteePosition = usize; -type Committee = u64; -type ValidatorIndex = u64; - -#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] -pub struct UniqueAttestation { - pub slot: Slot, - pub committee_index: Committee, - pub committee_position: CommitteePosition, -} -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct ProposerInfo { - pub validator_index: ValidatorIndex, - pub graffiti: String, -} - -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockPackingEfficiency { - pub slot: Slot, - pub block_hash: Hash256, - pub proposer_info: ProposerInfo, - pub available_attestations: usize, - pub included_attestations: usize, - pub prior_skip_slots: u64, -} - -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockPackingEfficiencyQuery { - pub start_epoch: Epoch, - pub end_epoch: Epoch, -} diff --git a/testing/simulator/src/checks.rs b/testing/simulator/src/checks.rs index 35200692c3..de202e5812 100644 --- a/testing/simulator/src/checks.rs +++ b/testing/simulator/src/checks.rs @@ -463,6 +463,9 @@ pub async fn reconnect_to_execution_layer( } /// Ensure all validators have attested correctly. +/// +/// Checks attestation rewards for head, target, and source. +/// A positive reward indicates a correct vote. pub async fn check_attestation_correctness( network: LocalNetwork, start_epoch: u64, @@ -476,54 +479,49 @@ pub async fn check_attestation_correctness( let remote_node = &network.remote_nodes()?[node_index]; - let results = remote_node - .get_lighthouse_analysis_attestation_performance( - Epoch::new(start_epoch), - Epoch::new(upto_epoch - 2), - "global".to_string(), - ) - .await - .map_err(|e| format!("Unable to get attestation performance: {e}"))?; - - let mut active_successes: f64 = 0.0; let mut head_successes: f64 = 0.0; let mut target_successes: f64 = 0.0; let mut source_successes: f64 = 0.0; - let mut total: f64 = 0.0; - for result in results { - for epochs in result.epochs.values() { + let end_epoch = upto_epoch + .checked_sub(2) + .ok_or_else(|| "upto_epoch must be >= 2 to have attestation rewards".to_string())?; + for epoch in start_epoch..=end_epoch { + let response = remote_node + .post_beacon_rewards_attestations(Epoch::new(epoch), &[]) + .await + .map_err(|e| format!("Unable to get attestation rewards for epoch {epoch}: {e}"))?; + + for reward in &response.data.total_rewards { total += 1.0; - if epochs.active { - active_successes += 1.0; - } - if epochs.head { + // A positive reward means the validator made a correct vote. + if reward.head > 0 { head_successes += 1.0; } - if epochs.target { + if reward.target > 0 { target_successes += 1.0; } - if epochs.source { + if reward.source > 0 { source_successes += 1.0; } } } - let active_percent = active_successes / total * 100.0; + + if total == 0.0 { + return Err("No attestation rewards data found".to_string()); + } + let head_percent = head_successes / total * 100.0; let target_percent = target_successes / total * 100.0; let source_percent = source_successes / total * 100.0; eprintln!("Total Attestations: {}", total); - eprintln!("Active: {}: {}%", active_successes, active_percent); eprintln!("Head: {}: {}%", head_successes, head_percent); eprintln!("Target: {}: {}%", target_successes, target_percent); eprintln!("Source: {}: {}%", source_successes, source_percent); - if active_percent < acceptable_attestation_performance { - return Err("Active percent was below required level".to_string()); - } if head_percent < acceptable_attestation_performance { return Err("Head percent was below required level".to_string()); }