diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 151c6009c7..79596bb4a6 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -896,9 +896,9 @@ impl ReprocessQueue { .ready_work_tx .try_send(ReadyWork::Envelope(envelope)) .is_err() - { - error!(?block_root, "Failed to send envelope after timeout"); - } + { + error!(?block_root, "Failed to send envelope after timeout"); + } } InboundEvent::ReadyAttestation(queued_id) => { metrics::inc_counter( diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 539445ef50..7b4e3ce753 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -109,6 +109,7 @@ pub type SingleLookupId = u32; enum Action { Retry, ParentUnknown { parent_root: Hash256 }, + ParentEnvelopeUnknown { parent_root: Hash256 }, Drop(/* reason: */ String), Continue, } @@ -559,14 +560,17 @@ impl BlockLookups { BlockProcessType::SingleCustodyColumn(id) => { self.on_processing_result_inner::>(id, result, cx) } - BlockProcessType::SinglePayloadEnvelope { id } => { - // TODO(EIP-7732): Wire into lookup state machine once envelope lookups are - // fully integrated. For now, just log the result. - debug!( - %id, - ?result, - "Payload envelope processing result (not yet wired to lookups)" - ); + BlockProcessType::SinglePayloadEnvelope { id, block_root } => { + match result { + BlockProcessingResult::Ok(_) => { + self.continue_envelope_child_lookups(block_root, cx); + } + BlockProcessingResult::Err(e) => { + debug!(%id, error = ?e, "Payload envelope processing failed"); + // TODO(EIP-7732): resolve awaiting_envelope on affected lookups so they can retry + } + _ => {} + } return; } }; @@ -655,6 +659,12 @@ impl BlockLookups { request_state.revert_to_awaiting_processing()?; Action::ParentUnknown { parent_root } } + BlockError::ParentEnvelopeUnknown { parent_root } => { + // The parent block is known but its execution payload envelope is missing. + // Revert to awaiting processing and fetch the envelope via RPC. + request_state.revert_to_awaiting_processing()?; + Action::ParentEnvelopeUnknown { parent_root } + } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline // and failed to validate the execution payload. Do not downscore peer. @@ -752,6 +762,26 @@ impl BlockLookups { ))) } } + Action::ParentEnvelopeUnknown { parent_root } => { + let peers = lookup.all_peers(); + lookup.set_awaiting_envelope(parent_root); + // Pick a peer to request the envelope from + let peer_id = peers.first().copied().ok_or_else(|| { + LookupRequestError::Failed("No peers available for envelope request".to_owned()) + })?; + match cx.envelope_lookup_request(lookup_id, peer_id, parent_root) { + Ok(_) => { + debug!( + id = lookup_id, + ?block_root, + ?parent_root, + "Requesting missing parent envelope" + ); + Ok(LookupResult::Pending) + } + Err(e) => Err(LookupRequestError::SendFailedNetwork(e)), + } + } Action::Drop(reason) => { // Drop with noop Err(LookupRequestError::Failed(reason)) @@ -819,6 +849,33 @@ impl BlockLookups { } } + /// Makes progress on lookups that were waiting for a parent envelope to be imported. + pub fn continue_envelope_child_lookups( + &mut self, + block_root: Hash256, + cx: &mut SyncNetworkContext, + ) { + let mut lookup_results = vec![]; + + for (id, lookup) in self.single_block_lookups.iter_mut() { + if lookup.awaiting_envelope() == Some(block_root) { + lookup.resolve_awaiting_envelope(); + debug!( + envelope_root = ?block_root, + id, + block_root = ?lookup.block_root(), + "Continuing lookup after envelope imported" + ); + let result = lookup.continue_requests(cx); + lookup_results.push((*id, result)); + } + } + + for (id, result) in lookup_results { + self.on_lookup_result(id, result, "continue_envelope_child_lookups", cx); + } + } + /// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need /// the parent to make progress to resolve, therefore we must drop them if the parent is /// dropped. diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 919526c238..51cc191056 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -70,6 +70,7 @@ pub struct SingleBlockLookup { peers: Arc>>, block_root: Hash256, awaiting_parent: Option, + awaiting_envelope: Option, created: Instant, pub(crate) span: Span, } @@ -104,6 +105,7 @@ impl SingleBlockLookup { peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))), block_root: requested_block_root, awaiting_parent, + awaiting_envelope: None, created: Instant::now(), span: lookup_span, } @@ -144,6 +146,20 @@ impl SingleBlockLookup { self.awaiting_parent = None; } + pub fn awaiting_envelope(&self) -> Option { + self.awaiting_envelope + } + + /// Mark this lookup as awaiting a parent envelope to be imported before processing. + pub fn set_awaiting_envelope(&mut self, parent_root: Hash256) { + self.awaiting_envelope = Some(parent_root); + } + + /// Mark this lookup as no longer awaiting a parent envelope. + pub fn resolve_awaiting_envelope(&mut self) { + self.awaiting_envelope = None; + } + /// Returns the time elapsed since this lookup was created pub fn elapsed_since_created(&self) -> Duration { self.created.elapsed() @@ -185,6 +201,7 @@ impl SingleBlockLookup { /// Returns true if this request is expecting some event to make progress pub fn is_awaiting_event(&self) -> bool { self.awaiting_parent.is_some() + || self.awaiting_envelope.is_some() || self.block_request_state.state.is_awaiting_event() || match &self.component_requests { // If components are waiting for the block request to complete, here we should @@ -287,7 +304,7 @@ impl SingleBlockLookup { expected_blobs: usize, ) -> Result<(), LookupRequestError> { let id = self.id; - let awaiting_parent = self.awaiting_parent.is_some(); + let awaiting_event = self.awaiting_parent.is_some() || self.awaiting_envelope.is_some(); let request = R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?; @@ -331,7 +348,7 @@ impl SingleBlockLookup { // Otherwise, attempt to progress awaiting processing // If this request is awaiting a parent lookup to be processed, do not send for processing. // The request will be rejected with unknown parent error. - } else if !awaiting_parent { + } else if !awaiting_event { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = request.get_state_mut().maybe_start_processing() { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 98053a049f..eb896aa865 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -193,7 +193,7 @@ pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, SingleCustodyColumn(Id), - SinglePayloadEnvelope { id: Id }, + SinglePayloadEnvelope { id: Id, block_root: Hash256 }, } impl BlockProcessType { @@ -202,7 +202,7 @@ impl BlockProcessType { BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } | BlockProcessType::SingleCustodyColumn(id) - | BlockProcessType::SinglePayloadEnvelope { id } => *id, + | BlockProcessType::SinglePayloadEnvelope { id, .. } => *id, } } } @@ -1256,8 +1256,9 @@ impl SyncManager { { match resp { Ok((envelope, seen_timestamp)) => { + let block_root = envelope.beacon_block_root(); debug!( - block_root = ?envelope.beacon_block_root(), + ?block_root, %id, "Downloaded payload envelope, sending for processing" ); @@ -1265,6 +1266,7 @@ impl SyncManager { id.req_id, envelope, seen_timestamp, + block_root, ) { error!(error = ?e, "Failed to send envelope for processing"); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 8c89098687..a909896ccb 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1705,18 +1705,18 @@ impl SyncNetworkContext { id: Id, envelope: Arc>, seen_timestamp: Duration, + block_root: Hash256, ) -> Result<(), SendErrorProcessor> { let beacon_processor = self .beacon_processor_if_enabled() .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; - let block_root = envelope.beacon_block_root(); debug!(?block_root, ?id, "Sending payload envelope for processing"); beacon_processor .send_rpc_payload_envelope( envelope, seen_timestamp, - BlockProcessType::SinglePayloadEnvelope { id }, + BlockProcessType::SinglePayloadEnvelope { id, block_root }, ) .map_err(|e| { error!(