diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index edd99345b4..bb8d81cc6e 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -2,7 +2,7 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, + BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, PeerId, }; use crate::sync::manager::BlockProcessType; use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; @@ -12,16 +12,17 @@ use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, SignedBeaconBlock}; +use types::{DataColumnSidecarList, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use super::SingleLookupId; use super::single_block_lookup::{ComponentRequests, DownloadResult}; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ResponseType { Block, Blob, CustodyColumn, + Envelope, } /// This trait unifies common single block lookup functionality across blocks and blobs. This @@ -151,6 +152,7 @@ impl RequestState for BlobRequestState { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), + ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } @@ -205,6 +207,7 @@ impl RequestState for CustodyRequestState { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), ComponentRequests::ActiveCustodyRequest(request) => Ok(request), + ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } @@ -215,3 +218,52 @@ impl RequestState for CustodyRequestState { &mut self.state } } + +impl RequestState for EnvelopeRequestState { + type VerifiedResponseType = Arc>; + + fn make_request( + &self, + id: Id, + lookup_peers: Arc>>, + _: usize, + cx: &mut SyncNetworkContext, + ) -> Result { + cx.envelope_lookup_request(id, lookup_peers, self.block_root) + .map_err(LookupRequestError::SendFailedNetwork) + } + + fn send_for_processing( + id: Id, + download_result: DownloadResult, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let DownloadResult { + value, + block_root, + seen_timestamp, + .. + } = download_result; + cx.send_envelope_for_processing(id, value, seen_timestamp, block_root) + .map_err(LookupRequestError::SendFailedProcessor) + } + + fn response_type() -> ResponseType { + ResponseType::Envelope + } + + fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { + match &mut request.component_requests { + ComponentRequests::ActiveEnvelopeRequest(request) => Ok(request), + _ => Err("expecting envelope request"), + } + } + + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 7b4e3ce753..b33c38d147 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -39,7 +39,9 @@ use fnv::FnvHashMap; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; +pub use single_block_lookup::{ + BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, +}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; @@ -344,6 +346,57 @@ impl BlockLookups { self.new_current_lookup(block_root_to_search, None, None, peers, cx) } + /// A block triggers the search of a parent envelope. + #[must_use = "only reference the new lookup if returns true"] + pub fn search_parent_envelope_of_child( + &mut self, + parent_root: Hash256, + peers: &[PeerId], + cx: &mut SyncNetworkContext, + ) -> bool { + // Check if there's already a lookup for this root (could be a block lookup or envelope + // lookup). If so, add peers and let it handle the envelope. + if let Some((&lookup_id, _lookup)) = self + .single_block_lookups + .iter_mut() + .find(|(_, lookup)| lookup.is_for_block(parent_root)) + { + if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) { + warn!(error = ?e, "Error adding peers to envelope lookup"); + } + return true; + } + + if self.single_block_lookups.len() >= MAX_LOOKUPS { + warn!(?parent_root, "Dropping envelope lookup reached max"); + return false; + } + + let lookup = SingleBlockLookup::new_envelope_only(parent_root, peers, cx.next_id()); + let _guard = lookup.span.clone().entered(); + + let id = lookup.id; + let lookup = match self.single_block_lookups.entry(id) { + Entry::Vacant(entry) => entry.insert(lookup), + Entry::Occupied(_) => { + warn!(id, "Lookup exists with same id"); + return false; + } + }; + + debug!( + ?peers, + ?parent_root, + id = lookup.id, + "Created envelope-only lookup" + ); + metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED); + self.metrics.created_lookups += 1; + + let result = lookup.continue_requests(cx); + self.on_lookup_result(id, result, "new_envelope_lookup", cx) + } + /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. /// Returns true if the lookup is created or already exists @@ -561,17 +614,13 @@ impl BlockLookups { self.on_processing_result_inner::>(id, result, cx) } BlockProcessType::SinglePayloadEnvelope { id, block_root } => { - match result { - BlockProcessingResult::Ok(_) => { - self.continue_envelope_child_lookups(block_root, cx); - } - BlockProcessingResult::Err(e) => { - debug!(%id, error = ?e, "Payload envelope processing failed"); - // TODO(EIP-7732): resolve awaiting_envelope on affected lookups so they can retry - } - _ => {} + let result = self + .on_processing_result_inner::>(id, result, cx); + // On successful envelope import, unblock child lookups waiting for this envelope + if matches!(&result, Ok(LookupResult::Completed)) { + self.continue_envelope_child_lookups(block_root, cx); } - return; + result } }; self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); @@ -721,6 +770,7 @@ impl BlockLookups { ResponseType::CustodyColumn => { "lookup_custody_column_processing_failure" } + ResponseType::Envelope => "lookup_envelope_processing_failure", }, ); } @@ -764,22 +814,20 @@ impl BlockLookups { } Action::ParentEnvelopeUnknown { parent_root } => { let peers = lookup.all_peers(); - lookup.set_awaiting_envelope(parent_root); - // Pick a peer to request the envelope from - let peer_id = peers.first().copied().ok_or_else(|| { - LookupRequestError::Failed("No peers available for envelope request".to_owned()) - })?; - match cx.envelope_lookup_request(lookup_id, peer_id, parent_root) { - Ok(_) => { - debug!( - id = lookup_id, - ?block_root, - ?parent_root, - "Requesting missing parent envelope" - ); - Ok(LookupResult::Pending) - } - Err(e) => Err(LookupRequestError::SendFailedNetwork(e)), + lookup.set_awaiting_parent_envelope(parent_root); + let envelope_lookup_exists = self.search_parent_envelope_of_child(parent_root, &peers, cx); + if envelope_lookup_exists { + debug!( + id = lookup_id, + ?block_root, + ?parent_root, + "Marking lookup as awaiting parent envelope" + ); + Ok(LookupResult::Pending) + } else { + Err(LookupRequestError::Failed(format!( + "Envelope lookup could not be created for {parent_root:?}" + ))) } } Action::Drop(reason) => { @@ -858,8 +906,8 @@ impl BlockLookups { let mut lookup_results = vec![]; for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_envelope() == Some(block_root) { - lookup.resolve_awaiting_envelope(); + if lookup.awaiting_parent_envelope() == Some(block_root) { + lookup.resolve_awaiting_parent_envelope(); debug!( envelope_root = ?block_root, id, @@ -894,7 +942,10 @@ impl BlockLookups { let child_lookups = self .single_block_lookups .iter() - .filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root())) + .filter(|(_, lookup)| { + lookup.awaiting_parent() == Some(dropped_lookup.block_root()) + || lookup.awaiting_parent_envelope() == Some(dropped_lookup.block_root()) + }) .map(|(id, _)| *id) .collect::>(); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 51cc191056..d59753b960 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -16,7 +16,9 @@ use store::Hash256; use strum::IntoStaticStr; use tracing::{Span, debug_span}; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; +use types::{ + DataColumnSidecarList, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, +}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -70,7 +72,7 @@ pub struct SingleBlockLookup { peers: Arc>>, block_root: Hash256, awaiting_parent: Option, - awaiting_envelope: Option, + awaiting_parent_envelope: Option, created: Instant, pub(crate) span: Span, } @@ -80,6 +82,7 @@ pub(crate) enum ComponentRequests { WaitingForBlock, ActiveBlobRequest(BlobRequestState, usize), ActiveCustodyRequest(CustodyRequestState), + ActiveEnvelopeRequest(EnvelopeRequestState), // When printing in debug this state display the reason why it's not needed #[allow(dead_code)] NotNeeded(&'static str), @@ -105,12 +108,26 @@ impl SingleBlockLookup { peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))), block_root: requested_block_root, awaiting_parent, - awaiting_envelope: None, + awaiting_parent_envelope: None, created: Instant::now(), span: lookup_span, } } + /// Create an envelope-only lookup. The block is already imported, we just need the envelope. + pub fn new_envelope_only(block_root: Hash256, peers: &[PeerId], id: Id) -> Self { + let mut lookup = Self::new(block_root, peers, id, None); + // Block is already imported, mark as completed + lookup + .block_request_state + .state + .on_completed_request("block already imported") + .expect("block state starts as AwaitingDownload"); + lookup.component_requests = + ComponentRequests::ActiveEnvelopeRequest(EnvelopeRequestState::new(block_root)); + lookup + } + /// Reset the status of all internal requests pub fn reset_requests(&mut self) { self.block_request_state = BlockRequestState::new(self.block_root); @@ -146,18 +163,18 @@ impl SingleBlockLookup { self.awaiting_parent = None; } - pub fn awaiting_envelope(&self) -> Option { - self.awaiting_envelope + pub fn awaiting_parent_envelope(&self) -> Option { + self.awaiting_parent_envelope } /// Mark this lookup as awaiting a parent envelope to be imported before processing. - pub fn set_awaiting_envelope(&mut self, parent_root: Hash256) { - self.awaiting_envelope = Some(parent_root); + pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) { + self.awaiting_parent_envelope = Some(parent_root); } /// Mark this lookup as no longer awaiting a parent envelope. - pub fn resolve_awaiting_envelope(&mut self) { - self.awaiting_envelope = None; + pub fn resolve_awaiting_parent_envelope(&mut self) { + self.awaiting_parent_envelope = None; } /// Returns the time elapsed since this lookup was created @@ -194,6 +211,7 @@ impl SingleBlockLookup { ComponentRequests::WaitingForBlock => false, ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(), ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(), + ComponentRequests::ActiveEnvelopeRequest(request) => request.state.is_processed(), ComponentRequests::NotNeeded { .. } => true, } } @@ -201,7 +219,7 @@ impl SingleBlockLookup { /// Returns true if this request is expecting some event to make progress pub fn is_awaiting_event(&self) -> bool { self.awaiting_parent.is_some() - || self.awaiting_envelope.is_some() + || self.awaiting_parent_envelope.is_some() || self.block_request_state.state.is_awaiting_event() || match &self.component_requests { // If components are waiting for the block request to complete, here we should @@ -214,6 +232,9 @@ impl SingleBlockLookup { ComponentRequests::ActiveCustodyRequest(request) => { request.state.is_awaiting_event() } + ComponentRequests::ActiveEnvelopeRequest(request) => { + request.state.is_awaiting_event() + } ComponentRequests::NotNeeded { .. } => false, } } @@ -283,6 +304,9 @@ impl SingleBlockLookup { ComponentRequests::ActiveCustodyRequest(_) => { self.continue_request::>(cx, 0)? } + ComponentRequests::ActiveEnvelopeRequest(_) => { + self.continue_request::>(cx, 0)? + } ComponentRequests::NotNeeded { .. } => {} // do nothing } @@ -304,7 +328,8 @@ impl SingleBlockLookup { expected_blobs: usize, ) -> Result<(), LookupRequestError> { let id = self.id; - let awaiting_event = self.awaiting_parent.is_some() || self.awaiting_envelope.is_some(); + let awaiting_event = + self.awaiting_parent.is_some() || self.awaiting_parent_envelope.is_some(); let request = R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?; @@ -444,6 +469,26 @@ impl BlockRequestState { } } +/// The state of the envelope request component of a `SingleBlockLookup`. +/// Used for envelope-only lookups where the parent block is already imported +/// but its execution payload envelope is missing. +#[derive(Educe)] +#[educe(Debug)] +pub struct EnvelopeRequestState { + #[educe(Debug(ignore))] + pub block_root: Hash256, + pub state: SingleLookupRequestState>>, +} + +impl EnvelopeRequestState { + pub fn new(block_root: Hash256) -> Self { + Self { + block_root, + state: SingleLookupRequestState::new(), + } + } +} + #[derive(Debug, Clone)] pub struct DownloadResult { pub value: T, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 256752d5fb..2cc35081b7 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -45,6 +45,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, + EnvelopeRequestState, }; use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; @@ -1278,27 +1279,14 @@ impl SyncManager { .network .on_single_envelope_response(id, peer_id, rpc_event) { - match resp { - Ok((envelope, seen_timestamp)) => { - let block_root = envelope.beacon_block_root(); - debug!( - ?block_root, - %id, - "Downloaded payload envelope, sending for processing" - ); - if let Err(e) = self.network.send_envelope_for_processing( - id.req_id, - envelope, - seen_timestamp, - block_root, - ) { - error!(error = ?e, "Failed to send envelope for processing"); - } - } - Err(e) => { - debug!(error = ?e, %id, "Payload envelope download failed"); - } - } + self.block_lookups + .on_download_response::>( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index e9d289b777..328940d672 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -944,9 +944,26 @@ impl SyncNetworkContext { pub fn envelope_lookup_request( &mut self, lookup_id: SingleLookupId, - peer_id: PeerId, + lookup_peers: Arc>>, block_root: Hash256, - ) -> Result { + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + let Some(peer_id) = lookup_peers + .read() + .iter() + .map(|peer| { + ( + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + return Ok(LookupRequestResult::Pending("no peers")); + }; + let id = SingleLookupReqId { lookup_id, req_id: self.next_id(), @@ -988,7 +1005,7 @@ impl SyncNetworkContext { request_span, ); - Ok(id.req_id) + Ok(LookupRequestResult::RequestSent(id.req_id)) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: