From e8674412205cd1e4eef68e78a4e02f52a90e1639 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 26 May 2026 18:09:18 -0700 Subject: [PATCH] Implement range sync --- beacon_node/beacon_chain/src/beacon_chain.rs | 64 +++++--- .../beacon_chain/src/block_verification.rs | 43 ++++- .../src/block_verification_types.rs | 120 +++++++++----- .../src/data_availability_checker.rs | 27 +++- .../overflow_lru_cache.rs | 2 +- .../beacon_chain/src/early_attester_cache.rs | 2 +- .../beacon_chain/src/historical_blocks.rs | 2 +- .../payload_envelope_verification/import.rs | 79 ++++++++- .../src/payload_envelope_verification/mod.rs | 6 +- .../tests/attestation_production.rs | 13 +- beacon_node/beacon_chain/tests/store_tests.rs | 7 +- .../src/service/api_types.rs | 14 ++ .../gossip_methods.rs | 3 +- .../network_beacon_processor/sync_methods.rs | 13 +- beacon_node/network/src/router.rs | 34 +++- beacon_node/network/src/sync/batch.rs | 1 + .../src/sync/block_sidecar_coupling.rs | 106 +++++++++++- beacon_node/network/src/sync/manager.rs | 30 +++- .../network/src/sync/network_context.rs | 153 +++++++++++++++--- .../src/sync/network_context/requests.rs | 2 + .../requests/payload_envelopes_by_range.rs | 48 ++++++ 21 files changed, 657 insertions(+), 112 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b3d258a2fb..854e231e41 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3146,11 +3146,12 @@ impl BeaconChain { }; // Import the blocks into the chain. - for signature_verified_block in signature_verified_blocks { + for (signature_verified_block, maybe_envelope) in signature_verified_blocks { let block_slot = signature_verified_block.slot(); + let block_root = signature_verified_block.block_root(); match self .process_block( - signature_verified_block.block_root(), + block_root, signature_verified_block, notify_execution_layer, BlockImportSource::RangeSync, @@ -3158,32 +3159,37 @@ impl BeaconChain { ) .await { - Ok(status) => { - match status { - AvailabilityProcessingStatus::Imported(block_root) => { - // The block was imported successfully. - imported_blocks.push((block_root, block_slot)); - } - AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { - warn!( - ?block_root, - %slot, - "Blobs missing in response to range request" - ); + Ok(status) => match status { + AvailabilityProcessingStatus::Imported(block_root) => { + imported_blocks.push((block_root, block_slot)); + } + AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { + warn!( + ?block_root, + %slot, + "Blobs missing in response to range request" + ); + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::AvailabilityCheck( + AvailabilityCheckError::MissingBlobs, + ), + }; + } + }, + Err(BlockError::DuplicateFullyImported(block_root)) => { + // For Gloas blocks that are already imported, we still + // need to process the envelope. + if let Some(envelope) = maybe_envelope { + if let Err(e) = + self.process_range_sync_envelope(envelope, block_root).await + { return ChainSegmentResult::Failed { imported_blocks, - error: BlockError::AvailabilityCheck( - AvailabilityCheckError::MissingBlobs, - ), + error: BlockError::EnvelopeError(Box::new(e)), }; } } - } - Err(BlockError::DuplicateFullyImported(block_root)) => { - debug!( - ?block_root, - "Ignoring already known blocks while processing chain segment" - ); continue; } Err(error) => { @@ -3193,6 +3199,16 @@ impl BeaconChain { }; } } + + // Process the envelope after the block has been imported. + if let Some(envelope) = maybe_envelope { + if let Err(e) = self.process_range_sync_envelope(envelope, block_root).await { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::EnvelopeError(Box::new(e)), + }; + } + } } } @@ -7603,7 +7619,7 @@ impl BeaconChain { block_data: AvailableBlockData, ) -> Option> { match block_data { - AvailableBlockData::NoData => None, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => None, AvailableBlockData::Blobs(blobs) => { debug!( %block_root, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 24f971f736..5c84de1132 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -60,6 +60,7 @@ use crate::execution_payload::{ }; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::SeenBlock; +use crate::payload_envelope_verification::{AvailableEnvelope, EnvelopeError}; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ @@ -324,6 +325,12 @@ pub enum BlockError { bid_parent_root: Hash256, block_parent_root: Hash256, }, + /// The payload envelope failed verification during range sync. + /// + /// ## Peer scoring + /// + /// The envelope is invalid and the peer is faulty. + EnvelopeError(Box), } /// Which specific signature(s) are invalid in a SignedBeaconBlock @@ -590,7 +597,13 @@ pub(crate) fn process_block_slash_info( mut chain_segment: Vec<(Hash256, RangeSyncBlock)>, chain: &BeaconChain, -) -> Result>, BlockError> { +) -> Result< + Vec<( + SignatureVerifiedBlock, + Option>>, + )>, + BlockError, +> { if chain_segment.is_empty() { return Ok(vec![]); } @@ -614,12 +627,20 @@ pub fn signature_verify_chain_segment( let mut available_blocks = Vec::with_capacity(chain_segment.len()); let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len()); + let mut envelopes: Vec>>> = + Vec::with_capacity(chain_segment.len()); for (block_root, block) in chain_segment { let consensus_context = ConsensusContext::new(block.slot()).set_current_block_root(block_root); - let available_block = block.into_available_block(); + let envelope = match &block { + RangeSyncBlock::Gloas { envelope, .. } => envelope.clone(), + RangeSyncBlock::Base(_) => None, + }; + + let available_block = + block.into_available_block(&chain.data_availability_checker, chain.spec.clone())?; available_blocks.push(available_block.clone()); signature_verified_blocks.push(SignatureVerifiedBlock { block: MaybeAvailableBlock::Available(available_block), @@ -627,6 +648,7 @@ pub fn signature_verify_chain_segment( parent: None, consensus_context, }); + envelopes.push(envelope); } // TODO(gloas) When implementing range and backfill sync for gloas // we need a batch verify kzg function in the new da checker as well. @@ -652,7 +674,10 @@ pub fn signature_verify_chain_segment( signature_verified_block.parent = Some(parent); } - Ok(signature_verified_blocks) + Ok(signature_verified_blocks + .into_iter() + .zip(envelopes) + .collect()) } /// A wrapper around a `SignedBeaconBlock` that indicates it has been approved for re-gossiping on @@ -1309,10 +1334,18 @@ impl IntoExecutionPendingBlock for RangeSyncBlock Result, BlockSlashInfo> { // Perform an early check to prevent wasting time on irrelevant blocks. + let header = self.signed_block_header(); let block_root = check_block_relevancy(self.as_block(), block_root, chain) - .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; + .map_err(|e| BlockSlashInfo::SignatureNotChecked(header.clone(), e))?; - let available_block = self.into_available_block(); + let available_block = self + .into_available_block(&chain.data_availability_checker, chain.spec.clone()) + .map_err(|e| { + BlockSlashInfo::SignatureNotChecked( + header.clone(), + BlockError::AvailabilityCheck(e), + ) + })?; chain .data_availability_checker .verify_kzg_for_available_block(&available_block) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index be73ef15d7..d770b6acf3 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -2,10 +2,11 @@ use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityC pub use crate::data_availability_checker::{ AvailableBlock, AvailableBlockData, MaybeAvailableBlock, }; +use crate::payload_envelope_verification::AvailableEnvelope; use crate::{BeaconChainTypes, PayloadVerificationOutcome}; -use educe::Educe; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ @@ -40,15 +41,22 @@ impl LookupBlock { } } -/// A fully available block that has been constructed by range sync. -/// The block contains all the data required to import into fork choice. -/// This includes any and all blobs/columns required, including zero if -/// none are required. This can happen if the block is pre-deneb or if -/// it's simply past the DA boundary. -#[derive(Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -pub struct RangeSyncBlock { - block: AvailableBlock, +/// A block that has been constructed by range sync, ready for import. +/// Pre-Gloas: wraps an `AvailableBlock` with all data. +/// Gloas: carries the block and an optional envelope (data arrives separately). +#[derive(Clone)] +pub enum RangeSyncBlock { + Base(AvailableBlock), + Gloas { + block: Arc>, + envelope: Option>>, + }, +} + +impl Hash for RangeSyncBlock { + fn hash(&self, state: &mut H) { + self.block_root().hash(state); + } } impl Debug for RangeSyncBlock { @@ -59,31 +67,36 @@ impl Debug for RangeSyncBlock { impl RangeSyncBlock { pub fn block_root(&self) -> Hash256 { - self.block.block_root() + match self { + Self::Base(block) => block.block_root(), + Self::Gloas { block, .. } => block.canonical_root(), + } } pub fn as_block(&self) -> &SignedBeaconBlock { - self.block.block() + match self { + Self::Base(block) => block.block(), + Self::Gloas { block, .. } => block, + } } pub fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + match self { + Self::Base(block) => block.block_cloned(), + Self::Gloas { block, .. } => block.clone(), + } } pub fn block_data(&self) -> &AvailableBlockData { - self.block.data() + match self { + Self::Base(block) => block.data(), + Self::Gloas { .. } => &AvailableBlockData::NoData, + } } } impl RangeSyncBlock { - /// Constructs an `RangeSyncBlock` from a block and availability data. - /// - /// # Errors - /// - /// Returns `AvailabilityCheckError` if: - /// - `InvalidAvailableBlockData`: Block data is provided but not required. - /// - `MissingBlobs`: Block requires blobs but they are missing or incomplete. - /// - `MissingCustodyColumns`: Block requires custody columns but they are incomplete. + /// Constructs a `RangeSyncBlock` from a block and availability data (pre-Gloas). pub fn new( block: Arc>, block_data: AvailableBlockData, @@ -94,32 +107,65 @@ impl RangeSyncBlock { T: BeaconChainTypes, { let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?; - Ok(Self { - block: available_block, - }) + Ok(Self::Base(available_block)) + } + + /// Constructs a Gloas `RangeSyncBlock` with block and optional envelope. + pub fn new_gloas( + block: Arc>, + envelope: Option>>, + ) -> Self { + Self::Gloas { block, envelope } } #[allow(clippy::type_complexity)] pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { - self.block.deconstruct() + match self { + Self::Base(block) => block.deconstruct(), + Self::Gloas { block, .. } => { + (block.canonical_root(), block, AvailableBlockData::NoData) + } + } } pub fn n_blobs(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, - AvailableBlockData::Blobs(blobs) => blobs.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData + | AvailableBlockData::DataInEnvelope + | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + }, + Self::Gloas { .. } => 0, } } pub fn n_data_columns(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, - AvailableBlockData::DataColumns(columns) => columns.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData + | AvailableBlockData::DataInEnvelope + | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + }, + Self::Gloas { .. } => 0, } } - pub fn into_available_block(self) -> AvailableBlock { - self.block + pub fn into_available_block( + self, + da_checker: &DataAvailabilityChecker, + spec: Arc, + ) -> Result, AvailabilityCheckError> + where + T: BeaconChainTypes, + { + match self { + Self::Base(block) => Ok(block), + Self::Gloas { block, .. } => { + AvailableBlock::new(block, AvailableBlockData::DataInEnvelope, da_checker, spec) + } + } } } @@ -405,13 +451,13 @@ impl AsBlock for RangeSyncBlock { self.as_block().message() } fn as_block(&self) -> &SignedBeaconBlock { - self.block.as_block() + RangeSyncBlock::as_block(self) } fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + RangeSyncBlock::block_cloned(self) } fn canonical_root(&self) -> Hash256 { - self.block.block_root() + self.block_root() } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index cfd8ee7d34..43310682da 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -487,7 +487,7 @@ impl DataAvailabilityChecker { available_block: &AvailableBlock, ) -> Result<(), AvailabilityCheckError> { match available_block.data() { - AvailableBlockData::NoData => Ok(()), + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => Ok(()), AvailableBlockData::Blobs(blobs) => verify_kzg_for_blob_list(blobs.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidBlobs), AvailableBlockData::DataColumns(columns) => { @@ -507,7 +507,7 @@ impl DataAvailabilityChecker { for available_block in available_blocks { match available_block.data() { - AvailableBlockData::NoData => {} + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => {} AvailableBlockData::Blobs(blobs) => all_blobs.extend(blobs.iter().cloned()), AvailableBlockData::DataColumns(columns) => { // Each block has its own commitments. For Gloas they live in the bid; for @@ -539,6 +539,12 @@ impl DataAvailabilityChecker { self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) } + pub fn envelopes_required_for_epoch(&self, epoch: Epoch) -> bool { + self.spec + .gloas_fork_epoch + .is_some_and(|gloas_epoch| epoch >= gloas_epoch) + } + /// See `Self::blobs_required_for_epoch` fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) @@ -818,6 +824,9 @@ pub enum AvailableBlockData { Blobs(BlobSidecarList), /// Block is post-PeerDAS and has more than zero blobs DataColumns(DataColumnSidecarList), + /// Gloas: block data (payload + columns) arrives via the execution payload envelope, + /// not the block itself. + DataInEnvelope, } impl AvailableBlockData { @@ -839,7 +848,7 @@ impl AvailableBlockData { pub fn blobs(&self) -> Option> { match self { - AvailableBlockData::NoData => None, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => None, AvailableBlockData::Blobs(blobs) => Some(blobs.clone()), AvailableBlockData::DataColumns(_) => None, } @@ -855,7 +864,7 @@ impl AvailableBlockData { pub fn data_columns(&self) -> Option> { match self { - AvailableBlockData::NoData => None, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => None, AvailableBlockData::Blobs(_) => None, AvailableBlockData::DataColumns(data_columns) => Some(data_columns.clone()), } @@ -969,6 +978,7 @@ impl AvailableBlock { return Err(AvailabilityCheckError::MissingCustodyColumns); } } + AvailableBlockData::DataInEnvelope => {} } Ok(Self { @@ -1001,7 +1011,7 @@ impl AvailableBlock { pub fn has_blobs(&self) -> bool { match self.blob_data { - AvailableBlockData::NoData => false, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => false, AvailableBlockData::Blobs(..) => true, AvailableBlockData::DataColumns(_) => false, } @@ -1029,6 +1039,7 @@ impl AvailableBlock { AvailableBlockData::DataColumns(data_columns) => { AvailableBlockData::DataColumns(data_columns.clone()) } + AvailableBlockData::DataInEnvelope => AvailableBlockData::DataInEnvelope, }, blobs_available_timestamp: self.blobs_available_timestamp, spec: self.spec.clone(), @@ -1322,7 +1333,11 @@ mod test { let available_blocks = blocks_with_columns .into_iter() - .map(|block| block.into_available_block()) + .map(|block| { + block + .into_available_block(&da_checker, spec.clone()) + .unwrap() + }) .collect::>(); // WHEN verifying all blocks together (totalling 256 data columns) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 8a80f835ab..80718fa6ba 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -275,7 +275,7 @@ impl PendingComponents { // Block is available, construct `AvailableExecutedBlock` let blobs_available_timestamp = match blob_data { - AvailableBlockData::NoData => None, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => None, AvailableBlockData::Blobs(_) => self .verified_blobs .iter() diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index e3a83f9374..5097928731 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -138,7 +138,7 @@ impl EarlyAttesterCache { }; let (blobs, data_columns) = match block.data() { - AvailableBlockData::NoData => (None, None), + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => (None, None), AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None), AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())), }; diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index bfda52558e..ebb1bc4b76 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -157,7 +157,7 @@ impl BeaconChain { } match &block_data { - AvailableBlockData::NoData => (), + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => (), AvailableBlockData::Blobs(_) => new_oldest_blob_slot = Some(block.slot()), AvailableBlockData::DataColumns(_) => { new_oldest_data_column_slot = Some(block.slot()) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 73ddb43273..9dc74fe13b 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -4,6 +4,7 @@ use std::time::Duration; use eth2::types::{EventKind, SseExecutionPayload, SseExecutionPayloadAvailable}; use fork_choice::PayloadVerificationStatus; use slot_clock::SlotClock; +use state_processing::{VerifySignatures, envelope_processing::verify_execution_payload_envelope}; use store::StoreOp; use tracing::{debug, error, info, info_span, instrument, warn}; use types::{BlockImportSource, Hash256, SignedExecutionPayloadEnvelope}; @@ -14,11 +15,12 @@ use super::{ }; use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, - NotifyExecutionLayer, + NotifyExecutionLayer, PayloadVerificationOutcome, block_verification_types::AvailableBlockData, metrics, payload_envelope_verification::{ AvailabilityPendingExecutedEnvelope, ExecutionPendingEnvelope, + load_snapshot_from_state_root, }, validator_monitor::get_slot_delay_ms, }; @@ -388,4 +390,79 @@ impl BeaconChain { )); } } + + /// Process an envelope received during range sync. The associated block must already + /// be imported into fork choice. This performs signature verification, state processing, + /// EL verification and import. + #[instrument(skip_all, level = "debug")] + pub async fn process_range_sync_envelope( + self: &Arc, + available_envelope: Box>, + block_root: Hash256, + ) -> Result<(), EnvelopeError> { + let signed_envelope = available_envelope.envelope().clone(); + let columns = available_envelope.columns.clone(); + + // Load the block from the store to use for EL verification + let block = self + .get_blinded_block(&block_root) + .map_err(|e| EnvelopeError::BeaconChainError(Arc::new(e)))? + .ok_or(EnvelopeError::BlockRootUnknown { block_root })?; + + let block = self + .store + .make_full_block(&block_root, block) + .map_err(|e| EnvelopeError::BeaconChainError(Arc::new(e.into())))?; + let block = Arc::new(block); + + // Load the state snapshot for envelope processing + let state_root = block.state_root(); + let snapshot = load_snapshot_from_state_root::(block_root, state_root, &self.store)?; + + // Verify envelope signature and state processing + verify_execution_payload_envelope( + &snapshot.pre_state, + &signed_envelope, + VerifySignatures::True, + snapshot.state_root, + &self.spec, + )?; + + // Send to EL for verification + let payload_notifier = super::payload_notifier::PayloadNotifier::new( + self.clone(), + signed_envelope.clone(), + block, + NotifyExecutionLayer::Yes, + )?; + + let payload_verification_status = payload_notifier + .notify_new_payload() + .await + .map_err(EnvelopeError::ImportError)?; + + if payload_verification_status.is_optimistic() { + return Err(EnvelopeError::ImportError( + BlockError::OptimisticSyncNotSupported { block_root }, + )); + } + + let payload_verification_outcome = PayloadVerificationOutcome { + payload_verification_status, + }; + + // Import the envelope using existing availability/import path + let executed = AvailabilityPendingExecutedEnvelope::new( + signed_envelope, + block_root, + payload_verification_outcome, + ); + + self.check_envelope_availability_and_import(executed) + .await + .map_err(EnvelopeError::ImportError)?; + + drop(columns); + Ok(()) + } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index a1e4e34eb6..50f07d54ac 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -40,7 +40,7 @@ mod payload_notifier; pub use execution_pending_envelope::ExecutionPendingEnvelope; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AvailableEnvelope { envelope: Arc>, pub columns: DataColumnSidecarList, @@ -54,6 +54,10 @@ impl AvailableEnvelope { Self { envelope, columns } } + pub fn envelope(&self) -> &Arc> { + &self.envelope + } + pub fn message(&self) -> &ExecutionPayloadEnvelope { &self.envelope.message } diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 1b87fc041a..fc9800d7e4 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -234,7 +234,12 @@ async fn produces_attestations() { let range_sync_block = harness .build_range_sync_block_from_store_blobs(Some(block_root), Arc::new(block.clone())); - let available_block = range_sync_block.into_available_block(); + let available_block = range_sync_block + .into_available_block( + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ) + .unwrap(); // For Gloas non-same-slot attestations, the early attester cache returns None. let is_same_slot_attestation = slot == block_slot; @@ -304,7 +309,11 @@ async fn early_attester_cache_old_request() { Some(head.beacon_block_root), head.beacon_block.clone(), ) - .into_available_block(); + .into_available_block( + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ) + .unwrap(); harness .chain diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 0ac77dcfaa..67a1f9d8a2 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3296,7 +3296,12 @@ async fn weak_subjectivity_sync_test( let range_sync_block = harness .build_range_sync_block_from_store_blobs(Some(block_root), Arc::new(full_block)); - let fully_available_block = range_sync_block.into_available_block(); + let fully_available_block = range_sync_block + .into_available_block( + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ) + .unwrap(); harness .chain .data_availability_checker diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 2429b813e9..c1c0002ba2 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -33,6 +33,8 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Payload envelopes by range request + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -59,6 +61,12 @@ pub struct BlobsByRangeRequestId { pub parent_request_id: ComponentsByRangeRequestId, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct PayloadEnvelopesByRangeRequestId { + pub id: Id, + pub parent_request_id: ComponentsByRangeRequestId, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct DataColumnsByRangeRequestId { /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` @@ -261,6 +269,12 @@ macro_rules! impl_display { impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!( + PayloadEnvelopesByRangeRequestId, + "{}/{}", + id, + parent_request_id +); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 14cda1b483..ad908d45e9 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1878,7 +1878,8 @@ impl NetworkBeaconProcessor { Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) | Err(e @ BlockError::EnvelopeBlockRootUnknown(_)) - | Err(e @ BlockError::OptimisticSyncNotSupported { .. }) => { + | Err(e @ BlockError::OptimisticSyncNotSupported { .. }) + | Err(e @ BlockError::EnvelopeError(_)) => { error!(error = %e, "Internal block gossip validation error"); return None; } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index e3ba6fb3c4..6e130596a2 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -783,10 +783,17 @@ impl NetworkBeaconProcessor { downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); - let available_blocks = downloaded_blocks + let available_blocks: Vec<_> = downloaded_blocks .into_iter() - .map(|block| block.into_available_block()) - .collect::>(); + .filter_map(|block| { + block + .into_available_block( + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .ok() + }) + .collect(); // TODO(gloas) when implementing backfill sync for gloas // we need a batch verify kzg function in the new da checker diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 35939c6f39..e924c6a954 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -352,10 +352,8 @@ impl Router { Response::PayloadEnvelopesByRoot(envelope) => { self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); } - // TODO(EIP-7732): implement outgoing payload envelopes by range responses - // once sync manager requests them. - Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by range not supported yet"); + Response::PayloadEnvelopesByRange(envelope) => { + self.on_payload_envelopes_by_range_response(peer_id, app_request_id, envelope); } // Lighthouse currently only serves BlocksByHead and does not issue it as a client, // so receiving a response is unexpected. Drop it without crashing. @@ -848,6 +846,34 @@ impl Router { }); } + /// Handle a `PayloadEnvelopesByRange` response from the peer. + pub fn on_payload_envelopes_by_range_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + trace!( + %peer_id, + "Received PayloadEnvelopesByRange Response" + ); + + let sync_request_id = match app_request_id { + AppRequestId::Sync(id @ SyncRequestId::PayloadEnvelopesByRange { .. }) => id, + other => { + crit!(request = ?other, %peer_id, "PayloadEnvelopesByRange response on incorrect request"); + return; + } + }; + + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 10af1bf503..8d40ec8b7f 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -34,6 +34,7 @@ pub type BatchId = Epoch; pub enum ByRangeRequestType { BlocksAndColumns, BlocksAndBlobs, + BlocksAndEnvelopesAndColumns, Blocks, Columns(HashSet), } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index bb43396473..004cdbaef4 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,3 +1,4 @@ +use beacon_chain::payload_envelope_verification::AvailableEnvelope; use beacon_chain::{ BeaconChainTypes, block_verification_types::{AvailableBlockData, RangeSyncBlock}, @@ -9,6 +10,7 @@ use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + PayloadEnvelopesByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; @@ -16,7 +18,7 @@ use std::{collections::HashMap, sync::Arc}; use tracing::{Span, debug}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; @@ -37,6 +39,13 @@ pub struct RangeBlockComponentsRequest { blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, + /// Payload envelopes for Gloas blocks. + payloads_request: Option< + ByRangeRequest< + PayloadEnvelopesByRangeRequestId, + Vec>>, + >, + >, /// Span to track the range request and all children range requests. pub(crate) request_span: Span, } @@ -88,6 +97,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + payloads_req_id: Option, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { @@ -110,6 +120,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, + payloads_request: payloads_req_id.map(ByRangeRequest::Active), request_span, } } @@ -191,6 +202,17 @@ impl RangeBlockComponentsRequest { } } + pub fn add_payload_envelopes( + &mut self, + req_id: PayloadEnvelopesByRangeRequestId, + envelopes: Vec>>, + ) -> Result<(), String> { + match &mut self.payloads_request { + Some(req) => req.finish(req_id, envelopes), + None => Err("received payload envelopes but none were expected".to_owned()), + } + } + /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. @@ -208,6 +230,35 @@ impl RangeBlockComponentsRequest { return None; }; + // Check if payload envelopes are still pending + if let Some(ByRangeRequest::Active(_)) = &self.payloads_request { + return None; + } + + // If we have payload envelopes, use the Gloas coupling path + if let Some(ByRangeRequest::Complete(envelopes)) = &self.payloads_request { + let data_columns = match &mut self.block_data_request { + RangeBlockDataRequest::DataColumns { requests, .. } => { + let mut cols = vec![]; + for req in requests.values() { + let Some(data) = req.to_finished() else { + return None; + }; + cols.extend(data.clone()); + } + cols + } + _ => vec![], + }; + return Some(Self::responses_gloas( + blocks.to_vec(), + envelopes.clone(), + data_columns, + da_checker, + spec, + )); + } + // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( @@ -460,6 +511,51 @@ impl RangeBlockComponentsRequest { Ok(range_sync_blocks) } + + fn responses_gloas( + blocks: Vec>>, + envelopes: Vec>>, + data_columns: DataColumnSidecarList, + _da_checker: Arc>, + _spec: Arc, + ) -> Result>, CouplingError> + where + T: BeaconChainTypes, + { + // Index envelopes by slot + let mut envelopes_by_slot: HashMap<_, _> = envelopes + .into_iter() + .map(|e| (e.message.slot(), e)) + .collect(); + + // Group data columns by block_root + let mut data_columns_by_block = HashMap::>>>::new(); + for column in data_columns { + let block_root = column.block_root(); + data_columns_by_block + .entry(block_root) + .or_default() + .push(column); + } + + let mut range_sync_blocks = Vec::with_capacity(blocks.len()); + for block in blocks { + let slot = block.slot(); + let block_root = get_block_root(&block); + + let envelope = envelopes_by_slot.remove(&slot); + let columns = data_columns_by_block + .remove(&block_root) + .unwrap_or_default(); + + let available_envelope = + envelope.map(|env| Box::new(AvailableEnvelope::new(env, columns))); + + range_sync_blocks.push(RangeSyncBlock::new_gloas(block, available_envelope)); + } + + Ok(range_sync_blocks) + } } impl ByRangeRequest { @@ -566,7 +662,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -597,6 +693,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -657,6 +754,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -733,6 +831,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -827,6 +926,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -925,6 +1025,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1041,6 +1142,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 14a38f0e72..447d525f87 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -59,7 +59,8 @@ use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -528,6 +529,8 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::PayloadEnvelopesByRange(req_id) => self + .on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)), } } @@ -1249,6 +1252,13 @@ impl SyncManager { peer_id, RpcEvent::from_chunk(envelope, seen_timestamp), ), + SyncRequestId::PayloadEnvelopesByRange(req_id) => { + self.on_payload_envelopes_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ); + } _ => { crit!(%peer_id, "bad request id for payload envelope"); } @@ -1299,6 +1309,24 @@ impl SyncManager { } } + fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_payload_envelopes_by_range_response(id, peer_id, envelope) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::PayloadEnvelope(id, resp), + ); + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 9d5ac40c0a..b493ea981f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -25,14 +25,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -40,7 +43,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, - PayloadEnvelopesByRootRequestItems, + PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -220,6 +223,11 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRange requests + payload_envelopes_by_range_requests: ActiveRequests< + PayloadEnvelopesByRangeRequestId, + PayloadEnvelopesByRangeRequestItems, + >, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -257,6 +265,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + PayloadEnvelope( + PayloadEnvelopesByRangeRequestId, + RpcResponseResult>>>, + ), } #[cfg(test)] @@ -306,6 +318,7 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -335,6 +348,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -374,6 +388,10 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let payload_envelope_by_range_ids = payload_envelopes_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) .chain(payload_envelopes_by_root_ids) @@ -381,6 +399,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(payload_envelope_by_range_ids) .collect() } @@ -438,6 +457,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -461,6 +481,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(payload_envelopes_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -593,24 +614,26 @@ impl SyncNetworkContext { }; // Attempt to find all required custody peers before sending any request or creating an ID - let columns_by_range_peers_to_request = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let column_indexes = self - .chain - .sampling_columns_for_epoch(epoch) - .iter() - .cloned() - .collect(); - Some(self.select_columns_by_range_peers_to_request( - &column_indexes, - column_peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?) - } else { - None - }; + let columns_by_range_peers_to_request = if matches!( + batch_type, + ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns + ) { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + column_peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { @@ -674,6 +697,26 @@ impl SyncNetworkContext { }) .transpose()?; + let payloads_req_id = + if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { + Some(self.send_payload_envelopes_by_range_request( + block_peer, + PayloadEnvelopesByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + new_range_request_span!( + self, + "outgoing_envelopes_by_range", + range_request_span.clone(), + block_peer + ), + )?) + } else { + None + }; + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); let info = RangeBlockComponentsRequest::new( blocks_req_id, @@ -684,6 +727,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + payloads_req_id, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -786,6 +830,17 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::PayloadEnvelope(req_id, resp) => { + resp.and_then(|(envelopes, _)| { + request + .add_payload_envelopes(req_id, envelopes) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { entry.remove(); @@ -1391,6 +1446,43 @@ impl SyncNetworkContext { Ok((id, requested_columns)) } + fn send_payload_envelopes_by_range_request( + &mut self, + peer_id: PeerId, + request: PayloadEnvelopesByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + request_span: Span, + ) -> Result { + let id = PayloadEnvelopesByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::PayloadEnvelopesByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRange", + slots = request.count, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_range_requests.insert( + id, + peer_id, + false, + PayloadEnvelopesByRangeRequestItems::new(request), + request_span, + ); + Ok(id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -1472,6 +1564,12 @@ impl SyncNetworkContext { ); if self + .chain + .data_availability_checker + .envelopes_required_for_epoch(epoch) + { + ByRangeRequestType::BlocksAndEnvelopesAndColumns + } else if self .chain .data_availability_checker .data_columns_required_for_epoch(epoch) @@ -1636,6 +1734,19 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + #[allow(clippy::type_complexity)] + pub(crate) fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self + .payload_envelopes_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(resp, peer_id) + } + /// Common handler for consistent scoring of RpcResponseError fn on_rpc_response_result( &mut self, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 8c091eca80..872b3293da 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems; pub use payload_envelopes_by_root::{ PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, }; @@ -30,6 +31,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_range; mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs new file mode 100644 index 0000000000..13e6454a23 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs @@ -0,0 +1,48 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +/// Accumulates results of a payload_envelopes_by_range request. Only returns items after +/// receiving the stream termination. +pub struct PayloadEnvelopesByRangeRequestItems { + request: PayloadEnvelopesByRangeRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRangeRequestItems { + pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, envelope: Self::Item) -> Result { + if envelope.slot().as_u64() < self.request.start_slot + || envelope.slot().as_u64() >= self.request.start_slot + self.request.count + { + return Err(LookupVerifyError::UnrequestedSlot(envelope.slot())); + } + + if self + .items + .iter() + .any(|existing| existing.slot() == envelope.slot()) + { + return Err(LookupVerifyError::DuplicatedData(envelope.slot(), 0)); + } + + self.items.push(envelope); + + Ok(self.items.len() >= self.request.count as usize) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +}