diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c26f4a04c4..a0208c5bf7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -64,6 +64,7 @@ use crate::payload_attestation_verification::VerifiedPayloadAttestationMessage; use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache; #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; +use crate::payload_envelope_verification::AvailableEnvelope; use crate::pending_payload_cache::PendingPayloadCache; use crate::pending_payload_cache::{ Availability as PayloadAvailability, @@ -155,6 +156,23 @@ pub type ForkChoiceError = fork_choice::Error; /// Alias to appease clippy. type HashBlockTuple = (Hash256, RangeSyncBlock); +/// Carries everything `process_range_sync_envelope` needs to import the anchor slots envelope. +type AnchorEnvelopeImport = ( + Hash256, + Arc>, + Box>, +); + +/// The output of [`BeaconChain::filter_chain_segment`], the blocks that should be imported, plus +/// the anchor slots envelope if it appeared in this segment and isn't yet imported. +/// +/// The checkpoint server serves only the anchor block and state, never its envelope. +/// See [`BeaconChain::queue_anchor_envelope_import`] and its usage. +pub struct FilteredChainSegment { + pub blocks: Vec>, + pub anchor_envelope: Option>, +} + // These keys are all zero because they get stored in different columns, see `DBColumn` type. pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::ZERO; pub const OP_POOL_DB_KEY: Hash256 = Hash256::ZERO; @@ -222,6 +240,7 @@ impl TryInto for AvailabilityProcessingStatus { } /// The result of a chain segment processing. +#[derive(Debug)] pub enum ChainSegmentResult { /// Processing this chain segment finished successfully. Successful { @@ -2953,10 +2972,11 @@ impl BeaconChain { pub fn filter_chain_segment( self: &Arc, chain_segment: Vec>, - ) -> Result>, Box> { + ) -> Result, Box> { // This function will never import any blocks. let imported_blocks = vec![]; let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len()); + let mut anchor_envelope = None; // Produce a list of the parent root and slot of the child of each block. // @@ -3007,7 +3027,13 @@ impl BeaconChain { // // Note that `check_block_relevancy` is incapable of returning // `DuplicateImportStatusUnknown` so we don't need to handle that case here. - Err(BlockError::DuplicateFullyImported(_)) => continue, + Err(BlockError::DuplicateFullyImported(_)) => { + // The block is already imported. If it's the anchor block, its envelope + // may still be missing from the store, queue it for import rather + // than dropping it. + self.queue_anchor_envelope_import(block_root, block, &mut anchor_envelope)?; + continue; + } // If the block is the genesis block, simply ignore this block. Err(BlockError::GenesisBlock) => continue, // If the block is is for a finalized slot, simply ignore this block. @@ -3018,12 +3044,18 @@ impl BeaconChain { // 2. In some non-canonical chain at a slot that has been finalized already. // // In the case of (1), there's no need to re-import and later blocks in this - // segement might be useful. + // segment might be useful. // // In the case of (2), skipping the block is valid since we should never import it. // However, we will potentially get a `ParentUnknown` on a later block. The sync // protocol will need to ensure this is handled gracefully. - Err(BlockError::WouldRevertFinalizedSlot { .. }) => continue, + Err(BlockError::WouldRevertFinalizedSlot { .. }) => { + // The block is at/below the finalized slot and won't be imported. If it's the + // anchor block, its envelope may still be missing from the store, queue it for + // import rather than dropping it. + self.queue_anchor_envelope_import(block_root, block, &mut anchor_envelope)?; + continue; + } // The block has a known parent that does not descend from the finalized block. // There is no need to process this block or any children. Err(BlockError::NotFinalizedDescendant { block_parent_root }) => { @@ -3046,7 +3078,67 @@ impl BeaconChain { } } - Ok(filtered_chain_segment) + Ok(FilteredChainSegment { + blocks: filtered_chain_segment, + anchor_envelope, + }) + } + + /// If `block` is the checkpoint sync anchor block and its execution payload envelope is not yet + /// in the store, set `anchor_envelope` so that it is imported in + /// [`Self::process_range_sync_envelope`]. + /// + /// The checkpoint server serves only the anchor block and state. so after checkpoint sync the + /// anchor block sits in the store without an envelope. Range sync re-fetches the anchor + /// block (as `DuplicateFullyImported` / `WouldRevertFinalizedSlot`) and its envelope. + fn queue_anchor_envelope_import( + &self, + block_root: Hash256, + block: RangeSyncBlock, + anchor_envelope: &mut Option>, + ) -> Result<(), Box> { + let db_failure = |e: Error| { + Box::new(ChainSegmentResult::Failed { + imported_blocks: vec![], + error: BlockError::BeaconChainError(Box::new(e)), + }) + }; + + // Ensure we only ever queue the anchor envelope for import. + let anchor_slot = self.store.get_anchor_info().anchor_slot; + + // Ensure we only ever queue the anchor envelope for import. + if block.slot() > anchor_slot { + return Ok(()); + } + + // If pre-gloas or no envelope there is no need to queue for import. + let RangeSyncBlock::Gloas { + block: signed_block, + envelope: Some(available_envelope), + } = block + else { + return Ok(()); + }; + + // If the envelope is already in the db there's nothing to do. + if self + .store + .payload_envelope_exists(&block_root) + .map_err(|e| db_failure(e.into()))? + { + return Ok(()); + } + + let anchor_block_root = self + .block_root_at_slot(anchor_slot, WhenSlotSkipped::Prev) + .map_err(db_failure)?; + + if anchor_block_root == Some(block_root) { + *anchor_envelope = Some((block_root, signed_block, Box::new(available_envelope))); + } + + Ok(()) } /// Attempt to verify and import a chain of blocks to `self`. @@ -3081,8 +3173,10 @@ impl BeaconChain { move || chain.filter_chain_segment(chain_segment), "filter_chain_segment", ); - let mut filtered_chain_segment = match filtered_chain_segment_future.await { - Ok(Ok(filtered_segment)) => filtered_segment, + let (mut filtered_chain_segment, anchor_envelope) = match filtered_chain_segment_future + .await + { + Ok(Ok(filtered_segment)) => (filtered_segment.blocks, filtered_segment.anchor_envelope), Ok(Err(segment_result)) => return *segment_result, Err(error) => { return ChainSegmentResult::Failed { @@ -3092,6 +3186,20 @@ impl BeaconChain { } }; + // Import the anchor envelope before the segment's blocks because the checkpoint child + // needs it present if it builds on top of it. A bad envelope fails the batch, so range sync handles the + // retry/downscore. + if let Some((block_root, block, envelope)) = anchor_envelope + && let Err(e) = self + .process_range_sync_envelope(*envelope, block_root, block) + .await + { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::EnvelopeError(Box::new(e)), + }; + } + while let Some((_root, block)) = filtered_chain_segment.first() { // Determine the epoch of the first block in the remaining segment. let start_epoch = block.epoch(); diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 90cdb4fe97..df6120eaeb 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -170,14 +170,6 @@ impl BeaconChain { .map_err(BeaconChainError::TokioJoin)? .ok_or(BeaconChainError::RuntimeShutdown)??; - // TODO(gloas): optimistic sync is not supported for Gloas, maybe we could re-add it - if payload_verification_outcome - .payload_verification_status - .is_optimistic() - { - return Err(EnvelopeError::OptimisticSyncNotSupported { block_root }); - } - Ok(AvailabilityPendingExecutedEnvelope::new( signed_envelope, block_root, diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index a0d34949c6..cd0769a491 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -166,8 +166,6 @@ pub enum EnvelopeError { EnvelopeProcessingError(EnvelopeProcessingError), /// Error verifying the execution payload ExecutionPayloadError(ExecutionPayloadError), - /// Optimistic sync is not supported for Gloas payload envelopes. - OptimisticSyncNotSupported { block_root: Hash256 }, /// The envelope's beacon block was not present in fork choice at import time. /// /// Unlike [`EnvelopeError::BlockRootUnknown`] (raised during gossip verification, where the @@ -199,7 +197,6 @@ impl EnvelopeError { | EnvelopeError::PriorToFinalization { .. } | EnvelopeError::BeaconChainError(_) | EnvelopeError::BeaconStateError(_) - | EnvelopeError::OptimisticSyncNotSupported { .. } | EnvelopeError::BlockRootNotInForkChoice(_) | EnvelopeError::InternalError(_) => false, } diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 94d4b3b9da..cd9768d17d 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -29,6 +29,7 @@ use state_processing::{ }; use std::marker::PhantomData; use std::sync::{Arc, LazyLock}; +use store::{AnchorInfo, StoreOp}; use tempfile::tempdir; use types::{test_utils::generate_deterministic_keypair, *}; @@ -435,6 +436,164 @@ async fn chain_segment_full_segment() { ); } +/// Checks that post-gloas `filter_chain_segment` queues the anchor slots envelope when range +/// sync re-delivers it after checkpoint sync. +#[tokio::test] +async fn filter_chain_segment_picks_up_only_the_anchor_envelope() { + let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); + let (chain_segment, chain_segment_sidecars) = get_chain_segment().await; + + // Need two recent Gloas blocks with envelopes: the anchor and a non-anchor. + let gloas_snapshots: Vec<&BeaconSnapshot> = chain_segment + .iter() + .rev() + .filter(|s| { + s.execution_envelope.is_some() && s.beacon_block.fork_name_unchecked().gloas_enabled() + }) + .take(2) + .collect(); + if gloas_snapshots.len() < 2 { + return; + } + let anchor_snapshot = gloas_snapshots[0]; + let other_snapshot = gloas_snapshots[1]; + + // Import the segment so the blocks are known. + store_envelopes_for_chain_segment(chain_segment, &harness); + let blocks = chain_segment_blocks(chain_segment, chain_segment_sidecars, harness.chain.clone()); + harness + .chain + .slot_clock + .set_slot(blocks.last().unwrap().slot().as_u64()); + harness + .chain + .process_chain_segment(blocks, NotifyExecutionLayer::Yes) + .await + .into_block_error() + .expect("should import chain segment"); + + let anchor_root = anchor_snapshot.beacon_block_root; + let anchor_block = Arc::new( + harness + .chain + .get_block(&anchor_root) + .await + .unwrap() + .unwrap(), + ); + let anchor_envelope = anchor_snapshot.execution_envelope.clone().unwrap(); + let anchor_slot = anchor_block.slot(); + + // Make this block the anchor. + let prev = harness.chain.store.get_anchor_info(); + harness + .chain + .store + .compare_and_set_anchor_info_with_write( + prev.clone(), + AnchorInfo { + anchor_slot, + ..prev + }, + ) + .unwrap(); + assert_eq!( + harness + .chain + .block_root_at_slot(anchor_slot, WhenSlotSkipped::Prev) + .unwrap(), + Some(anchor_root), + ); + + // Drop the anchor's envelope to mimic the post checkpoint sync state. + harness + .chain + .store + .do_atomically_with_block_and_blobs_cache(vec![StoreOp::DeletePayloadEnvelope(anchor_root)]) + .unwrap(); + assert!( + !harness + .chain + .store + .payload_envelope_exists(&anchor_root) + .unwrap() + ); + + // Anchor envelope doesn't exist in the store. Queue envelope for import. + let filtered = harness + .chain + .filter_chain_segment(vec![build_range_sync_block( + anchor_block.clone(), + Some(anchor_envelope.clone()), + &None, + harness.chain.clone(), + )]) + .unwrap(); + assert!( + filtered.blocks.is_empty(), + "anchor block should be skipped, not imported" + ); + assert_eq!( + filtered.anchor_envelope.map(|(root, ..)| root), + Some(anchor_root), + "anchor envelope should be queued" + ); + + // Anchor slot does not have an envelope, nothing to queue for import. + let filtered = harness + .chain + .filter_chain_segment(vec![build_range_sync_block( + anchor_block.clone(), + None, + &None, + harness.chain.clone(), + )]) + .unwrap(); + assert!(filtered.anchor_envelope.is_none()); + + // Non-anchor block missing its envelope, not queued for import. + let other_root = other_snapshot.beacon_block_root; + let other_block = Arc::new(harness.chain.get_block(&other_root).await.unwrap().unwrap()); + let other_envelope = other_snapshot.execution_envelope.clone().unwrap(); + harness + .chain + .store + .do_atomically_with_block_and_blobs_cache(vec![StoreOp::DeletePayloadEnvelope(other_root)]) + .unwrap(); + let filtered = harness + .chain + .filter_chain_segment(vec![build_range_sync_block( + other_block, + Some(other_envelope), + &None, + harness.chain.clone(), + )]) + .unwrap(); + assert!( + filtered.anchor_envelope.is_none(), + "non-anchor block must not be queued even with a missing envelope" + ); + + // Put anchor envelope in the store + harness + .chain + .store + .put_payload_envelope(&anchor_root, &anchor_envelope) + .unwrap(); + + // Nothing to import, the anchor envelope is already in the store. + let filtered = harness + .chain + .filter_chain_segment(vec![build_range_sync_block( + anchor_block, + Some(anchor_envelope), + &None, + harness.chain.clone(), + )]) + .unwrap(); + assert!(filtered.anchor_envelope.is_none()); +} + #[tokio::test] async fn chain_segment_varying_chunk_size() { let (chain_segment, chain_segment_blobs) = get_chain_segment().await; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 4d392ef524..87b6d086d2 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2967,6 +2967,7 @@ async fn reproduction_unaligned_checkpoint_sync_pruned_payload() { } } +#[allow(clippy::large_stack_frames)] async fn weak_subjectivity_sync_test( slots: Vec, checkpoint_slot: Slot, @@ -3081,20 +3082,6 @@ async fn weak_subjectivity_sync_test( .build() .expect("should build"); - // Store the WSS envelope to simulate it arriving from network sync. - // In production, the envelope would be synced from the network after checkpoint sync. - if let Some(envelope) = harness - .chain - .store - .get_payload_envelope(&wss_block.canonical_root()) - .unwrap_or(None) - { - beacon_chain - .store - .put_payload_envelope(&wss_block.canonical_root(), &envelope) - .unwrap(); - } - let beacon_chain = Arc::new(beacon_chain); let wss_block_root = wss_block.canonical_root(); @@ -3135,118 +3122,85 @@ async fn weak_subjectivity_sync_test( assert_eq!(store_wss_blobs_opt, wss_blobs_opt); } - // Store the WSS block's envelope in the new chain (required for Gloas forward sync). - // The first forward block needs the checkpoint block's envelope to determine the parent's - // Full state. - if let Some(envelope) = harness - .chain - .store - .get_payload_envelope(&wss_block_root) - .unwrap() - { - beacon_chain - .store - .put_payload_envelope(&wss_block_root, &envelope) - .unwrap(); - - // `from_anchor` doesn't mark the anchor's payload received, so do it here; otherwise the - // first forward block (a FULL child of the anchor) would be rejected with `ParentUnknown`. - beacon_chain - .canonical_head - .fork_choice_write_lock() - .on_valid_payload_envelope_received(wss_block_root) - .unwrap(); - } - - // Apply blocks forward to reach head. + // Apply the anchor and forward blocks via range sync. The anchor is included so + // `process_chain_segment` imports its envelope (not served by the checkpoint server) before + // its first child, exercising the same path production uses. let chain_dump = harness.chain.chain_dump().unwrap(); - let new_blocks = chain_dump + let forward_snapshots = chain_dump .iter() - .filter(|snapshot| snapshot.beacon_block.slot() > checkpoint_slot); + .filter(|snapshot| snapshot.beacon_block.slot() > checkpoint_slot) + .collect::>(); - for snapshot in new_blocks { - let block_root = snapshot.beacon_block_root; + let mut segment = vec![]; + if checkpoint_slot != 0 { + segment.push(harness.build_range_sync_block_from_store_blobs( + Some(wss_block_root), + Arc::new(wss_block.clone()), + )); + } + for snapshot in &forward_snapshots { let full_block = harness .chain .get_block(&snapshot.beacon_block_root) .await .unwrap() .unwrap(); + segment.push(harness.build_range_sync_block_from_store_blobs( + Some(snapshot.beacon_block_root), + Arc::new(full_block), + )); + } - let slot = full_block.slot(); - let full_block_root = full_block.canonical_root(); - let state_root = full_block.state_root(); - - info!(block_root = ?full_block_root, ?state_root, %slot, "Importing block from chain dump"); - beacon_chain.slot_clock.set_slot(slot.as_u64()); + if let Some(last) = segment.last() { beacon_chain - .process_block( - full_block_root, - harness.build_range_sync_block_from_store_blobs( - Some(block_root), - Arc::new(full_block), - ), - NotifyExecutionLayer::Yes, - BlockImportSource::Lookup, - || Ok(()), - ) + .slot_clock + .set_slot(last.as_block().slot().as_u64()); + } + beacon_chain + .process_chain_segment(segment, NotifyExecutionLayer::Yes) + .await + .into_block_error() + .expect("forward range sync should import"); + beacon_chain.recompute_head_at_current_slot().await; + + // Each non-finalized forward block's state should load correctly. States below the split slot + // have been pruned by the finalization that occurred during import. + let split_slot = beacon_chain.store.get_split_slot(); + for snapshot in &forward_snapshots { + let full_block = harness + .chain + .get_block(&snapshot.beacon_block_root) .await + .unwrap() .unwrap(); - - // Store the envelope, its columns, and apply to fork choice. - if let Some(envelope) = &snapshot.execution_envelope { - // Persist data columns for Gloas blocks. This mirrors what production does in - // `import_available_execution_payload_envelope` and what the harness now does in - // `process_envelope` — the WSS forward-sync loop bypasses both, so do it directly. - let mut ops = vec![]; - let columns_block = beacon_chain - .store - .get_blinded_block(&block_root) - .unwrap() - .and_then(|b| beacon_chain.store.make_full_block(&block_root, b).ok()); - if let Some(full_block) = columns_block { - let columns = beacon_chain::test_utils::generate_data_column_sidecars_from_block( - &full_block, - &beacon_chain.spec, - ); - if !columns.is_empty() - && let Some(store_op) = beacon_chain.get_blobs_or_columns_store_op( - block_root, - full_block.slot(), - beacon_chain::block_verification_types::AvailableBlockData::DataColumns( - columns, - ), - ) - { - ops.push(store_op); - } - } - ops.push(store::StoreOp::PutPayloadEnvelope( - block_root, - std::sync::Arc::new(envelope.as_ref().clone()), - )); - beacon_chain - .store - .do_atomically_with_block_and_blobs_cache(ops) - .unwrap(); - - // Update fork choice so head selection accounts for Full payload status. - beacon_chain - .canonical_head - .fork_choice_write_lock() - .on_valid_payload_envelope_received(block_root) - .unwrap(); + if full_block.slot() < split_slot { + continue; } - - beacon_chain.recompute_head_at_current_slot().await; - - // Check that the new block's state can be loaded correctly. + let state_root = full_block.state_root(); let mut state = beacon_chain .store - .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) + .get_state(&state_root, Some(full_block.slot()), CACHE_STATE_IN_TESTS) .unwrap() .unwrap(); assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); + + // The envelope import must persist the block's data columns, not drop them. + let fork = full_block.fork_name_unchecked(); + let source_columns = harness + .chain + .store + .get_data_columns(&snapshot.beacon_block_root, fork) + .unwrap(); + let dest_columns = beacon_chain + .store + .get_data_columns(&snapshot.beacon_block_root, fork) + .unwrap(); + assert_eq!( + dest_columns, + source_columns, + "data columns mismatch after import at slot {}", + full_block.slot() + ); } if checkpoint_slot != 0 { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index b52732000e..c39edfa291 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -3715,7 +3715,6 @@ impl NetworkBeaconProcessor { | EnvelopeError::BeaconStateError(_) // The following variants are produced during envelope import, not gossip // verification, so they cannot be reached here. Ignore them to be safe. - | EnvelopeError::OptimisticSyncNotSupported { .. } | EnvelopeError::BlockRootNotInForkChoice(_) | EnvelopeError::InternalError(_) => { self.propagate_validation_result(