From 6af1a927b87977b4e369db0b702e572adb5139e7 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Mon, 4 May 2026 14:57:49 +0300 Subject: [PATCH] Actually do something with the responses... --- beacon_node/network/src/router.rs | 74 +++++++++++++++++++++++-- beacon_node/network/src/sync/manager.rs | 47 ++++++++++++++++ 2 files changed, 117 insertions(+), 4 deletions(-) diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 443fa51cc6..5326778794 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -26,6 +26,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; /// Handles messages from the network and routes them to the appropriate service to be handled. @@ -341,10 +342,19 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(payload_envelope) => { + self.on_payload_envelopes_by_root_response( + peer_id, + app_request_id, + payload_envelope, + ); + } + Response::PayloadEnvelopesByRange(payload_envelope) => { + self.on_payload_envelopes_by_range_response( + peer_id, + app_request_id, + payload_envelope, + ); } // Light client responses should not be received Response::LightClientBootstrap(_) @@ -809,6 +819,62 @@ impl Router { } } + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + payload_envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SinglePayloadEnvelope { .. } => id, + other => { + crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }, + AppRequestId::Router => { + crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync"); + return; + } + AppRequestId::Internal => unreachable!("Handled internally"), + }; + + trace!( + %peer_id, + "Received PayloadEnvelopesByRoot Response" + ); + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + + pub fn on_payload_envelopes_by_range_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + payload_envelope: Option>>, + ) { + trace!( + %peer_id, + "Received PayloadEnvelopesByRange Response" + ); + + if let AppRequestId::Sync(sync_request_id) = app_request_id { + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + peer_id, + sync_request_id, + payload_envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } else { + crit!("All payload envelopes by range responses should belong to sync"); + } + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5db0f9e921..c3b3daf4d6 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -134,6 +134,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + payload_envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -853,6 +861,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -1238,6 +1257,34 @@ impl SyncManager { } } + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + payload_envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_envelope_response( + id, + peer_id, + RpcEvent::from_chunk(payload_envelope, seen_timestamp), + ); + } + SyncRequestId::PayloadEnvelopesByRange(req_id) => { + self.on_payload_envelopes_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(payload_envelope, seen_timestamp), + ); + } + _ => { + crit!(%peer_id, "bad request id for payload_envelope"); + } + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId,