diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 8b50f4b18f..569f0eea50 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3108,6 +3108,15 @@ impl BeaconChain { let mut blocks = filtered_chain_segment.split_off(last_index); std::mem::swap(&mut blocks, &mut filtered_chain_segment); + // Extract envelopes before passing blocks to signature verification. + let envelopes: Vec<_> = blocks + .iter() + .map(|(_, block)| match block { + RangeSyncBlock::Gloas { envelope, .. } => envelope.clone(), + RangeSyncBlock::Base(_) => None, + }) + .collect(); + let chain = self.clone(); let signature_verification_future = self.spawn_blocking_handle( move || signature_verify_chain_segment(blocks, &chain), @@ -3132,11 +3141,15 @@ impl BeaconChain { }; // Import the blocks into the chain. - for signature_verified_block in signature_verified_blocks { + for (signature_verified_block, maybe_envelope) in + signature_verified_blocks.into_iter().zip(envelopes) + { let block_slot = signature_verified_block.slot(); + let block_root = signature_verified_block.block_root(); + let block = signature_verified_block.block_cloned(); match self .process_block( - signature_verified_block.block_root(), + block_root, signature_verified_block, notify_execution_layer, BlockImportSource::RangeSync, @@ -3166,11 +3179,8 @@ impl BeaconChain { } } Err(BlockError::DuplicateFullyImported(block_root)) => { - debug!( - ?block_root, - "Ignoring already known blocks while processing chain segment" - ); - continue; + // Block was already imported, envelope might need re-import + imported_blocks.push((block_root, block_slot)); } Err(error) => { return ChainSegmentResult::Failed { @@ -3179,6 +3189,18 @@ impl BeaconChain { }; } } + + // Process the envelope after the block has been imported. + if let Some(envelope) = maybe_envelope + && let Err(e) = self + .process_range_sync_envelope(envelope, block_root, block) + .await + { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::EnvelopeError(Box::new(e)), + }; + } } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 7b9b7e8218..6b1ac3b033 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -52,6 +52,7 @@ use crate::beacon_snapshot::PreProcessingSnapshot; use crate::block_verification_types::{AsBlock, BlockImportData, LookupBlock, RangeSyncBlock}; use crate::data_availability_checker::{ AvailabilityCheckError, AvailableBlock, AvailableBlockData, MaybeAvailableBlock, + verify_columns_against_block, }; use crate::data_column_verification::GossipDataColumnError; use crate::execution_payload::{ @@ -652,14 +653,16 @@ pub fn signature_verify_chain_segment( )?; let mut available_blocks = Vec::with_capacity(chain_segment.len()); + let mut envelopes = Vec::with_capacity(chain_segment.len()); let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len()); for (block_root, block) in chain_segment { let consensus_context = ConsensusContext::new(block.slot()).set_current_block_root(block_root); - let available_block = block.into_available_block(); + let (available_block, envelope) = block.into_available_block()?; available_blocks.push(available_block.clone()); + envelopes.push(envelope); signature_verified_blocks.push(SignatureVerifiedBlock { block: MaybeAvailableBlock::Available(available_block), block_root, @@ -667,12 +670,17 @@ pub fn signature_verify_chain_segment( consensus_context, }); } - // TODO(gloas) When implementing range and backfill sync for gloas - // we need a batch verify kzg function in the new da checker as well. + chain .data_availability_checker .batch_verify_kzg_for_available_blocks(&available_blocks)?; + for (available_block, maybe_envelope) in available_blocks.iter().zip(envelopes.iter()) { + if let Some(envelope) = maybe_envelope { + verify_columns_against_block(&chain.kzg, available_block.block(), &envelope.columns)?; + } + } + // verify signatures let pubkey_cache = get_validator_pubkey_cache(chain)?; let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); @@ -1341,10 +1349,13 @@ impl IntoExecutionPendingBlock for RangeSyncBlock Result, BlockSlashInfo> { // Perform an early check to prevent wasting time on irrelevant blocks. + let header = self.signed_block_header(); let block_root = check_block_relevancy(self.as_block(), block_root, chain) - .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; + .map_err(|e| BlockSlashInfo::SignatureNotChecked(header.clone(), e))?; - let available_block = self.into_available_block(); + let (available_block, _envelope) = self.into_available_block().map_err(|e| { + BlockSlashInfo::SignatureNotChecked(header.clone(), BlockError::AvailabilityCheck(e)) + })?; chain .data_availability_checker .verify_kzg_for_available_block(&available_block) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index be73ef15d7..18e95f58f3 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -2,10 +2,12 @@ use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityC pub use crate::data_availability_checker::{ AvailableBlock, AvailableBlockData, MaybeAvailableBlock, }; +use crate::payload_envelope_verification::AvailableEnvelope; +use crate::payload_envelope_verification::gossip_verified_envelope::verify_envelope_consistency; use crate::{BeaconChainTypes, PayloadVerificationOutcome}; -use educe::Educe; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ @@ -40,15 +42,27 @@ impl LookupBlock { } } -/// A fully available block that has been constructed by range sync. -/// The block contains all the data required to import into fork choice. -/// This includes any and all blobs/columns required, including zero if -/// none are required. This can happen if the block is pre-deneb or if -/// it's simply past the DA boundary. -#[derive(Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -pub struct RangeSyncBlock { - block: AvailableBlock, +/// A block that has been constructed by range sync, ready for import. +/// Pre-Gloas: wraps an `AvailableBlock` with all data. +/// Gloas: carries the block and an optional envelope which contains the sidecar data. +/// +/// Note: In the gloas case, we only ensure that the block is consistent with the envelope +/// if the envelope is `Some` when constructing a `RangeSyncBlock` type. +/// If `envelope` is None, then there is no guarantee that the canonical chain also contains +/// an empty payload. The only way to ensure that is to process the next block. +#[derive(Clone)] +pub enum RangeSyncBlock { + Base(AvailableBlock), + Gloas { + block: Arc>, + envelope: Option>, + }, +} + +impl Hash for RangeSyncBlock { + fn hash(&self, state: &mut H) { + self.block_root().hash(state); + } } impl Debug for RangeSyncBlock { @@ -59,31 +73,48 @@ impl Debug for RangeSyncBlock { impl RangeSyncBlock { pub fn block_root(&self) -> Hash256 { - self.block.block_root() + match self { + Self::Base(block) => block.block_root(), + Self::Gloas { block, .. } => block.canonical_root(), + } } pub fn as_block(&self) -> &SignedBeaconBlock { - self.block.block() + match self { + Self::Base(block) => block.block(), + Self::Gloas { block, .. } => block, + } } pub fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + match self { + Self::Base(block) => block.block_cloned(), + Self::Gloas { block, .. } => block.clone(), + } } pub fn block_data(&self) -> &AvailableBlockData { - self.block.data() + match self { + Self::Base(block) => block.data(), + Self::Gloas { .. } => &AvailableBlockData::NoData, + } + } + + /// Returns the data columns associated with this block. For Gloas blocks the columns are + /// carried by the payload envelope rather than `block_data`, so this unwraps that case. + pub fn data_columns(&self) -> Option> { + match self { + Self::Base(block) => block.data().data_columns(), + Self::Gloas { envelope, .. } => envelope + .as_ref() + .map(|envelope| envelope.columns.clone()) + .filter(|columns| !columns.is_empty()), + } } } impl RangeSyncBlock { - /// Constructs an `RangeSyncBlock` from a block and availability data. - /// - /// # Errors - /// - /// Returns `AvailabilityCheckError` if: - /// - `InvalidAvailableBlockData`: Block data is provided but not required. - /// - `MissingBlobs`: Block requires blobs but they are missing or incomplete. - /// - `MissingCustodyColumns`: Block requires custody columns but they are incomplete. + /// Constructs a `RangeSyncBlock` from a block and availability data (pre-Gloas). pub fn new( block: Arc>, block_data: AvailableBlockData, @@ -93,33 +124,91 @@ impl RangeSyncBlock { where T: BeaconChainTypes, { + if block.fork_name_unchecked().gloas_enabled() { + return Err(AvailabilityCheckError::InvalidVariant); + } let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?; - Ok(Self { - block: available_block, - }) + Ok(Self::Base(available_block)) + } + + /// Constructs a Gloas `RangeSyncBlock` with block and optional `AvailableEnvelope` + /// which wraps the payload envelope with its data columns. + /// + /// This function only checks for consistency between the block and the envelope + /// if envelope.is_some() == true . + /// In the `None` case, we cannot guarantee that the payload is empty until we + /// process the block that builds on top of this block. + /// + /// Expects `block.canonical_root() == envelope.beacon_block_root` as they are coupled. + pub fn new_gloas( + block: Arc>, + envelope: Option>, + ) -> Result { + if let Some(envelope) = envelope.as_ref() { + let execution_bid = &block + .message() + .body() + .signed_execution_payload_bid() + .map_err(|e| format!("missing signed_execution_payload_bid: {e:?}"))? + .message; + // Skip the finalized-slot check; range sync imports historical (finalized) blocks. + let latest_finalized_slot = Slot::new(0); + verify_envelope_consistency( + envelope.message(), + &block, + execution_bid, + latest_finalized_slot, + ) + .map_err(|e| format!("Inconsistent envelope: {e:?}"))?; + } + + Ok(Self::Gloas { block, envelope }) } #[allow(clippy::type_complexity)] pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { - self.block.deconstruct() + match self { + Self::Base(block) => block.deconstruct(), + Self::Gloas { block, .. } => { + (block.canonical_root(), block, AvailableBlockData::NoData) + } + } } pub fn n_blobs(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, - AvailableBlockData::Blobs(blobs) => blobs.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + }, + Self::Gloas { .. } => 0, } } pub fn n_data_columns(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, - AvailableBlockData::DataColumns(columns) => columns.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + }, + Self::Gloas { .. } => 0, } } - pub fn into_available_block(self) -> AvailableBlock { - self.block + /// Converts into an `AvailableBlock` for import, returning any associated envelope + /// separately. Callers processing Gloas blocks must handle the envelope themselves. + #[allow(clippy::type_complexity)] + pub fn into_available_block( + self, + ) -> Result<(AvailableBlock, Option>), AvailabilityCheckError> { + match self { + Self::Base(block) => Ok((block, None)), + Self::Gloas { block, envelope } => { + let available = + AvailableBlock::new_gloas(block).map_err(AvailabilityCheckError::Unexpected)?; + Ok((available, envelope)) + } + } } } @@ -405,13 +494,13 @@ impl AsBlock for RangeSyncBlock { self.as_block().message() } fn as_block(&self) -> &SignedBeaconBlock { - self.block.as_block() + RangeSyncBlock::as_block(self) } fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + RangeSyncBlock::block_cloned(self) } fn canonical_root(&self) -> Hash256 { - self.block.block_root() + self.block_root() } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 9829db0f1d..a0b117f072 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -669,7 +669,7 @@ impl DataAvailabilityChecker { /// Verify a batch of data columns belonging to a single block, picking the right commitment /// source for the block's fork (Fulu: inline on column; Gloas: from the embedded payload bid). -fn verify_columns_against_block( +pub fn verify_columns_against_block( kzg: &Kzg, block: &SignedBeaconBlock, columns: &[Arc>], @@ -792,7 +792,12 @@ async fn availability_cache_maintenance_service( #[derive(Debug, Clone)] // TODO(#8633) move this to `block_verification_types.rs` pub enum AvailableBlockData { - /// Block is pre-Deneb or has zero blobs + /// Block has no inline DA object for block import. + /// + /// This covers: + /// - pre-Deneb blocks, + /// - blocks with zero blobs, and + /// - Gloas blocks, where DA is checked on the payload envelope instead. NoData, /// Block is post-Deneb, pre-PeerDAS and has more than zero blobs Blobs(BlobSidecarList), @@ -953,6 +958,19 @@ impl AvailableBlock { }) } + pub fn new_gloas(block: Arc>) -> Result { + if block.fork_name_unchecked().gloas_enabled() { + Ok(Self { + block_root: block.canonical_root(), + block, + blob_data: AvailableBlockData::NoData, + blobs_available_timestamp: None, + }) + } else { + Err("Block is not gloas".to_owned()) + } + } + pub fn block(&self) -> &SignedBeaconBlock { &self.block } @@ -1294,7 +1312,7 @@ mod test { let available_blocks = blocks_with_columns .into_iter() - .map(|block| block.into_available_block()) + .map(|block| block.into_available_block().unwrap().0) .collect::>(); // WHEN verifying all blocks together (totalling 256 data columns) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index a10372b933..00806f0e17 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -4,9 +4,10 @@ use std::time::Duration; use eth2::types::{EventKind, SseExecutionPayload, SseExecutionPayloadAvailable}; use fork_choice::PayloadVerificationStatus; use slot_clock::SlotClock; +use state_processing::{VerifySignatures, envelope_processing::verify_execution_payload_envelope}; use store::StoreOp; use tracing::{debug, error, info, info_span, instrument, warn}; -use types::{BlockImportSource, Hash256, SignedExecutionPayloadEnvelope}; +use types::{BlockImportSource, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use super::{ AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, @@ -19,6 +20,7 @@ use crate::{ metrics, payload_envelope_verification::{ AvailabilityPendingExecutedEnvelope, ExecutionPendingEnvelope, + load_snapshot_from_state_root, payload_notifier::PayloadNotifier, }, validator_monitor::get_slot_delay_ms, }; @@ -372,4 +374,58 @@ impl BeaconChain { )); } } + + /// Process an envelope received during range sync. The associated block must already + /// be imported into fork choice. This performs signature verification, state processing, + /// EL verification and import. + #[instrument(skip_all, level = "debug")] + pub async fn process_range_sync_envelope( + self: &Arc, + available_envelope: AvailableEnvelope, + block_root: Hash256, + block: Arc>, + ) -> Result<(), EnvelopeError> { + let signed_envelope = available_envelope.envelope().clone(); + + // Load the state snapshot for envelope processing + let state_root = block.state_root(); + let snapshot = load_snapshot_from_state_root::(block_root, state_root, &self.store)?; + + // Verify envelope signature and state processing + verify_execution_payload_envelope( + &snapshot.pre_state, + &signed_envelope, + VerifySignatures::True, + snapshot.state_root, + &self.spec, + )?; + + // Send to EL for verification + let payload_notifier = PayloadNotifier::new( + self.clone(), + signed_envelope.clone(), + block, + NotifyExecutionLayer::Yes, + )?; + + let payload_verification_status = payload_notifier.notify_new_payload().await?; + + // Import directly — we already have all components (envelope + columns). + let chain = self.clone(); + let _ = self + .spawn_blocking_handle( + move || { + chain.import_execution_payload_envelope( + available_envelope, + block_root, + payload_verification_status, + ) + }, + "range_sync_envelope_import", + ) + .await + .map_err(|e| EnvelopeError::BeaconChainError(Box::new(e)))?; + + Ok(()) + } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index a1cbac35b3..a0d34949c6 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -40,7 +40,7 @@ mod payload_notifier; pub use execution_pending_envelope::ExecutionPendingEnvelope; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AvailableEnvelope { envelope: Arc>, pub columns: DataColumnSidecarList, @@ -54,6 +54,10 @@ impl AvailableEnvelope { Self { envelope, columns } } + pub fn envelope(&self) -> &Arc> { + &self.envelope + } + pub fn message(&self) -> &ExecutionPayloadEnvelope { &self.envelope.message } @@ -180,6 +184,28 @@ impl std::fmt::Display for EnvelopeError { } } +impl EnvelopeError { + pub fn penalize_peer(&self) -> bool { + match self { + EnvelopeError::BadSignature + | EnvelopeError::BuilderIndexMismatch { .. } + | EnvelopeError::SlotMismatch { .. } + | EnvelopeError::BlockHashMismatch { .. } + | EnvelopeError::UnknownValidator { .. } + | EnvelopeError::IncorrectBlockProposer { .. } + | EnvelopeError::EnvelopeProcessingError(_) => true, + EnvelopeError::ExecutionPayloadError(e) => e.penalize_peer(), + EnvelopeError::BlockRootUnknown { .. } + | EnvelopeError::PriorToFinalization { .. } + | EnvelopeError::BeaconChainError(_) + | EnvelopeError::BeaconStateError(_) + | EnvelopeError::OptimisticSyncNotSupported { .. } + | EnvelopeError::BlockRootNotInForkChoice(_) + | EnvelopeError::InternalError(_) => false, + } + } +} + impl From for EnvelopeError { fn from(e: BeaconChainError) -> Self { EnvelopeError::BeaconChainError(Box::new(e)) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index db2a9a902d..62c7fb3a45 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -4,6 +4,7 @@ use crate::data_availability_checker::DataAvailabilityChecker; use crate::graffiti_calculator::GraffitiSettings; use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sidecars_gloas}; use crate::observed_operations::ObservationOutcome; +use crate::payload_envelope_verification::AvailableEnvelope; pub use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::{BeaconBlockResponseWrapper, CustodyContext, get_block_root}; use crate::{ @@ -2929,18 +2930,29 @@ where block: Arc>, ) -> RangeSyncBlock { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); + let is_gloas = block.fork_name_unchecked().gloas_enabled(); // For Gloas, kzg commitments live in the bid (`signed_execution_payload_bid`), so the // body's `blob_kzg_commitments()` accessor returns Err. `num_expected_blobs` already // handles both shapes. let has_blobs = block.num_expected_blobs() > 0; if !has_blobs { - return RangeSyncBlock::new( - block, - AvailableBlockData::NoData, - &self.chain.data_availability_checker, - self.chain.spec.clone(), - ) - .unwrap(); + return if is_gloas { + let envelope = self + .chain + .get_payload_envelope(&block_root) + .unwrap() + .map(Arc::new) + .map(|envelope| AvailableEnvelope::new(envelope, vec![])); + RangeSyncBlock::new_gloas(block, envelope).unwrap() + } else { + RangeSyncBlock::new( + block, + AvailableBlockData::NoData, + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .unwrap() + }; } // Blobs are stored as data columns from Fulu (PeerDAS) @@ -2952,14 +2964,24 @@ where .unwrap() .unwrap(); let custody_columns = columns.into_iter().collect::>(); - let block_data = AvailableBlockData::new_with_data_columns(custody_columns); - RangeSyncBlock::new( - block, - block_data, - &self.chain.data_availability_checker, - self.chain.spec.clone(), - ) - .unwrap() + if is_gloas { + let envelope = self + .chain + .get_payload_envelope(&block_root) + .unwrap() + .map(Arc::new) + .map(|envelope| AvailableEnvelope::new(envelope, custody_columns)); + RangeSyncBlock::new_gloas(block, envelope).unwrap() + } else { + let block_data = AvailableBlockData::new_with_data_columns(custody_columns); + RangeSyncBlock::new( + block, + block_data, + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .unwrap() + } } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); let block_data = if let Some(blobs) = blobs { @@ -2984,6 +3006,19 @@ where block: Arc>>, blob_items: Option<(KzgProofs, BlobsList)>, ) -> Result, BlockError> { + if block.fork_name_unchecked().gloas_enabled() { + let columns = blob_items + .map(|_| generate_data_column_sidecars_from_block(&block, &self.spec)) + .unwrap_or_default(); + let envelope = self + .chain + .get_payload_envelope(&block.canonical_root()) + .map_err(|e| BlockError::BeaconChainError(Box::new(e)))? + .map(Arc::new) + .map(|envelope| AvailableEnvelope::new(envelope, columns)); + return RangeSyncBlock::new_gloas(block, envelope).map_err(BlockError::InternalError); + } + Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let epoch = block.slot().epoch(E::slots_per_epoch()); let sampling_columns = self.chain.sampling_columns_for_epoch(epoch); diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 862c2a9fe8..9d32b37134 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -235,7 +235,7 @@ async fn produces_attestations() { let range_sync_block = harness .build_range_sync_block_from_store_blobs(Some(block_root), Arc::new(block.clone())); - let available_block = range_sync_block.into_available_block(); + let (available_block, _envelope) = range_sync_block.into_available_block().unwrap(); // For Gloas non-same-slot attestations, the early attester cache returns None. let is_same_slot_attestation = slot == block_slot; @@ -300,12 +300,13 @@ async fn early_attester_cache_old_request() { .get_block(&head.beacon_block_root) .unwrap(); - let available_block = harness + let (available_block, _envelope) = harness .build_range_sync_block_from_store_blobs( Some(head.beacon_block_root), head.beacon_block.clone(), ) - .into_available_block(); + .into_available_block() + .unwrap(); harness .chain diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index d7bba13eb0..94d4b3b9da 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -3,13 +3,14 @@ use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, LookupBlock, RangeSyncBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; +use beacon_chain::payload_envelope_verification::AvailableEnvelope; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, ExecutionPendingBlock, WhenSlotSkipped, custody_context::NodeCustodyType, test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, - MakeAttestationOptions, fork_name_from_env, test_spec, + MakeAttestationOptions, test_spec, }, }; use beacon_chain::{ @@ -158,19 +159,37 @@ where .zip(chain_segment_sidecars.iter()) .map(|(snapshot, data_sidecars)| { let block = snapshot.beacon_block.clone(); - build_range_sync_block(block, data_sidecars, chain.clone()) + build_range_sync_block( + block, + snapshot.execution_envelope.clone(), + data_sidecars, + chain.clone(), + ) }) .collect() } fn build_range_sync_block( block: Arc>, + execution_envelope: Option>>, data_sidecars: &Option>, chain: Arc>, ) -> RangeSyncBlock where T: BeaconChainTypes, { + if block.fork_name_unchecked().gloas_enabled() { + let columns = match data_sidecars { + Some(DataSidecars::DataColumns(columns)) => columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + Some(DataSidecars::Blobs(_)) | None => vec![], + }; + let envelope = execution_envelope.map(|envelope| AvailableEnvelope::new(envelope, columns)); + return RangeSyncBlock::new_gloas(block, envelope).unwrap(); + } + match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { let block_data = AvailableBlockData::new_with_blobs(blobs.clone()); @@ -286,6 +305,14 @@ fn update_proposal_signatures( } } +fn update_envelope_block_root(snapshot: &mut BeaconSnapshot) { + if let Some(envelope) = snapshot.execution_envelope.as_ref() { + let mut envelope = envelope.as_ref().clone(); + envelope.message.beacon_block_root = snapshot.beacon_block.canonical_root(); + snapshot.execution_envelope = Some(Arc::new(envelope)); + } +} + fn update_parent_roots(snapshots: &mut [BeaconSnapshot], blobs: &mut [Option>]) { for i in 0..snapshots.len() { let root = snapshots[i].beacon_block.canonical_root(); @@ -304,6 +331,7 @@ fn update_parent_roots(snapshots: &mut [BeaconSnapshot], blobs: &mut [Option< } } child.beacon_block = new_child; + update_envelope_block_root(child); } } } @@ -369,10 +397,6 @@ fn update_data_column_signed_header( #[tokio::test] async fn chain_segment_full_segment() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; store_envelopes_for_chain_segment(chain_segment, &harness); @@ -413,10 +437,6 @@ async fn chain_segment_full_segment() { #[tokio::test] async fn chain_segment_varying_chunk_size() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); let blocks: Vec> = @@ -495,13 +515,18 @@ async fn chain_segment_non_linear_parent_roots() { let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.parent_root_mut() = Hash256::zero(); - blocks[3] = RangeSyncBlock::new( - Arc::new(SignedBeaconBlock::from_block(block, signature)), - blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.spec.clone(), - ) - .unwrap(); + let mutated_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + blocks[3] = if mutated_block.fork_name_unchecked().gloas_enabled() { + RangeSyncBlock::new_gloas(mutated_block, None).unwrap() + } else { + RangeSyncBlock::new( + mutated_block, + blocks[3].block_data().clone(), + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap() + }; assert!( matches!( @@ -535,13 +560,18 @@ async fn chain_segment_non_linear_slots() { .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = Slot::new(0); - blocks[3] = RangeSyncBlock::new( - Arc::new(SignedBeaconBlock::from_block(block, signature)), - blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.spec.clone(), - ) - .unwrap(); + let mutated_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + blocks[3] = if mutated_block.fork_name_unchecked().gloas_enabled() { + RangeSyncBlock::new_gloas(mutated_block, None).unwrap() + } else { + RangeSyncBlock::new( + mutated_block, + blocks[3].block_data().clone(), + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap() + }; assert!( matches!( @@ -565,13 +595,18 @@ async fn chain_segment_non_linear_slots() { .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = blocks[2].slot(); - blocks[3] = RangeSyncBlock::new( - Arc::new(SignedBeaconBlock::from_block(block, signature)), - blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.chain.spec.clone(), - ) - .unwrap(); + let mutated_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + blocks[3] = if mutated_block.fork_name_unchecked().gloas_enabled() { + RangeSyncBlock::new_gloas(mutated_block, None).unwrap() + } else { + RangeSyncBlock::new( + mutated_block, + blocks[3].block_data().clone(), + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ) + .unwrap() + }; assert!( matches!( @@ -599,7 +634,12 @@ async fn assert_invalid_signature( .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect(); @@ -638,7 +678,12 @@ async fn assert_invalid_signature( .zip(chain_segment_blobs.iter()) .filter(|(snapshot, _)| snapshot.beacon_block.slot() > finalized_slot) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect(); // We don't care if this fails, we just call this to ensure that all prior blocks have been @@ -656,6 +701,7 @@ async fn assert_invalid_signature( snapshots[block_index].beacon_block.canonical_root(), build_range_sync_block( snapshots[block_index].beacon_block.clone(), + snapshots[block_index].execution_envelope.clone(), &chain_segment_blobs[block_index], harness.chain.clone(), ), @@ -697,10 +743,6 @@ async fn get_invalid_sigs_harness( } #[tokio::test] async fn invalid_signature_gossip_block() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, chain_segment_blobs) = get_chain_segment().await; for &block_index in BLOCK_INDICES { // Ensure the block will be rejected if imported on its own (without gossip checking). @@ -721,7 +763,12 @@ async fn invalid_signature_gossip_block() { .take(block_index) .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect(); harness @@ -757,10 +804,6 @@ async fn invalid_signature_gossip_block() { #[tokio::test] async fn invalid_signature_block_proposal() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, chain_segment_blobs) = get_chain_segment().await; for &block_index in BLOCK_INDICES { let harness = get_invalid_sigs_harness(chain_segment).await; @@ -778,7 +821,12 @@ async fn invalid_signature_block_proposal() { .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect::>(); // Ensure the block will be rejected if imported in a chain segment. @@ -800,10 +848,6 @@ async fn invalid_signature_block_proposal() { #[tokio::test] async fn invalid_signature_randao_reveal() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); for &block_index in BLOCK_INDICES { @@ -817,6 +861,7 @@ async fn invalid_signature_randao_reveal() { *block.body_mut().randao_reveal_mut() = junk_signature(); snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -833,10 +878,6 @@ async fn invalid_signature_randao_reveal() { #[tokio::test] async fn invalid_signature_proposer_slashing() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); for &block_index in BLOCK_INDICES { @@ -864,6 +905,7 @@ async fn invalid_signature_proposer_slashing() { .expect("should update proposer slashing"); snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -880,10 +922,6 @@ async fn invalid_signature_proposer_slashing() { #[tokio::test] async fn invalid_signature_attester_slashing() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); for &block_index in BLOCK_INDICES { @@ -990,6 +1028,7 @@ async fn invalid_signature_attester_slashing() { } snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -1006,10 +1045,6 @@ async fn invalid_signature_attester_slashing() { #[tokio::test] async fn invalid_signature_attestation() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); let mut checked_attestation = false; @@ -1060,6 +1095,7 @@ async fn invalid_signature_attestation() { if block.body().attestations_len() > 0 { snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -1112,13 +1148,19 @@ async fn invalid_signature_deposit() { .expect("should update deposit"); snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect(); assert!( @@ -1137,10 +1179,6 @@ async fn invalid_signature_deposit() { #[tokio::test] async fn invalid_signature_exit() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); for &block_index in BLOCK_INDICES { @@ -1165,6 +1203,7 @@ async fn invalid_signature_exit() { .expect("should update deposit"); snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -2158,13 +2197,19 @@ async fn import_duplicate_block_unrealized_justification() { // Create two verified variants of the block, representing the same block being processed in // parallel. let notify_execution_layer = NotifyExecutionLayer::Yes; - let range_sync_block = RangeSyncBlock::new( - block.clone(), - AvailableBlockData::NoData, - &harness.chain.data_availability_checker, - harness.spec.clone(), - ) - .unwrap(); + let range_sync_block = if block.fork_name_unchecked().gloas_enabled() { + // Fine to pass no envelope as we are testing duplicate block import + // which is not related. + RangeSyncBlock::new_gloas(block.clone(), None).unwrap() + } else { + RangeSyncBlock::new( + block.clone(), + AvailableBlockData::NoData, + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap() + }; let verified_block1 = range_sync_block .clone() .into_execution_pending_block(block_root, chain, notify_execution_layer) @@ -2239,6 +2284,111 @@ async fn import_execution_pending_block( } } +async fn make_gloas_range_sync_block_inputs() +-> Option<(Arc>, SignedExecutionPayloadEnvelope)> { + let spec = test_spec::(); + if !spec.fork_name_at_slot::(Slot::new(1)).gloas_enabled() { + return None; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.into()) + .keypairs(KEYPAIRS[0..VALIDATOR_COUNT].to_vec()) + .node_custody_type(NodeCustodyType::Supernode) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + let state = harness.get_current_state(); + let slot = harness.get_current_slot(); + let ((block, _), envelope, _) = harness.make_block_with_envelope(state, slot).await; + Some((block, envelope.expect("gloas block should have envelope"))) +} + +#[tokio::test] +async fn range_sync_block_new_gloas_accepts_matching_envelope() { + let Some((block, envelope)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); + + assert!( + result.is_ok(), + "new_gloas should accept matching block/envelope, got: {:?}", + result + ); +} + +#[tokio::test] +async fn range_sync_block_new_gloas_allows_missing_envelope() { + let Some((block, _)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + let result = RangeSyncBlock::new_gloas(block, None); + + assert!( + result.is_ok(), + "new_gloas should allow None envelope, got: {:?}", + result + ); +} + +#[tokio::test] +async fn range_sync_block_new_gloas_rejects_slot_mismatch() { + let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + envelope.message.payload.slot_number += 1; + let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); + + assert!( + matches!(result, Err(ref err) if err.contains("SlotMismatch")), + "new_gloas should reject mismatched slot, got: {:?}", + result + ); +} + +#[tokio::test] +async fn range_sync_block_new_gloas_rejects_builder_index_mismatch() { + let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + envelope.message.builder_index += 1; + let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); + + assert!( + matches!(result, Err(ref err) if err.contains("BuilderIndexMismatch")), + "new_gloas should reject mismatched builder index, got: {:?}", + result + ); +} + +#[tokio::test] +async fn range_sync_block_new_gloas_rejects_block_hash_mismatch() { + let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + envelope.message.payload.block_hash = ExecutionBlockHash::repeat_byte(0x22); + let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); + + assert!( + matches!(result, Err(ref err) if err.contains("BlockHashMismatch")), + "new_gloas should reject mismatched block hash, got: {:?}", + result + ); +} + // Test that RpcBlock::new() rejects blocks when blob count doesn't match expected. #[tokio::test] async fn range_sync_block_construction_fails_with_wrong_blob_count() { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index b70961c499..4d392ef524 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3304,7 +3304,8 @@ async fn weak_subjectivity_sync_test( let range_sync_block = harness .build_range_sync_block_from_store_blobs(Some(block_root), Arc::new(full_block)); - let fully_available_block = range_sync_block.into_available_block(); + let (fully_available_block, _envelope) = + range_sync_block.into_available_block().unwrap(); harness .chain .data_availability_checker diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 8843541c11..dce4713245 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -533,7 +533,8 @@ mod tests { use super::*; use beacon_chain::{ PayloadVerificationStatus, - block_verification_types::{AvailableBlockData, RangeSyncBlock}, + block_verification_types::AvailableBlockData, + data_availability_checker::AvailableBlock, test_utils::{ BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, generate_data_column_sidecars_from_block, @@ -611,14 +612,13 @@ mod tests { "precondition: {fork_name:?} test block must produce data columns" ); - let available_block = RangeSyncBlock::new( + let available_block = AvailableBlock::new( block.clone(), AvailableBlockData::new_with_data_columns(data_columns), &chain.data_availability_checker, chain.spec.clone(), ) - .unwrap() - .into_available_block(); + .unwrap(); let current_slot = harness.get_current_slot(); let cached_head = chain.canonical_head.cached_head(); diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index ed9dc5666a..b444608468 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -726,20 +726,17 @@ pub fn rpc_data_column_limits( spec: &ChainSpec, ) -> RpcLimits { let fork_name = spec.fork_name_at_epoch(current_digest_epoch); + let max_blobs = spec.max_blobs_per_block(current_digest_epoch) as usize; if fork_name.gloas_enabled() { RpcLimits::new( DataColumnSidecarGloas::::min_size(), - DataColumnSidecarFulu::::max_size( - spec.max_blobs_per_block(current_digest_epoch) as usize - ), + DataColumnSidecarFulu::::max_size(max_blobs), ) } else { RpcLimits::new( DataColumnSidecarFulu::::min_size(), - DataColumnSidecarFulu::::max_size( - spec.max_blobs_per_block(current_digest_epoch) as usize - ), + DataColumnSidecarFulu::::max_size(max_blobs), ) } } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 1d0d181cb3..4a8c6c55eb 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -31,6 +31,8 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Payload envelopes by range request + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -57,6 +59,12 @@ pub struct BlobsByRangeRequestId { pub parent_request_id: ComponentsByRangeRequestId, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct PayloadEnvelopesByRangeRequestId { + pub id: Id, + pub parent_request_id: ComponentsByRangeRequestId, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct DataColumnsByRangeRequestId { /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` @@ -259,6 +267,12 @@ macro_rules! impl_display { impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!( + PayloadEnvelopesByRangeRequestId, + "{}/{}", + id, + parent_request_id +); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 528a261bb8..e2226af094 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -682,10 +682,26 @@ impl NetworkBeaconProcessor { downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); - let available_blocks = downloaded_blocks + let available_blocks = match downloaded_blocks .into_iter() - .map(|block| block.into_available_block()) - .collect::>(); + .map(|block| { + block + .into_available_block() + .map(|(available, _envelope)| available) + }) + .collect::, _>>() + { + Ok(blocks) => blocks, + Err(e) => { + return ( + 0, + Err(ChainSegmentFailed { + peer_action: Some(PeerAction::LowToleranceError), + message: format!("Block failed availability construction: {:?}", e), + }), + ); + } + }; // TODO(gloas) when implementing backfill sync for gloas // we need a batch verify kzg function in the new da checker @@ -893,6 +909,17 @@ impl NetworkBeaconProcessor { peer_action: None, }) } + ref err @ BlockError::EnvelopeError(ref envelope_error) => { + debug!(error = ?err, "Invalid execution payload envelope"); + Err(ChainSegmentFailed { + message: format!("Invalid execution payload envelope: {err:?}"), + peer_action: if envelope_error.penalize_peer() { + Some(PeerAction::LowToleranceError) + } else { + None + }, + }) + } ref err @ BlockError::ExecutionPayloadError(ref epe) => { if !epe.penalize_peer() { // These errors indicate an issue with the EL and not the `ChainSegment`. @@ -1030,9 +1057,16 @@ impl From> for BlockProcessingR None } } - BlockError::EnvelopeError(_) => { - // TODO(gloas): penalize correctly in range sync PR - None + BlockError::EnvelopeError(epe) => { + if epe.penalize_peer() { + Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::BlockPeer, + (&e).into(), + )) + } else { + None + } } // Remaining invalid blocks: penalize the block peer. Listed explicitly so a // new `BlockError` variant forces a compile error here. diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 277ece0aa8..315ec9387d 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -352,10 +352,8 @@ impl Router { Response::PayloadEnvelopesByRoot(envelope) => { self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); } - // TODO(EIP-7732): implement outgoing payload envelopes by range responses - // once sync manager requests them. - Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by range not supported yet"); + Response::PayloadEnvelopesByRange(envelope) => { + self.on_payload_envelopes_by_range_response(peer_id, app_request_id, envelope); } // Lighthouse currently only serves BlocksByHead and does not issue it as a client, // so receiving a response is unexpected. Drop it without crashing. @@ -802,6 +800,29 @@ impl Router { }); } + /// Handle a `PayloadEnvelopesByRange` response from the peer. + pub fn on_payload_envelopes_by_range_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(id @ SyncRequestId::PayloadEnvelopesByRange { .. }) => id, + other => { + crit!(request = ?other, %peer_id, "PayloadEnvelopesByRange response on incorrect request"); + return; + } + }; + + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 36816fb5d6..c8bb17243e 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -351,6 +351,9 @@ impl BackFillSync { CouplingError::BlobPeerFailure(msg) => { debug!(?batch_id, msg, "Blob peer failure"); } + CouplingError::EnvelopePeerFailure(msg) => { + debug!(?batch_id, msg, "Envelope peer failure"); + } CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 10af1bf503..8d40ec8b7f 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -34,6 +34,7 @@ pub type BatchId = Epoch; pub enum ByRangeRequestType { BlocksAndColumns, BlocksAndBlobs, + BlocksAndEnvelopesAndColumns, Blocks, Columns(HashSet), } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 999b3dd30e..b64ae4a4c5 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,3 +1,4 @@ +use beacon_chain::payload_envelope_verification::AvailableEnvelope; use beacon_chain::{ BeaconChainTypes, block_verification_types::{AvailableBlockData, RangeSyncBlock}, @@ -9,14 +10,15 @@ use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + PayloadEnvelopesByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; use std::{collections::HashMap, sync::Arc}; -use tracing::{Span, debug}; +use tracing::{Span, debug, warn}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; @@ -37,6 +39,13 @@ pub struct RangeBlockComponentsRequest { blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, + /// Payload envelopes for Gloas blocks. + payloads_request: Option< + ByRangeRequest< + PayloadEnvelopesByRangeRequestId, + Vec>>, + >, + >, /// Span to track the range request and all children range requests. pub(crate) request_span: Span, } @@ -71,6 +80,7 @@ pub enum CouplingError { exceeded_retries: bool, }, BlobPeerFailure(String), + EnvelopePeerFailure(String), } impl RangeBlockComponentsRequest { @@ -88,6 +98,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + payloads_req_id: Option, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { @@ -110,6 +121,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, + payloads_request: payloads_req_id.map(ByRangeRequest::Active), request_span, } } @@ -191,6 +203,17 @@ impl RangeBlockComponentsRequest { } } + pub fn add_payload_envelopes( + &mut self, + req_id: PayloadEnvelopesByRangeRequestId, + envelopes: Vec>>, + ) -> Result<(), String> { + match &mut self.payloads_request { + Some(req) => req.finish(req_id, envelopes), + None => Err("received payload envelopes but none were expected".to_owned()), + } + } + /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. @@ -208,6 +231,11 @@ impl RangeBlockComponentsRequest { return None; }; + // Check if payload envelopes are still pending + if let Some(ByRangeRequest::Active(_)) = &self.payloads_request { + return None; + } + // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( @@ -254,6 +282,12 @@ impl RangeBlockComponentsRequest { } } + let payload_envelopes = self.payloads_request.as_ref().and_then(|request| { + request + .to_finished() + .map(|payload_envelopes| payload_envelopes.to_vec()) + }); + let resp = Self::responses_with_custody_columns( blocks.to_vec(), data_columns, @@ -262,6 +296,7 @@ impl RangeBlockComponentsRequest { *attempt, da_checker, spec, + payload_envelopes, ); if let Err(CouplingError::DataColumnPeerFailure { @@ -352,6 +387,7 @@ impl RangeBlockComponentsRequest { Ok(responses) } + #[allow(clippy::too_many_arguments)] fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, @@ -360,10 +396,19 @@ impl RangeBlockComponentsRequest { attempt: usize, da_checker: Arc>, spec: Arc, + payload_envelopes: Option>>>, ) -> Result>, CouplingError> where T: BeaconChainTypes, { + // Index envelopes by beacon_block_root for correct coupling. + let mut envelopes_by_block_root = payload_envelopes.map(|envelopes| { + envelopes + .into_iter() + .map(|e| (e.beacon_block_root(), e)) + .collect::>() + }); + // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); @@ -393,7 +438,7 @@ impl RangeBlockComponentsRequest { let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; for block in blocks { let block_root = get_block_root(&block); - range_sync_blocks.push(if block.num_expected_blobs() > 0 { + let custody_columns = if block.num_expected_blobs() > 0 { let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) else { let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); @@ -401,7 +446,6 @@ impl RangeBlockComponentsRequest { error: format!("No columns for block {block_root:?} with data"), faulty_peers: responsible_peers, exceeded_retries, - }); }; @@ -415,16 +459,21 @@ impl RangeBlockComponentsRequest { custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); } else { let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); + return Err(CouplingError::InternalError(format!( + "Internal error, no request made for column {}", + index + ))); }; naughty_peers.push((*index, *responsible_peer)); } } if !naughty_peers.is_empty() { return Err(CouplingError::DataColumnPeerFailure { - error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), + error: format!( + "Peers did not return column for block_root {block_root:?} {naughty_peers:?}" + ), faulty_peers: naughty_peers, - exceeded_retries + exceeded_retries, }); } @@ -439,15 +488,31 @@ impl RangeBlockComponentsRequest { ); } - let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); - - RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) - .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + custody_columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>() } else { - // Block has no data, expects zero columns + vec![] + }; + let range_sync_block = if let Some(envelopes_by_block_root) = + envelopes_by_block_root.as_mut() + { + let envelope = envelopes_by_block_root.remove(&block_root); + let available_envelope = + envelope.map(|env| AvailableEnvelope::new(env, custody_columns)); + + RangeSyncBlock::new_gloas(block, available_envelope) + .map_err(CouplingError::EnvelopePeerFailure)? + } else if custody_columns.is_empty() { RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? - }); + } else { + let block_data = AvailableBlockData::new_with_data_columns(custody_columns); + RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + }; + range_sync_blocks.push(range_sync_block); } // Assert that there are no columns left for other blocks @@ -458,6 +523,13 @@ impl RangeBlockComponentsRequest { debug!(?remaining_roots, "Not all columns consumed for block"); } + // Recoverable error, log and continue + if let Some(envelopes_by_block_root) = envelopes_by_block_root + && !envelopes_by_block_root.is_empty() + { + warn!("Peer returned extra envelopes not matching any block"); + } + Ok(range_sync_blocks) } } @@ -489,21 +561,28 @@ mod tests { use crate::sync::network_context::MAX_COLUMN_RETRIES; use super::RangeBlockComponentsRequest; + use beacon_chain::block_verification_types::RangeSyncBlock; use beacon_chain::custody_context::NodeCustodyType; + use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::test_utils::{ - NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, - test_da_checker, test_spec, + EphemeralHarnessType, NumBlobs, generate_rand_block_and_blobs, + generate_rand_block_and_data_columns, test_da_checker, test_spec, }; + use bls::Signature; use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, RangeRequestId, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, + PayloadEnvelopesByRangeRequestId, RangeRequestId, }, }; use std::{collections::HashMap, sync::Arc}; use tracing::Span; - use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; + use types::{ + ChainSpec, DataColumnSidecarList, Epoch, ExecutionPayloadEnvelope, ForkName, + MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, + }; fn components_id() -> ComponentsByRangeRequestId { ComponentsByRangeRequestId { @@ -538,6 +617,15 @@ mod tests { } } + fn payloads_id( + parent_request_id: ComponentsByRangeRequestId, + ) -> PayloadEnvelopesByRangeRequestId { + PayloadEnvelopesByRangeRequestId { + id: 1, + parent_request_id, + } + } + fn columns_id( id: Id, parent_request_id: DataColumnsByRangeRequester, @@ -555,8 +643,166 @@ mod tests { info.responses(da_checker, spec).is_some() } + fn gloas_spec() -> ChainSpec { + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + spec + } + + fn matching_envelope(block: &SignedBeaconBlock) -> Arc> { + let bid = &block + .message() + .body() + .signed_execution_payload_bid() + .expect("Gloas block should have payload bid") + .message; + let mut envelope = SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope::empty(), + signature: Signature::empty(), + }; + envelope.message.beacon_block_root = block.canonical_root(); + envelope.message.parent_beacon_block_root = block.parent_root(); + envelope.message.builder_index = bid.builder_index; + envelope.message.payload.slot_number = block.slot(); + envelope.message.payload.parent_hash = bid.parent_block_hash; + envelope.message.payload.block_hash = bid.block_hash; + Arc::new(envelope) + } + + #[allow(clippy::type_complexity)] + fn make_gloas_blocks_and_columns( + count: usize, + spec: &ChainSpec, + ) -> Vec<( + Arc>, + DataColumnSidecarList, + Arc>, + )> { + let mut u = types::test_utils::test_unstructured(); + (0..count) + .map(|_| { + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut u, + spec, + ) + .unwrap(); + let envelope = matching_envelope(&block); + (Arc::new(block), data_columns, envelope) + }) + .collect() + } + + #[allow(clippy::type_complexity)] + fn add_all_columns( + info: &mut RangeBlockComponentsRequest, + blocks: &[( + Arc>, + DataColumnSidecarList, + Arc>, + )], + columns_req_id: &[(DataColumnsByRangeRequestId, Vec)], + expected_custody_columns: &[u64], + ) { + for (i, &column_index) in expected_custody_columns.iter().enumerate() { + let (req, _columns) = columns_req_id.get(i).unwrap(); + info.add_custody_columns( + *req, + blocks + .iter() + .flat_map(|(_, columns, _)| { + columns + .iter() + .filter(|column| *column.index() == column_index) + .cloned() + }) + .collect(), + ) + .unwrap(); + } + } + + #[allow(clippy::type_complexity)] + struct GloasSetup { + info: RangeBlockComponentsRequest, + da_checker: Arc>>, + spec: Arc, + blocks: Vec<( + Arc>, + DataColumnSidecarList, + Arc>, + )>, + payloads_req_id: PayloadEnvelopesByRangeRequestId, + expected_custody_columns: Vec, + } + + /// Builds a Gloas coupling request with `count` blocks and all custody columns added, + /// ready for the per-test payload-envelope step. + fn setup_gloas_coupling(count: usize) -> GloasSetup { + let spec = Arc::new(gloas_spec()); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let expected_custody_columns = da_checker + .custody_context() + .sampling_columns_for_epoch(Epoch::new(0), &spec) + .to_vec(); + let blocks = make_gloas_blocks_and_columns(count, &spec); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let payloads_req_id = payloads_id(components_id); + let columns_req_id = expected_custody_columns + .iter() + .enumerate() + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) + .collect::>(); + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + Some((columns_req_id.clone(), expected_custody_columns.clone())), + Some(payloads_req_id), + Span::none(), + ); + + info.add_blocks( + blocks_req_id, + blocks.iter().map(|(block, _, _)| block.clone()).collect(), + ) + .unwrap(); + add_all_columns( + &mut info, + &blocks, + &columns_req_id, + &expected_custody_columns, + ); + + GloasSetup { + info, + da_checker, + spec, + blocks, + payloads_req_id, + expected_custody_columns, + } + } + #[test] fn no_blobs_into_responses() { + // This exercises the pre-Gloas blobs/no-data coupling path. Gloas coupling is covered + // by the dedicated `setup_gloas_coupling` tests below. + if skip_under_gloas() { + return; + } let spec = Arc::new(test_spec::()); let mut u = types::test_utils::test_unstructured(); @@ -575,7 +821,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -606,6 +852,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -672,6 +919,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -751,6 +999,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -804,6 +1053,115 @@ mod tests { info.responses(da_checker, spec).unwrap().unwrap(); } + #[test] + fn gloas_payload_envelopes_must_complete_before_responses() { + let GloasSetup { + mut info, + da_checker, + spec, + .. + } = setup_gloas_coupling(2); + + // No payload envelopes added yet, so the request must not be complete. + assert!(info.responses(da_checker, spec).is_none()); + } + + #[test] + fn gloas_payload_envelopes_are_coupled_by_block_root() { + let GloasSetup { + mut info, + da_checker, + spec, + blocks, + payloads_req_id, + expected_custody_columns, + } = setup_gloas_coupling(2); + + // Supply envelopes in reverse order to prove coupling is by block root, not position. + info.add_payload_envelopes( + payloads_req_id, + blocks + .iter() + .rev() + .map(|(_, _, envelope)| envelope.clone()) + .collect(), + ) + .unwrap(); + + let responses = info.responses(da_checker, spec).unwrap().unwrap(); + assert_eq!(responses.len(), blocks.len()); + for response in responses { + match response { + RangeSyncBlock::Gloas { + block, + envelope: Some(envelope), + } => { + assert_eq!( + envelope.envelope().beacon_block_root(), + block.canonical_root() + ); + assert_eq!(envelope.columns.len(), expected_custody_columns.len()); + } + other => panic!("expected Gloas block with envelope, got {other:?}"), + } + } + } + + #[test] + fn gloas_payload_envelopes_allow_missing_envelopes() { + let GloasSetup { + mut info, + da_checker, + spec, + blocks, + payloads_req_id, + .. + } = setup_gloas_coupling(2); + + // Supply an envelope for only one of the two blocks. + info.add_payload_envelopes(payloads_req_id, vec![blocks[0].2.clone()]) + .unwrap(); + + let responses = info.responses(da_checker, spec).unwrap().unwrap(); + let count_with = |with_envelope: bool| { + responses + .iter() + .filter(|response| { + matches!(response, RangeSyncBlock::Gloas { envelope, .. } if envelope.is_some() == with_envelope) + }) + .count() + }; + assert_eq!(count_with(true), 1); + assert_eq!(count_with(false), 1); + } + + #[test] + fn gloas_payload_envelope_mismatch_fails_coupling() { + let GloasSetup { + mut info, + da_checker, + spec, + blocks, + payloads_req_id, + .. + } = setup_gloas_coupling(1); + + let mut bad_envelope = (*blocks[0].2).clone(); + bad_envelope.message.payload.slot_number += 1; + info.add_payload_envelopes(payloads_req_id, vec![Arc::new(bad_envelope)]) + .unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!( + matches!( + result, + Err(super::CouplingError::EnvelopePeerFailure(ref error)) + if error.contains("SlotMismatch") + ), + "expected envelope slot mismatch, got {result:?}" + ); + } + #[test] fn missing_custody_columns_from_faulty_peers() { if skip_under_gloas() { @@ -848,6 +1206,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -949,6 +1308,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1068,6 +1428,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7a90163852..8e7b8cd05a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -57,7 +57,8 @@ use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -504,6 +505,8 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::PayloadEnvelopesByRange(req_id) => self + .on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)), } } @@ -1160,6 +1163,13 @@ impl SyncManager { peer_id, RpcEvent::from_chunk(envelope, seen_timestamp), ), + SyncRequestId::PayloadEnvelopesByRange(req_id) => { + self.on_payload_envelopes_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ); + } _ => { crit!(%peer_id, "bad request id for payload envelope"); } @@ -1214,6 +1224,24 @@ impl SyncManager { } } + fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_payload_envelopes_by_range_response(id, peer_id, envelope) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::PayloadEnvelope(id, resp), + ); + } + } + fn on_data_columns_by_root_response( &mut self, req_id: DataColumnsByRootRequestId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index c9a48c9d5e..8e8abd4fa6 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -24,14 +24,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -39,7 +42,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, - PayloadEnvelopesByRootRequestItems, + PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -217,6 +220,11 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRange requests + payload_envelopes_by_range_requests: ActiveRequests< + PayloadEnvelopesByRangeRequestId, + PayloadEnvelopesByRangeRequestItems, + >, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -254,6 +262,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + PayloadEnvelope( + PayloadEnvelopesByRangeRequestId, + RpcResponseResult>>>, + ), } #[cfg(test)] @@ -302,6 +314,7 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -334,6 +347,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -369,12 +383,17 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let payload_envelope_by_range_ids = payload_envelopes_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id)); blocks_by_root_ids .chain(payload_envelopes_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(payload_envelope_by_range_ids) .collect() } @@ -431,6 +450,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -453,6 +473,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(payload_envelopes_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -585,24 +606,26 @@ impl SyncNetworkContext { }; // Attempt to find all required custody peers before sending any request or creating an ID - let columns_by_range_peers_to_request = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let column_indexes = self - .chain - .sampling_columns_for_epoch(epoch) - .iter() - .cloned() - .collect(); - Some(self.select_columns_by_range_peers_to_request( - &column_indexes, - column_peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?) - } else { - None - }; + let columns_by_range_peers_to_request = if matches!( + batch_type, + ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns + ) { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + column_peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { @@ -666,6 +689,26 @@ impl SyncNetworkContext { }) .transpose()?; + let payloads_req_id = + if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { + Some(self.send_payload_envelopes_by_range_request( + block_peer, + PayloadEnvelopesByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + new_range_request_span!( + self, + "outgoing_envelopes_by_range", + range_request_span.clone(), + block_peer + ), + )?) + } else { + None + }; + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); let info = RangeBlockComponentsRequest::new( blocks_req_id, @@ -676,6 +719,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + payloads_req_id, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -778,6 +822,17 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::PayloadEnvelope(req_id, resp) => { + resp.and_then(|(envelopes, _)| { + request + .add_payload_envelopes(req_id, envelopes) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { entry.remove(); @@ -1269,6 +1324,43 @@ impl SyncNetworkContext { Ok((id, requested_columns)) } + fn send_payload_envelopes_by_range_request( + &mut self, + peer_id: PeerId, + request: PayloadEnvelopesByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + request_span: Span, + ) -> Result { + let id = PayloadEnvelopesByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::PayloadEnvelopesByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRange", + slots = request.count, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_range_requests.insert( + id, + peer_id, + false, + PayloadEnvelopesByRangeRequestItems::new(request), + request_span, + ); + Ok(id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -1349,7 +1441,10 @@ impl SyncNetworkContext { "To deal with alignment with deneb boundaries, batches need to be of just one epoch" ); - if self + if self.chain.spec.fork_name_at_epoch(epoch).gloas_enabled() { + // TODO(gloas): Not precise and we can be post-gloas and not require columns + ByRangeRequestType::BlocksAndEnvelopesAndColumns + } else if self .chain .data_availability_checker .data_columns_required_for_epoch(epoch) @@ -1485,6 +1580,19 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + #[allow(clippy::type_complexity)] + pub(crate) fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self + .payload_envelopes_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(resp, peer_id) + } + /// Common handler for consistent scoring of RpcResponseError fn on_rpc_response_result( &mut self, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 72dd2c22d0..cc74785098 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -15,6 +15,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems; pub use payload_envelopes_by_root::{ PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, }; @@ -28,6 +29,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_range; mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs new file mode 100644 index 0000000000..13e6454a23 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs @@ -0,0 +1,48 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +/// Accumulates results of a payload_envelopes_by_range request. Only returns items after +/// receiving the stream termination. +pub struct PayloadEnvelopesByRangeRequestItems { + request: PayloadEnvelopesByRangeRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRangeRequestItems { + pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, envelope: Self::Item) -> Result { + if envelope.slot().as_u64() < self.request.start_slot + || envelope.slot().as_u64() >= self.request.start_slot + self.request.count + { + return Err(LookupVerifyError::UnrequestedSlot(envelope.slot())); + } + + if self + .items + .iter() + .any(|existing| existing.slot() == envelope.slot()) + { + return Err(LookupVerifyError::DuplicatedData(envelope.slot(), 0)); + } + + self.items.push(envelope); + + Ok(self.items.len() >= self.request.count as usize) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index d533d8ed0d..6292388339 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -952,6 +952,9 @@ impl SyncingChain { CouplingError::BlobPeerFailure(msg) => { debug!(?batch_id, msg, "Blob peer failure"); } + CouplingError::EnvelopePeerFailure(msg) => { + debug!(?batch_id, msg, "Envelope peer failure"); + } CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index fb0956cf50..13eeaee9aa 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -12,6 +12,7 @@ use crate::sync::{ }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; +use beacon_chain::payload_envelope_verification::AvailableEnvelope; use beacon_chain::{ AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer, block_verification_types::{AsBlock, AvailableBlockData}, @@ -613,7 +614,6 @@ impl TestRig { .unwrap_or_else(|| { panic!("Test consumer requested unknown block: {id:?}") }) - .block_data() .data_columns() .unwrap_or_else(|| panic!("Block id {id:?} has no columns")); id.columns @@ -763,7 +763,7 @@ impl TestRig { .return_wrong_range_column_indices_n_times -= 1; let wrong_columns = (req.start_slot..req.start_slot + req.count) .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) - .filter_map(|block| block.block_data().data_columns()) + .filter_map(|block| block.data_columns()) .flat_map(|columns| { columns .into_iter() @@ -787,7 +787,7 @@ impl TestRig { let wrong_columns = self .network_blocks_by_slot .get(&Slot::new(wrong_slot)) - .and_then(|block| block.block_data().data_columns()) + .and_then(|block| block.data_columns()) .into_iter() .flat_map(|columns| { columns @@ -804,7 +804,7 @@ impl TestRig { self.complete_strategy.return_partial_range_columns_n_times -= 1; let columns = (req.start_slot..req.start_slot + req.count) .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) - .filter_map(|block| block.block_data().data_columns()) + .filter_map(|block| block.data_columns()) .flat_map(|columns| { columns .into_iter() @@ -820,7 +820,7 @@ impl TestRig { let columns = (req.start_slot..req.start_slot + req.count) .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) - .filter_map(|block| block.block_data().data_columns()) + .filter_map(|block| block.data_columns()) .flat_map(|columns| { columns .into_iter() @@ -830,6 +830,25 @@ impl TestRig { self.send_rpc_columns_response(req_id, peer_id, &columns); } + (RequestType::PayloadEnvelopesByRange(req), AppRequestId::Sync(req_id)) => { + if self.complete_strategy.skip_by_range_routes { + return; + } + + let envelopes = (req.start_slot..req.start_slot + req.count) + .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) + .filter_map(|block| { + let block_root = block.canonical_root(); + // Respect a withheld payload envelope. + if self.complete_strategy.hold_envelope_for_block == Some(block_root) { + return None; + } + self.network_envelopes_by_root.get(&block_root).cloned() + }) + .collect::>(); + self.send_rpc_envelopes_response(req_id, peer_id, &envelopes); + } + (RequestType::Status(_req), AppRequestId::Router) => { // Ignore Status requests for now } @@ -958,6 +977,34 @@ impl TestRig { }); } + fn send_rpc_envelopes_response( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelopes: &[Arc>], + ) { + let slots = envelopes.iter().map(|e| e.slot()).collect::>(); + self.log(&format!( + "Completing request {sync_request_id:?} to {peer_id} with envelopes {slots:?}" + )); + + for envelope in envelopes { + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: Some(envelope.clone()), + seen_timestamp: D, + }); + } + // Stream termination + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: None, + seen_timestamp: D, + }); + } + #[allow(dead_code)] fn is_after_gloas(&self) -> bool { self.fork_name.gloas_enabled() @@ -1177,7 +1224,7 @@ impl TestRig { let range_sync_block = self.get_last_block().clone(); let mut block = (*range_sync_block.block_cloned()).clone(); let blobs = range_sync_block.block_data().blobs(); - let columns = range_sync_block.block_data().data_columns(); + let columns = range_sync_block.data_columns(); *block.signature_mut() = self.valid_signature(); self.re_insert_block(Arc::new(block), blobs, columns); } @@ -1192,10 +1239,7 @@ impl TestRig { let range_sync_block = self.get_last_block().clone(); let block = range_sync_block.block_cloned(); let blobs = range_sync_block.block_data().blobs(); - let mut columns = range_sync_block - .block_data() - .data_columns() - .expect("no columns"); + let mut columns = range_sync_block.data_columns().expect("no columns"); let first = columns.first_mut().expect("empty columns"); Arc::make_mut(first) .signed_block_header_mut() @@ -1217,10 +1261,7 @@ impl TestRig { .clone(); let block = range_sync_block.block_cloned(); let blobs = range_sync_block.block_data().blobs(); - let mut columns = range_sync_block - .block_data() - .data_columns() - .expect("no columns"); + let mut columns = range_sync_block.data_columns().expect("no columns"); let first = columns.first_mut().expect("empty columns"); let column = Arc::make_mut(first); let proof = column.kzg_proofs_mut().first_mut().expect("no kzg proofs"); @@ -1256,20 +1297,30 @@ impl TestRig { ) { let block_root = block.canonical_root(); let block_slot = block.slot(); - let block_data = if let Some(columns) = columns { - AvailableBlockData::new_with_data_columns(columns) - } else if let Some(blobs) = blobs { - AvailableBlockData::new_with_blobs(blobs) + let range_sync_block = if block.fork_name_unchecked().gloas_enabled() { + // Gloas carries data columns in the payload envelope, not in `block_data`. + let envelope = self + .network_envelopes_by_root + .get(&block_root) + .cloned() + .map(|envelope| AvailableEnvelope::new(envelope, columns.unwrap_or_default())); + RangeSyncBlock::new_gloas(block, envelope).unwrap() } else { - AvailableBlockData::NoData + let block_data = if let Some(columns) = columns { + AvailableBlockData::new_with_data_columns(columns) + } else if let Some(blobs) = blobs { + AvailableBlockData::new_with_blobs(blobs) + } else { + AvailableBlockData::NoData + }; + RangeSyncBlock::new( + block, + block_data, + &self.harness.chain.data_availability_checker, + self.harness.chain.spec.clone(), + ) + .unwrap() }; - let range_sync_block = RangeSyncBlock::new( - block, - block_data, - &self.harness.chain.data_availability_checker, - self.harness.chain.spec.clone(), - ) - .unwrap(); self.network_blocks_by_slot .insert(block_slot, range_sync_block.clone()); self.network_blocks_by_root @@ -1367,7 +1418,6 @@ impl TestRig { let peer_id = self.new_connected_supernode_peer(); let columns = self .get_last_block() - .block_data() .data_columns() .expect("No data columns"); let column = columns.first().expect("empty columns");