diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index bfda52558e..938fbc4104 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -1,3 +1,4 @@ +use crate::block_verification_types::{AsBlock, RangeSyncBlock}; use crate::data_availability_checker::{AvailableBlock, AvailableBlockData}; use crate::{BeaconChain, BeaconChainTypes, WhenSlotSkipped, metrics}; use fixed_bytes::FixedBytesExtended; @@ -8,12 +9,13 @@ use state_processing::{ }; use std::borrow::Cow; use std::iter; +use std::sync::Arc; use std::time::Duration; use store::metadata::DataColumnInfo; use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use tracing::{debug, debug_span, instrument}; -use types::{Hash256, Slot}; +use types::{Hash256, SignedExecutionPayloadEnvelope, Slot}; /// Use a longer timeout on the pubkey cache. /// @@ -315,4 +317,209 @@ impl BeaconChain { Ok(num_relevant) } + + /// Store a batch of historical GLOaS blocks in the database. + /// + /// Similar to `import_historical_block_batch` but handles `RangeSyncBlock::Gloas` variants, + /// storing both the beacon block and the execution payload envelope. + /// + /// The `blocks` should be given in slot-ascending order. Block root verification, + /// signature verification, and anchor updates follow the same logic as the pre-GLOaS path. + #[instrument(skip_all)] + pub fn import_historical_gloas_block_batch( + &self, + mut blocks: Vec>, + ) -> Result { + let anchor_info = self.store.get_anchor_info(); + + // Take all blocks with slots less than or equal to the oldest block slot. + let num_relevant = blocks.partition_point(|block| { + block.slot() <= anchor_info.oldest_block_slot + }); + + let total_blocks = blocks.len(); + blocks.truncate(num_relevant); + let blocks_to_import = blocks; + + if blocks_to_import.len() != total_blocks { + debug!( + oldest_block_slot = %anchor_info.oldest_block_slot, + total_blocks, + ignored = total_blocks.saturating_sub(blocks_to_import.len()), + "Ignoring some historic GLOaS blocks" + ); + } + + if blocks_to_import.is_empty() { + return Ok(0); + } + + let mut expected_block_root = anchor_info.oldest_block_parent; + let mut last_block_root = expected_block_root; + let mut prev_block_slot = anchor_info.oldest_block_slot; + + let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); + let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); + let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); + let mut envelopes_to_store: Vec<(Hash256, Arc>)> = + Vec::new(); + + for range_block in blocks_to_import.into_iter().rev() { + let block_root = range_block.block_root(); + let block = range_block.block_cloned(); + + // Extract envelope if this is a GLOaS block with one. + if let RangeSyncBlock::Gloas { + envelope: Some(available_envelope), + .. + } = range_block + { + let (signed_envelope, _columns) = available_envelope.deconstruct(); + envelopes_to_store.push((block_root, signed_envelope)); + } + + if block.slot() == anchor_info.oldest_block_slot { + // When reimporting, verify that this is actually the same block (same block root). + let oldest_block_root = self + .block_root_at_slot(block.slot(), WhenSlotSkipped::None) + .ok() + .flatten() + .ok_or(HistoricalBlockError::MissingOldestBlockRoot { slot: block.slot() })?; + if block_root != oldest_block_root { + return Err(HistoricalBlockError::MismatchedBlockRoot { + block_root, + expected_block_root: oldest_block_root, + }); + } + + debug!( + ?block_root, + slot = %block.slot(), + "Re-importing historic GLOaS block" + ); + last_block_root = block_root; + } else if block_root != expected_block_root { + return Err(HistoricalBlockError::MismatchedBlockRoot { + block_root, + expected_block_root, + }); + } + + // Store block in the hot database. + // GLOaS blocks always have their payload in the envelope, so we store blinded. + let blinded_block = block.clone_as_blinded(); + self.store.blinded_block_as_kv_store_ops( + &block_root, + &blinded_block, + &mut hot_batch, + ); + + // Store block roots, including at all skip slots in the freezer DB. + for slot in (block.slot().as_u64()..prev_block_slot.as_u64()).rev() { + cold_batch.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconBlockRoots, + slot.to_be_bytes().to_vec(), + block_root.as_slice().to_vec(), + )); + } + + prev_block_slot = block.slot(); + expected_block_root = block.message().parent_root(); + signed_blocks.push(block); + + // If we've reached genesis, add the genesis block root to the batch. + if expected_block_root == self.genesis_block_root { + let genesis_slot = self.spec.genesis_slot; + for slot in genesis_slot.as_u64()..prev_block_slot.as_u64() { + cold_batch.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconBlockRoots, + slot.to_be_bytes().to_vec(), + self.genesis_block_root.as_slice().to_vec(), + )); + } + prev_block_slot = genesis_slot; + expected_block_root = Hash256::zero(); + break; + } + } + // Blocks were pushed in reverse order so reverse again. + signed_blocks.reverse(); + + // Verify signatures in one batch. + let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); + let setup_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_SETUP_TIMES); + let pubkey_cache = self + .validator_pubkey_cache + .try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?; + let block_roots = signed_blocks + .get(1..) + .ok_or(HistoricalBlockError::IndexOutOfBounds)? + .iter() + .map(|block| block.parent_root()) + .chain(iter::once(last_block_root)); + let signature_set = signed_blocks + .iter() + .zip_eq(block_roots) + .filter(|&(_block, block_root)| block_root != self.genesis_block_root) + .map(|(block, block_root)| { + block_proposal_signature_set_from_parts( + block, + Some(block_root), + block.message().proposer_index(), + &self.spec.fork_at_epoch(block.message().epoch()), + self.genesis_validators_root, + |validator_index| pubkey_cache.get(validator_index).cloned().map(Cow::Owned), + &self.spec, + ) + }) + .collect::, _>>() + .map_err(HistoricalBlockError::SignatureSet) + .map(ParallelSignatureSets::from)?; + drop(pubkey_cache); + drop(setup_timer); + + let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); + if !signature_set.verify() { + return Err(HistoricalBlockError::InvalidSignature); + } + drop(verify_timer); + drop(sig_timer); + + // Write envelopes to the hot DB. + for (block_root, signed_envelope) in &envelopes_to_store { + self.store + .put_payload_envelope(block_root, signed_envelope)?; + } + + // Write the block batches to disk. + { + let _span = debug_span!("backfill_write_hot_db").entered(); + self.store.hot_db.do_atomically(hot_batch)?; + } + { + let _span = debug_span!("backfill_write_cold_db").entered(); + self.store.cold_db.do_atomically(cold_batch)?; + } + + // Update the anchor. + let new_anchor = AnchorInfo { + oldest_block_slot: prev_block_slot, + oldest_block_parent: expected_block_root, + ..anchor_info + }; + let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot); + let anchor_batch = vec![ + self.store + .compare_and_set_anchor_info(anchor_info, new_anchor)?, + ]; + self.store.hot_db.do_atomically(anchor_batch)?; + + // If backfill has completed, trigger reconstruction. + if backfill_complete && self.genesis_backfill_slot == Slot::new(0) && self.config.archive { + self.store_migrator.process_reconstruction(); + } + + Ok(num_relevant) + } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 6a5586c126..0443a85cab 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -799,13 +799,38 @@ impl NetworkBeaconProcessor { downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); + + // Check if this batch contains GLOaS blocks. + let is_gloas_batch = downloaded_blocks + .first() + .map(|b| matches!(b, RangeSyncBlock::Gloas { .. })) + .unwrap_or(false); + + if is_gloas_batch { + // GLOaS blocks: store blocks and envelopes directly. + // KZG verification for columns was already done during coupling. + match self.chain.import_historical_gloas_block_batch(downloaded_blocks) { + Ok(imported_blocks) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL, + ); + return (imported_blocks, Ok(())); + } + Err(e) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_FAILED_TOTAL, + ); + return self.handle_historical_block_error(e); + } + } + } + + // Pre-GLOaS path: convert to AvailableBlocks and verify KZG. let available_blocks = downloaded_blocks .into_iter() .map(|block| block.into_available_block()) .collect::>(); - // TODO(gloas) when implementing backfill sync for gloas - // we need a batch verify kzg function in the new da checker match self .chain .data_availability_checker @@ -859,75 +884,83 @@ impl NetworkBeaconProcessor { metrics::inc_counter( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_FAILED_TOTAL, ); - let peer_action = match &e { - HistoricalBlockError::MismatchedBlockRoot { - block_root, - expected_block_root, - } => { - debug!( - error = "mismatched_block_root", - ?block_root, - expected_root = ?expected_block_root, - "Backfill batch processing error" - ); - // The peer is faulty if they send blocks with bad roots. - Some(PeerAction::LowToleranceError) - } - HistoricalBlockError::InvalidSignature - | HistoricalBlockError::SignatureSet(_) => { - warn!( - error = ?e, - "Backfill batch processing error" - ); - // The peer is faulty if they bad signatures. - Some(PeerAction::LowToleranceError) - } - HistoricalBlockError::MissingOldestBlockRoot { slot } => { - warn!( - %slot, - error = "missing_oldest_block_root", - "Backfill batch processing error" - ); - // This is an internal error, do not penalize the peer. - None - } - - HistoricalBlockError::ValidatorPubkeyCacheTimeout => { - warn!( - error = "pubkey_cache_timeout", - "Backfill batch processing error" - ); - // This is an internal error, do not penalize the peer. - None - } - HistoricalBlockError::IndexOutOfBounds => { - error!( - error = ?e, - "Backfill batch OOB error" - ); - // This should never occur, don't penalize the peer. - None - } - HistoricalBlockError::StoreError(e) => { - warn!(error = ?e, "Backfill batch processing error"); - // This is an internal error, don't penalize the peer. - None - } // - // Do not use a fallback match, handle all errors explicitly - }; - let err_str: &'static str = e.into(); - ( - 0, - Err(ChainSegmentFailed { - message: format!("{:?}", err_str), - // This is an internal error, don't penalize the peer. - peer_action, - }), - ) + self.handle_historical_block_error(e) } } } + /// Maps a `HistoricalBlockError` to the appropriate peer action and error tuple. + fn handle_historical_block_error( + &self, + e: HistoricalBlockError, + ) -> (usize, Result<(), ChainSegmentFailed>) { + let peer_action = match &e { + HistoricalBlockError::MismatchedBlockRoot { + block_root, + expected_block_root, + } => { + debug!( + error = "mismatched_block_root", + ?block_root, + expected_root = ?expected_block_root, + "Backfill batch processing error" + ); + // The peer is faulty if they send blocks with bad roots. + Some(PeerAction::LowToleranceError) + } + HistoricalBlockError::InvalidSignature + | HistoricalBlockError::SignatureSet(_) => { + warn!( + error = ?e, + "Backfill batch processing error" + ); + // The peer is faulty if they bad signatures. + Some(PeerAction::LowToleranceError) + } + HistoricalBlockError::MissingOldestBlockRoot { slot } => { + warn!( + %slot, + error = "missing_oldest_block_root", + "Backfill batch processing error" + ); + // This is an internal error, do not penalize the peer. + None + } + + HistoricalBlockError::ValidatorPubkeyCacheTimeout => { + warn!( + error = "pubkey_cache_timeout", + "Backfill batch processing error" + ); + // This is an internal error, do not penalize the peer. + None + } + HistoricalBlockError::IndexOutOfBounds => { + error!( + error = ?e, + "Backfill batch OOB error" + ); + // This should never occur, don't penalize the peer. + None + } + HistoricalBlockError::StoreError(e) => { + warn!(error = ?e, "Backfill batch processing error"); + // This is an internal error, don't penalize the peer. + None + } // + // Do not use a fallback match, handle all errors explicitly + }; + let err_str: &'static str = e.into(); + ( + 0, + Err(ChainSegmentFailed { + message: format!("{:?}", err_str), + // This is an internal error, don't penalize the peer. + peer_action, + }), + ) + } + /// Helper function to handle a `BlockError` from `process_chain_segment` fn handle_failed_chain_segment(&self, error: BlockError) -> Result<(), ChainSegmentFailed> { match error { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index c690d5e584..0f80138d24 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -215,11 +215,6 @@ impl BackFillSync { &mut self, network: &mut SyncNetworkContext, ) -> Result { - // Skip backfill sync for GLOaS — not yet implemented for this fork. - if self.beacon_chain.spec.gloas_fork_epoch.is_some_and(|e| e != Epoch::max_value()) { - return Ok(SyncStart::NotSyncing); - } - match self.state() { BackFillState::Syncing => {} // already syncing ignore. BackFillState::Paused => { diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index b78e6652b3..ba69fc265e 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -584,10 +584,10 @@ where b.execution_status .block_hash() .or(match head_payload_status { - PayloadStatus::Full => b.execution_payload_block_hash, - PayloadStatus::Pending | PayloadStatus::Empty => { - b.execution_payload_parent_hash + PayloadStatus::Full | PayloadStatus::Pending => { + b.execution_payload_block_hash } + PayloadStatus::Empty => b.execution_payload_parent_hash, }) }); let justified_root = self.justified_checkpoint().root;