diff --git a/.github/forbidden-files.txt b/.github/forbidden-files.txt index a08a6b4e98..b070067350 100644 --- a/.github/forbidden-files.txt +++ b/.github/forbidden-files.txt @@ -6,5 +6,9 @@ beacon_node/beacon_chain/src/otb_verification_service.rs beacon_node/store/src/partial_beacon_state.rs beacon_node/store/src/consensus_context.rs beacon_node/beacon_chain/src/block_reward.rs +beacon_node/http_api/src/attestation_performance.rs +beacon_node/http_api/src/block_packing_efficiency.rs beacon_node/http_api/src/block_rewards.rs +common/eth2/src/lighthouse/attestation_performance.rs +common/eth2/src/lighthouse/block_packing_efficiency.rs common/eth2/src/lighthouse/block_rewards.rs diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index 9992273e0a..308ddcf819 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -38,7 +38,7 @@ jobs: - name: Install Kurtosis run: | - echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list + echo "deb [trusted=yes] https://sdk.kurtosis.com/kurtosis-cli-release-artifacts/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list sudo apt update sudo apt install -y kurtosis-cli kurtosis analytics disable @@ -106,7 +106,7 @@ jobs: - name: Install Kurtosis run: | - echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list + echo "deb [trusted=yes] https://sdk.kurtosis.com/kurtosis-cli-release-artifacts/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list sudo apt update sudo apt install -y kurtosis-cli kurtosis analytics disable @@ -142,7 +142,7 @@ jobs: - name: Install Kurtosis run: | - echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list + echo "deb [trusted=yes] https://sdk.kurtosis.com/kurtosis-cli-release-artifacts/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list sudo apt update sudo apt install -y kurtosis-cli kurtosis analytics disable @@ -185,7 +185,7 @@ jobs: - name: Install Kurtosis run: | - echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list + echo "deb [trusted=yes] https://sdk.kurtosis.com/kurtosis-cli-release-artifacts/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list sudo apt update sudo apt install -y kurtosis-cli kurtosis analytics disable @@ -227,7 +227,7 @@ jobs: - name: Install Kurtosis run: | - echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list + echo "deb [trusted=yes] https://sdk.kurtosis.com/kurtosis-cli-release-artifacts/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list sudo apt update sudo apt install -y kurtosis-cli kurtosis analytics disable diff --git a/Cargo.lock b/Cargo.lock index a14aacc0a2..8dee921887 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5267,7 +5267,7 @@ dependencies = [ "rcgen", "ring", "rustls 0.23.35", - "rustls-webpki 0.103.8", + "rustls-webpki 0.103.10", "thiserror 2.0.17", "x509-parser", "yasna", @@ -7121,7 +7121,7 @@ dependencies = [ "once_cell", "socket2 0.5.10", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7670,7 +7670,7 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.8", + "rustls-webpki 0.103.10", "subtle", "zeroize", ] @@ -7719,9 +7719,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.8" +version = "0.103.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52" +checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" dependencies = [ "ring", "rustls-pki-types", diff --git a/Makefile b/Makefile index ad1bbbb8e8..599c1a8791 100644 --- a/Makefile +++ b/Makefile @@ -331,7 +331,7 @@ install-audit: cargo install --force cargo-audit audit-CI: - cargo audit + cargo audit --ignore RUSTSEC-2026-0049 # Runs cargo deny (check for banned crates, duplicate versions, and source restrictions) deny: install-deny deny-CI diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 96e4ab6c60..d53e588d54 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6715,6 +6715,9 @@ impl BeaconChain { let mut prev_block_root = None; let mut prev_beacon_state = None; + // Collect all blocks. + let mut blocks = vec![]; + for res in self.forwards_iter_block_roots(from_slot)? { let (beacon_block_root, _) = res?; @@ -6730,16 +6733,42 @@ impl BeaconChain { .ok_or_else(|| { Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) })?; - let beacon_state_root = beacon_block.state_root(); + blocks.push((beacon_block_root, Arc::new(beacon_block))); + } + + // Collect states, using the next blocks to determine if states are full (have Gloas + // payloads). + for (i, (block_root, block)) in blocks.iter().enumerate() { + let (opt_envelope, state_root) = if block.fork_name_unchecked().gloas_enabled() { + let opt_envelope = self.store.get_payload_envelope(block_root)?.map(Arc::new); + + if let Some((_, next_block)) = blocks.get(i + 1) { + let block_hash = block.payload_bid_block_hash()?; + if next_block.is_parent_block_full(block_hash) { + let envelope = opt_envelope.ok_or_else(|| { + Error::DBInconsistent(format!("Missing envelope {block_root:?}")) + })?; + let state_root = envelope.message.state_root; + (Some(envelope), state_root) + } else { + (None, block.state_root()) + } + } else { + // TODO(gloas): should use fork choice/cached head for last block in sequence + opt_envelope + .as_ref() + .map_or((None, block.state_root()), |envelope| { + (Some(envelope.clone()), envelope.message.state_root) + }) + } + } else { + (None, block.state_root()) + }; - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. let mut beacon_state = self .store - .get_state(&beacon_state_root, Some(beacon_block.slot()), true)? - .ok_or_else(|| { - Error::DBInconsistent(format!("Missing state {:?}", beacon_state_root)) - })?; + .get_state(&state_root, Some(block.slot()), true)? + .ok_or_else(|| Error::DBInconsistent(format!("Missing state {:?}", state_root)))?; // This beacon state might come from the freezer DB, which means it could have pending // updates or lots of untethered memory. We rebase it on the previous state in order to @@ -6752,12 +6781,14 @@ impl BeaconChain { prev_beacon_state = Some(beacon_state.clone()); let snapshot = BeaconSnapshot { - beacon_block: Arc::new(beacon_block), - beacon_block_root, + beacon_block: block.clone(), + execution_envelope: opt_envelope, + beacon_block_root: *block_root, beacon_state, }; dump.push(snapshot); } + Ok(dump) } diff --git a/beacon_node/beacon_chain/src/beacon_snapshot.rs b/beacon_node/beacon_chain/src/beacon_snapshot.rs index e9fde48ac6..566713e3f3 100644 --- a/beacon_node/beacon_chain/src/beacon_snapshot.rs +++ b/beacon_node/beacon_chain/src/beacon_snapshot.rs @@ -2,7 +2,7 @@ use serde::Serialize; use std::sync::Arc; use types::{ AbstractExecPayload, BeaconState, EthSpec, FullPayload, Hash256, SignedBeaconBlock, - SignedBlindedBeaconBlock, + SignedBlindedBeaconBlock, SignedExecutionPayloadEnvelope, }; /// Represents some block and its associated state. Generally, this will be used for tracking the @@ -10,6 +10,7 @@ use types::{ #[derive(Clone, Serialize, PartialEq, Debug)] pub struct BeaconSnapshot = FullPayload> { pub beacon_block: Arc>, + pub execution_envelope: Option>>, pub beacon_block_root: Hash256, pub beacon_state: BeaconState, } @@ -31,33 +32,42 @@ impl> BeaconSnapshot { /// Create a new checkpoint. pub fn new( beacon_block: Arc>, + execution_envelope: Option>>, beacon_block_root: Hash256, beacon_state: BeaconState, ) -> Self { Self { beacon_block, + execution_envelope, beacon_block_root, beacon_state, } } - /// Returns the state root from `self.beacon_block`. + /// Returns the state root from `self.beacon_block` or `self.execution_envelope` as + /// appropriate. /// /// ## Caution /// /// It is not strictly enforced that `root(self.beacon_state) == self.beacon_state_root()`. pub fn beacon_state_root(&self) -> Hash256 { - self.beacon_block.message().state_root() + if let Some(ref envelope) = self.execution_envelope { + envelope.message.state_root + } else { + self.beacon_block.message().state_root() + } } /// Update all fields of the checkpoint. pub fn update( &mut self, beacon_block: Arc>, + execution_envelope: Option>>, beacon_block_root: Hash256, beacon_state: BeaconState, ) { self.beacon_block = beacon_block; + self.execution_envelope = execution_envelope; self.beacon_block_root = beacon_block_root; self.beacon_state = beacon_state; } diff --git a/beacon_node/beacon_chain/src/block_production/gloas.rs b/beacon_node/beacon_chain/src/block_production/gloas.rs index 5d7d99b5bd..2fc4fb51f7 100644 --- a/beacon_node/beacon_chain/src/block_production/gloas.rs +++ b/beacon_node/beacon_chain/src/block_production/gloas.rs @@ -763,8 +763,12 @@ fn get_execution_payload_gloas( let latest_execution_block_hash = *state.latest_block_hash()?; let latest_gas_limit = state.latest_execution_payload_bid()?.gas_limit; - let withdrawals = - Withdrawals::::from(get_expected_withdrawals(state, spec)?).into(); + let withdrawals = if state.is_parent_block_full() { + Withdrawals::::from(get_expected_withdrawals(state, spec)?).into() + } else { + // If the previous payload was missed, carry forward the withdrawals from the state. + state.payload_expected_withdrawals()?.to_vec() + }; // Spawn a task to obtain the execution payload from the EL via a series of async calls. The // `join_handle` can be used to await the result of the function. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 59fa5ec9ec..7eb92060a2 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -358,6 +358,7 @@ where Ok(( BeaconSnapshot { beacon_block_root, + execution_envelope: None, beacon_block: Arc::new(beacon_block), beacon_state, }, @@ -616,8 +617,10 @@ where .map_err(|e| format!("Failed to initialize data column info: {:?}", e))?, ); + // TODO(gloas): add check that checkpoint state is Pending let snapshot = BeaconSnapshot { beacon_block_root: weak_subj_block_root, + execution_envelope: None, beacon_block: Arc::new(weak_subj_block), beacon_state: weak_subj_state, }; @@ -800,6 +803,7 @@ where let mut head_snapshot = BeaconSnapshot { beacon_block_root: head_block_root, + execution_envelope: None, beacon_block: Arc::new(head_block), beacon_state: head_state, }; diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index fd060e2b59..0faddd1792 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -319,6 +319,7 @@ impl CanonicalHead { let snapshot = BeaconSnapshot { beacon_block_root, + execution_envelope: None, beacon_block: Arc::new(beacon_block), beacon_state, }; @@ -695,6 +696,7 @@ impl BeaconChain { BeaconSnapshot { beacon_block: Arc::new(beacon_block), + execution_envelope: None, beacon_block_root: new_view.head_block_root, beacon_state, } diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 8981b20a55..2bb60f111a 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -77,8 +77,10 @@ async fn get_chain_segment() -> (Vec>, Vec>(); + let some_validators = (0..LOW_VALIDATOR_COUNT).collect::>(); let (genesis_state, _genesis_state_root) = harness.get_current_state_and_root(); @@ -5886,6 +5889,7 @@ async fn test_gloas_hot_state_hierarchy() { // Verify chain dump and iterators work with Gloas states. check_chain_dump(&harness, num_blocks + 1); check_iterators(&harness); + check_db_invariants(&harness); } /// Check that the HotColdDB's split_slot is equal to the start slot of the last finalized epoch. diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 33a00bfa49..c33f4840e0 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -41,7 +41,8 @@ pub use crate::scheduler::BeaconProcessorQueueLengths; use crate::scheduler::work_queue::WorkQueues; use crate::work_reprocessing_queue::{ - QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, + QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope, + ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -242,6 +243,18 @@ impl From for WorkEvent { process_fn, }, }, + ReadyWork::Envelope(QueuedGossipEnvelope { + beacon_block_slot, + beacon_block_root, + process_fn, + }) => Self { + drop_during_sync: false, + work: Work::DelayedImportEnvelope { + beacon_block_slot, + beacon_block_root, + process_fn, + }, + }, ReadyWork::RpcBlock(QueuedRpcBlock { beacon_block_root, process_fn, @@ -384,6 +397,11 @@ pub enum Work { beacon_block_root: Hash256, process_fn: AsyncFn, }, + DelayedImportEnvelope { + beacon_block_slot: Slot, + beacon_block_root: Hash256, + process_fn: AsyncFn, + }, GossipVoluntaryExit(BlockingFn), GossipProposerSlashing(BlockingFn), GossipAttesterSlashing(BlockingFn), @@ -447,6 +465,7 @@ pub enum WorkType { GossipBlobSidecar, GossipDataColumnSidecar, DelayedImportBlock, + DelayedImportEnvelope, GossipVoluntaryExit, GossipProposerSlashing, GossipAttesterSlashing, @@ -498,6 +517,7 @@ impl Work { Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar, Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar, Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock, + Work::DelayedImportEnvelope { .. } => WorkType::DelayedImportEnvelope, Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit, Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing, Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing, @@ -793,6 +813,8 @@ impl BeaconProcessor { // on the delayed ones. } else if let Some(item) = work_queues.delayed_block_queue.pop() { Some(item) + } else if let Some(item) = work_queues.delayed_envelope_queue.pop() { + Some(item) // Check gossip blocks and payloads before gossip attestations, since a block might be // required to verify some attestations. } else if let Some(item) = work_queues.gossip_block_queue.pop() { @@ -1111,6 +1133,9 @@ impl BeaconProcessor { Work::DelayedImportBlock { .. } => { work_queues.delayed_block_queue.push(work, work_id) } + Work::DelayedImportEnvelope { .. } => { + work_queues.delayed_envelope_queue.push(work, work_id) + } Work::GossipVoluntaryExit { .. } => { work_queues.gossip_voluntary_exit_queue.push(work, work_id) } @@ -1238,6 +1263,7 @@ impl BeaconProcessor { work_queues.gossip_data_column_queue.len() } WorkType::DelayedImportBlock => work_queues.delayed_block_queue.len(), + WorkType::DelayedImportEnvelope => work_queues.delayed_envelope_queue.len(), WorkType::GossipVoluntaryExit => { work_queues.gossip_voluntary_exit_queue.len() } @@ -1435,6 +1461,11 @@ impl BeaconProcessor { beacon_block_slot: _, beacon_block_root: _, process_fn, + } + | Work::DelayedImportEnvelope { + beacon_block_slot: _, + beacon_block_root: _, + process_fn, } => task_spawner.spawn_async(process_fn), Work::RpcBlock { process_fn, diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index 934659b304..e48c776b6d 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -127,6 +127,7 @@ pub struct BeaconProcessorQueueLengths { gossip_blob_queue: usize, gossip_data_column_queue: usize, delayed_block_queue: usize, + delayed_envelope_queue: usize, status_queue: usize, block_brange_queue: usize, block_broots_queue: usize, @@ -197,6 +198,7 @@ impl BeaconProcessorQueueLengths { gossip_blob_queue: 1024, gossip_data_column_queue: 1024, delayed_block_queue: 1024, + delayed_envelope_queue: 1024, status_queue: 1024, block_brange_queue: 1024, block_broots_queue: 1024, @@ -250,6 +252,7 @@ pub struct WorkQueues { pub gossip_blob_queue: FifoQueue>, pub gossip_data_column_queue: FifoQueue>, pub delayed_block_queue: FifoQueue>, + pub delayed_envelope_queue: FifoQueue>, pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, @@ -315,6 +318,7 @@ impl WorkQueues { let gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue); let gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue); let delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue); + let delayed_envelope_queue = FifoQueue::new(queue_lengths.delayed_envelope_queue); let status_queue = FifoQueue::new(queue_lengths.status_queue); let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue); @@ -375,6 +379,7 @@ impl WorkQueues { gossip_blob_queue, gossip_data_column_queue, delayed_block_queue, + delayed_envelope_queue, status_queue, block_brange_queue, block_broots_queue, diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index c99388287c..38306b3bb6 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -35,6 +35,7 @@ use types::{EthSpec, Hash256, Slot}; const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; +const GOSSIP_ENVELOPES: &str = "gossip_envelopes"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root"; @@ -51,6 +52,10 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12); /// For how long to queue light client updates for re-processing. pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); +/// Envelope timeout as a multiplier of slot duration. Envelopes waiting for their block will be +/// sent for processing after this many slots worth of time, even if the block hasn't arrived. +const QUEUED_ENVELOPE_DELAY_SLOTS: u32 = 1; + /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4); @@ -65,6 +70,9 @@ pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150); /// it's nice to have extra protection. const MAXIMUM_QUEUED_BLOCKS: usize = 16; +/// Set an arbitrary upper-bound on the number of queued envelopes to avoid DoS attacks. +const MAXIMUM_QUEUED_ENVELOPES: usize = 16; + /// How many attestations we keep before new ones get dropped. const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; @@ -93,6 +101,8 @@ pub const RECONSTRUCTION_DEADLINE: (u64, u64) = (1, 4); pub enum ReprocessQueueMessage { /// A block that has been received early and we should queue for later processing. EarlyBlock(QueuedGossipBlock), + /// An execution payload envelope that references a block not yet in fork choice. + UnknownBlockForEnvelope(QueuedGossipEnvelope), /// A gossip block for hash `X` is being imported, we should queue the rpc block for the same /// hash until the gossip block is imported. RpcBlock(QueuedRpcBlock), @@ -120,6 +130,7 @@ pub enum ReprocessQueueMessage { /// Events sent by the scheduler once they are ready for re-processing. pub enum ReadyWork { Block(QueuedGossipBlock), + Envelope(QueuedGossipEnvelope), RpcBlock(QueuedRpcBlock), IgnoredRpcBlock(IgnoredRpcBlock), Unaggregate(QueuedUnaggregate), @@ -157,6 +168,13 @@ pub struct QueuedGossipBlock { pub process_fn: AsyncFn, } +/// An execution payload envelope that arrived early and has been queued for later import. +pub struct QueuedGossipEnvelope { + pub beacon_block_slot: Slot, + pub beacon_block_root: Hash256, + pub process_fn: AsyncFn, +} + /// A block that arrived for processing when the same block was being imported over gossip. /// It is queued for later import. pub struct QueuedRpcBlock { @@ -209,6 +227,8 @@ impl From for WorkEvent { enum InboundEvent { /// A gossip block that was queued for later processing and is ready for import. ReadyGossipBlock(QueuedGossipBlock), + /// An envelope whose block has been imported and is now ready for processing. + ReadyEnvelope(Hash256), /// A rpc block that was queued because the same gossip block was being imported /// will now be retried for import. ReadyRpcBlock(QueuedRpcBlock), @@ -234,6 +254,8 @@ struct ReprocessQueue { /* Queues */ /// Queue to manage scheduled early blocks. gossip_block_delay_queue: DelayQueue, + /// Queue to manage envelope timeouts (keyed by block root). + envelope_delay_queue: DelayQueue, /// Queue to manage scheduled early blocks. rpc_block_delay_queue: DelayQueue, /// Queue to manage scheduled attestations. @@ -246,6 +268,8 @@ struct ReprocessQueue { /* Queued items */ /// Queued blocks. queued_gossip_block_roots: HashSet, + /// Queued envelopes awaiting their block, keyed by block root. + awaiting_envelopes_per_root: HashMap, /// Queued aggregated attestations. queued_aggregates: FnvHashMap, /// Queued attestations. @@ -266,6 +290,7 @@ struct ReprocessQueue { next_attestation: usize, next_lc_update: usize, early_block_debounce: TimeLatch, + envelope_delay_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, @@ -315,6 +340,13 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.envelope_delay_queue.poll_expired(cx) { + Poll::Ready(Some(block_root)) => { + return Poll::Ready(Some(InboundEvent::ReadyEnvelope(block_root.into_inner()))); + } + Poll::Ready(None) | Poll::Pending => (), + } + match self.rpc_block_delay_queue.poll_expired(cx) { Poll::Ready(Some(queued_block)) => { return Poll::Ready(Some(InboundEvent::ReadyRpcBlock(queued_block.into_inner()))); @@ -418,11 +450,13 @@ impl ReprocessQueue { work_reprocessing_rx, ready_work_tx, gossip_block_delay_queue: DelayQueue::new(), + envelope_delay_queue: DelayQueue::new(), rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), column_reconstructions_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), + awaiting_envelopes_per_root: HashMap::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), @@ -433,6 +467,7 @@ impl ReprocessQueue { next_attestation: 0, next_lc_update: 0, early_block_debounce: TimeLatch::default(), + envelope_delay_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), @@ -498,6 +533,52 @@ impl ReprocessQueue { } } } + // An envelope that references an unknown block. Queue it until the block is + // imported, or until the timeout expires. + InboundEvent::Msg(UnknownBlockForEnvelope(queued_envelope)) => { + let block_root = queued_envelope.beacon_block_root; + + // TODO(gloas): Perform lightweight pre-validation before queuing + // (e.g. verify builder signature) to prevent unsigned garbage from + // consuming queue slots. + + // Don't add the same envelope to the queue twice. This prevents DoS attacks. + if self.awaiting_envelopes_per_root.contains_key(&block_root) { + trace!( + ?block_root, + "Duplicate envelope for same block root, dropping" + ); + return; + } + + // When the queue is full, evict the oldest entry to make room for newer envelopes. + if self.awaiting_envelopes_per_root.len() >= MAXIMUM_QUEUED_ENVELOPES { + if self.envelope_delay_debounce.elapsed() { + warn!( + queue_size = MAXIMUM_QUEUED_ENVELOPES, + msg = "system resources may be saturated", + "Envelope delay queue is full, evicting oldest entry" + ); + } + if let Some(oldest_root) = + self.awaiting_envelopes_per_root.keys().next().copied() + && let Some((_envelope, delay_key)) = + self.awaiting_envelopes_per_root.remove(&oldest_root) + { + self.envelope_delay_queue.remove(&delay_key); + } + } + + // Register the timeout. + let delay_key = self.envelope_delay_queue.insert( + block_root, + self.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS, + ); + + // Store the envelope keyed by block root. + self.awaiting_envelopes_per_root + .insert(block_root, (queued_envelope, delay_key)); + } // A rpc block arrived for processing at the same time when a gossip block // for the same block hash is being imported. We wait for `QUEUED_RPC_BLOCK_DELAY` // and then send the rpc block back for processing assuming the gossip import @@ -647,6 +728,23 @@ impl ReprocessQueue { block_root, parent_root, }) => { + // Unqueue the envelope we have for this root, if any. + if let Some((envelope, delay_key)) = + self.awaiting_envelopes_per_root.remove(&block_root) + { + self.envelope_delay_queue.remove(&delay_key); + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!( + ?block_root, + "Failed to send envelope for reprocessing after block import" + ); + } + } + // Unqueue the attestations we have for this root, if any. if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { let mut sent_count = 0; @@ -802,6 +900,25 @@ impl ReprocessQueue { error!("Failed to pop queued block"); } } + // An envelope's timeout has expired. Send it for processing regardless of + // whether the block has been imported. + InboundEvent::ReadyEnvelope(block_root) => { + if let Some((envelope, _delay_key)) = + self.awaiting_envelopes_per_root.remove(&block_root) + { + debug!( + ?block_root, + "Envelope timed out waiting for block, sending for processing" + ); + if self + .ready_work_tx + .try_send(ReadyWork::Envelope(envelope)) + .is_err() + { + error!(?block_root, "Failed to send envelope after timeout"); + } + } + } InboundEvent::ReadyAttestation(queued_id) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS, @@ -941,6 +1058,11 @@ impl ReprocessQueue { &[GOSSIP_BLOCKS], self.gossip_block_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[GOSSIP_ENVELOPES], + self.awaiting_envelopes_per_root.len() as i64, + ); metrics::set_gauge_vec( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, &[RPC_BLOCKS], @@ -1339,4 +1461,163 @@ mod tests { assert_eq!(reconstruction.block_root, block_root); } } + + // Test that envelopes are properly cleaned up from `awaiting_envelopes_per_root` on timeout. + #[tokio::test] + async fn prune_awaiting_envelopes_per_root() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + // Insert an envelope. + let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root, + process_fn: Box::pin(async {}), + }); + + // Process the event to enter it into the delay queue. + queue.handle_message(InboundEvent::Msg(msg)); + + // Check that it is queued. + assert_eq!(queue.awaiting_envelopes_per_root.len(), 1); + assert!( + queue + .awaiting_envelopes_per_root + .contains_key(&beacon_block_root) + ); + + // Advance time to expire the envelope. + advance_time( + &queue.slot_clock, + queue.slot_clock.slot_duration() * QUEUED_ENVELOPE_DELAY_SLOTS * 2, + ) + .await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyEnvelope(_))); + queue.handle_message(ready_msg); + + // The entry for the block root should be gone. + assert!(queue.awaiting_envelopes_per_root.is_empty()); + } + + #[tokio::test] + async fn envelope_released_on_block_imported() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + let parent_root = Hash256::repeat_byte(0xab); + + // Insert an envelope. + let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root, + process_fn: Box::pin(async {}), + }); + + // Process the event to enter it into the delay queue. + queue.handle_message(InboundEvent::Msg(msg)); + + // Check that it is queued. + assert_eq!(queue.awaiting_envelopes_per_root.len(), 1); + + // Simulate block import. + let imported = ReprocessQueueMessage::BlockImported { + block_root: beacon_block_root, + parent_root, + }; + queue.handle_message(InboundEvent::Msg(imported)); + + // The entry for the block root should be gone. + assert!(queue.awaiting_envelopes_per_root.is_empty()); + // Delay queue entry should also be cancelled. + assert_eq!(queue.envelope_delay_queue.len(), 0); + } + + #[tokio::test] + async fn envelope_dedup_drops_second() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + // Insert an envelope. + let msg1 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root, + process_fn: Box::pin(async {}), + }); + let msg2 = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root, + process_fn: Box::pin(async {}), + }); + + // Process both events. + queue.handle_message(InboundEvent::Msg(msg1)); + queue.handle_message(InboundEvent::Msg(msg2)); + + // Only one should be queued. + assert_eq!(queue.awaiting_envelopes_per_root.len(), 1); + assert_eq!(queue.envelope_delay_queue.len(), 1); + } + + #[tokio::test] + async fn envelope_capacity_evicts_oldest() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + // Fill the queue to capacity. + for i in 0..MAXIMUM_QUEUED_ENVELOPES { + let block_root = Hash256::repeat_byte(i as u8); + let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root: block_root, + process_fn: Box::pin(async {}), + }); + queue.handle_message(InboundEvent::Msg(msg)); + } + assert_eq!( + queue.awaiting_envelopes_per_root.len(), + MAXIMUM_QUEUED_ENVELOPES + ); + + // One more should evict the oldest and insert the new one. + let overflow_root = Hash256::repeat_byte(0xff); + let msg = ReprocessQueueMessage::UnknownBlockForEnvelope(QueuedGossipEnvelope { + beacon_block_slot: Slot::new(1), + beacon_block_root: overflow_root, + process_fn: Box::pin(async {}), + }); + queue.handle_message(InboundEvent::Msg(msg)); + + // Queue should still be at capacity, with the new root present. + assert_eq!( + queue.awaiting_envelopes_per_root.len(), + MAXIMUM_QUEUED_ENVELOPES + ); + assert!( + queue + .awaiting_envelopes_per_root + .contains_key(&overflow_root) + ); + } } diff --git a/beacon_node/http_api/src/attestation_performance.rs b/beacon_node/http_api/src/attestation_performance.rs deleted file mode 100644 index 05ed36e68b..0000000000 --- a/beacon_node/http_api/src/attestation_performance.rs +++ /dev/null @@ -1,217 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; -use eth2::lighthouse::{ - AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, -}; -use state_processing::{ - BlockReplayError, BlockReplayer, per_epoch_processing::EpochProcessingSummary, -}; -use std::sync::Arc; -use types::{BeaconState, BeaconStateError, EthSpec, Hash256}; -use warp_utils::reject::{custom_bad_request, custom_server_error, unhandled_error}; - -const MAX_REQUEST_RANGE_EPOCHS: usize = 100; -const BLOCK_ROOT_CHUNK_SIZE: usize = 100; - -#[derive(Debug)] -// We don't use the inner values directly, but they're used in the Debug impl. -enum AttestationPerformanceError { - BlockReplay(#[allow(dead_code)] BlockReplayError), - BeaconState(#[allow(dead_code)] BeaconStateError), - UnableToFindValidator(#[allow(dead_code)] usize), -} - -impl From for AttestationPerformanceError { - fn from(e: BlockReplayError) -> Self { - Self::BlockReplay(e) - } -} - -impl From for AttestationPerformanceError { - fn from(e: BeaconStateError) -> Self { - Self::BeaconState(e) - } -} - -pub fn get_attestation_performance( - target: String, - query: AttestationPerformanceQuery, - chain: Arc>, -) -> Result, warp::Rejection> { - let spec = &chain.spec; - // We increment by 2 here so that when we build the state from the `prior_slot` it is - // still 1 epoch ahead of the first epoch we want to analyse. - // This ensures the `.is_previous_epoch_X` functions on `EpochProcessingSummary` return results - // for the correct epoch. - let start_epoch = query.start_epoch + 2; - let start_slot = start_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let prior_slot = start_slot - 1; - - let end_epoch = query.end_epoch + 2; - let end_slot = end_epoch.end_slot(T::EthSpec::slots_per_epoch()); - - // Ensure end_epoch is smaller than the current epoch - 1. - let current_epoch = chain.epoch().map_err(unhandled_error)?; - if query.end_epoch >= current_epoch - 1 { - return Err(custom_bad_request(format!( - "end_epoch must be less than the current epoch - 1. current: {}, end: {}", - current_epoch, query.end_epoch - ))); - } - - // Check query is valid. - if start_epoch > end_epoch { - return Err(custom_bad_request(format!( - "start_epoch must not be larger than end_epoch. start: {}, end: {}", - query.start_epoch, query.end_epoch - ))); - } - - // The response size can grow exceptionally large therefore we should check that the - // query is within permitted bounds to prevent potential OOM errors. - if (end_epoch - start_epoch).as_usize() > MAX_REQUEST_RANGE_EPOCHS { - return Err(custom_bad_request(format!( - "end_epoch must not exceed start_epoch by more than {} epochs. start: {}, end: {}", - MAX_REQUEST_RANGE_EPOCHS, query.start_epoch, query.end_epoch - ))); - } - - // Either use the global validator set, or the specified index. - // - // Does no further validation of the indices, so in the event an index has not yet been - // activated or does not yet exist (according to the head state), it will return all fields as - // `false`. - let index_range = if target.to_lowercase() == "global" { - chain - .with_head(|head| Ok((0..head.beacon_state.validators().len() as u64).collect())) - .map_err(unhandled_error::)? - } else { - vec![target.parse::().map_err(|_| { - custom_bad_request(format!( - "Invalid validator index: {:?}", - target.to_lowercase() - )) - })?] - }; - - // Load block roots. - let mut block_roots: Vec = chain - .forwards_iter_block_roots_until(start_slot, end_slot) - .map_err(unhandled_error)? - .map(|res| res.map(|(root, _)| root)) - .collect::, _>>() - .map_err(unhandled_error)?; - block_roots.dedup(); - - // Load first block so we can get its parent. - let first_block_root = block_roots.first().ok_or_else(|| { - custom_server_error( - "No blocks roots could be loaded. Ensure the beacon node is synced.".to_string(), - ) - })?; - let first_block = chain - .get_blinded_block(first_block_root) - .and_then(|maybe_block| { - maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root)) - }) - .map_err(unhandled_error)?; - - // Load the block of the prior slot which will be used to build the starting state. - let prior_block = chain - .get_blinded_block(&first_block.parent_root()) - .and_then(|maybe_block| { - maybe_block - .ok_or_else(|| BeaconChainError::MissingBeaconBlock(first_block.parent_root())) - }) - .map_err(unhandled_error)?; - - // Load state for block replay. - let state_root = prior_block.state_root(); - - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. - let state = chain - .get_state(&state_root, Some(prior_slot), true) - .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) - .map_err(unhandled_error)?; - - // Allocate an AttestationPerformance vector for each validator in the range. - let mut perfs: Vec = - AttestationPerformance::initialize(index_range.clone()); - - let post_slot_hook = |state: &mut BeaconState, - summary: Option>, - _is_skip_slot: bool| - -> Result<(), AttestationPerformanceError> { - // If a `summary` was not output then an epoch boundary was not crossed - // so we move onto the next slot. - if let Some(summary) = summary { - for (position, i) in index_range.iter().enumerate() { - let index = *i as usize; - - let val = perfs - .get_mut(position) - .ok_or(AttestationPerformanceError::UnableToFindValidator(index))?; - - // We are two epochs ahead since the summary is generated for - // `state.previous_epoch()` then `summary.is_previous_epoch_X` functions return - // data for the epoch before that. - let epoch = state.previous_epoch().as_u64() - 1; - - let is_active = summary.is_active_unslashed_in_previous_epoch(index); - - let received_source_reward = summary.is_previous_epoch_source_attester(index)?; - - let received_head_reward = summary.is_previous_epoch_head_attester(index)?; - - let received_target_reward = summary.is_previous_epoch_target_attester(index)?; - - let inclusion_delay = summary - .previous_epoch_inclusion_info(index) - .map(|info| info.delay); - - let perf = AttestationPerformanceStatistics { - active: is_active, - head: received_head_reward, - target: received_target_reward, - source: received_source_reward, - delay: inclusion_delay, - }; - - val.epochs.insert(epoch, perf); - } - } - Ok(()) - }; - - // Initialize block replayer - let mut replayer = BlockReplayer::new(state, spec) - .no_state_root_iter() - .no_signature_verification() - .minimal_block_root_verification() - .post_slot_hook(Box::new(post_slot_hook)); - - // Iterate through block roots in chunks to reduce load on memory. - for block_root_chunks in block_roots.chunks(BLOCK_ROOT_CHUNK_SIZE) { - // Load blocks from the block root chunks. - let blocks = block_root_chunks - .iter() - .map(|root| { - chain - .get_blinded_block(root) - .and_then(|maybe_block| { - maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root)) - }) - .map_err(unhandled_error) - }) - .collect::, _>>()?; - - // TODO(gloas): add payloads - replayer = replayer - .apply_blocks(blocks, vec![], None) - .map_err(|e| custom_server_error(format!("{:?}", e)))?; - } - - drop(replayer); - - Ok(perfs) -} diff --git a/beacon_node/http_api/src/block_packing_efficiency.rs b/beacon_node/http_api/src/block_packing_efficiency.rs deleted file mode 100644 index 725a0648a5..0000000000 --- a/beacon_node/http_api/src/block_packing_efficiency.rs +++ /dev/null @@ -1,410 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; -use eth2::lighthouse::{ - BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, -}; -use parking_lot::Mutex; -use state_processing::{ - BlockReplayError, BlockReplayer, per_epoch_processing::EpochProcessingSummary, -}; -use std::collections::{HashMap, HashSet}; -use std::marker::PhantomData; -use std::sync::Arc; -use types::{ - AttestationRef, BeaconCommittee, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, - Epoch, EthSpec, Hash256, OwnedBeaconCommittee, RelativeEpoch, SignedBeaconBlock, Slot, -}; -use warp_utils::reject::{custom_bad_request, custom_server_error, unhandled_error}; - -/// Load blocks from block roots in chunks to reduce load on memory. -const BLOCK_ROOT_CHUNK_SIZE: usize = 100; - -#[derive(Debug)] -// We don't use the inner values directly, but they're used in the Debug impl. -enum PackingEfficiencyError { - BlockReplay(#[allow(dead_code)] BlockReplayError), - BeaconState(#[allow(dead_code)] BeaconStateError), - CommitteeStoreError(#[allow(dead_code)] Slot), - InvalidAttestationError, -} - -impl From for PackingEfficiencyError { - fn from(e: BlockReplayError) -> Self { - Self::BlockReplay(e) - } -} - -impl From for PackingEfficiencyError { - fn from(e: BeaconStateError) -> Self { - Self::BeaconState(e) - } -} - -struct CommitteeStore { - current_epoch_committees: Vec, - previous_epoch_committees: Vec, -} - -impl CommitteeStore { - fn new() -> Self { - CommitteeStore { - current_epoch_committees: Vec::new(), - previous_epoch_committees: Vec::new(), - } - } -} - -struct PackingEfficiencyHandler { - current_slot: Slot, - current_epoch: Epoch, - prior_skip_slots: u64, - available_attestations: HashSet, - included_attestations: HashMap, - committee_store: CommitteeStore, - _phantom: PhantomData, -} - -impl PackingEfficiencyHandler { - fn new( - start_epoch: Epoch, - starting_state: BeaconState, - spec: &ChainSpec, - ) -> Result { - let mut handler = PackingEfficiencyHandler { - current_slot: start_epoch.start_slot(E::slots_per_epoch()), - current_epoch: start_epoch, - prior_skip_slots: 0, - available_attestations: HashSet::new(), - included_attestations: HashMap::new(), - committee_store: CommitteeStore::new(), - _phantom: PhantomData, - }; - - handler.compute_epoch(start_epoch, &starting_state, spec)?; - Ok(handler) - } - - fn update_slot(&mut self, slot: Slot) { - self.current_slot = slot; - if slot % E::slots_per_epoch() == 0 { - self.current_epoch = Epoch::new(slot.as_u64() / E::slots_per_epoch()); - } - } - - fn prune_included_attestations(&mut self) { - let epoch = self.current_epoch; - self.included_attestations.retain(|x, _| { - x.slot >= Epoch::new(epoch.as_u64().saturating_sub(2)).start_slot(E::slots_per_epoch()) - }); - } - - fn prune_available_attestations(&mut self) { - let slot = self.current_slot; - self.available_attestations - .retain(|x| x.slot >= (slot.as_u64().saturating_sub(E::slots_per_epoch()))); - } - - fn apply_block( - &mut self, - block: &SignedBeaconBlock>, - ) -> Result { - let block_body = block.message().body(); - let attestations = block_body.attestations(); - - let mut attestations_in_block = HashMap::new(); - for attestation in attestations { - match attestation { - AttestationRef::Base(attn) => { - for (position, voted) in attn.aggregation_bits.iter().enumerate() { - if voted { - let unique_attestation = UniqueAttestation { - slot: attn.data.slot, - committee_index: attn.data.index, - committee_position: position, - }; - let inclusion_distance: u64 = block - .slot() - .as_u64() - .checked_sub(attn.data.slot.as_u64()) - .ok_or(PackingEfficiencyError::InvalidAttestationError)?; - - self.available_attestations.remove(&unique_attestation); - attestations_in_block.insert(unique_attestation, inclusion_distance); - } - } - } - AttestationRef::Electra(attn) => { - for (position, voted) in attn.aggregation_bits.iter().enumerate() { - if voted { - let unique_attestation = UniqueAttestation { - slot: attn.data.slot, - committee_index: attn.data.index, - committee_position: position, - }; - let inclusion_distance: u64 = block - .slot() - .as_u64() - .checked_sub(attn.data.slot.as_u64()) - .ok_or(PackingEfficiencyError::InvalidAttestationError)?; - - self.available_attestations.remove(&unique_attestation); - attestations_in_block.insert(unique_attestation, inclusion_distance); - } - } - } - } - } - - // Remove duplicate attestations as these yield no reward. - attestations_in_block.retain(|x, _| !self.included_attestations.contains_key(x)); - self.included_attestations - .extend(attestations_in_block.clone()); - - Ok(attestations_in_block.len()) - } - - fn add_attestations(&mut self, slot: Slot) -> Result<(), PackingEfficiencyError> { - let committees = self.get_committees_at_slot(slot)?; - for committee in committees { - for position in 0..committee.committee.len() { - let unique_attestation = UniqueAttestation { - slot, - committee_index: committee.index, - committee_position: position, - }; - self.available_attestations.insert(unique_attestation); - } - } - - Ok(()) - } - - fn compute_epoch( - &mut self, - epoch: Epoch, - state: &BeaconState, - spec: &ChainSpec, - ) -> Result<(), PackingEfficiencyError> { - // Free some memory by pruning old attestations from the included set. - self.prune_included_attestations(); - - let new_committees = if state.committee_cache_is_initialized(RelativeEpoch::Current) { - state - .get_beacon_committees_at_epoch(RelativeEpoch::Current)? - .into_iter() - .map(BeaconCommittee::into_owned) - .collect::>() - } else { - state - .initialize_committee_cache(epoch, spec)? - .get_all_beacon_committees()? - .into_iter() - .map(BeaconCommittee::into_owned) - .collect::>() - }; - - self.committee_store - .previous_epoch_committees - .clone_from(&self.committee_store.current_epoch_committees); - - self.committee_store.current_epoch_committees = new_committees; - - Ok(()) - } - - fn get_committees_at_slot( - &self, - slot: Slot, - ) -> Result, PackingEfficiencyError> { - let mut committees = Vec::new(); - - for committee in &self.committee_store.current_epoch_committees { - if committee.slot == slot { - committees.push(committee.clone()); - } - } - for committee in &self.committee_store.previous_epoch_committees { - if committee.slot == slot { - committees.push(committee.clone()); - } - } - - if committees.is_empty() { - return Err(PackingEfficiencyError::CommitteeStoreError(slot)); - } - - Ok(committees) - } -} - -pub fn get_block_packing_efficiency( - query: BlockPackingEfficiencyQuery, - chain: Arc>, -) -> Result, warp::Rejection> { - let spec = &chain.spec; - - let start_epoch = query.start_epoch; - let start_slot = start_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let prior_slot = start_slot - 1; - - let end_epoch = query.end_epoch; - let end_slot = end_epoch.end_slot(T::EthSpec::slots_per_epoch()); - - // Check query is valid. - if start_epoch > end_epoch || start_epoch == 0 { - return Err(custom_bad_request(format!( - "invalid start and end epochs: {}, {}", - start_epoch, end_epoch - ))); - } - - let prior_epoch = start_epoch - 1; - let start_slot_of_prior_epoch = prior_epoch.start_slot(T::EthSpec::slots_per_epoch()); - - // Load block roots. - let mut block_roots: Vec = chain - .forwards_iter_block_roots_until(start_slot_of_prior_epoch, end_slot) - .map_err(unhandled_error)? - .collect::, _>>() - .map_err(unhandled_error)? - .iter() - .map(|(root, _)| *root) - .collect(); - block_roots.dedup(); - - let first_block_root = block_roots - .first() - .ok_or_else(|| custom_server_error("no blocks were loaded".to_string()))?; - - let first_block = chain - .get_blinded_block(first_block_root) - .and_then(|maybe_block| { - maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root)) - }) - .map_err(unhandled_error)?; - - // Load state for block replay. - let starting_state_root = first_block.state_root(); - - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. - let starting_state = chain - .get_state(&starting_state_root, Some(prior_slot), true) - .and_then(|maybe_state| { - maybe_state.ok_or(BeaconChainError::MissingBeaconState(starting_state_root)) - }) - .map_err(unhandled_error)?; - - // Initialize response vector. - let mut response = Vec::new(); - - // Initialize handler. - let handler = Arc::new(Mutex::new( - PackingEfficiencyHandler::new(prior_epoch, starting_state.clone(), spec) - .map_err(|e| custom_server_error(format!("{:?}", e)))?, - )); - - let pre_slot_hook = - |_, state: &mut BeaconState| -> Result<(), PackingEfficiencyError> { - // Add attestations to `available_attestations`. - handler.lock().add_attestations(state.slot())?; - Ok(()) - }; - - let post_slot_hook = |state: &mut BeaconState, - _summary: Option>, - is_skip_slot: bool| - -> Result<(), PackingEfficiencyError> { - handler.lock().update_slot(state.slot()); - - // Check if this a new epoch. - if state.slot() % T::EthSpec::slots_per_epoch() == 0 { - handler.lock().compute_epoch( - state.slot().epoch(T::EthSpec::slots_per_epoch()), - state, - spec, - )?; - } - - if is_skip_slot { - handler.lock().prior_skip_slots += 1; - } - - // Remove expired attestations. - handler.lock().prune_available_attestations(); - - Ok(()) - }; - - let pre_block_hook = |_state: &mut BeaconState, - block: &SignedBeaconBlock<_, BlindedPayload<_>>| - -> Result<(), PackingEfficiencyError> { - let slot = block.slot(); - - let block_message = block.message(); - // Get block proposer info. - let proposer_info = ProposerInfo { - validator_index: block_message.proposer_index(), - graffiti: block_message.body().graffiti().as_utf8_lossy(), - }; - - // Store the count of available attestations at this point. - // In the future it may be desirable to check that the number of available attestations - // does not exceed the maximum possible amount given the length of available committees. - let available_count = handler.lock().available_attestations.len(); - - // Get all attestations included in the block. - let included = handler.lock().apply_block(block)?; - - let efficiency = BlockPackingEfficiency { - slot, - block_hash: block.canonical_root(), - proposer_info, - available_attestations: available_count, - included_attestations: included, - prior_skip_slots: handler.lock().prior_skip_slots, - }; - - // Write to response. - if slot >= start_slot { - response.push(efficiency); - } - - handler.lock().prior_skip_slots = 0; - - Ok(()) - }; - - // Build BlockReplayer. - let mut replayer = BlockReplayer::new(starting_state, spec) - .no_state_root_iter() - .no_signature_verification() - .minimal_block_root_verification() - .pre_slot_hook(Box::new(pre_slot_hook)) - .post_slot_hook(Box::new(post_slot_hook)) - .pre_block_hook(Box::new(pre_block_hook)); - - // Iterate through the block roots, loading blocks in chunks to reduce load on memory. - for block_root_chunks in block_roots.chunks(BLOCK_ROOT_CHUNK_SIZE) { - // Load blocks from the block root chunks. - let blocks = block_root_chunks - .iter() - .map(|root| { - chain - .get_blinded_block(root) - .and_then(|maybe_block| { - maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root)) - }) - .map_err(unhandled_error) - }) - .collect::, _>>()?; - - // TODO(gloas): add payloads - replayer = replayer - .apply_blocks(blocks, vec![], None) - .map_err(|e: PackingEfficiencyError| custom_server_error(format!("{:?}", e)))?; - } - - drop(replayer); - - Ok(response) -} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 8fa43ee10e..19d73be89a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -7,11 +7,9 @@ //! used for development. mod aggregate_attestation; -mod attestation_performance; mod attester_duties; mod beacon; mod block_id; -mod block_packing_efficiency; mod build_block_contents; mod builder_states; mod custody; @@ -3101,39 +3099,6 @@ pub fn serve( }, ); - // GET lighthouse/analysis/attestation_performance/{index} - let get_lighthouse_attestation_performance = warp::path("lighthouse") - .and(warp::path("analysis")) - .and(warp::path("attestation_performance")) - .and(warp::path::param::()) - .and(warp::query::()) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |target, query, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { - attestation_performance::get_attestation_performance(target, query, chain) - }) - }, - ); - - // GET lighthouse/analysis/block_packing_efficiency - let get_lighthouse_block_packing_efficiency = warp::path("lighthouse") - .and(warp::path("analysis")) - .and(warp::path("block_packing_efficiency")) - .and(warp::query::()) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then( - |query, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { - block_packing_efficiency::get_block_packing_efficiency(query, chain) - }) - }, - ); - let get_events = eth_v1 .clone() .and(warp::path("events")) @@ -3369,12 +3334,10 @@ pub fn serve( .uor(get_lighthouse_database_info) .uor(get_lighthouse_database_invariants) .uor(get_lighthouse_custody_info) - .uor(get_lighthouse_attestation_performance) .uor(get_beacon_light_client_optimistic_update) .uor(get_beacon_light_client_finality_update) .uor(get_beacon_light_client_bootstrap) .uor(get_beacon_light_client_updates) - .uor(get_lighthouse_block_packing_efficiency) .uor(get_events) .uor(get_expected_withdrawals) .uor(lighthouse_log_events.boxed()) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 3335315157..1f55d9a878 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -20,7 +20,9 @@ use beacon_chain::{ }; use beacon_chain::{ blob_verification::{GossipBlobError, GossipVerifiedBlob}, - payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope, + payload_envelope_verification::{ + EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, + }, }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; @@ -49,8 +51,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ DuplicateCache, GossipAggregatePackage, GossipAttestationBatch, work_reprocessing_queue::{ - QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, - ReprocessQueueMessage, + QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate, + QueuedUnaggregate, ReprocessQueueMessage, }, }; @@ -3332,6 +3334,61 @@ impl NetworkBeaconProcessor { verified_envelope } + + Err(EnvelopeError::BlockRootUnknown { block_root }) => { + let envelope_slot = envelope.slot(); + + debug!( + ?block_root, + %envelope_slot, + "Envelope references unknown block, deferring to reprocess queue" + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + + let inner_self = self.clone(); + let chain = self.chain.clone(); + let process_fn = Box::pin(async move { + match chain.verify_envelope_for_gossip(envelope).await { + Ok(verified_envelope) => { + inner_self + .process_gossip_verified_execution_payload_envelope( + peer_id, + verified_envelope, + ) + .await; + } + Err(e) => { + debug!( + error = ?e, + "Deferred envelope failed verification" + ); + } + } + }); + + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess(ReprocessQueueMessage::UnknownBlockForEnvelope( + QueuedGossipEnvelope { + beacon_block_slot: envelope_slot, + beacon_block_root: block_root, + process_fn, + }, + )), + }) + .is_err() + { + error!( + %envelope_slot, + ?block_root, + "Failed to defer envelope import" + ); + } + return None; + } // TODO(gloas) penalize peers accordingly Err(_) => return None, }; diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 5fa8c729cb..c5ccbc2ae6 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -2090,3 +2090,8 @@ async fn test_data_columns_by_range_no_duplicates_with_skip_slots() { unique_roots.len(), ); } + +// TODO(ePBS): Add integration tests for envelope deferral (UnknownBlockForEnvelope): +// 1. Gossip envelope arrives before its block → queued via UnknownBlockForEnvelope +// 2. Block imported → envelope released and processed successfully +// 3. Timeout path → envelope released and re-verified diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 428086c464..8ef91b3c74 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1906,6 +1906,51 @@ impl, Cold: ItemStore> HotColdDB } } + /// Recompute the payload status for a state at `slot` that is stored in the cold DB. + /// + /// This function returns an error for any `slot` that is outside the range of slots stored in + /// the freezer DB. + /// + /// For all slots prior to Gloas, it returns `Pending`. + /// + /// For post-Gloas slots the algorithm is: + /// + /// 1. Load the most recently applied block at `slot` (may not be from `slot` in case of a skip) + /// 2. Load the canonical `state_root` at the slot of the block. If this `state_root` matches + /// the one in the block then we know the state at *that* slot is canonically empty (no + /// payload). Conversely, if it is different, we know that the block's slot is full (assuming + /// no database corruption). + /// 3. The payload status of `slot` is the same as the payload status of `block.slot()`, because + /// we only care about whether a beacon block or payload was applied most recently, and + /// `block` is by definition the most-recently-applied block. + /// + /// All of this mucking around could be avoided if we do a schema migration to record the + /// payload status in the database. For now, this is simpler. + fn get_cold_state_payload_status(&self, slot: Slot) -> Result { + // Pre-Gloas states are always `Pending`. + if !self.spec.fork_name_at_slot::(slot).gloas_enabled() { + return Ok(StatePayloadStatus::Pending); + } + + let block_root = self + .get_cold_block_root(slot)? + .ok_or(HotColdDBError::MissingFrozenBlock(slot))?; + + let block = self + .get_blinded_block(&block_root)? + .ok_or(Error::MissingBlock(block_root))?; + + let state_root = self + .get_cold_state_root(block.slot())? + .ok_or(HotColdDBError::MissingRestorePointState(block.slot()))?; + + if block.state_root() != state_root { + Ok(StatePayloadStatus::Full) + } else { + Ok(StatePayloadStatus::Pending) + } + } + fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result { if let Some(buffer) = self .state_cache @@ -2454,8 +2499,7 @@ impl, Cold: ItemStore> HotColdDB self.forwards_state_roots_iterator_until(base_state.slot(), slot, || { Err(Error::StateShouldNotBeRequired(slot)) })?; - // TODO(gloas): calculate correct payload status for cold states - let payload_status = StatePayloadStatus::Pending; + let payload_status = self.get_cold_state_payload_status(slot)?; let state = self.replay_blocks( base_state, blocks, @@ -2591,9 +2635,10 @@ impl, Cold: ItemStore> HotColdDB { return Ok((blocks, vec![])); } - // TODO(gloas): wire this up - let end_block_root = Hash256::ZERO; - let desired_payload_status = StatePayloadStatus::Pending; + let end_block_root = self + .get_cold_block_root(end_slot)? + .ok_or(HotColdDBError::MissingFrozenBlock(end_slot))?; + let desired_payload_status = self.get_cold_state_payload_status(end_slot)?; let envelopes = self.load_payload_envelopes_for_blocks( &blocks, end_block_root, diff --git a/beacon_node/store/src/invariants.rs b/beacon_node/store/src/invariants.rs index eb5232d344..d251fb8800 100644 --- a/beacon_node/store/src/invariants.rs +++ b/beacon_node/store/src/invariants.rs @@ -319,6 +319,10 @@ impl, Cold: ItemStore> HotColdDB .spec .fulu_fork_epoch .map(|epoch| epoch.start_slot(E::slots_per_epoch())); + let gloas_fork_slot = self + .spec + .gloas_fork_epoch + .map(|epoch| epoch.start_slot(E::slots_per_epoch())); let oldest_blob_slot = self.get_blob_info().oldest_blob_slot; let oldest_data_column_slot = self.get_data_column_info().oldest_data_column_slot; @@ -343,17 +347,28 @@ impl, Cold: ItemStore> HotColdDB } // Invariant 5: execution payload consistency. - // TODO(gloas): reconsider this invariant if check_payloads && let Some(bellatrix_slot) = bellatrix_fork_slot && slot >= bellatrix_slot - && !self.execution_payload_exists(&block_root)? - && !self.payload_envelope_exists(&block_root)? { - result.add_violation(InvariantViolation::ExecutionPayloadMissing { - block_root, - slot, - }); + if let Some(gloas_slot) = gloas_fork_slot + && slot >= gloas_slot + { + // For Gloas there is never a true payload stored at slot 0. + // TODO(gloas): still need to account for non-canonical payloads once pruning + // is implemented. + if slot != 0 && !self.payload_envelope_exists(&block_root)? { + result.add_violation(InvariantViolation::ExecutionPayloadMissing { + block_root, + slot, + }); + } + } else if !self.execution_payload_exists(&block_root)? { + result.add_violation(InvariantViolation::ExecutionPayloadMissing { + block_root, + slot, + }); + } } // Invariant 6: blob sidecar consistency. diff --git a/book/src/api_lighthouse.md b/book/src/api_lighthouse.md index 2fd7290cb2..c2e4fbdd5a 100644 --- a/book/src/api_lighthouse.md +++ b/book/src/api_lighthouse.md @@ -512,126 +512,6 @@ As all testnets and Mainnet have been merged, both values will be the same after } ``` -## `/lighthouse/analysis/attestation_performance/{index}` - -Fetch information about the attestation performance of a validator index or all validators for a -range of consecutive epochs. - -Two query parameters are required: - -- `start_epoch` (inclusive): the first epoch to compute attestation performance for. -- `end_epoch` (inclusive): the final epoch to compute attestation performance for. - -Example: - -```bash -curl -X GET "http://localhost:5052/lighthouse/analysis/attestation_performance/1?start_epoch=1&end_epoch=1" | jq -``` - -```json -[ - { - "index": 1, - "epochs": { - "1": { - "active": true, - "head": true, - "target": true, - "source": true, - "delay": 1 - } - } - } -] -``` - -Instead of specifying a validator index, you can specify the entire validator set by using `global`: - -```bash -curl -X GET "http://localhost:5052/lighthouse/analysis/attestation_performance/global?start_epoch=1&end_epoch=1" | jq -``` - -```json -[ - { - "index": 0, - "epochs": { - "1": { - "active": true, - "head": true, - "target": true, - "source": true, - "delay": 1 - } - } - }, - { - "index": 1, - "epochs": { - "1": { - "active": true, - "head": true, - "target": true, - "source": true, - "delay": 1 - } - } - }, - { - .. - } -] - -``` - -Caveats: - -- For maximum efficiency the start_epoch should satisfy `(start_epoch * slots_per_epoch) % slots_per_restore_point == 1`. - This is because the state *prior* to the `start_epoch` needs to be loaded from the database, - and loading a state on a boundary is most efficient. - -## `/lighthouse/analysis/block_packing` - -Fetch information about the block packing efficiency of blocks for a range of consecutive -epochs. - -Two query parameters are required: - -- `start_epoch` (inclusive): the epoch of the first block to compute packing efficiency for. -- `end_epoch` (inclusive): the epoch of the last block to compute packing efficiency for. - -```bash -curl -X GET "http://localhost:5052/lighthouse/analysis/block_packing_efficiency?start_epoch=1&end_epoch=1" | jq -``` - -An excerpt of the response looks like: - -```json -[ - { - "slot": "33", - "block_hash": "0xb20970bb97c6c6de6b1e2b689d6381dd15b3d3518fbaee032229495f963bd5da", - "proposer_info": { - "validator_index": 855, - "graffiti": "poapZoJ7zWNfK7F3nWjEausWVBvKa6gA" - }, - "available_attestations": 3805, - "included_attestations": 1143, - "prior_skip_slots": 1 - }, - { - .. - } -] -``` - -Caveats: - -- `start_epoch` must not be `0`. -- For maximum efficiency the `start_epoch` should satisfy `(start_epoch * slots_per_epoch) % slots_per_restore_point == 1`. - This is because the state *prior* to the `start_epoch` needs to be loaded from the database, and - loading a state on a boundary is most efficient. - ## `/lighthouse/logs` This is a Server Side Event subscription endpoint. This allows a user to read diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 3c039b16b3..5ff7a7e0f0 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -1,7 +1,5 @@ //! This module contains endpoints that are non-standard and only available on Lighthouse servers. -mod attestation_performance; -mod block_packing_efficiency; mod custody; pub mod sync_state; @@ -15,12 +13,6 @@ use serde::{Deserialize, Serialize}; use ssz::four_byte_option_impl; use ssz_derive::{Decode, Encode}; -pub use attestation_performance::{ - AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, -}; -pub use block_packing_efficiency::{ - BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, -}; pub use custody::CustodyInfo; // Define "legacy" implementations of `Option` which use four bytes for encoding the union @@ -310,52 +302,4 @@ impl BeaconNodeHttpClient { self.post_with_response(path, &req).await } - - /* - Analysis endpoints. - */ - - /// `GET` lighthouse/analysis/block_packing?start_epoch,end_epoch - pub async fn get_lighthouse_analysis_block_packing( - &self, - start_epoch: Epoch, - end_epoch: Epoch, - ) -> Result, Error> { - let mut path = self.server.expose_full().clone(); - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("lighthouse") - .push("analysis") - .push("block_packing_efficiency"); - - path.query_pairs_mut() - .append_pair("start_epoch", &start_epoch.to_string()) - .append_pair("end_epoch", &end_epoch.to_string()); - - self.get(path).await - } - - /// `GET` lighthouse/analysis/attestation_performance/{index}?start_epoch,end_epoch - pub async fn get_lighthouse_analysis_attestation_performance( - &self, - start_epoch: Epoch, - end_epoch: Epoch, - target: String, - ) -> Result, Error> { - let mut path = self.server.expose_full().clone(); - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("lighthouse") - .push("analysis") - .push("attestation_performance") - .push(&target); - - path.query_pairs_mut() - .append_pair("start_epoch", &start_epoch.to_string()) - .append_pair("end_epoch", &end_epoch.to_string()); - - self.get(path).await - } } diff --git a/common/eth2/src/lighthouse/attestation_performance.rs b/common/eth2/src/lighthouse/attestation_performance.rs deleted file mode 100644 index 5ce1d90a38..0000000000 --- a/common/eth2/src/lighthouse/attestation_performance.rs +++ /dev/null @@ -1,39 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use types::Epoch; - -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct AttestationPerformanceStatistics { - pub active: bool, - pub head: bool, - pub target: bool, - pub source: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub delay: Option, -} - -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct AttestationPerformance { - pub index: u64, - pub epochs: HashMap, -} - -impl AttestationPerformance { - pub fn initialize(indices: Vec) -> Vec { - let mut vec = Vec::with_capacity(indices.len()); - for index in indices { - vec.push(Self { - index, - ..Default::default() - }) - } - vec - } -} - -/// Query parameters for the `/lighthouse/analysis/attestation_performance` endpoint. -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct AttestationPerformanceQuery { - pub start_epoch: Epoch, - pub end_epoch: Epoch, -} diff --git a/common/eth2/src/lighthouse/block_packing_efficiency.rs b/common/eth2/src/lighthouse/block_packing_efficiency.rs deleted file mode 100644 index 0ad6f46031..0000000000 --- a/common/eth2/src/lighthouse/block_packing_efficiency.rs +++ /dev/null @@ -1,34 +0,0 @@ -use serde::{Deserialize, Serialize}; -use types::{Epoch, Hash256, Slot}; - -type CommitteePosition = usize; -type Committee = u64; -type ValidatorIndex = u64; - -#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] -pub struct UniqueAttestation { - pub slot: Slot, - pub committee_index: Committee, - pub committee_position: CommitteePosition, -} -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct ProposerInfo { - pub validator_index: ValidatorIndex, - pub graffiti: String, -} - -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockPackingEfficiency { - pub slot: Slot, - pub block_hash: Hash256, - pub proposer_info: ProposerInfo, - pub available_attestations: usize, - pub included_attestations: usize, - pub prior_skip_slots: u64, -} - -#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockPackingEfficiencyQuery { - pub start_epoch: Epoch, - pub end_epoch: Epoch, -} diff --git a/consensus/state_processing/src/block_replayer.rs b/consensus/state_processing/src/block_replayer.rs index a10d6179fe..f5f06d1cb9 100644 --- a/consensus/state_processing/src/block_replayer.rs +++ b/consensus/state_processing/src/block_replayer.rs @@ -313,6 +313,7 @@ where // indicates that the parent is full (and it hasn't already been applied). state_root = if block.fork_name_unchecked().gloas_enabled() && self.state.slot() == self.state.latest_block_header().slot + && self.state.payload_status() == StatePayloadStatus::Pending { let latest_bid_block_hash = self .state diff --git a/testing/simulator/src/checks.rs b/testing/simulator/src/checks.rs index 35200692c3..de202e5812 100644 --- a/testing/simulator/src/checks.rs +++ b/testing/simulator/src/checks.rs @@ -463,6 +463,9 @@ pub async fn reconnect_to_execution_layer( } /// Ensure all validators have attested correctly. +/// +/// Checks attestation rewards for head, target, and source. +/// A positive reward indicates a correct vote. pub async fn check_attestation_correctness( network: LocalNetwork, start_epoch: u64, @@ -476,54 +479,49 @@ pub async fn check_attestation_correctness( let remote_node = &network.remote_nodes()?[node_index]; - let results = remote_node - .get_lighthouse_analysis_attestation_performance( - Epoch::new(start_epoch), - Epoch::new(upto_epoch - 2), - "global".to_string(), - ) - .await - .map_err(|e| format!("Unable to get attestation performance: {e}"))?; - - let mut active_successes: f64 = 0.0; let mut head_successes: f64 = 0.0; let mut target_successes: f64 = 0.0; let mut source_successes: f64 = 0.0; - let mut total: f64 = 0.0; - for result in results { - for epochs in result.epochs.values() { + let end_epoch = upto_epoch + .checked_sub(2) + .ok_or_else(|| "upto_epoch must be >= 2 to have attestation rewards".to_string())?; + for epoch in start_epoch..=end_epoch { + let response = remote_node + .post_beacon_rewards_attestations(Epoch::new(epoch), &[]) + .await + .map_err(|e| format!("Unable to get attestation rewards for epoch {epoch}: {e}"))?; + + for reward in &response.data.total_rewards { total += 1.0; - if epochs.active { - active_successes += 1.0; - } - if epochs.head { + // A positive reward means the validator made a correct vote. + if reward.head > 0 { head_successes += 1.0; } - if epochs.target { + if reward.target > 0 { target_successes += 1.0; } - if epochs.source { + if reward.source > 0 { source_successes += 1.0; } } } - let active_percent = active_successes / total * 100.0; + + if total == 0.0 { + return Err("No attestation rewards data found".to_string()); + } + let head_percent = head_successes / total * 100.0; let target_percent = target_successes / total * 100.0; let source_percent = source_successes / total * 100.0; eprintln!("Total Attestations: {}", total); - eprintln!("Active: {}: {}%", active_successes, active_percent); eprintln!("Head: {}: {}%", head_successes, head_percent); eprintln!("Target: {}: {}%", target_successes, target_percent); eprintln!("Source: {}: {}%", source_successes, source_percent); - if active_percent < acceptable_attestation_performance { - return Err("Active percent was below required level".to_string()); - } if head_percent < acceptable_attestation_performance { return Err("Head percent was below required level".to_string()); }