diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 1fe013f754..d6cf15027c 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -60,6 +60,7 @@ use crate::execution_payload::{ }; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::SeenBlock; +use crate::payload_envelope_verification::EnvelopeError; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ @@ -321,13 +322,18 @@ pub enum BlockError { bid_parent_root: Hash256, block_parent_root: Hash256, }, - /// The parent block is known but its execution payload envelope has not been received yet. + /// The child block is known but its parent execution payload envelope has not been received yet. /// /// ## Peer scoring /// /// It's unclear if this block is valid, but it cannot be fully verified without the parent's /// execution payload envelope. ParentEnvelopeUnknown { parent_root: Hash256 }, + + PayloadEnvelopeError { + e: Box, + penalize_peer: bool, + }, } /// Which specific signature(s) are invalid in a SignedBeaconBlock @@ -494,6 +500,36 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: EnvelopeError) -> Self { + let penalize_peer = match &e { + // REJECT per spec: peer sent invalid envelope data + EnvelopeError::BadSignature + | EnvelopeError::BuilderIndexMismatch { .. } + | EnvelopeError::BlockHashMismatch { .. } + | EnvelopeError::SlotMismatch { .. } + | EnvelopeError::IncorrectBlockProposer { .. } => true, + // IGNORE per spec: not the peer's fault + EnvelopeError::BlockRootUnknown { .. } + | EnvelopeError::PriorToFinalization { .. } + | EnvelopeError::UnknownValidator { .. } => false, + // Internal errors: not the peer's fault + EnvelopeError::BeaconChainError(_) + | EnvelopeError::BeaconStateError(_) + | EnvelopeError::BlockProcessingError(_) + | EnvelopeError::EnvelopeProcessingError(_) + | EnvelopeError::ExecutionPayloadError(_) + | EnvelopeError::BlockError(_) + | EnvelopeError::InternalError(_) + | EnvelopeError::OptimisticSyncNotSupported { .. } => false, + }; + BlockError::PayloadEnvelopeError { + e: Box::new(e), + penalize_peer, + } + } +} + /// Stores information about verifying a payload against an execution engine. #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct PayloadVerificationOutcome { @@ -1978,8 +2014,9 @@ fn load_parent>( } else if let Ok(parent_bid_block_hash) = parent_block.payload_bid_block_hash() && block.as_block().is_parent_block_full(parent_bid_block_hash) { - // Post-Gloas Full block case. - // TODO(gloas): loading the envelope here is not very efficient + // If the parent's execution payload envelope hasn't arrived yet, + // return an unknown parent error so the block gets sent to the + // reprocess queue. let envelope = chain .store .get_payload_envelope(&root)? diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index ea8c0d4b8a..3479d62f6a 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -90,7 +90,6 @@ pub(crate) fn post_beacon_execution_payload_envelope( .boxed() } /// Publishes a signed execution payload envelope to the network. -/// TODO(gloas): Add gossip verification (BroadcastValidation::Gossip) before import. pub async fn publish_execution_payload_envelope( envelope: SignedExecutionPayloadEnvelope, chain: Arc>, @@ -132,18 +131,21 @@ pub async fn publish_execution_payload_envelope( }; let ctx = chain.gossip_verification_context(); - let Ok(gossip_verifed_envelope) = GossipVerifiedEnvelope::new(signed_envelope, &ctx) else { - warn!(%slot, %beacon_block_root, "Execution payload envelope rejected"); - return Err(warp_utils::reject::custom_bad_request( - "execution payload envelope rejected, gossip verification".to_string(), - )); + let gossip_verified_envelope = match GossipVerifiedEnvelope::new(signed_envelope, &ctx) { + Ok(envelope) => envelope, + Err(e) => { + warn!(%slot, %beacon_block_root, error = ?e, "Execution payload envelope rejected"); + return Err(warp_utils::reject::custom_bad_request(format!( + "execution payload envelope rejected: {e:?}", + ))); + } }; // Import the envelope locally (runs state transition and notifies the EL). chain .process_execution_payload_envelope( beacon_block_root, - gossip_verifed_envelope, + gossip_verified_envelope, NotifyExecutionLayer::Yes, BlockImportSource::HttpApi, publish_fn, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 2e04847630..fe9e1755b6 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1390,7 +1390,9 @@ impl NetworkBeaconProcessor { return None; } // BlobNotRequired is unreachable. Only constructed in `process_gossip_blob` - Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => { + Err(e @ BlockError::InternalError(_)) + | Err(e @ BlockError::BlobNotRequired(_)) + | Err(e @ BlockError::PayloadEnvelopeError { .. }) => { error!(error = %e, "Internal block gossip validation error"); return None; } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index f6d4940121..57d3d7d220 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -109,9 +109,7 @@ impl NetworkBeaconProcessor { ); self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: BlockProcessingResult::Err(BlockError::InternalError(format!( - "Envelope verification failed: {e:?}" - ))), + result: BlockProcessingResult::Err(e.into()), }); return; } @@ -138,9 +136,7 @@ impl NetworkBeaconProcessor { ?beacon_block_root, "RPC payload envelope processing failed" ); - BlockProcessingResult::Err(BlockError::InternalError(format!( - "Envelope processing failed: {e:?}" - ))) + BlockProcessingResult::Err(e.into()) } }; diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 3fb2196975..3d82252a0c 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -336,7 +336,7 @@ impl Router { // TODO(EIP-7732): implement outgoing payload envelopes by range responses once // range sync requests them. Response::PayloadEnvelopesByRange(_) => { - unreachable!() + error!(%peer_id, "Unexpected PayloadEnvelopesByRange response"); } // Light client responses should not be received Response::LightClientBootstrap(_) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index edd99345b4..bb8d81cc6e 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -2,7 +2,7 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, + BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, PeerId, }; use crate::sync::manager::BlockProcessType; use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; @@ -12,16 +12,17 @@ use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, SignedBeaconBlock}; +use types::{DataColumnSidecarList, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use super::SingleLookupId; use super::single_block_lookup::{ComponentRequests, DownloadResult}; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ResponseType { Block, Blob, CustodyColumn, + Envelope, } /// This trait unifies common single block lookup functionality across blocks and blobs. This @@ -151,6 +152,7 @@ impl RequestState for BlobRequestState { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), + ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } @@ -205,6 +207,7 @@ impl RequestState for CustodyRequestState { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), ComponentRequests::ActiveCustodyRequest(request) => Ok(request), + ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } @@ -215,3 +218,52 @@ impl RequestState for CustodyRequestState { &mut self.state } } + +impl RequestState for EnvelopeRequestState { + type VerifiedResponseType = Arc>; + + fn make_request( + &self, + id: Id, + lookup_peers: Arc>>, + _: usize, + cx: &mut SyncNetworkContext, + ) -> Result { + cx.envelope_lookup_request(id, lookup_peers, self.block_root) + .map_err(LookupRequestError::SendFailedNetwork) + } + + fn send_for_processing( + id: Id, + download_result: DownloadResult, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let DownloadResult { + value, + block_root, + seen_timestamp, + .. + } = download_result; + cx.send_envelope_for_processing(id, value, seen_timestamp, block_root) + .map_err(LookupRequestError::SendFailedProcessor) + } + + fn response_type() -> ResponseType { + ResponseType::Envelope + } + + fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { + match &mut request.component_requests { + ComponentRequests::ActiveEnvelopeRequest(request) => Ok(request), + _ => Err("expecting envelope request"), + } + } + + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 7b4e3ce753..27d96de51d 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -22,7 +22,9 @@ use self::parent_chain::{NodeChain, compute_parent_chains}; pub use self::single_block_lookup::DownloadResult; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; +use self::single_block_lookup::{ + AwaitingParent, LookupRequestError, LookupResult, SingleBlockLookup, +}; use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; @@ -39,7 +41,9 @@ use fnv::FnvHashMap; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; +pub use single_block_lookup::{ + BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, +}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; @@ -214,7 +218,7 @@ impl BlockLookups { self.new_current_lookup( block_root, Some(block_component), - Some(parent_root), + Some(AwaitingParent::Block(parent_root)), // On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required // to have the rest of the block components (refer to decoupled blob gossip). Create // the lookup with zero peers to house the block components. @@ -226,7 +230,37 @@ impl BlockLookups { } } - /// Seach a block whose parent root is unknown. + /// A child block's parent envelope is missing. Create a child lookup (with the block component) + /// that waits for the parent envelope, and an envelope-only lookup for the parent. + /// + /// Returns true if both lookups are created or already exist. + #[must_use = "only reference the new lookup if returns true"] + pub fn search_child_and_parent_envelope( + &mut self, + block_root: Hash256, + block_component: BlockComponent, + parent_root: Hash256, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) -> bool { + let envelope_lookup_exists = + self.search_parent_envelope_of_child(parent_root, &[peer_id], cx); + if envelope_lookup_exists { + // Create child lookup that waits for the parent envelope. + // The child block itself has already been seen, so we pass it as a component. + self.new_current_lookup( + block_root, + Some(block_component), + Some(AwaitingParent::Envelope(parent_root)), + &[], + cx, + ) + } else { + false + } + } + + /// Search a block whose parent root is unknown. /// /// Returns true if the lookup is created or already exists #[must_use = "only reference the new lookup if returns true"] @@ -344,6 +378,57 @@ impl BlockLookups { self.new_current_lookup(block_root_to_search, None, None, peers, cx) } + /// A block triggers the search of a parent envelope. + #[must_use = "only reference the new lookup if returns true"] + pub fn search_parent_envelope_of_child( + &mut self, + parent_root: Hash256, + peers: &[PeerId], + cx: &mut SyncNetworkContext, + ) -> bool { + // Check if there's already a lookup for this root (could be a block lookup or envelope + // lookup). If so, add peers and let it handle the envelope. + if let Some((&lookup_id, _lookup)) = self + .single_block_lookups + .iter_mut() + .find(|(_, lookup)| lookup.is_for_block(parent_root)) + { + if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) { + warn!(error = ?e, "Error adding peers to envelope lookup"); + } + return true; + } + + if self.single_block_lookups.len() >= MAX_LOOKUPS { + warn!(?parent_root, "Dropping envelope lookup reached max"); + return false; + } + + let lookup = SingleBlockLookup::new_envelope_only(parent_root, peers, cx.next_id()); + let _guard = lookup.span.clone().entered(); + + let id = lookup.id; + let lookup = match self.single_block_lookups.entry(id) { + Entry::Vacant(entry) => entry.insert(lookup), + Entry::Occupied(_) => { + warn!(id, "Lookup exists with same id"); + return false; + } + }; + + debug!( + ?peers, + ?parent_root, + id = lookup.id, + "Created envelope-only lookup" + ); + metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED); + self.metrics.created_lookups += 1; + + let result = lookup.continue_requests(cx); + self.on_lookup_result(id, result, "new_envelope_lookup", cx) + } + /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. /// Returns true if the lookup is created or already exists @@ -352,7 +437,7 @@ impl BlockLookups { &mut self, block_root: Hash256, block_component: Option>, - awaiting_parent: Option, + awaiting_parent: Option, peers: &[PeerId], cx: &mut SyncNetworkContext, ) -> bool { @@ -387,13 +472,14 @@ impl BlockLookups { } // Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress - if let Some(awaiting_parent) = awaiting_parent + if let Some(AwaitingParent::Block(parent_root) | AwaitingParent::Envelope(parent_root)) = + awaiting_parent && !self .single_block_lookups .iter() - .any(|(_, lookup)| lookup.is_for_block(awaiting_parent)) + .any(|(_, lookup)| lookup.is_for_block(parent_root)) { - warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found"); + warn!(block_root = ?parent_root, "Ignoring child lookup parent lookup not found"); return false; } @@ -427,9 +513,7 @@ impl BlockLookups { debug!( ?peers, ?block_root, - awaiting_parent = awaiting_parent - .map(|root| root.to_string()) - .unwrap_or("none".to_owned()), + ?awaiting_parent, id = lookup.id, "Created block lookup" ); @@ -561,17 +645,13 @@ impl BlockLookups { self.on_processing_result_inner::>(id, result, cx) } BlockProcessType::SinglePayloadEnvelope { id, block_root } => { - match result { - BlockProcessingResult::Ok(_) => { - self.continue_envelope_child_lookups(block_root, cx); - } - BlockProcessingResult::Err(e) => { - debug!(%id, error = ?e, "Payload envelope processing failed"); - // TODO(EIP-7732): resolve awaiting_envelope on affected lookups so they can retry - } - _ => {} + let result = self + .on_processing_result_inner::>(id, result, cx); + // On successful envelope import, unblock child lookups waiting for this envelope + if matches!(&result, Ok(LookupResult::Completed)) { + self.continue_envelope_child_lookups(block_root, cx); } - return; + result } }; self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); @@ -687,6 +767,26 @@ impl BlockLookups { // We opt to drop the lookup instead. Action::Drop(format!("{e:?}")) } + BlockError::PayloadEnvelopeError { e, penalize_peer } => { + debug!( + ?block_root, + error = ?e, + "Payload envelope processing error" + ); + if penalize_peer { + let peer_group = request_state.on_processing_failure()?; + for peer in peer_group.all() { + cx.report_peer( + *peer, + PeerAction::MidToleranceError, + "lookup_envelope_processing_failure", + ); + } + Action::Retry + } else { + Action::Drop(format!("{e:?}")) + } + } other => { debug!( ?block_root, @@ -721,6 +821,7 @@ impl BlockLookups { ResponseType::CustodyColumn => { "lookup_custody_column_processing_failure" } + ResponseType::Envelope => "lookup_envelope_processing_failure", }, ); } @@ -764,22 +865,21 @@ impl BlockLookups { } Action::ParentEnvelopeUnknown { parent_root } => { let peers = lookup.all_peers(); - lookup.set_awaiting_envelope(parent_root); - // Pick a peer to request the envelope from - let peer_id = peers.first().copied().ok_or_else(|| { - LookupRequestError::Failed("No peers available for envelope request".to_owned()) - })?; - match cx.envelope_lookup_request(lookup_id, peer_id, parent_root) { - Ok(_) => { - debug!( - id = lookup_id, - ?block_root, - ?parent_root, - "Requesting missing parent envelope" - ); - Ok(LookupResult::Pending) - } - Err(e) => Err(LookupRequestError::SendFailedNetwork(e)), + lookup.set_awaiting_parent_envelope(parent_root); + let envelope_lookup_exists = + self.search_parent_envelope_of_child(parent_root, &peers, cx); + if envelope_lookup_exists { + debug!( + id = lookup_id, + ?block_root, + ?parent_root, + "Marking lookup as awaiting parent envelope" + ); + Ok(LookupResult::Pending) + } else { + Err(LookupRequestError::Failed(format!( + "Envelope lookup could not be created for {parent_root:?}" + ))) } } Action::Drop(reason) => { @@ -831,7 +931,7 @@ impl BlockLookups { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_parent() == Some(block_root) { + if lookup.awaiting_parent_block() == Some(block_root) { lookup.resolve_awaiting_parent(); debug!( parent_root = ?block_root, @@ -858,8 +958,8 @@ impl BlockLookups { let mut lookup_results = vec![]; for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_envelope() == Some(block_root) { - lookup.resolve_awaiting_envelope(); + if lookup.awaiting_parent_envelope() == Some(block_root) { + lookup.resolve_awaiting_parent(); debug!( envelope_root = ?block_root, id, @@ -891,10 +991,14 @@ impl BlockLookups { metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[reason]); self.metrics.dropped_lookups += 1; + let dropped_root = dropped_lookup.block_root(); let child_lookups = self .single_block_lookups .iter() - .filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root())) + .filter(|(_, lookup)| { + lookup.awaiting_parent_block() == Some(dropped_root) + || lookup.awaiting_parent_envelope() == Some(dropped_root) + }) .map(|(id, _)| *id) .collect::>(); @@ -1062,17 +1166,15 @@ impl BlockLookups { &'a self, lookup: &'a SingleBlockLookup, ) -> Result<&'a SingleBlockLookup, String> { - if let Some(awaiting_parent) = lookup.awaiting_parent() { + if let Some(parent_root) = lookup.awaiting_parent_block() { if let Some(lookup) = self .single_block_lookups .values() - .find(|l| l.block_root() == awaiting_parent) + .find(|l| l.block_root() == parent_root) { self.find_oldest_ancestor_lookup(lookup) } else { - Err(format!( - "Lookup references unknown parent {awaiting_parent:?}" - )) + Err(format!("Lookup references unknown parent {parent_root:?}")) } } else { Ok(lookup) @@ -1105,7 +1207,7 @@ impl BlockLookups { } } - if let Some(parent_root) = lookup.awaiting_parent() { + if let Some(parent_root) = lookup.awaiting_parent_block() { if let Some((&child_id, _)) = self .single_block_lookups .iter() diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 5deea1dd94..18363e9b8d 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -13,7 +13,7 @@ impl From<&SingleBlockLookup> for Node { fn from(value: &SingleBlockLookup) -> Self { Self { block_root: value.block_root(), - parent_root: value.awaiting_parent(), + parent_root: value.awaiting_parent_block(), } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 51cc191056..6687a1ec75 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -16,7 +16,9 @@ use store::Hash256; use strum::IntoStaticStr; use tracing::{Span, debug_span}; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; +use types::{ + DataColumnSidecarList, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, +}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -56,6 +58,14 @@ pub enum LookupRequestError { }, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AwaitingParent { + /// Waiting for the parent block to be imported. + Block(Hash256), + /// The parent block is imported but its execution payload envelope is missing. + Envelope(Hash256), +} + #[derive(Educe)] #[educe(Debug(bound(T: BeaconChainTypes)))] pub struct SingleBlockLookup { @@ -69,8 +79,7 @@ pub struct SingleBlockLookup { #[educe(Debug(method(fmt_peer_set_as_len)))] peers: Arc>>, block_root: Hash256, - awaiting_parent: Option, - awaiting_envelope: Option, + awaiting_parent: Option, created: Instant, pub(crate) span: Span, } @@ -80,6 +89,7 @@ pub(crate) enum ComponentRequests { WaitingForBlock, ActiveBlobRequest(BlobRequestState, usize), ActiveCustodyRequest(CustodyRequestState), + ActiveEnvelopeRequest(EnvelopeRequestState), // When printing in debug this state display the reason why it's not needed #[allow(dead_code)] NotNeeded(&'static str), @@ -90,7 +100,7 @@ impl SingleBlockLookup { requested_block_root: Hash256, peers: &[PeerId], id: Id, - awaiting_parent: Option, + awaiting_parent: Option, ) -> Self { let lookup_span = debug_span!( "lh_single_block_lookup", @@ -105,16 +115,38 @@ impl SingleBlockLookup { peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))), block_root: requested_block_root, awaiting_parent, - awaiting_envelope: None, created: Instant::now(), span: lookup_span, } } + /// Create an envelope-only lookup. The block is already imported, we just need the envelope. + pub fn new_envelope_only(block_root: Hash256, peers: &[PeerId], id: Id) -> Self { + let mut lookup = Self::new(block_root, peers, id, None); + // Block is already imported, mark as completed + lookup + .block_request_state + .state + .on_completed_request("block already imported") + .expect("block state starts as AwaitingDownload"); + lookup.component_requests = + ComponentRequests::ActiveEnvelopeRequest(EnvelopeRequestState::new(block_root)); + lookup + } + /// Reset the status of all internal requests pub fn reset_requests(&mut self) { self.block_request_state = BlockRequestState::new(self.block_root); - self.component_requests = ComponentRequests::WaitingForBlock; + match &self.component_requests { + ComponentRequests::ActiveEnvelopeRequest(_) => { + self.component_requests = ComponentRequests::ActiveEnvelopeRequest( + EnvelopeRequestState::new(self.block_root), + ); + } + _ => { + self.component_requests = ComponentRequests::WaitingForBlock; + } + } } /// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing` @@ -130,34 +162,39 @@ impl SingleBlockLookup { self.block_root } - pub fn awaiting_parent(&self) -> Option { + pub fn awaiting_parent(&self) -> Option { self.awaiting_parent } - /// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send - /// components for processing. + /// Returns the parent root if awaiting a parent block. + pub fn awaiting_parent_block(&self) -> Option { + match self.awaiting_parent { + Some(AwaitingParent::Block(root)) => Some(root), + _ => None, + } + } + + /// Returns the parent root if awaiting a parent envelope. + pub fn awaiting_parent_envelope(&self) -> Option { + match self.awaiting_parent { + Some(AwaitingParent::Envelope(root)) => Some(root), + _ => None, + } + } + + /// Mark this lookup as awaiting a parent block to be imported before processing. pub fn set_awaiting_parent(&mut self, parent_root: Hash256) { - self.awaiting_parent = Some(parent_root) - } - - /// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for - /// processing. - pub fn resolve_awaiting_parent(&mut self) { - self.awaiting_parent = None; - } - - pub fn awaiting_envelope(&self) -> Option { - self.awaiting_envelope + self.awaiting_parent = Some(AwaitingParent::Block(parent_root)); } /// Mark this lookup as awaiting a parent envelope to be imported before processing. - pub fn set_awaiting_envelope(&mut self, parent_root: Hash256) { - self.awaiting_envelope = Some(parent_root); + pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) { + self.awaiting_parent = Some(AwaitingParent::Envelope(parent_root)); } - /// Mark this lookup as no longer awaiting a parent envelope. - pub fn resolve_awaiting_envelope(&mut self) { - self.awaiting_envelope = None; + /// Mark this lookup as no longer awaiting any parent. + pub fn resolve_awaiting_parent(&mut self) { + self.awaiting_parent = None; } /// Returns the time elapsed since this lookup was created @@ -194,6 +231,7 @@ impl SingleBlockLookup { ComponentRequests::WaitingForBlock => false, ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(), ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(), + ComponentRequests::ActiveEnvelopeRequest(request) => request.state.is_processed(), ComponentRequests::NotNeeded { .. } => true, } } @@ -201,7 +239,6 @@ impl SingleBlockLookup { /// Returns true if this request is expecting some event to make progress pub fn is_awaiting_event(&self) -> bool { self.awaiting_parent.is_some() - || self.awaiting_envelope.is_some() || self.block_request_state.state.is_awaiting_event() || match &self.component_requests { // If components are waiting for the block request to complete, here we should @@ -214,6 +251,9 @@ impl SingleBlockLookup { ComponentRequests::ActiveCustodyRequest(request) => { request.state.is_awaiting_event() } + ComponentRequests::ActiveEnvelopeRequest(request) => { + request.state.is_awaiting_event() + } ComponentRequests::NotNeeded { .. } => false, } } @@ -283,6 +323,9 @@ impl SingleBlockLookup { ComponentRequests::ActiveCustodyRequest(_) => { self.continue_request::>(cx, 0)? } + ComponentRequests::ActiveEnvelopeRequest(_) => { + self.continue_request::>(cx, 0)? + } ComponentRequests::NotNeeded { .. } => {} // do nothing } @@ -304,7 +347,7 @@ impl SingleBlockLookup { expected_blobs: usize, ) -> Result<(), LookupRequestError> { let id = self.id; - let awaiting_event = self.awaiting_parent.is_some() || self.awaiting_envelope.is_some(); + let awaiting_event = self.awaiting_parent.is_some(); let request = R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?; @@ -444,6 +487,26 @@ impl BlockRequestState { } } +/// The state of the envelope request component of a `SingleBlockLookup`. +/// Used for envelope-only lookups where the parent block is already imported +/// but its execution payload envelope is missing. +#[derive(Educe)] +#[educe(Debug)] +pub struct EnvelopeRequestState { + #[educe(Debug(ignore))] + pub block_root: Hash256, + pub state: SingleLookupRequestState>>, +} + +impl EnvelopeRequestState { + pub fn new(block_root: Hash256) -> Self { + Self { + block_root, + state: SingleLookupRequestState::new(), + } + } +} + #[derive(Debug, Clone)] pub struct DownloadResult { pub value: T, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 256752d5fb..1ca338ccd3 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -45,6 +45,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, + EnvelopeRequestState, }; use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; @@ -934,9 +935,9 @@ impl SyncManager { debug!( %block_root, %parent_root, - "Parent envelope not yet available, creating lookup" + "Parent envelope not yet available, creating envelope lookup" ); - self.handle_unknown_parent( + self.handle_unknown_parent_envelope( peer_id, block_root, parent_root, @@ -1054,6 +1055,40 @@ impl SyncManager { } } + /// Handle a block whose parent block is known but parent envelope is missing. + /// Creates an envelope-only lookup for the parent and a child lookup that waits for it. + fn handle_unknown_parent_envelope( + &mut self, + peer_id: PeerId, + block_root: Hash256, + parent_root: Hash256, + slot: Slot, + block_component: BlockComponent, + ) { + match self.should_search_for_block(Some(slot), &peer_id) { + Ok(_) => { + if self.block_lookups.search_child_and_parent_envelope( + block_root, + block_component, + parent_root, + peer_id, + &mut self.network, + ) { + // Lookups created + } else { + debug!( + ?block_root, + ?parent_root, + "No lookup created for child and parent envelope" + ); + } + } + Err(reason) => { + debug!(%block_root, %parent_root, reason, "Ignoring unknown parent envelope request"); + } + } + } + fn handle_unknown_block_root(&mut self, peer_id: PeerId, block_root: Hash256) { match self.should_search_for_block(None, &peer_id) { Ok(_) => { @@ -1278,27 +1313,14 @@ impl SyncManager { .network .on_single_envelope_response(id, peer_id, rpc_event) { - match resp { - Ok((envelope, seen_timestamp)) => { - let block_root = envelope.beacon_block_root(); - debug!( - ?block_root, - %id, - "Downloaded payload envelope, sending for processing" - ); - if let Err(e) = self.network.send_envelope_for_processing( - id.req_id, - envelope, - seen_timestamp, - block_root, - ) { - error!(error = ?e, "Failed to send envelope for processing"); - } - } - Err(e) => { - debug!(error = ?e, %id, "Payload envelope download failed"); - } - } + self.block_lookups + .on_download_response::>( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index e9d289b777..1176442202 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -944,9 +944,26 @@ impl SyncNetworkContext { pub fn envelope_lookup_request( &mut self, lookup_id: SingleLookupId, - peer_id: PeerId, + lookup_peers: Arc>>, block_root: Hash256, - ) -> Result { + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + let Some(peer_id) = lookup_peers + .read() + .iter() + .map(|peer| { + ( + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + return Ok(LookupRequestResult::Pending("no peers")); + }; + let id = SingleLookupReqId { lookup_id, req_id: self.next_id(), @@ -988,7 +1005,7 @@ impl SyncNetworkContext { request_span, ); - Ok(id.req_id) + Ok(LookupRequestResult::RequestSent(id.req_id)) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: @@ -1900,6 +1917,10 @@ impl SyncNetworkContext { "data_columns_by_range", self.data_columns_by_range_requests.len(), ), + ( + "payload_envelopes_by_root", + self.payload_envelopes_by_root_requests.len(), + ), ("custody_by_root", self.custody_by_root_requests.len()), ( "components_by_range",