From db3192e00144f338d833396cd2b98bb6a6d33d81 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 10 Jun 2026 16:00:57 +0530 Subject: [PATCH] Gloas range sync (#9362) N/A Implement range sync in gloas. Basically requests blocks and payloads post gloas from the same peer, couples them and sends it for processing. Does not change sync much at all other than adding the machinery for payloads by range requests. Main changes are: `RangeSyncBlock` which used to be a struct is an enum to account for the Gloas case. This allows a clear separation between gloas and pre-gloas code. `AvailableBlockData` now has a `BlockInEnvelope` variant. This is to clearly indicate the post gloas case. I feel this is simpler to follow compared to `NoData` variant. Tries to extract post gloas logic into its own functions so that there is minimal logic change in mainnet range sync behaviour. This is meant as a stable base on which we can iterate further to make range sync cleaner and for unblocking range sync support on devnet. Some ideas for later is removing the retry mechanism in favour of delegating column fetching to lookup sync which can be done post #9155 and batch signature verifying envelopes. Co-Authored-By: Pawan Dhananjay Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> Co-Authored-By: Eitan Seri-Levi --- beacon_node/beacon_chain/src/beacon_chain.rs | 36 +- .../beacon_chain/src/block_verification.rs | 21 +- .../src/block_verification_types.rs | 163 +++++-- .../src/data_availability_checker.rs | 24 +- .../payload_envelope_verification/import.rs | 58 ++- .../src/payload_envelope_verification/mod.rs | 28 +- beacon_node/beacon_chain/src/test_utils.rs | 65 ++- .../tests/attestation_production.rs | 7 +- .../beacon_chain/tests/block_verification.rs | 292 +++++++++---- beacon_node/beacon_chain/tests/store_tests.rs | 3 +- beacon_node/http_api/src/block_id.rs | 8 +- .../lighthouse_network/src/rpc/protocol.rs | 9 +- .../src/service/api_types.rs | 14 + .../network_beacon_processor/sync_methods.rs | 46 +- beacon_node/network/src/router.rs | 29 +- .../network/src/sync/backfill_sync/mod.rs | 3 + beacon_node/network/src/sync/batch.rs | 1 + .../src/sync/block_sidecar_coupling.rs | 397 +++++++++++++++++- beacon_node/network/src/sync/manager.rs | 30 +- .../network/src/sync/network_context.rs | 152 ++++++- .../src/sync/network_context/requests.rs | 2 + .../requests/payload_envelopes_by_range.rs | 48 +++ .../network/src/sync/range_sync/chain.rs | 3 + beacon_node/network/src/sync/tests/lookups.rs | 104 +++-- 24 files changed, 1311 insertions(+), 232 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 8b50f4b18f..569f0eea50 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3108,6 +3108,15 @@ impl BeaconChain { let mut blocks = filtered_chain_segment.split_off(last_index); std::mem::swap(&mut blocks, &mut filtered_chain_segment); + // Extract envelopes before passing blocks to signature verification. + let envelopes: Vec<_> = blocks + .iter() + .map(|(_, block)| match block { + RangeSyncBlock::Gloas { envelope, .. } => envelope.clone(), + RangeSyncBlock::Base(_) => None, + }) + .collect(); + let chain = self.clone(); let signature_verification_future = self.spawn_blocking_handle( move || signature_verify_chain_segment(blocks, &chain), @@ -3132,11 +3141,15 @@ impl BeaconChain { }; // Import the blocks into the chain. - for signature_verified_block in signature_verified_blocks { + for (signature_verified_block, maybe_envelope) in + signature_verified_blocks.into_iter().zip(envelopes) + { let block_slot = signature_verified_block.slot(); + let block_root = signature_verified_block.block_root(); + let block = signature_verified_block.block_cloned(); match self .process_block( - signature_verified_block.block_root(), + block_root, signature_verified_block, notify_execution_layer, BlockImportSource::RangeSync, @@ -3166,11 +3179,8 @@ impl BeaconChain { } } Err(BlockError::DuplicateFullyImported(block_root)) => { - debug!( - ?block_root, - "Ignoring already known blocks while processing chain segment" - ); - continue; + // Block was already imported, envelope might need re-import + imported_blocks.push((block_root, block_slot)); } Err(error) => { return ChainSegmentResult::Failed { @@ -3179,6 +3189,18 @@ impl BeaconChain { }; } } + + // Process the envelope after the block has been imported. + if let Some(envelope) = maybe_envelope + && let Err(e) = self + .process_range_sync_envelope(envelope, block_root, block) + .await + { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::EnvelopeError(Box::new(e)), + }; + } } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 7b9b7e8218..6b1ac3b033 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -52,6 +52,7 @@ use crate::beacon_snapshot::PreProcessingSnapshot; use crate::block_verification_types::{AsBlock, BlockImportData, LookupBlock, RangeSyncBlock}; use crate::data_availability_checker::{ AvailabilityCheckError, AvailableBlock, AvailableBlockData, MaybeAvailableBlock, + verify_columns_against_block, }; use crate::data_column_verification::GossipDataColumnError; use crate::execution_payload::{ @@ -652,14 +653,16 @@ pub fn signature_verify_chain_segment( )?; let mut available_blocks = Vec::with_capacity(chain_segment.len()); + let mut envelopes = Vec::with_capacity(chain_segment.len()); let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len()); for (block_root, block) in chain_segment { let consensus_context = ConsensusContext::new(block.slot()).set_current_block_root(block_root); - let available_block = block.into_available_block(); + let (available_block, envelope) = block.into_available_block()?; available_blocks.push(available_block.clone()); + envelopes.push(envelope); signature_verified_blocks.push(SignatureVerifiedBlock { block: MaybeAvailableBlock::Available(available_block), block_root, @@ -667,12 +670,17 @@ pub fn signature_verify_chain_segment( consensus_context, }); } - // TODO(gloas) When implementing range and backfill sync for gloas - // we need a batch verify kzg function in the new da checker as well. + chain .data_availability_checker .batch_verify_kzg_for_available_blocks(&available_blocks)?; + for (available_block, maybe_envelope) in available_blocks.iter().zip(envelopes.iter()) { + if let Some(envelope) = maybe_envelope { + verify_columns_against_block(&chain.kzg, available_block.block(), &envelope.columns)?; + } + } + // verify signatures let pubkey_cache = get_validator_pubkey_cache(chain)?; let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); @@ -1341,10 +1349,13 @@ impl IntoExecutionPendingBlock for RangeSyncBlock Result, BlockSlashInfo> { // Perform an early check to prevent wasting time on irrelevant blocks. + let header = self.signed_block_header(); let block_root = check_block_relevancy(self.as_block(), block_root, chain) - .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; + .map_err(|e| BlockSlashInfo::SignatureNotChecked(header.clone(), e))?; - let available_block = self.into_available_block(); + let (available_block, _envelope) = self.into_available_block().map_err(|e| { + BlockSlashInfo::SignatureNotChecked(header.clone(), BlockError::AvailabilityCheck(e)) + })?; chain .data_availability_checker .verify_kzg_for_available_block(&available_block) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index be73ef15d7..18e95f58f3 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -2,10 +2,12 @@ use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityC pub use crate::data_availability_checker::{ AvailableBlock, AvailableBlockData, MaybeAvailableBlock, }; +use crate::payload_envelope_verification::AvailableEnvelope; +use crate::payload_envelope_verification::gossip_verified_envelope::verify_envelope_consistency; use crate::{BeaconChainTypes, PayloadVerificationOutcome}; -use educe::Educe; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ @@ -40,15 +42,27 @@ impl LookupBlock { } } -/// A fully available block that has been constructed by range sync. -/// The block contains all the data required to import into fork choice. -/// This includes any and all blobs/columns required, including zero if -/// none are required. This can happen if the block is pre-deneb or if -/// it's simply past the DA boundary. -#[derive(Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -pub struct RangeSyncBlock { - block: AvailableBlock, +/// A block that has been constructed by range sync, ready for import. +/// Pre-Gloas: wraps an `AvailableBlock` with all data. +/// Gloas: carries the block and an optional envelope which contains the sidecar data. +/// +/// Note: In the gloas case, we only ensure that the block is consistent with the envelope +/// if the envelope is `Some` when constructing a `RangeSyncBlock` type. +/// If `envelope` is None, then there is no guarantee that the canonical chain also contains +/// an empty payload. The only way to ensure that is to process the next block. +#[derive(Clone)] +pub enum RangeSyncBlock { + Base(AvailableBlock), + Gloas { + block: Arc>, + envelope: Option>, + }, +} + +impl Hash for RangeSyncBlock { + fn hash(&self, state: &mut H) { + self.block_root().hash(state); + } } impl Debug for RangeSyncBlock { @@ -59,31 +73,48 @@ impl Debug for RangeSyncBlock { impl RangeSyncBlock { pub fn block_root(&self) -> Hash256 { - self.block.block_root() + match self { + Self::Base(block) => block.block_root(), + Self::Gloas { block, .. } => block.canonical_root(), + } } pub fn as_block(&self) -> &SignedBeaconBlock { - self.block.block() + match self { + Self::Base(block) => block.block(), + Self::Gloas { block, .. } => block, + } } pub fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + match self { + Self::Base(block) => block.block_cloned(), + Self::Gloas { block, .. } => block.clone(), + } } pub fn block_data(&self) -> &AvailableBlockData { - self.block.data() + match self { + Self::Base(block) => block.data(), + Self::Gloas { .. } => &AvailableBlockData::NoData, + } + } + + /// Returns the data columns associated with this block. For Gloas blocks the columns are + /// carried by the payload envelope rather than `block_data`, so this unwraps that case. + pub fn data_columns(&self) -> Option> { + match self { + Self::Base(block) => block.data().data_columns(), + Self::Gloas { envelope, .. } => envelope + .as_ref() + .map(|envelope| envelope.columns.clone()) + .filter(|columns| !columns.is_empty()), + } } } impl RangeSyncBlock { - /// Constructs an `RangeSyncBlock` from a block and availability data. - /// - /// # Errors - /// - /// Returns `AvailabilityCheckError` if: - /// - `InvalidAvailableBlockData`: Block data is provided but not required. - /// - `MissingBlobs`: Block requires blobs but they are missing or incomplete. - /// - `MissingCustodyColumns`: Block requires custody columns but they are incomplete. + /// Constructs a `RangeSyncBlock` from a block and availability data (pre-Gloas). pub fn new( block: Arc>, block_data: AvailableBlockData, @@ -93,33 +124,91 @@ impl RangeSyncBlock { where T: BeaconChainTypes, { + if block.fork_name_unchecked().gloas_enabled() { + return Err(AvailabilityCheckError::InvalidVariant); + } let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?; - Ok(Self { - block: available_block, - }) + Ok(Self::Base(available_block)) + } + + /// Constructs a Gloas `RangeSyncBlock` with block and optional `AvailableEnvelope` + /// which wraps the payload envelope with its data columns. + /// + /// This function only checks for consistency between the block and the envelope + /// if envelope.is_some() == true . + /// In the `None` case, we cannot guarantee that the payload is empty until we + /// process the block that builds on top of this block. + /// + /// Expects `block.canonical_root() == envelope.beacon_block_root` as they are coupled. + pub fn new_gloas( + block: Arc>, + envelope: Option>, + ) -> Result { + if let Some(envelope) = envelope.as_ref() { + let execution_bid = &block + .message() + .body() + .signed_execution_payload_bid() + .map_err(|e| format!("missing signed_execution_payload_bid: {e:?}"))? + .message; + // Skip the finalized-slot check; range sync imports historical (finalized) blocks. + let latest_finalized_slot = Slot::new(0); + verify_envelope_consistency( + envelope.message(), + &block, + execution_bid, + latest_finalized_slot, + ) + .map_err(|e| format!("Inconsistent envelope: {e:?}"))?; + } + + Ok(Self::Gloas { block, envelope }) } #[allow(clippy::type_complexity)] pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { - self.block.deconstruct() + match self { + Self::Base(block) => block.deconstruct(), + Self::Gloas { block, .. } => { + (block.canonical_root(), block, AvailableBlockData::NoData) + } + } } pub fn n_blobs(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, - AvailableBlockData::Blobs(blobs) => blobs.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + }, + Self::Gloas { .. } => 0, } } pub fn n_data_columns(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, - AvailableBlockData::DataColumns(columns) => columns.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + }, + Self::Gloas { .. } => 0, } } - pub fn into_available_block(self) -> AvailableBlock { - self.block + /// Converts into an `AvailableBlock` for import, returning any associated envelope + /// separately. Callers processing Gloas blocks must handle the envelope themselves. + #[allow(clippy::type_complexity)] + pub fn into_available_block( + self, + ) -> Result<(AvailableBlock, Option>), AvailabilityCheckError> { + match self { + Self::Base(block) => Ok((block, None)), + Self::Gloas { block, envelope } => { + let available = + AvailableBlock::new_gloas(block).map_err(AvailabilityCheckError::Unexpected)?; + Ok((available, envelope)) + } + } } } @@ -405,13 +494,13 @@ impl AsBlock for RangeSyncBlock { self.as_block().message() } fn as_block(&self) -> &SignedBeaconBlock { - self.block.as_block() + RangeSyncBlock::as_block(self) } fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + RangeSyncBlock::block_cloned(self) } fn canonical_root(&self) -> Hash256 { - self.block.block_root() + self.block_root() } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 9829db0f1d..a0b117f072 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -669,7 +669,7 @@ impl DataAvailabilityChecker { /// Verify a batch of data columns belonging to a single block, picking the right commitment /// source for the block's fork (Fulu: inline on column; Gloas: from the embedded payload bid). -fn verify_columns_against_block( +pub fn verify_columns_against_block( kzg: &Kzg, block: &SignedBeaconBlock, columns: &[Arc>], @@ -792,7 +792,12 @@ async fn availability_cache_maintenance_service( #[derive(Debug, Clone)] // TODO(#8633) move this to `block_verification_types.rs` pub enum AvailableBlockData { - /// Block is pre-Deneb or has zero blobs + /// Block has no inline DA object for block import. + /// + /// This covers: + /// - pre-Deneb blocks, + /// - blocks with zero blobs, and + /// - Gloas blocks, where DA is checked on the payload envelope instead. NoData, /// Block is post-Deneb, pre-PeerDAS and has more than zero blobs Blobs(BlobSidecarList), @@ -953,6 +958,19 @@ impl AvailableBlock { }) } + pub fn new_gloas(block: Arc>) -> Result { + if block.fork_name_unchecked().gloas_enabled() { + Ok(Self { + block_root: block.canonical_root(), + block, + blob_data: AvailableBlockData::NoData, + blobs_available_timestamp: None, + }) + } else { + Err("Block is not gloas".to_owned()) + } + } + pub fn block(&self) -> &SignedBeaconBlock { &self.block } @@ -1294,7 +1312,7 @@ mod test { let available_blocks = blocks_with_columns .into_iter() - .map(|block| block.into_available_block()) + .map(|block| block.into_available_block().unwrap().0) .collect::>(); // WHEN verifying all blocks together (totalling 256 data columns) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index a10372b933..00806f0e17 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -4,9 +4,10 @@ use std::time::Duration; use eth2::types::{EventKind, SseExecutionPayload, SseExecutionPayloadAvailable}; use fork_choice::PayloadVerificationStatus; use slot_clock::SlotClock; +use state_processing::{VerifySignatures, envelope_processing::verify_execution_payload_envelope}; use store::StoreOp; use tracing::{debug, error, info, info_span, instrument, warn}; -use types::{BlockImportSource, Hash256, SignedExecutionPayloadEnvelope}; +use types::{BlockImportSource, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use super::{ AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, @@ -19,6 +20,7 @@ use crate::{ metrics, payload_envelope_verification::{ AvailabilityPendingExecutedEnvelope, ExecutionPendingEnvelope, + load_snapshot_from_state_root, payload_notifier::PayloadNotifier, }, validator_monitor::get_slot_delay_ms, }; @@ -372,4 +374,58 @@ impl BeaconChain { )); } } + + /// Process an envelope received during range sync. The associated block must already + /// be imported into fork choice. This performs signature verification, state processing, + /// EL verification and import. + #[instrument(skip_all, level = "debug")] + pub async fn process_range_sync_envelope( + self: &Arc, + available_envelope: AvailableEnvelope, + block_root: Hash256, + block: Arc>, + ) -> Result<(), EnvelopeError> { + let signed_envelope = available_envelope.envelope().clone(); + + // Load the state snapshot for envelope processing + let state_root = block.state_root(); + let snapshot = load_snapshot_from_state_root::(block_root, state_root, &self.store)?; + + // Verify envelope signature and state processing + verify_execution_payload_envelope( + &snapshot.pre_state, + &signed_envelope, + VerifySignatures::True, + snapshot.state_root, + &self.spec, + )?; + + // Send to EL for verification + let payload_notifier = PayloadNotifier::new( + self.clone(), + signed_envelope.clone(), + block, + NotifyExecutionLayer::Yes, + )?; + + let payload_verification_status = payload_notifier.notify_new_payload().await?; + + // Import directly — we already have all components (envelope + columns). + let chain = self.clone(); + let _ = self + .spawn_blocking_handle( + move || { + chain.import_execution_payload_envelope( + available_envelope, + block_root, + payload_verification_status, + ) + }, + "range_sync_envelope_import", + ) + .await + .map_err(|e| EnvelopeError::BeaconChainError(Box::new(e)))?; + + Ok(()) + } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index a1cbac35b3..a0d34949c6 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -40,7 +40,7 @@ mod payload_notifier; pub use execution_pending_envelope::ExecutionPendingEnvelope; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AvailableEnvelope { envelope: Arc>, pub columns: DataColumnSidecarList, @@ -54,6 +54,10 @@ impl AvailableEnvelope { Self { envelope, columns } } + pub fn envelope(&self) -> &Arc> { + &self.envelope + } + pub fn message(&self) -> &ExecutionPayloadEnvelope { &self.envelope.message } @@ -180,6 +184,28 @@ impl std::fmt::Display for EnvelopeError { } } +impl EnvelopeError { + pub fn penalize_peer(&self) -> bool { + match self { + EnvelopeError::BadSignature + | EnvelopeError::BuilderIndexMismatch { .. } + | EnvelopeError::SlotMismatch { .. } + | EnvelopeError::BlockHashMismatch { .. } + | EnvelopeError::UnknownValidator { .. } + | EnvelopeError::IncorrectBlockProposer { .. } + | EnvelopeError::EnvelopeProcessingError(_) => true, + EnvelopeError::ExecutionPayloadError(e) => e.penalize_peer(), + EnvelopeError::BlockRootUnknown { .. } + | EnvelopeError::PriorToFinalization { .. } + | EnvelopeError::BeaconChainError(_) + | EnvelopeError::BeaconStateError(_) + | EnvelopeError::OptimisticSyncNotSupported { .. } + | EnvelopeError::BlockRootNotInForkChoice(_) + | EnvelopeError::InternalError(_) => false, + } + } +} + impl From for EnvelopeError { fn from(e: BeaconChainError) -> Self { EnvelopeError::BeaconChainError(Box::new(e)) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index db2a9a902d..62c7fb3a45 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -4,6 +4,7 @@ use crate::data_availability_checker::DataAvailabilityChecker; use crate::graffiti_calculator::GraffitiSettings; use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sidecars_gloas}; use crate::observed_operations::ObservationOutcome; +use crate::payload_envelope_verification::AvailableEnvelope; pub use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::{BeaconBlockResponseWrapper, CustodyContext, get_block_root}; use crate::{ @@ -2929,18 +2930,29 @@ where block: Arc>, ) -> RangeSyncBlock { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); + let is_gloas = block.fork_name_unchecked().gloas_enabled(); // For Gloas, kzg commitments live in the bid (`signed_execution_payload_bid`), so the // body's `blob_kzg_commitments()` accessor returns Err. `num_expected_blobs` already // handles both shapes. let has_blobs = block.num_expected_blobs() > 0; if !has_blobs { - return RangeSyncBlock::new( - block, - AvailableBlockData::NoData, - &self.chain.data_availability_checker, - self.chain.spec.clone(), - ) - .unwrap(); + return if is_gloas { + let envelope = self + .chain + .get_payload_envelope(&block_root) + .unwrap() + .map(Arc::new) + .map(|envelope| AvailableEnvelope::new(envelope, vec![])); + RangeSyncBlock::new_gloas(block, envelope).unwrap() + } else { + RangeSyncBlock::new( + block, + AvailableBlockData::NoData, + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .unwrap() + }; } // Blobs are stored as data columns from Fulu (PeerDAS) @@ -2952,14 +2964,24 @@ where .unwrap() .unwrap(); let custody_columns = columns.into_iter().collect::>(); - let block_data = AvailableBlockData::new_with_data_columns(custody_columns); - RangeSyncBlock::new( - block, - block_data, - &self.chain.data_availability_checker, - self.chain.spec.clone(), - ) - .unwrap() + if is_gloas { + let envelope = self + .chain + .get_payload_envelope(&block_root) + .unwrap() + .map(Arc::new) + .map(|envelope| AvailableEnvelope::new(envelope, custody_columns)); + RangeSyncBlock::new_gloas(block, envelope).unwrap() + } else { + let block_data = AvailableBlockData::new_with_data_columns(custody_columns); + RangeSyncBlock::new( + block, + block_data, + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .unwrap() + } } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); let block_data = if let Some(blobs) = blobs { @@ -2984,6 +3006,19 @@ where block: Arc>>, blob_items: Option<(KzgProofs, BlobsList)>, ) -> Result, BlockError> { + if block.fork_name_unchecked().gloas_enabled() { + let columns = blob_items + .map(|_| generate_data_column_sidecars_from_block(&block, &self.spec)) + .unwrap_or_default(); + let envelope = self + .chain + .get_payload_envelope(&block.canonical_root()) + .map_err(|e| BlockError::BeaconChainError(Box::new(e)))? + .map(Arc::new) + .map(|envelope| AvailableEnvelope::new(envelope, columns)); + return RangeSyncBlock::new_gloas(block, envelope).map_err(BlockError::InternalError); + } + Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let epoch = block.slot().epoch(E::slots_per_epoch()); let sampling_columns = self.chain.sampling_columns_for_epoch(epoch); diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 862c2a9fe8..9d32b37134 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -235,7 +235,7 @@ async fn produces_attestations() { let range_sync_block = harness .build_range_sync_block_from_store_blobs(Some(block_root), Arc::new(block.clone())); - let available_block = range_sync_block.into_available_block(); + let (available_block, _envelope) = range_sync_block.into_available_block().unwrap(); // For Gloas non-same-slot attestations, the early attester cache returns None. let is_same_slot_attestation = slot == block_slot; @@ -300,12 +300,13 @@ async fn early_attester_cache_old_request() { .get_block(&head.beacon_block_root) .unwrap(); - let available_block = harness + let (available_block, _envelope) = harness .build_range_sync_block_from_store_blobs( Some(head.beacon_block_root), head.beacon_block.clone(), ) - .into_available_block(); + .into_available_block() + .unwrap(); harness .chain diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index d7bba13eb0..94d4b3b9da 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -3,13 +3,14 @@ use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, LookupBlock, RangeSyncBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; +use beacon_chain::payload_envelope_verification::AvailableEnvelope; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, ExecutionPendingBlock, WhenSlotSkipped, custody_context::NodeCustodyType, test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, - MakeAttestationOptions, fork_name_from_env, test_spec, + MakeAttestationOptions, test_spec, }, }; use beacon_chain::{ @@ -158,19 +159,37 @@ where .zip(chain_segment_sidecars.iter()) .map(|(snapshot, data_sidecars)| { let block = snapshot.beacon_block.clone(); - build_range_sync_block(block, data_sidecars, chain.clone()) + build_range_sync_block( + block, + snapshot.execution_envelope.clone(), + data_sidecars, + chain.clone(), + ) }) .collect() } fn build_range_sync_block( block: Arc>, + execution_envelope: Option>>, data_sidecars: &Option>, chain: Arc>, ) -> RangeSyncBlock where T: BeaconChainTypes, { + if block.fork_name_unchecked().gloas_enabled() { + let columns = match data_sidecars { + Some(DataSidecars::DataColumns(columns)) => columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + Some(DataSidecars::Blobs(_)) | None => vec![], + }; + let envelope = execution_envelope.map(|envelope| AvailableEnvelope::new(envelope, columns)); + return RangeSyncBlock::new_gloas(block, envelope).unwrap(); + } + match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { let block_data = AvailableBlockData::new_with_blobs(blobs.clone()); @@ -286,6 +305,14 @@ fn update_proposal_signatures( } } +fn update_envelope_block_root(snapshot: &mut BeaconSnapshot) { + if let Some(envelope) = snapshot.execution_envelope.as_ref() { + let mut envelope = envelope.as_ref().clone(); + envelope.message.beacon_block_root = snapshot.beacon_block.canonical_root(); + snapshot.execution_envelope = Some(Arc::new(envelope)); + } +} + fn update_parent_roots(snapshots: &mut [BeaconSnapshot], blobs: &mut [Option>]) { for i in 0..snapshots.len() { let root = snapshots[i].beacon_block.canonical_root(); @@ -304,6 +331,7 @@ fn update_parent_roots(snapshots: &mut [BeaconSnapshot], blobs: &mut [Option< } } child.beacon_block = new_child; + update_envelope_block_root(child); } } } @@ -369,10 +397,6 @@ fn update_data_column_signed_header( #[tokio::test] async fn chain_segment_full_segment() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; store_envelopes_for_chain_segment(chain_segment, &harness); @@ -413,10 +437,6 @@ async fn chain_segment_full_segment() { #[tokio::test] async fn chain_segment_varying_chunk_size() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); let blocks: Vec> = @@ -495,13 +515,18 @@ async fn chain_segment_non_linear_parent_roots() { let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.parent_root_mut() = Hash256::zero(); - blocks[3] = RangeSyncBlock::new( - Arc::new(SignedBeaconBlock::from_block(block, signature)), - blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.spec.clone(), - ) - .unwrap(); + let mutated_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + blocks[3] = if mutated_block.fork_name_unchecked().gloas_enabled() { + RangeSyncBlock::new_gloas(mutated_block, None).unwrap() + } else { + RangeSyncBlock::new( + mutated_block, + blocks[3].block_data().clone(), + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap() + }; assert!( matches!( @@ -535,13 +560,18 @@ async fn chain_segment_non_linear_slots() { .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = Slot::new(0); - blocks[3] = RangeSyncBlock::new( - Arc::new(SignedBeaconBlock::from_block(block, signature)), - blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.spec.clone(), - ) - .unwrap(); + let mutated_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + blocks[3] = if mutated_block.fork_name_unchecked().gloas_enabled() { + RangeSyncBlock::new_gloas(mutated_block, None).unwrap() + } else { + RangeSyncBlock::new( + mutated_block, + blocks[3].block_data().clone(), + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap() + }; assert!( matches!( @@ -565,13 +595,18 @@ async fn chain_segment_non_linear_slots() { .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = blocks[2].slot(); - blocks[3] = RangeSyncBlock::new( - Arc::new(SignedBeaconBlock::from_block(block, signature)), - blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.chain.spec.clone(), - ) - .unwrap(); + let mutated_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + blocks[3] = if mutated_block.fork_name_unchecked().gloas_enabled() { + RangeSyncBlock::new_gloas(mutated_block, None).unwrap() + } else { + RangeSyncBlock::new( + mutated_block, + blocks[3].block_data().clone(), + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ) + .unwrap() + }; assert!( matches!( @@ -599,7 +634,12 @@ async fn assert_invalid_signature( .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect(); @@ -638,7 +678,12 @@ async fn assert_invalid_signature( .zip(chain_segment_blobs.iter()) .filter(|(snapshot, _)| snapshot.beacon_block.slot() > finalized_slot) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect(); // We don't care if this fails, we just call this to ensure that all prior blocks have been @@ -656,6 +701,7 @@ async fn assert_invalid_signature( snapshots[block_index].beacon_block.canonical_root(), build_range_sync_block( snapshots[block_index].beacon_block.clone(), + snapshots[block_index].execution_envelope.clone(), &chain_segment_blobs[block_index], harness.chain.clone(), ), @@ -697,10 +743,6 @@ async fn get_invalid_sigs_harness( } #[tokio::test] async fn invalid_signature_gossip_block() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, chain_segment_blobs) = get_chain_segment().await; for &block_index in BLOCK_INDICES { // Ensure the block will be rejected if imported on its own (without gossip checking). @@ -721,7 +763,12 @@ async fn invalid_signature_gossip_block() { .take(block_index) .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect(); harness @@ -757,10 +804,6 @@ async fn invalid_signature_gossip_block() { #[tokio::test] async fn invalid_signature_block_proposal() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, chain_segment_blobs) = get_chain_segment().await; for &block_index in BLOCK_INDICES { let harness = get_invalid_sigs_harness(chain_segment).await; @@ -778,7 +821,12 @@ async fn invalid_signature_block_proposal() { .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect::>(); // Ensure the block will be rejected if imported in a chain segment. @@ -800,10 +848,6 @@ async fn invalid_signature_block_proposal() { #[tokio::test] async fn invalid_signature_randao_reveal() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); for &block_index in BLOCK_INDICES { @@ -817,6 +861,7 @@ async fn invalid_signature_randao_reveal() { *block.body_mut().randao_reveal_mut() = junk_signature(); snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -833,10 +878,6 @@ async fn invalid_signature_randao_reveal() { #[tokio::test] async fn invalid_signature_proposer_slashing() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); for &block_index in BLOCK_INDICES { @@ -864,6 +905,7 @@ async fn invalid_signature_proposer_slashing() { .expect("should update proposer slashing"); snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -880,10 +922,6 @@ async fn invalid_signature_proposer_slashing() { #[tokio::test] async fn invalid_signature_attester_slashing() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); for &block_index in BLOCK_INDICES { @@ -990,6 +1028,7 @@ async fn invalid_signature_attester_slashing() { } snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -1006,10 +1045,6 @@ async fn invalid_signature_attester_slashing() { #[tokio::test] async fn invalid_signature_attestation() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); let mut checked_attestation = false; @@ -1060,6 +1095,7 @@ async fn invalid_signature_attestation() { if block.body().attestations_len() > 0 { snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -1112,13 +1148,19 @@ async fn invalid_signature_deposit() { .expect("should update deposit"); snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - build_range_sync_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + build_range_sync_block( + snapshot.beacon_block.clone(), + snapshot.execution_envelope.clone(), + blobs, + harness.chain.clone(), + ) }) .collect(); assert!( @@ -1137,10 +1179,6 @@ async fn invalid_signature_deposit() { #[tokio::test] async fn invalid_signature_exit() { - // TODO(gloas): re-enable for Gloas once range sync imports payload envelopes. - if fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { - return; - } let (chain_segment, ref_blobs) = get_chain_segment().await; let mut chain_segment_blobs = ref_blobs.clone(); for &block_index in BLOCK_INDICES { @@ -1165,6 +1203,7 @@ async fn invalid_signature_exit() { .expect("should update deposit"); snapshots[block_index].beacon_block = Arc::new(SignedBeaconBlock::from_block(block, signature)); + update_envelope_block_root(&mut snapshots[block_index]); update_parent_roots(&mut snapshots, &mut chain_segment_blobs); update_proposal_signatures(&mut snapshots, &harness); assert_invalid_signature( @@ -2158,13 +2197,19 @@ async fn import_duplicate_block_unrealized_justification() { // Create two verified variants of the block, representing the same block being processed in // parallel. let notify_execution_layer = NotifyExecutionLayer::Yes; - let range_sync_block = RangeSyncBlock::new( - block.clone(), - AvailableBlockData::NoData, - &harness.chain.data_availability_checker, - harness.spec.clone(), - ) - .unwrap(); + let range_sync_block = if block.fork_name_unchecked().gloas_enabled() { + // Fine to pass no envelope as we are testing duplicate block import + // which is not related. + RangeSyncBlock::new_gloas(block.clone(), None).unwrap() + } else { + RangeSyncBlock::new( + block.clone(), + AvailableBlockData::NoData, + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap() + }; let verified_block1 = range_sync_block .clone() .into_execution_pending_block(block_root, chain, notify_execution_layer) @@ -2239,6 +2284,111 @@ async fn import_execution_pending_block( } } +async fn make_gloas_range_sync_block_inputs() +-> Option<(Arc>, SignedExecutionPayloadEnvelope)> { + let spec = test_spec::(); + if !spec.fork_name_at_slot::(Slot::new(1)).gloas_enabled() { + return None; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.into()) + .keypairs(KEYPAIRS[0..VALIDATOR_COUNT].to_vec()) + .node_custody_type(NodeCustodyType::Supernode) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + let state = harness.get_current_state(); + let slot = harness.get_current_slot(); + let ((block, _), envelope, _) = harness.make_block_with_envelope(state, slot).await; + Some((block, envelope.expect("gloas block should have envelope"))) +} + +#[tokio::test] +async fn range_sync_block_new_gloas_accepts_matching_envelope() { + let Some((block, envelope)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); + + assert!( + result.is_ok(), + "new_gloas should accept matching block/envelope, got: {:?}", + result + ); +} + +#[tokio::test] +async fn range_sync_block_new_gloas_allows_missing_envelope() { + let Some((block, _)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + let result = RangeSyncBlock::new_gloas(block, None); + + assert!( + result.is_ok(), + "new_gloas should allow None envelope, got: {:?}", + result + ); +} + +#[tokio::test] +async fn range_sync_block_new_gloas_rejects_slot_mismatch() { + let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + envelope.message.payload.slot_number += 1; + let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); + + assert!( + matches!(result, Err(ref err) if err.contains("SlotMismatch")), + "new_gloas should reject mismatched slot, got: {:?}", + result + ); +} + +#[tokio::test] +async fn range_sync_block_new_gloas_rejects_builder_index_mismatch() { + let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + envelope.message.builder_index += 1; + let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); + + assert!( + matches!(result, Err(ref err) if err.contains("BuilderIndexMismatch")), + "new_gloas should reject mismatched builder index, got: {:?}", + result + ); +} + +#[tokio::test] +async fn range_sync_block_new_gloas_rejects_block_hash_mismatch() { + let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + return; + }; + + envelope.message.payload.block_hash = ExecutionBlockHash::repeat_byte(0x22); + let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); + + assert!( + matches!(result, Err(ref err) if err.contains("BlockHashMismatch")), + "new_gloas should reject mismatched block hash, got: {:?}", + result + ); +} + // Test that RpcBlock::new() rejects blocks when blob count doesn't match expected. #[tokio::test] async fn range_sync_block_construction_fails_with_wrong_blob_count() { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index b70961c499..4d392ef524 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3304,7 +3304,8 @@ async fn weak_subjectivity_sync_test( let range_sync_block = harness .build_range_sync_block_from_store_blobs(Some(block_root), Arc::new(full_block)); - let fully_available_block = range_sync_block.into_available_block(); + let (fully_available_block, _envelope) = + range_sync_block.into_available_block().unwrap(); harness .chain .data_availability_checker diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 8843541c11..dce4713245 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -533,7 +533,8 @@ mod tests { use super::*; use beacon_chain::{ PayloadVerificationStatus, - block_verification_types::{AvailableBlockData, RangeSyncBlock}, + block_verification_types::AvailableBlockData, + data_availability_checker::AvailableBlock, test_utils::{ BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, generate_data_column_sidecars_from_block, @@ -611,14 +612,13 @@ mod tests { "precondition: {fork_name:?} test block must produce data columns" ); - let available_block = RangeSyncBlock::new( + let available_block = AvailableBlock::new( block.clone(), AvailableBlockData::new_with_data_columns(data_columns), &chain.data_availability_checker, chain.spec.clone(), ) - .unwrap() - .into_available_block(); + .unwrap(); let current_slot = harness.get_current_slot(); let cached_head = chain.canonical_head.cached_head(); diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index ed9dc5666a..b444608468 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -726,20 +726,17 @@ pub fn rpc_data_column_limits( spec: &ChainSpec, ) -> RpcLimits { let fork_name = spec.fork_name_at_epoch(current_digest_epoch); + let max_blobs = spec.max_blobs_per_block(current_digest_epoch) as usize; if fork_name.gloas_enabled() { RpcLimits::new( DataColumnSidecarGloas::::min_size(), - DataColumnSidecarFulu::::max_size( - spec.max_blobs_per_block(current_digest_epoch) as usize - ), + DataColumnSidecarFulu::::max_size(max_blobs), ) } else { RpcLimits::new( DataColumnSidecarFulu::::min_size(), - DataColumnSidecarFulu::::max_size( - spec.max_blobs_per_block(current_digest_epoch) as usize - ), + DataColumnSidecarFulu::::max_size(max_blobs), ) } } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 1d0d181cb3..4a8c6c55eb 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -31,6 +31,8 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Payload envelopes by range request + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -57,6 +59,12 @@ pub struct BlobsByRangeRequestId { pub parent_request_id: ComponentsByRangeRequestId, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct PayloadEnvelopesByRangeRequestId { + pub id: Id, + pub parent_request_id: ComponentsByRangeRequestId, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct DataColumnsByRangeRequestId { /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` @@ -259,6 +267,12 @@ macro_rules! impl_display { impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!( + PayloadEnvelopesByRangeRequestId, + "{}/{}", + id, + parent_request_id +); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 528a261bb8..e2226af094 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -682,10 +682,26 @@ impl NetworkBeaconProcessor { downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); - let available_blocks = downloaded_blocks + let available_blocks = match downloaded_blocks .into_iter() - .map(|block| block.into_available_block()) - .collect::>(); + .map(|block| { + block + .into_available_block() + .map(|(available, _envelope)| available) + }) + .collect::, _>>() + { + Ok(blocks) => blocks, + Err(e) => { + return ( + 0, + Err(ChainSegmentFailed { + peer_action: Some(PeerAction::LowToleranceError), + message: format!("Block failed availability construction: {:?}", e), + }), + ); + } + }; // TODO(gloas) when implementing backfill sync for gloas // we need a batch verify kzg function in the new da checker @@ -893,6 +909,17 @@ impl NetworkBeaconProcessor { peer_action: None, }) } + ref err @ BlockError::EnvelopeError(ref envelope_error) => { + debug!(error = ?err, "Invalid execution payload envelope"); + Err(ChainSegmentFailed { + message: format!("Invalid execution payload envelope: {err:?}"), + peer_action: if envelope_error.penalize_peer() { + Some(PeerAction::LowToleranceError) + } else { + None + }, + }) + } ref err @ BlockError::ExecutionPayloadError(ref epe) => { if !epe.penalize_peer() { // These errors indicate an issue with the EL and not the `ChainSegment`. @@ -1030,9 +1057,16 @@ impl From> for BlockProcessingR None } } - BlockError::EnvelopeError(_) => { - // TODO(gloas): penalize correctly in range sync PR - None + BlockError::EnvelopeError(epe) => { + if epe.penalize_peer() { + Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::BlockPeer, + (&e).into(), + )) + } else { + None + } } // Remaining invalid blocks: penalize the block peer. Listed explicitly so a // new `BlockError` variant forces a compile error here. diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 277ece0aa8..315ec9387d 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -352,10 +352,8 @@ impl Router { Response::PayloadEnvelopesByRoot(envelope) => { self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); } - // TODO(EIP-7732): implement outgoing payload envelopes by range responses - // once sync manager requests them. - Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by range not supported yet"); + Response::PayloadEnvelopesByRange(envelope) => { + self.on_payload_envelopes_by_range_response(peer_id, app_request_id, envelope); } // Lighthouse currently only serves BlocksByHead and does not issue it as a client, // so receiving a response is unexpected. Drop it without crashing. @@ -802,6 +800,29 @@ impl Router { }); } + /// Handle a `PayloadEnvelopesByRange` response from the peer. + pub fn on_payload_envelopes_by_range_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(id @ SyncRequestId::PayloadEnvelopesByRange { .. }) => id, + other => { + crit!(request = ?other, %peer_id, "PayloadEnvelopesByRange response on incorrect request"); + return; + } + }; + + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 36816fb5d6..c8bb17243e 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -351,6 +351,9 @@ impl BackFillSync { CouplingError::BlobPeerFailure(msg) => { debug!(?batch_id, msg, "Blob peer failure"); } + CouplingError::EnvelopePeerFailure(msg) => { + debug!(?batch_id, msg, "Envelope peer failure"); + } CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 10af1bf503..8d40ec8b7f 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -34,6 +34,7 @@ pub type BatchId = Epoch; pub enum ByRangeRequestType { BlocksAndColumns, BlocksAndBlobs, + BlocksAndEnvelopesAndColumns, Blocks, Columns(HashSet), } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 999b3dd30e..b64ae4a4c5 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,3 +1,4 @@ +use beacon_chain::payload_envelope_verification::AvailableEnvelope; use beacon_chain::{ BeaconChainTypes, block_verification_types::{AvailableBlockData, RangeSyncBlock}, @@ -9,14 +10,15 @@ use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + PayloadEnvelopesByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; use std::{collections::HashMap, sync::Arc}; -use tracing::{Span, debug}; +use tracing::{Span, debug, warn}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; @@ -37,6 +39,13 @@ pub struct RangeBlockComponentsRequest { blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, + /// Payload envelopes for Gloas blocks. + payloads_request: Option< + ByRangeRequest< + PayloadEnvelopesByRangeRequestId, + Vec>>, + >, + >, /// Span to track the range request and all children range requests. pub(crate) request_span: Span, } @@ -71,6 +80,7 @@ pub enum CouplingError { exceeded_retries: bool, }, BlobPeerFailure(String), + EnvelopePeerFailure(String), } impl RangeBlockComponentsRequest { @@ -88,6 +98,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + payloads_req_id: Option, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { @@ -110,6 +121,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, + payloads_request: payloads_req_id.map(ByRangeRequest::Active), request_span, } } @@ -191,6 +203,17 @@ impl RangeBlockComponentsRequest { } } + pub fn add_payload_envelopes( + &mut self, + req_id: PayloadEnvelopesByRangeRequestId, + envelopes: Vec>>, + ) -> Result<(), String> { + match &mut self.payloads_request { + Some(req) => req.finish(req_id, envelopes), + None => Err("received payload envelopes but none were expected".to_owned()), + } + } + /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. @@ -208,6 +231,11 @@ impl RangeBlockComponentsRequest { return None; }; + // Check if payload envelopes are still pending + if let Some(ByRangeRequest::Active(_)) = &self.payloads_request { + return None; + } + // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( @@ -254,6 +282,12 @@ impl RangeBlockComponentsRequest { } } + let payload_envelopes = self.payloads_request.as_ref().and_then(|request| { + request + .to_finished() + .map(|payload_envelopes| payload_envelopes.to_vec()) + }); + let resp = Self::responses_with_custody_columns( blocks.to_vec(), data_columns, @@ -262,6 +296,7 @@ impl RangeBlockComponentsRequest { *attempt, da_checker, spec, + payload_envelopes, ); if let Err(CouplingError::DataColumnPeerFailure { @@ -352,6 +387,7 @@ impl RangeBlockComponentsRequest { Ok(responses) } + #[allow(clippy::too_many_arguments)] fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, @@ -360,10 +396,19 @@ impl RangeBlockComponentsRequest { attempt: usize, da_checker: Arc>, spec: Arc, + payload_envelopes: Option>>>, ) -> Result>, CouplingError> where T: BeaconChainTypes, { + // Index envelopes by beacon_block_root for correct coupling. + let mut envelopes_by_block_root = payload_envelopes.map(|envelopes| { + envelopes + .into_iter() + .map(|e| (e.beacon_block_root(), e)) + .collect::>() + }); + // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); @@ -393,7 +438,7 @@ impl RangeBlockComponentsRequest { let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; for block in blocks { let block_root = get_block_root(&block); - range_sync_blocks.push(if block.num_expected_blobs() > 0 { + let custody_columns = if block.num_expected_blobs() > 0 { let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) else { let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); @@ -401,7 +446,6 @@ impl RangeBlockComponentsRequest { error: format!("No columns for block {block_root:?} with data"), faulty_peers: responsible_peers, exceeded_retries, - }); }; @@ -415,16 +459,21 @@ impl RangeBlockComponentsRequest { custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); } else { let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); + return Err(CouplingError::InternalError(format!( + "Internal error, no request made for column {}", + index + ))); }; naughty_peers.push((*index, *responsible_peer)); } } if !naughty_peers.is_empty() { return Err(CouplingError::DataColumnPeerFailure { - error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), + error: format!( + "Peers did not return column for block_root {block_root:?} {naughty_peers:?}" + ), faulty_peers: naughty_peers, - exceeded_retries + exceeded_retries, }); } @@ -439,15 +488,31 @@ impl RangeBlockComponentsRequest { ); } - let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); - - RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) - .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + custody_columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>() } else { - // Block has no data, expects zero columns + vec![] + }; + let range_sync_block = if let Some(envelopes_by_block_root) = + envelopes_by_block_root.as_mut() + { + let envelope = envelopes_by_block_root.remove(&block_root); + let available_envelope = + envelope.map(|env| AvailableEnvelope::new(env, custody_columns)); + + RangeSyncBlock::new_gloas(block, available_envelope) + .map_err(CouplingError::EnvelopePeerFailure)? + } else if custody_columns.is_empty() { RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? - }); + } else { + let block_data = AvailableBlockData::new_with_data_columns(custody_columns); + RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + }; + range_sync_blocks.push(range_sync_block); } // Assert that there are no columns left for other blocks @@ -458,6 +523,13 @@ impl RangeBlockComponentsRequest { debug!(?remaining_roots, "Not all columns consumed for block"); } + // Recoverable error, log and continue + if let Some(envelopes_by_block_root) = envelopes_by_block_root + && !envelopes_by_block_root.is_empty() + { + warn!("Peer returned extra envelopes not matching any block"); + } + Ok(range_sync_blocks) } } @@ -489,21 +561,28 @@ mod tests { use crate::sync::network_context::MAX_COLUMN_RETRIES; use super::RangeBlockComponentsRequest; + use beacon_chain::block_verification_types::RangeSyncBlock; use beacon_chain::custody_context::NodeCustodyType; + use beacon_chain::data_availability_checker::DataAvailabilityChecker; use beacon_chain::test_utils::{ - NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, - test_da_checker, test_spec, + EphemeralHarnessType, NumBlobs, generate_rand_block_and_blobs, + generate_rand_block_and_data_columns, test_da_checker, test_spec, }; + use bls::Signature; use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, RangeRequestId, + DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, + PayloadEnvelopesByRangeRequestId, RangeRequestId, }, }; use std::{collections::HashMap, sync::Arc}; use tracing::Span; - use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; + use types::{ + ChainSpec, DataColumnSidecarList, Epoch, ExecutionPayloadEnvelope, ForkName, + MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, + }; fn components_id() -> ComponentsByRangeRequestId { ComponentsByRangeRequestId { @@ -538,6 +617,15 @@ mod tests { } } + fn payloads_id( + parent_request_id: ComponentsByRangeRequestId, + ) -> PayloadEnvelopesByRangeRequestId { + PayloadEnvelopesByRangeRequestId { + id: 1, + parent_request_id, + } + } + fn columns_id( id: Id, parent_request_id: DataColumnsByRangeRequester, @@ -555,8 +643,166 @@ mod tests { info.responses(da_checker, spec).is_some() } + fn gloas_spec() -> ChainSpec { + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + spec + } + + fn matching_envelope(block: &SignedBeaconBlock) -> Arc> { + let bid = &block + .message() + .body() + .signed_execution_payload_bid() + .expect("Gloas block should have payload bid") + .message; + let mut envelope = SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope::empty(), + signature: Signature::empty(), + }; + envelope.message.beacon_block_root = block.canonical_root(); + envelope.message.parent_beacon_block_root = block.parent_root(); + envelope.message.builder_index = bid.builder_index; + envelope.message.payload.slot_number = block.slot(); + envelope.message.payload.parent_hash = bid.parent_block_hash; + envelope.message.payload.block_hash = bid.block_hash; + Arc::new(envelope) + } + + #[allow(clippy::type_complexity)] + fn make_gloas_blocks_and_columns( + count: usize, + spec: &ChainSpec, + ) -> Vec<( + Arc>, + DataColumnSidecarList, + Arc>, + )> { + let mut u = types::test_utils::test_unstructured(); + (0..count) + .map(|_| { + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut u, + spec, + ) + .unwrap(); + let envelope = matching_envelope(&block); + (Arc::new(block), data_columns, envelope) + }) + .collect() + } + + #[allow(clippy::type_complexity)] + fn add_all_columns( + info: &mut RangeBlockComponentsRequest, + blocks: &[( + Arc>, + DataColumnSidecarList, + Arc>, + )], + columns_req_id: &[(DataColumnsByRangeRequestId, Vec)], + expected_custody_columns: &[u64], + ) { + for (i, &column_index) in expected_custody_columns.iter().enumerate() { + let (req, _columns) = columns_req_id.get(i).unwrap(); + info.add_custody_columns( + *req, + blocks + .iter() + .flat_map(|(_, columns, _)| { + columns + .iter() + .filter(|column| *column.index() == column_index) + .cloned() + }) + .collect(), + ) + .unwrap(); + } + } + + #[allow(clippy::type_complexity)] + struct GloasSetup { + info: RangeBlockComponentsRequest, + da_checker: Arc>>, + spec: Arc, + blocks: Vec<( + Arc>, + DataColumnSidecarList, + Arc>, + )>, + payloads_req_id: PayloadEnvelopesByRangeRequestId, + expected_custody_columns: Vec, + } + + /// Builds a Gloas coupling request with `count` blocks and all custody columns added, + /// ready for the per-test payload-envelope step. + fn setup_gloas_coupling(count: usize) -> GloasSetup { + let spec = Arc::new(gloas_spec()); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let expected_custody_columns = da_checker + .custody_context() + .sampling_columns_for_epoch(Epoch::new(0), &spec) + .to_vec(); + let blocks = make_gloas_blocks_and_columns(count, &spec); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let payloads_req_id = payloads_id(components_id); + let columns_req_id = expected_custody_columns + .iter() + .enumerate() + .map(|(i, column)| { + ( + columns_id( + i as Id, + DataColumnsByRangeRequester::ComponentsByRange(components_id), + ), + vec![*column], + ) + }) + .collect::>(); + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + Some((columns_req_id.clone(), expected_custody_columns.clone())), + Some(payloads_req_id), + Span::none(), + ); + + info.add_blocks( + blocks_req_id, + blocks.iter().map(|(block, _, _)| block.clone()).collect(), + ) + .unwrap(); + add_all_columns( + &mut info, + &blocks, + &columns_req_id, + &expected_custody_columns, + ); + + GloasSetup { + info, + da_checker, + spec, + blocks, + payloads_req_id, + expected_custody_columns, + } + } + #[test] fn no_blobs_into_responses() { + // This exercises the pre-Gloas blobs/no-data coupling path. Gloas coupling is covered + // by the dedicated `setup_gloas_coupling` tests below. + if skip_under_gloas() { + return; + } let spec = Arc::new(test_spec::()); let mut u = types::test_utils::test_unstructured(); @@ -575,7 +821,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -606,6 +852,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -672,6 +919,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -751,6 +999,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -804,6 +1053,115 @@ mod tests { info.responses(da_checker, spec).unwrap().unwrap(); } + #[test] + fn gloas_payload_envelopes_must_complete_before_responses() { + let GloasSetup { + mut info, + da_checker, + spec, + .. + } = setup_gloas_coupling(2); + + // No payload envelopes added yet, so the request must not be complete. + assert!(info.responses(da_checker, spec).is_none()); + } + + #[test] + fn gloas_payload_envelopes_are_coupled_by_block_root() { + let GloasSetup { + mut info, + da_checker, + spec, + blocks, + payloads_req_id, + expected_custody_columns, + } = setup_gloas_coupling(2); + + // Supply envelopes in reverse order to prove coupling is by block root, not position. + info.add_payload_envelopes( + payloads_req_id, + blocks + .iter() + .rev() + .map(|(_, _, envelope)| envelope.clone()) + .collect(), + ) + .unwrap(); + + let responses = info.responses(da_checker, spec).unwrap().unwrap(); + assert_eq!(responses.len(), blocks.len()); + for response in responses { + match response { + RangeSyncBlock::Gloas { + block, + envelope: Some(envelope), + } => { + assert_eq!( + envelope.envelope().beacon_block_root(), + block.canonical_root() + ); + assert_eq!(envelope.columns.len(), expected_custody_columns.len()); + } + other => panic!("expected Gloas block with envelope, got {other:?}"), + } + } + } + + #[test] + fn gloas_payload_envelopes_allow_missing_envelopes() { + let GloasSetup { + mut info, + da_checker, + spec, + blocks, + payloads_req_id, + .. + } = setup_gloas_coupling(2); + + // Supply an envelope for only one of the two blocks. + info.add_payload_envelopes(payloads_req_id, vec![blocks[0].2.clone()]) + .unwrap(); + + let responses = info.responses(da_checker, spec).unwrap().unwrap(); + let count_with = |with_envelope: bool| { + responses + .iter() + .filter(|response| { + matches!(response, RangeSyncBlock::Gloas { envelope, .. } if envelope.is_some() == with_envelope) + }) + .count() + }; + assert_eq!(count_with(true), 1); + assert_eq!(count_with(false), 1); + } + + #[test] + fn gloas_payload_envelope_mismatch_fails_coupling() { + let GloasSetup { + mut info, + da_checker, + spec, + blocks, + payloads_req_id, + .. + } = setup_gloas_coupling(1); + + let mut bad_envelope = (*blocks[0].2).clone(); + bad_envelope.message.payload.slot_number += 1; + info.add_payload_envelopes(payloads_req_id, vec![Arc::new(bad_envelope)]) + .unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!( + matches!( + result, + Err(super::CouplingError::EnvelopePeerFailure(ref error)) + if error.contains("SlotMismatch") + ), + "expected envelope slot mismatch, got {result:?}" + ); + } + #[test] fn missing_custody_columns_from_faulty_peers() { if skip_under_gloas() { @@ -848,6 +1206,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -949,6 +1308,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1068,6 +1428,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7a90163852..8e7b8cd05a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -57,7 +57,8 @@ use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -504,6 +505,8 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::PayloadEnvelopesByRange(req_id) => self + .on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)), } } @@ -1160,6 +1163,13 @@ impl SyncManager { peer_id, RpcEvent::from_chunk(envelope, seen_timestamp), ), + SyncRequestId::PayloadEnvelopesByRange(req_id) => { + self.on_payload_envelopes_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ); + } _ => { crit!(%peer_id, "bad request id for payload envelope"); } @@ -1214,6 +1224,24 @@ impl SyncManager { } } + fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_payload_envelopes_by_range_response(id, peer_id, envelope) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::PayloadEnvelope(id, resp), + ); + } + } + fn on_data_columns_by_root_response( &mut self, req_id: DataColumnsByRootRequestId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index c9a48c9d5e..8e8abd4fa6 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -24,14 +24,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -39,7 +42,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, - PayloadEnvelopesByRootRequestItems, + PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -217,6 +220,11 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRange requests + payload_envelopes_by_range_requests: ActiveRequests< + PayloadEnvelopesByRangeRequestId, + PayloadEnvelopesByRangeRequestItems, + >, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -254,6 +262,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + PayloadEnvelope( + PayloadEnvelopesByRangeRequestId, + RpcResponseResult>>>, + ), } #[cfg(test)] @@ -302,6 +314,7 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -334,6 +347,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -369,12 +383,17 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let payload_envelope_by_range_ids = payload_envelopes_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id)); blocks_by_root_ids .chain(payload_envelopes_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(payload_envelope_by_range_ids) .collect() } @@ -431,6 +450,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -453,6 +473,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(payload_envelopes_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -585,24 +606,26 @@ impl SyncNetworkContext { }; // Attempt to find all required custody peers before sending any request or creating an ID - let columns_by_range_peers_to_request = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let column_indexes = self - .chain - .sampling_columns_for_epoch(epoch) - .iter() - .cloned() - .collect(); - Some(self.select_columns_by_range_peers_to_request( - &column_indexes, - column_peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?) - } else { - None - }; + let columns_by_range_peers_to_request = if matches!( + batch_type, + ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns + ) { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + column_peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { @@ -666,6 +689,26 @@ impl SyncNetworkContext { }) .transpose()?; + let payloads_req_id = + if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { + Some(self.send_payload_envelopes_by_range_request( + block_peer, + PayloadEnvelopesByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + new_range_request_span!( + self, + "outgoing_envelopes_by_range", + range_request_span.clone(), + block_peer + ), + )?) + } else { + None + }; + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); let info = RangeBlockComponentsRequest::new( blocks_req_id, @@ -676,6 +719,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + payloads_req_id, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -778,6 +822,17 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::PayloadEnvelope(req_id, resp) => { + resp.and_then(|(envelopes, _)| { + request + .add_payload_envelopes(req_id, envelopes) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { entry.remove(); @@ -1269,6 +1324,43 @@ impl SyncNetworkContext { Ok((id, requested_columns)) } + fn send_payload_envelopes_by_range_request( + &mut self, + peer_id: PeerId, + request: PayloadEnvelopesByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + request_span: Span, + ) -> Result { + let id = PayloadEnvelopesByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::PayloadEnvelopesByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRange", + slots = request.count, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_range_requests.insert( + id, + peer_id, + false, + PayloadEnvelopesByRangeRequestItems::new(request), + request_span, + ); + Ok(id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -1349,7 +1441,10 @@ impl SyncNetworkContext { "To deal with alignment with deneb boundaries, batches need to be of just one epoch" ); - if self + if self.chain.spec.fork_name_at_epoch(epoch).gloas_enabled() { + // TODO(gloas): Not precise and we can be post-gloas and not require columns + ByRangeRequestType::BlocksAndEnvelopesAndColumns + } else if self .chain .data_availability_checker .data_columns_required_for_epoch(epoch) @@ -1485,6 +1580,19 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + #[allow(clippy::type_complexity)] + pub(crate) fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self + .payload_envelopes_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(resp, peer_id) + } + /// Common handler for consistent scoring of RpcResponseError fn on_rpc_response_result( &mut self, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 72dd2c22d0..cc74785098 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -15,6 +15,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems; pub use payload_envelopes_by_root::{ PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, }; @@ -28,6 +29,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_range; mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs new file mode 100644 index 0000000000..13e6454a23 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs @@ -0,0 +1,48 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +/// Accumulates results of a payload_envelopes_by_range request. Only returns items after +/// receiving the stream termination. +pub struct PayloadEnvelopesByRangeRequestItems { + request: PayloadEnvelopesByRangeRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRangeRequestItems { + pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, envelope: Self::Item) -> Result { + if envelope.slot().as_u64() < self.request.start_slot + || envelope.slot().as_u64() >= self.request.start_slot + self.request.count + { + return Err(LookupVerifyError::UnrequestedSlot(envelope.slot())); + } + + if self + .items + .iter() + .any(|existing| existing.slot() == envelope.slot()) + { + return Err(LookupVerifyError::DuplicatedData(envelope.slot(), 0)); + } + + self.items.push(envelope); + + Ok(self.items.len() >= self.request.count as usize) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index d533d8ed0d..6292388339 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -952,6 +952,9 @@ impl SyncingChain { CouplingError::BlobPeerFailure(msg) => { debug!(?batch_id, msg, "Blob peer failure"); } + CouplingError::EnvelopePeerFailure(msg) => { + debug!(?batch_id, msg, "Envelope peer failure"); + } CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index fb0956cf50..13eeaee9aa 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -12,6 +12,7 @@ use crate::sync::{ }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; +use beacon_chain::payload_envelope_verification::AvailableEnvelope; use beacon_chain::{ AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer, block_verification_types::{AsBlock, AvailableBlockData}, @@ -613,7 +614,6 @@ impl TestRig { .unwrap_or_else(|| { panic!("Test consumer requested unknown block: {id:?}") }) - .block_data() .data_columns() .unwrap_or_else(|| panic!("Block id {id:?} has no columns")); id.columns @@ -763,7 +763,7 @@ impl TestRig { .return_wrong_range_column_indices_n_times -= 1; let wrong_columns = (req.start_slot..req.start_slot + req.count) .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) - .filter_map(|block| block.block_data().data_columns()) + .filter_map(|block| block.data_columns()) .flat_map(|columns| { columns .into_iter() @@ -787,7 +787,7 @@ impl TestRig { let wrong_columns = self .network_blocks_by_slot .get(&Slot::new(wrong_slot)) - .and_then(|block| block.block_data().data_columns()) + .and_then(|block| block.data_columns()) .into_iter() .flat_map(|columns| { columns @@ -804,7 +804,7 @@ impl TestRig { self.complete_strategy.return_partial_range_columns_n_times -= 1; let columns = (req.start_slot..req.start_slot + req.count) .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) - .filter_map(|block| block.block_data().data_columns()) + .filter_map(|block| block.data_columns()) .flat_map(|columns| { columns .into_iter() @@ -820,7 +820,7 @@ impl TestRig { let columns = (req.start_slot..req.start_slot + req.count) .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) - .filter_map(|block| block.block_data().data_columns()) + .filter_map(|block| block.data_columns()) .flat_map(|columns| { columns .into_iter() @@ -830,6 +830,25 @@ impl TestRig { self.send_rpc_columns_response(req_id, peer_id, &columns); } + (RequestType::PayloadEnvelopesByRange(req), AppRequestId::Sync(req_id)) => { + if self.complete_strategy.skip_by_range_routes { + return; + } + + let envelopes = (req.start_slot..req.start_slot + req.count) + .filter_map(|slot| self.network_blocks_by_slot.get(&Slot::new(slot))) + .filter_map(|block| { + let block_root = block.canonical_root(); + // Respect a withheld payload envelope. + if self.complete_strategy.hold_envelope_for_block == Some(block_root) { + return None; + } + self.network_envelopes_by_root.get(&block_root).cloned() + }) + .collect::>(); + self.send_rpc_envelopes_response(req_id, peer_id, &envelopes); + } + (RequestType::Status(_req), AppRequestId::Router) => { // Ignore Status requests for now } @@ -958,6 +977,34 @@ impl TestRig { }); } + fn send_rpc_envelopes_response( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelopes: &[Arc>], + ) { + let slots = envelopes.iter().map(|e| e.slot()).collect::>(); + self.log(&format!( + "Completing request {sync_request_id:?} to {peer_id} with envelopes {slots:?}" + )); + + for envelope in envelopes { + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: Some(envelope.clone()), + seen_timestamp: D, + }); + } + // Stream termination + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: None, + seen_timestamp: D, + }); + } + #[allow(dead_code)] fn is_after_gloas(&self) -> bool { self.fork_name.gloas_enabled() @@ -1177,7 +1224,7 @@ impl TestRig { let range_sync_block = self.get_last_block().clone(); let mut block = (*range_sync_block.block_cloned()).clone(); let blobs = range_sync_block.block_data().blobs(); - let columns = range_sync_block.block_data().data_columns(); + let columns = range_sync_block.data_columns(); *block.signature_mut() = self.valid_signature(); self.re_insert_block(Arc::new(block), blobs, columns); } @@ -1192,10 +1239,7 @@ impl TestRig { let range_sync_block = self.get_last_block().clone(); let block = range_sync_block.block_cloned(); let blobs = range_sync_block.block_data().blobs(); - let mut columns = range_sync_block - .block_data() - .data_columns() - .expect("no columns"); + let mut columns = range_sync_block.data_columns().expect("no columns"); let first = columns.first_mut().expect("empty columns"); Arc::make_mut(first) .signed_block_header_mut() @@ -1217,10 +1261,7 @@ impl TestRig { .clone(); let block = range_sync_block.block_cloned(); let blobs = range_sync_block.block_data().blobs(); - let mut columns = range_sync_block - .block_data() - .data_columns() - .expect("no columns"); + let mut columns = range_sync_block.data_columns().expect("no columns"); let first = columns.first_mut().expect("empty columns"); let column = Arc::make_mut(first); let proof = column.kzg_proofs_mut().first_mut().expect("no kzg proofs"); @@ -1256,20 +1297,30 @@ impl TestRig { ) { let block_root = block.canonical_root(); let block_slot = block.slot(); - let block_data = if let Some(columns) = columns { - AvailableBlockData::new_with_data_columns(columns) - } else if let Some(blobs) = blobs { - AvailableBlockData::new_with_blobs(blobs) + let range_sync_block = if block.fork_name_unchecked().gloas_enabled() { + // Gloas carries data columns in the payload envelope, not in `block_data`. + let envelope = self + .network_envelopes_by_root + .get(&block_root) + .cloned() + .map(|envelope| AvailableEnvelope::new(envelope, columns.unwrap_or_default())); + RangeSyncBlock::new_gloas(block, envelope).unwrap() } else { - AvailableBlockData::NoData + let block_data = if let Some(columns) = columns { + AvailableBlockData::new_with_data_columns(columns) + } else if let Some(blobs) = blobs { + AvailableBlockData::new_with_blobs(blobs) + } else { + AvailableBlockData::NoData + }; + RangeSyncBlock::new( + block, + block_data, + &self.harness.chain.data_availability_checker, + self.harness.chain.spec.clone(), + ) + .unwrap() }; - let range_sync_block = RangeSyncBlock::new( - block, - block_data, - &self.harness.chain.data_availability_checker, - self.harness.chain.spec.clone(), - ) - .unwrap(); self.network_blocks_by_slot .insert(block_slot, range_sync_block.clone()); self.network_blocks_by_root @@ -1367,7 +1418,6 @@ impl TestRig { let peer_id = self.new_connected_supernode_peer(); let columns = self .get_last_block() - .block_data() .data_columns() .expect("No data columns"); let column = columns.first().expect("empty columns");