diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5253c85bbc..da904726d1 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3185,7 +3185,7 @@ impl BeaconChain { }; // Import the blocks into the chain. - for signature_verified_block in signature_verified_blocks { + for (signature_verified_block, _envelope) in signature_verified_blocks { let block_slot = signature_verified_block.slot(); match self .process_block( diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 24f971f736..597b557f69 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -60,6 +60,7 @@ use crate::execution_payload::{ }; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::SeenBlock; +use crate::payload_envelope_verification::{AvailableEnvelope, EnvelopeError}; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ @@ -324,6 +325,20 @@ pub enum BlockError { bid_parent_root: Hash256, block_parent_root: Hash256, }, + /// The child block is known but its parent execution payload envelope has not been received yet. + /// + /// ## Peer scoring + /// + /// It's unclear if this block is valid, but it cannot be fully verified without the parent's + /// execution payload envelope. + ParentEnvelopeUnknown { parent_root: Hash256 }, + /// An error occurred while processing the execution payload envelope during range sync. + EnvelopeError(Box), + + PayloadEnvelopeError { + e: Box, + penalize_peer: bool, + }, } /// Which specific signature(s) are invalid in a SignedBeaconBlock @@ -490,6 +505,33 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: EnvelopeError) -> Self { + let penalize_peer = match &e { + // REJECT per spec: peer sent invalid envelope data + EnvelopeError::BadSignature + | EnvelopeError::BuilderIndexMismatch { .. } + | EnvelopeError::BlockHashMismatch { .. } + | EnvelopeError::SlotMismatch { .. } + | EnvelopeError::IncorrectBlockProposer { .. } => true, + // IGNORE per spec: not the peer's fault + EnvelopeError::BlockRootUnknown { .. } + | EnvelopeError::PriorToFinalization { .. } + | EnvelopeError::UnknownValidator { .. } => false, + // Internal errors: not the peer's fault + EnvelopeError::BeaconChainError(_) + | EnvelopeError::BeaconStateError(_) + | EnvelopeError::EnvelopeProcessingError(_) + | EnvelopeError::ExecutionPayloadError(_) + | EnvelopeError::ImportError(_) => false, + }; + BlockError::PayloadEnvelopeError { + e: Box::new(e), + penalize_peer, + } + } +} + /// Stores information about verifying a payload against an execution engine. #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct PayloadVerificationOutcome { @@ -587,10 +629,17 @@ pub(crate) fn process_block_slash_info( mut chain_segment: Vec<(Hash256, RangeSyncBlock)>, chain: &BeaconChain, -) -> Result>, BlockError> { +) -> Result< + Vec<( + SignatureVerifiedBlock, + Option>>, + )>, + BlockError, +> { if chain_segment.is_empty() { return Ok(vec![]); } @@ -619,14 +668,29 @@ pub fn signature_verify_chain_segment( let consensus_context = ConsensusContext::new(block.slot()).set_current_block_root(block_root); - let available_block = block.into_available_block(); + let (available_block, envelope) = match block { + RangeSyncBlock::Base(ab) => (ab, None), + RangeSyncBlock::Gloas { block, envelope } => { + let ab = AvailableBlock::new( + block, + AvailableBlockData::NoData, + &chain.data_availability_checker, + chain.spec.clone(), + ) + .map_err(BlockError::AvailabilityCheck)?; + (ab, envelope) + } + }; available_blocks.push(available_block.clone()); - signature_verified_blocks.push(SignatureVerifiedBlock { - block: MaybeAvailableBlock::Available(available_block), - block_root, - parent: None, - consensus_context, - }); + signature_verified_blocks.push(( + SignatureVerifiedBlock { + block: MaybeAvailableBlock::Available(available_block), + block_root, + parent: None, + consensus_context, + }, + envelope, + )); } // TODO(gloas) When implementing range and backfill sync for gloas // we need a batch verify kzg function in the new da checker as well. @@ -637,7 +701,7 @@ pub fn signature_verify_chain_segment( // verify signatures let pubkey_cache = get_validator_pubkey_cache(chain)?; let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - for svb in &mut signature_verified_blocks { + for (svb, _) in &mut signature_verified_blocks { signature_verifier .include_all_signatures(svb.block.as_block(), &mut svb.consensus_context)?; } @@ -648,7 +712,7 @@ pub fn signature_verify_chain_segment( drop(pubkey_cache); - if let Some(signature_verified_block) = signature_verified_blocks.first_mut() { + if let Some((signature_verified_block, _)) = signature_verified_blocks.first_mut() { signature_verified_block.parent = Some(parent); } @@ -1196,7 +1260,7 @@ impl SignatureVerifiedBlock { let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify()); match result { Ok(_) => { - // gloas blocks are always available. + // Gloas blocks are always available — data arrives via the envelope. let maybe_available = if chain .spec .fork_name_at_slot::(block.slot()) @@ -1951,6 +2015,21 @@ fn load_parent>( BlockError::from(BeaconChainError::MissingBeaconBlock(block.parent_root())) })?; + // For post-Gloas parent blocks, the execution payload arrives via the envelope. + // If the parent's execution payload envelope hasn't arrived yet, + // return an unknown parent error so the block gets sent to the + // reprocess queue. + if chain + .spec + .fork_name_at_slot::(parent_block.slot()) + .gloas_enabled() + { + let _envelope = chain + .store + .get_payload_envelope(&root)? + .ok_or(BlockError::ParentEnvelopeUnknown { parent_root: root })?; + } + // Load the parent block's state from the database, returning an error if it is not found. // It is an error because if we know the parent block we should also know the parent state. // Retrieve any state that is advanced through to at most `block.slot()`: this is diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index be73ef15d7..f97e695b29 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -2,10 +2,11 @@ use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityC pub use crate::data_availability_checker::{ AvailableBlock, AvailableBlockData, MaybeAvailableBlock, }; +use crate::payload_envelope_verification::AvailableEnvelope; use crate::{BeaconChainTypes, PayloadVerificationOutcome}; -use educe::Educe; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ @@ -45,38 +46,61 @@ impl LookupBlock { /// This includes any and all blobs/columns required, including zero if /// none are required. This can happen if the block is pre-deneb or if /// it's simply past the DA boundary. -#[derive(Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -pub struct RangeSyncBlock { - block: AvailableBlock, +#[derive(Clone)] +pub enum RangeSyncBlock { + Base(AvailableBlock), + Gloas { + block: Arc>, + envelope: Option>>, + }, +} + +impl Hash for RangeSyncBlock { + fn hash(&self, state: &mut H) { + self.block_root().hash(state); + } } impl Debug for RangeSyncBlock { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RpcBlock({:?})", self.block_root()) + write!(f, "RangeSyncBlock({:?})", self.block_root()) } } impl RangeSyncBlock { pub fn block_root(&self) -> Hash256 { - self.block.block_root() + match self { + RangeSyncBlock::Base(block) => block.block_root(), + RangeSyncBlock::Gloas { block, .. } => block.canonical_root(), + } } pub fn as_block(&self) -> &SignedBeaconBlock { - self.block.block() + match self { + RangeSyncBlock::Base(block) => block.block(), + RangeSyncBlock::Gloas { block, .. } => block, + } } pub fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + match self { + RangeSyncBlock::Base(block) => block.block_cloned(), + RangeSyncBlock::Gloas { block, .. } => block.clone(), + } } pub fn block_data(&self) -> &AvailableBlockData { - self.block.data() + match self { + RangeSyncBlock::Base(block) => block.data(), + RangeSyncBlock::Gloas { .. } => { + unreachable!("block_data called on Gloas variant — use envelope data instead") + } + } } } impl RangeSyncBlock { - /// Constructs an `RangeSyncBlock` from a block and availability data. + /// Constructs a `RangeSyncBlock` from a block and availability data. /// /// # Errors /// @@ -94,32 +118,53 @@ impl RangeSyncBlock { T: BeaconChainTypes, { let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?; - Ok(Self { - block: available_block, - }) + Ok(Self::Base(available_block)) + } + + pub fn new_gloas( + block: Arc>, + envelope: Option>>, + ) -> Self { + Self::Gloas { block, envelope } } #[allow(clippy::type_complexity)] pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { - self.block.deconstruct() + match self { + RangeSyncBlock::Base(block) => block.deconstruct(), + RangeSyncBlock::Gloas { .. } => { + unreachable!("deconstruct called on Gloas variant") + } + } } pub fn n_blobs(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, - AvailableBlockData::Blobs(blobs) => blobs.len(), + match self { + RangeSyncBlock::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + }, + RangeSyncBlock::Gloas { .. } => 0, } } pub fn n_data_columns(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, - AvailableBlockData::DataColumns(columns) => columns.len(), + match self { + RangeSyncBlock::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + }, + RangeSyncBlock::Gloas { .. } => 0, } } pub fn into_available_block(self) -> AvailableBlock { - self.block + match self { + RangeSyncBlock::Base(block) => block, + RangeSyncBlock::Gloas { .. } => { + unreachable!("into_available_block called on Gloas variant") + } + } } } @@ -387,31 +432,31 @@ impl AsBlock for AvailableBlock { impl AsBlock for RangeSyncBlock { fn slot(&self) -> Slot { - self.as_block().slot() + RangeSyncBlock::as_block(self).slot() } fn epoch(&self) -> Epoch { - self.as_block().epoch() + RangeSyncBlock::as_block(self).epoch() } fn parent_root(&self) -> Hash256 { - self.as_block().parent_root() + RangeSyncBlock::as_block(self).parent_root() } fn state_root(&self) -> Hash256 { - self.as_block().state_root() + RangeSyncBlock::as_block(self).state_root() } fn signed_block_header(&self) -> SignedBeaconBlockHeader { - self.as_block().signed_block_header() + RangeSyncBlock::as_block(self).signed_block_header() } fn message(&self) -> BeaconBlockRef<'_, E> { - self.as_block().message() + RangeSyncBlock::as_block(self).message() } fn as_block(&self) -> &SignedBeaconBlock { - self.block.as_block() + RangeSyncBlock::as_block(self) } fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + RangeSyncBlock::block_cloned(self) } fn canonical_root(&self) -> Hash256 { - self.block.block_root() + self.block_root() } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 637e8acdc8..00d11c551d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -21,7 +21,7 @@ use tracing::{debug, error, instrument}; use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn}; use types::{ BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError, + DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, PartialDataColumnSidecarError, PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, new_non_zero_usize, }; @@ -539,6 +539,11 @@ impl DataAvailabilityChecker { self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) } + /// Determines if execution payload envelopes are required for an epoch (Gloas and later). + pub fn envelopes_required_for_epoch(&self, epoch: Epoch) -> bool { + self.spec.fork_name_at_epoch(epoch) >= ForkName::Gloas + } + /// See `Self::blobs_required_for_epoch` fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index b4f53b9df8..26dc872dd4 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -19,6 +19,7 @@ //! ``` use state_processing::envelope_processing::EnvelopeProcessingError; +use std::marker::PhantomData; use std::sync::Arc; use store::Error as DBError; use strum::AsRefStr; @@ -40,7 +41,15 @@ mod payload_notifier; pub use execution_pending_envelope::ExecutionPendingEnvelope; -#[derive(Debug)] +// TODO(gloas): could remove this type completely, or remove the generic +#[derive(Debug, PartialEq)] +pub struct EnvelopeImportData { + pub block_root: Hash256, + _phantom: PhantomData, +} + +#[derive(Debug, Clone)] +#[allow(dead_code)] pub struct AvailableEnvelope { envelope: Arc>, pub columns: DataColumnSidecarList, diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ea87e9bc71..d2f7010854 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -417,6 +417,9 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcPayloadEnvelope { + process_fn: AsyncFn, + }, RpcCustodyColumn(AsyncFn), ColumnReconstruction(AsyncFn), IgnoredRpcBlock { @@ -483,6 +486,7 @@ pub enum WorkType { GossipLightClientOptimisticUpdate, RpcBlock, RpcBlobs, + RpcPayloadEnvelope, RpcCustodyColumn, ColumnReconstruction, IgnoredRpcBlock, @@ -545,6 +549,7 @@ impl Work { Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences, Work::RpcBlock { .. } => WorkType::RpcBlock, Work::RpcBlobs { .. } => WorkType::RpcBlobs, + Work::RpcPayloadEnvelope { .. } => WorkType::RpcPayloadEnvelope, Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, @@ -1183,7 +1188,9 @@ impl BeaconProcessor { Work::GossipLightClientOptimisticUpdate { .. } => work_queues .lc_gossip_optimistic_update_queue .push(work, work_id), - Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => { + Work::RpcBlock { .. } + | Work::IgnoredRpcBlock { .. } + | Work::RpcPayloadEnvelope { .. } => { work_queues.rpc_block_queue.push(work, work_id) } Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), @@ -1318,7 +1325,9 @@ impl BeaconProcessor { WorkType::GossipLightClientOptimisticUpdate => { work_queues.lc_gossip_optimistic_update_queue.len() } - WorkType::RpcBlock => work_queues.rpc_block_queue.len(), + WorkType::RpcBlock | WorkType::RpcPayloadEnvelope => { + work_queues.rpc_block_queue.len() + } WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { work_queues.rpc_blob_queue.len() } @@ -1513,6 +1522,7 @@ impl BeaconProcessor { beacon_block_root: _, } | Work::RpcBlobs { process_fn } + | Work::RpcPayloadEnvelope { process_fn } | Work::RpcCustodyColumn(process_fn) | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 486a443857..0b1d84b706 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -31,6 +31,10 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Request searching for an execution payload envelope given a block root. + SinglePayloadEnvelope { id: SingleLookupReqId }, + /// Payload envelopes by range request + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -76,6 +80,14 @@ pub enum DataColumnsByRangeRequester { CustodyBackfillSync(CustodyBackFillBatchRequestId), } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct PayloadEnvelopesByRangeRequestId { + /// Id to identify this attempt at a payload_envelopes_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + /// Block components by range request for range sync. Includes an ID for downstream consumers to /// handle retries and tie all their sub requests together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -252,6 +264,12 @@ macro_rules! impl_display { impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!( + PayloadEnvelopesByRangeRequestId, + "{}/{}", + id, + parent_request_id +); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index b2b5960597..5cef5dcb43 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1853,6 +1853,21 @@ impl NetworkBeaconProcessor { error!(error = %e, "Internal block gossip validation error"); return None; } + Err(BlockError::ParentEnvelopeUnknown { .. }) => { + // Gossip validation does not check envelope availability; this should not occur. + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + return None; + } + Err(e @ BlockError::EnvelopeError(_)) => { + debug!(error = %e, "Gossip block envelope error"); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + return None; + } + Err(e @ BlockError::PayloadEnvelopeError { .. }) => { + debug!(error = %e, "Gossip block payload envelope error"); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + return None; + } }; metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 10af1bf503..e0704e2569 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -33,6 +33,7 @@ pub type BatchId = Epoch; #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { BlocksAndColumns, + BlocksAndEnvelopesAndColumns, BlocksAndBlobs, Blocks, Columns(HashSet), diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 6d96967fd0..a43710435e 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -4,11 +4,13 @@ use beacon_chain::{ data_availability_checker::DataAvailabilityChecker, data_column_verification::CustodyDataColumn, get_block_root, + payload_envelope_verification::AvailableEnvelope, }; use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + PayloadEnvelopesByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; @@ -16,7 +18,7 @@ use std::{collections::HashMap, sync::Arc}; use tracing::{Span, debug}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; @@ -35,6 +37,13 @@ use crate::sync::network_context::MAX_COLUMN_RETRIES; pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, + /// Payload envelopes (Gloas+). None for pre-Gloas forks. + payloads_request: Option< + ByRangeRequest< + PayloadEnvelopesByRangeRequestId, + Vec>>, + >, + >, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, /// Span to track the range request and all children range requests. @@ -88,6 +97,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + payloads_req_id: Option, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { @@ -109,6 +119,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), + payloads_request: payloads_req_id.map(ByRangeRequest::Active), block_data_request, request_span, } @@ -191,6 +202,18 @@ impl RangeBlockComponentsRequest { } } + /// Adds received payload envelopes to the request. + pub fn add_payload_envelopes( + &mut self, + req_id: PayloadEnvelopesByRangeRequestId, + envelopes: Vec>>, + ) -> Result<(), String> { + match &mut self.payloads_request { + Some(req) => req.finish(req_id, envelopes), + None => Err("received payload envelopes but none expected".to_owned()), + } + } + /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. @@ -208,6 +231,13 @@ impl RangeBlockComponentsRequest { return None; }; + // If payloads are expected, they must also be complete before we can produce responses. + if let Some(payloads_req) = &self.payloads_request + && payloads_req.to_finished().is_none() + { + return None; + } + // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( @@ -254,15 +284,29 @@ impl RangeBlockComponentsRequest { } } - let resp = Self::responses_with_custody_columns( - blocks.to_vec(), - data_columns, - column_to_peer_id, - expected_custody_columns, - *attempt, - da_checker, - spec, - ); + // Gloas path: if payloads are present, produce Gloas blocks + let resp = if let Some(payloads_req) = &self.payloads_request { + let payloads = payloads_req.to_finished().expect("checked above").to_vec(); + Self::responses_with_envelopes_and_columns( + blocks.to_vec(), + payloads, + data_columns, + column_to_peer_id, + expected_custody_columns, + *attempt, + spec, + ) + } else { + Self::responses_with_custody_columns( + blocks.to_vec(), + data_columns, + column_to_peer_id, + expected_custody_columns, + *attempt, + da_checker, + spec, + ) + }; if let Err(CouplingError::DataColumnPeerFailure { error: _, @@ -364,102 +408,199 @@ impl RangeBlockComponentsRequest { where T: BeaconChainTypes, { - // Group data columns by block_root and index - let mut data_columns_by_block = - HashMap::>>>::new(); - - for column in data_columns { - let block_root = column.block_root(); - let index = *column.index(); - if data_columns_by_block - .entry(block_root) - .or_default() - .insert(index, column) - .is_some() - { - // `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers - // we request the data from. - // If there are duplicated indices, its likely a peer sending us the same index multiple times. - // However we can still proceed even if there are extra columns, just log an error. - debug!(?block_root, ?index, "Repeated column for block_root"); - continue; - } - } - - // Now iterate all blocks ensuring that the block roots of each block and data column match, - // plus we have columns for our custody requirements + let mut columns_by_root = Self::group_columns_by_root(data_columns); let mut range_sync_blocks = Vec::with_capacity(blocks.len()); - let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; + for block in blocks { let block_root = get_block_root(&block); range_sync_blocks.push(if block.num_expected_blobs() > 0 { - let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) - else { - let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); - return Err(CouplingError::DataColumnPeerFailure { - error: format!("No columns for block {block_root:?} with data"), - faulty_peers: responsible_peers, - exceeded_retries, - - }); - }; - - let mut custody_columns = vec![]; - let mut naughty_peers = vec![]; - for index in expects_custody_columns { - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. - if let Some(data_column) = data_columns_by_index.remove(index) { - custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); - } else { - let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); - }; - naughty_peers.push((*index, *responsible_peer)); - } - } - if !naughty_peers.is_empty() { - return Err(CouplingError::DataColumnPeerFailure { - error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), - faulty_peers: naughty_peers, - exceeded_retries - }); - } - - // Assert that there are no columns left - if !data_columns_by_index.is_empty() { - let remaining_indices = data_columns_by_index.keys().collect::>(); - // log the error but don't return an error, we can still progress with extra columns. - debug!( - ?block_root, - ?remaining_indices, - "Not all columns consumed for block" - ); - } - - let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); - + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. + let columns = Self::extract_custody_columns_for_root( + block_root, + &mut columns_by_root, + expects_custody_columns, + &column_to_peer, + exceeded_retries, + )?; + let custody_columns = columns + .into_iter() + .map(CustodyDataColumn::from_asserted_custody) + .collect::>(); + let block_data = AvailableBlockData::new_with_data_columns( + custody_columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ); RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { - // Block has no data, expects zero columns RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? }); } - // Assert that there are no columns left for other blocks - if !data_columns_by_block.is_empty() { - let remaining_roots = data_columns_by_block.keys().collect::>(); - // log the error but don't return an error, we can still progress with responses. - // this is most likely an internal error with overrequesting or a client bug. + if !columns_by_root.is_empty() { + let remaining_roots = columns_by_root.keys().collect::>(); debug!(?remaining_roots, "Not all columns consumed for block"); } Ok(range_sync_blocks) } + + /// Couples blocks with payload envelopes and custody columns for Gloas range sync. + fn responses_with_envelopes_and_columns( + blocks: Vec>>, + payloads: Vec>>, + data_columns: DataColumnSidecarList, + column_to_peer: HashMap, + expects_custody_columns: &[ColumnIndex], + attempt: usize, + _spec: Arc, + ) -> Result>, CouplingError> { + let mut columns_by_root = Self::group_columns_by_root(data_columns); + let mut range_sync_blocks = Vec::with_capacity(blocks.len()); + let mut payload_iter = payloads.into_iter().peekable(); + let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; + + for block in blocks { + let mut envelope_for_block = None; + if payload_iter + .peek() + .map(|e| e.message.slot() == block.slot()) + .unwrap_or(false) + { + envelope_for_block = payload_iter.next(); + } + + let block_root = get_block_root(&block); + + let available_envelope = if block.num_expected_blobs() > 0 { + let envelope = envelope_for_block.ok_or_else(|| { + CouplingError::InternalError(format!( + "Missing payload envelope for block {block_root:?} with blobs" + )) + })?; + let custody_columns = Self::extract_custody_columns_for_root( + block_root, + &mut columns_by_root, + expects_custody_columns, + &column_to_peer, + exceeded_retries, + )?; + Some(Box::new(AvailableEnvelope::new( + envelope, + custody_columns, + None, + ))) + } else { + envelope_for_block + .map(|envelope| Box::new(AvailableEnvelope::new(envelope, vec![], None))) + }; + + range_sync_blocks.push(RangeSyncBlock::new_gloas(block, available_envelope)); + } + + if payload_iter.next().is_some() { + let remaining = payload_iter.count() + 1; + debug!( + remaining, + "Received payload envelopes that don't pair with blocks" + ); + } + + if !columns_by_root.is_empty() { + let remaining_roots = columns_by_root.keys().collect::>(); + debug!( + ?remaining_roots, + "Not all columns consumed for Gloas blocks" + ); + } + + Ok(range_sync_blocks) + } + + /// Groups data columns by their block root, logging and skipping duplicates. + fn group_columns_by_root( + data_columns: DataColumnSidecarList, + ) -> HashMap>>> { + let mut by_root = + HashMap::>>>::new(); + for column in data_columns { + let block_root = column.block_root(); + let index = *column.index(); + if by_root + .entry(block_root) + .or_default() + .insert(index, column) + .is_some() + { + // `DataColumnsByRangeRequestItems` ensures no duplicated indices across peers. + // Duplicates are likely a peer sending the same index multiple times; log and skip. + debug!(?block_root, ?index, "Repeated column for block_root"); + } + } + by_root + } + + /// Extracts and validates custody columns for a single block root. + /// + /// Removes the matching entry from `columns_by_root`, checks all expected indices are + /// present, and logs any extras. Returns the raw columns; callers wrap them as needed. + fn extract_custody_columns_for_root( + block_root: Hash256, + columns_by_root: &mut HashMap>>>, + expects_custody_columns: &[ColumnIndex], + column_to_peer: &HashMap, + exceeded_retries: bool, + ) -> Result>>, CouplingError> { + let Some(mut by_index) = columns_by_root.remove(&block_root) else { + let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); + return Err(CouplingError::DataColumnPeerFailure { + error: format!("No columns for block {block_root:?} with data"), + faulty_peers: responsible_peers, + exceeded_retries, + }); + }; + + let mut columns = vec![]; + let mut naughty_peers = vec![]; + for index in expects_custody_columns { + if let Some(col) = by_index.remove(index) { + columns.push(col); + } else { + let Some(responsible_peer) = column_to_peer.get(index) else { + return Err(CouplingError::InternalError(format!( + "Internal error, no request made for column {index}" + ))); + }; + naughty_peers.push((*index, *responsible_peer)); + } + } + if !naughty_peers.is_empty() { + return Err(CouplingError::DataColumnPeerFailure { + error: format!( + "Peers did not return column for block_root {block_root:?} {naughty_peers:?}" + ), + faulty_peers: naughty_peers, + exceeded_retries, + }); + } + + if !by_index.is_empty() { + let remaining_indices = by_index.keys().collect::>(); + debug!( + ?block_root, + ?remaining_indices, + "Not all columns consumed for block" + ); + } + + Ok(columns) + } } impl ByRangeRequest { @@ -494,6 +635,8 @@ mod tests { NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_da_checker, test_spec, }; + use bls::Signature; + use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId; use lighthouse_network::{ PeerId, service::api_types::{ @@ -504,7 +647,11 @@ mod tests { use rand::SeedableRng; use std::{collections::HashMap, sync::Arc}; use tracing::Span; - use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng}; + use types::{ + Epoch, EthSpec, ExecutionBlockHash, ExecutionPayloadEnvelope, ExecutionPayloadGloas, + ExecutionRequests, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, test_utils::XorShiftRng, + }; fn components_id() -> ComponentsByRangeRequestId { ComponentsByRangeRequestId { @@ -566,7 +713,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -596,6 +743,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -655,6 +803,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -731,6 +880,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -823,6 +973,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -920,6 +1071,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1035,6 +1187,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1106,4 +1259,171 @@ mod tests { panic!("Expected PeerFailure error with exceeded_retries=true"); } } + + // --- Gloas tests --- + + fn make_gloas_envelope( + slot: Slot, + rng: &mut impl rand::Rng, + ) -> Arc> { + let envelope = ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas { + slot_number: slot, + block_hash: ExecutionBlockHash::from_root(Hash256::from(rng.random::<[u8; 32]>())), + ..ExecutionPayloadGloas::default() + }, + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root: Hash256::from(rng.random::<[u8; 32]>()), + parent_beacon_block_root: Hash256::repeat_byte(0), + }; + Arc::new(SignedExecutionPayloadEnvelope { + message: envelope, + signature: Signature::empty(), + }) + } + + fn envelope_id( + parent_request_id: ComponentsByRangeRequestId, + ) -> PayloadEnvelopesByRangeRequestId { + use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId; + PayloadEnvelopesByRangeRequestId { + id: 99, + parent_request_id, + } + } + + #[test] + fn gloas_blocks_couple_with_envelopes() { + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([42; 16]); + + let blocks = (0..4) + .map(|_| { + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + Arc::new(raw_block) as Arc> + }) + .collect::>(); + + // Build envelopes with slots matching each block + let envelopes: Vec>> = blocks + .iter() + .map(|b| make_gloas_envelope::(b.slot(), &mut rng)) + .collect(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, blocks).unwrap(); + // Not finished — envelopes still pending + assert!(!is_finished(&mut info)); + + info.add_payload_envelopes(env_req_id, envelopes).unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap().len(), 4); + } + + #[test] + fn gloas_blocks_without_envelopes_succeed() { + // Blocks with no blobs don't require envelopes — they should couple fine with an empty envelope response. + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([42; 16]); + + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + let block: Arc> = Arc::new(raw_block); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, vec![block]).unwrap(); + // No envelope for this block (peer didn't send one) + info.add_payload_envelopes(env_req_id, vec![]).unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok(), "expected Ok, got: {:?}", result); + assert_eq!(result.unwrap().len(), 1); + } + + #[test] + fn gloas_extra_envelopes_are_ignored() { + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([99; 16]); + + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + let block: Arc> = Arc::new(raw_block); + let slot = block.slot(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, vec![block]).unwrap(); + // Two envelopes: one matching, one extra at a different slot + let env1 = make_gloas_envelope::(slot, &mut rng); + let env2 = make_gloas_envelope::(Slot::new(slot.as_u64() + 10), &mut rng); + info.add_payload_envelopes(env_req_id, vec![env1, env2]) + .unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap().len(), 1); + } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 68d193ff50..bd59e1df37 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -59,7 +59,8 @@ use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -73,7 +74,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -512,6 +514,11 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error)) + } + SyncRequestId::PayloadEnvelopesByRange(req_id) => self + .on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)), } } @@ -1335,6 +1342,36 @@ impl SyncManager { } } + fn on_single_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) { + // Placeholder: by-root envelope lookup not yet implemented for range sync. + // This is called on error injection for disconnected peers. Log and ignore. + let _ = (id, peer_id, rpc_event); + debug!("on_single_envelope_response: not yet implemented"); + } + + fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_payload_envelopes_by_range_response(id, peer_id, envelope) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::PayloadEnvelope(id, resp), + ); + } + } + fn on_custody_by_root_result( &mut self, requester: CustodyRequester, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 7cf8a67845..07a9fdcd0a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -22,14 +22,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -37,6 +40,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + PayloadEnvelopesByRangeRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -213,6 +217,11 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRange requests + payload_envelopes_by_range_requests: ActiveRequests< + PayloadEnvelopesByRangeRequestId, + PayloadEnvelopesByRangeRequestItems, + >, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -250,6 +259,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + PayloadEnvelope( + PayloadEnvelopesByRangeRequestId, + RpcResponseResult>>>, + ), } #[cfg(test)] @@ -298,6 +311,7 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -326,6 +340,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -361,12 +376,17 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let payload_envelope_by_range_ids = payload_envelopes_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(payload_envelope_by_range_ids) .collect() } @@ -423,6 +443,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -445,6 +466,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(payload_envelopes_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -577,24 +599,26 @@ impl SyncNetworkContext { }; // Attempt to find all required custody peers before sending any request or creating an ID - let columns_by_range_peers_to_request = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let column_indexes = self - .chain - .sampling_columns_for_epoch(epoch) - .iter() - .cloned() - .collect(); - Some(self.select_columns_by_range_peers_to_request( - &column_indexes, - column_peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?) - } else { - None - }; + let columns_by_range_peers_to_request = if matches!( + batch_type, + ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns + ) { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + column_peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { @@ -659,6 +683,28 @@ impl SyncNetworkContext { .transpose()?; let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + + // Send envelope request for Gloas epochs + let payloads_req_id = + if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { + Some(self.send_payload_envelopes_by_range_request( + block_peer, + PayloadEnvelopesByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + new_range_request_span!( + self, + "outgoing_envelopes_by_range", + range_request_span.clone(), + block_peer + ), + )?) + } else { + None + }; + let info = RangeBlockComponentsRequest::new( blocks_req_id, blobs_req_id, @@ -668,6 +714,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + payloads_req_id, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -770,6 +817,17 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::PayloadEnvelope(req_id, resp) => { + resp.and_then(|(envelopes, _)| { + request + .add_payload_envelopes(req_id, envelopes) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { entry.remove(); @@ -1295,6 +1353,57 @@ impl SyncNetworkContext { Ok((id, requested_columns)) } + fn send_payload_envelopes_by_range_request( + &mut self, + peer_id: PeerId, + request: PayloadEnvelopesByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + request_span: Span, + ) -> Result { + let id = PayloadEnvelopesByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::PayloadEnvelopesByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRange", + slots = request.count, + epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_range_requests.insert( + id, + peer_id, + false, + PayloadEnvelopesByRangeRequestItems::new(request), + request_span, + ); + Ok(id) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self + .payload_envelopes_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(resp, peer_id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -1376,6 +1485,12 @@ impl SyncNetworkContext { ); if self + .chain + .data_availability_checker + .envelopes_required_for_epoch(epoch) + { + ByRangeRequestType::BlocksAndEnvelopesAndColumns + } else if self .chain .data_availability_checker .data_columns_required_for_epoch(epoch) @@ -1795,6 +1910,10 @@ impl SyncNetworkContext { "data_columns_by_range", self.data_columns_by_range_requests.len(), ), + ( + "payload_envelopes_by_range", + self.payload_envelopes_by_range_requests.len(), + ), ("custody_by_root", self.custody_by_root_requests.len()), ( "components_by_range", diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index ad60dffb45..b6361a2ed1 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems; use crate::metrics; @@ -27,6 +28,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_range; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs new file mode 100644 index 0000000000..3d4ea8248b --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs @@ -0,0 +1,42 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +/// Accumulates results of a payload_envelopes_by_range request. Only returns items after +/// receiving the stream termination. +pub struct PayloadEnvelopesByRangeRequestItems { + request: PayloadEnvelopesByRangeRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRangeRequestItems { + pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, envelope: Self::Item) -> Result { + let slot = envelope.slot(); + if slot < self.request.start_slot || slot >= self.request.start_slot + self.request.count { + return Err(LookupVerifyError::UnrequestedSlot(slot)); + } + if self.items.iter().any(|existing| existing.slot() == slot) { + return Err(LookupVerifyError::DuplicatedData(slot, 0)); + } + + self.items.push(envelope); + + Ok(false) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +}