From 2c76ee5b6b03cdcd43563e89d1befa7f07f4cc75 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 20 May 2026 06:56:49 -0600 Subject: [PATCH] Gloas lookup sync boilerplate (#9322) Implements the boring boilerplate to send envelopes by root requests and process them. Pre-step to - https://github.com/sigp/lighthouse/pull/9155 Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 + beacon_node/beacon_processor/src/lib.rs | 10 ++ .../src/scheduler/work_queue.rs | 6 + .../src/service/api_types.rs | 2 + .../src/network_beacon_processor/mod.rs | 19 +++ .../network_beacon_processor/sync_methods.rs | 57 +++++++ beacon_node/network/src/router.rs | 35 ++++- .../network/src/sync/block_lookups/mod.rs | 2 + beacon_node/network/src/sync/manager.rs | 66 +++++++- .../network/src/sync/network_context.rs | 145 +++++++++++++++++- .../src/sync/network_context/requests.rs | 4 + .../requests/payload_envelopes_by_root.rs | 54 +++++++ 12 files changed, 398 insertions(+), 8 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index af8cd477d6..f3f6cd299e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6339,6 +6339,12 @@ impl BeaconChain { .contains_block(root) } + pub fn envelope_is_known_to_fork_choice(&self, root: &Hash256) -> bool { + self.canonical_head + .fork_choice_read_lock() + .is_payload_received(root) + } + /// Determines the beacon proposer for the next slot. If that proposer is registered in the /// `execution_layer`, provide the `execution_layer` with the necessary information to produce /// `PayloadAttributes` for future calls to fork choice. diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 25944bcf8a..ce3851ea54 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -418,6 +418,7 @@ pub enum Work { process_fn: AsyncFn, }, RpcCustodyColumn(AsyncFn), + RpcEnvelope(AsyncFn), ColumnReconstruction(AsyncFn), IgnoredRpcBlock { process_fn: BlockingFn, @@ -485,6 +486,7 @@ pub enum WorkType { RpcBlock, RpcBlobs, RpcCustodyColumn, + RpcEnvelope, ColumnReconstruction, IgnoredRpcBlock, ChainSegment, @@ -548,6 +550,7 @@ impl Work { Work::RpcBlock { .. } => WorkType::RpcBlock, Work::RpcBlobs { .. } => WorkType::RpcBlobs, Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, + Work::RpcEnvelope(_) => WorkType::RpcEnvelope, Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, Work::ChainSegment { .. } => WorkType::ChainSegment, @@ -825,6 +828,8 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { Some(item) + } else if let Some(item) = work_queues.rpc_envelope_queue.pop() { + Some(item) // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = work_queues.delayed_block_queue.pop() { @@ -1192,6 +1197,9 @@ impl BeaconProcessor { work_queues.rpc_block_queue.push(work, work_id) } Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), + Work::RpcEnvelope(_) => { + work_queues.rpc_envelope_queue.push(work, work_id) + } Work::RpcCustodyColumn { .. } => { work_queues.rpc_custody_column_queue.push(work, work_id) } @@ -1330,6 +1338,7 @@ impl BeaconProcessor { WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { work_queues.rpc_blob_queue.len() } + WorkType::RpcEnvelope => work_queues.rpc_envelope_queue.len(), WorkType::RpcCustodyColumn => work_queues.rpc_custody_column_queue.len(), WorkType::ColumnReconstruction => { work_queues.column_reconstruction_queue.len() @@ -1523,6 +1532,7 @@ impl BeaconProcessor { } | Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) + | Work::RpcEnvelope(process_fn) | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index eb57b97df2..2fdc15182c 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -120,6 +120,7 @@ pub struct BeaconProcessorQueueLengths { rpc_block_queue: usize, rpc_blob_queue: usize, rpc_custody_column_queue: usize, + rpc_envelope_queue: usize, column_reconstruction_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, @@ -195,6 +196,8 @@ impl BeaconProcessorQueueLengths { // We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit // this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB. rpc_custody_column_queue: 64, + // Bounded by `PARENT_DEPTH_TOLERANCE`; one envelope per Gloas block. + rpc_envelope_queue: 1024, column_reconstruction_queue: 1, chain_segment_queue: 64, backfill_chain_segment: 64, @@ -253,6 +256,7 @@ pub struct WorkQueues { pub rpc_block_queue: FifoQueue>, pub rpc_blob_queue: FifoQueue>, pub rpc_custody_column_queue: FifoQueue>, + pub rpc_envelope_queue: FifoQueue>, pub column_reconstruction_queue: LifoQueue>, pub chain_segment_queue: FifoQueue>, pub backfill_chain_segment: FifoQueue>, @@ -323,6 +327,7 @@ impl WorkQueues { let rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); let rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); let rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); + let rpc_envelope_queue = FifoQueue::new(queue_lengths.rpc_envelope_queue); let column_reconstruction_queue = LifoQueue::new(queue_lengths.column_reconstruction_queue); let chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); let backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); @@ -391,6 +396,7 @@ impl WorkQueues { rpc_block_queue, rpc_blob_queue, rpc_custody_column_queue, + rpc_envelope_queue, chain_segment_queue, column_reconstruction_queue, backfill_chain_segment, diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index f598f59aee..2429b813e9 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -23,6 +23,8 @@ pub enum SyncRequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, + /// Request searching for a payload envelope given a hash. + SinglePayloadEnvelope { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. DataColumnsByRoot(DataColumnsByRootRequestId), /// Blocks by range request diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7bf969db10..7817feb0bd 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -588,6 +588,25 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for an RPC-fetched payload envelope. `process_lookup_envelope` + /// reports the result back to sync. + pub fn send_lookup_envelope( + self: &Arc, + block_root: Hash256, + envelope: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let s = self.clone(); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcEnvelope(Box::pin(async move { + s.process_lookup_envelope(block_root, envelope, seen_timestamp, process_type) + .await; + })), + }) + } + /// Create a new `Work` event for some custody columns. `process_rpc_custody_columns` reports /// the result back to sync. pub fn send_rpc_custody_columns( diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 988a68c9dd..e3ba6fb3c4 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -426,6 +426,63 @@ impl NetworkBeaconProcessor { }); } + /// Attempt to verify and import an execution payload envelope received via RPC. + #[instrument( + name = "lh_process_lookup_envelope", + parent = None, + level = "debug", + skip_all, + fields(?block_root), + )] + pub async fn process_lookup_envelope( + self: Arc>, + block_root: Hash256, + envelope: Arc>, + _seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + debug!( + ?block_root, + slot = %envelope.slot(), + ?process_type, + "Processing RPC payload envelope" + ); + + // Gossip verification runs the same signature / slot / builder-index / block-hash checks + // independently of gossip propagation, so we can reuse it for RPC-fetched envelopes. + #[allow(clippy::result_large_err)] + let result = match self + .chain + .clone() + .verify_envelope_for_gossip(envelope.clone()) + .await + { + Ok(verified) => { + self.chain + .process_execution_payload_envelope( + block_root, + verified, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), + ) + .await + } + Err(e) => Err(e), + }; + + // TODO(gloas): structured penalty classification arrives with the envelope lookup state + // machine; for now, fold the EnvelopeError into BlockError::InternalError so it flows + // through the existing `BlockProcessingResult::Err` path. + let result: Result = + result.map_err(|e| BlockError::InternalError(format!("envelope: {e}"))); + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: result.into(), + }); + } + pub fn process_historic_data_columns( &self, batch_id: CustodyBackfillBatchId, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index a718997e0a..35939c6f39 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -26,6 +26,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; /// Handles messages from the network and routes them to the appropriate service to be handled. @@ -348,10 +349,13 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(envelope) => { + self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); + } + // TODO(EIP-7732): implement outgoing payload envelopes by range responses + // once sync manager requests them. + Response::PayloadEnvelopesByRange(_) => { + debug!("Requesting envelopes by range not supported yet"); } // Lighthouse currently only serves BlocksByHead and does not issue it as a client, // so receiving a response is unexpected. Drop it without crashing. @@ -821,6 +825,29 @@ impl Router { } } + /// Handle a `PayloadEnvelopesByRoot` response from the peer. + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(id @ SyncRequestId::SinglePayloadEnvelope { .. }) => id, + other => { + crit!(request = ?other, %peer_id, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }; + + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 3929f74aa0..f10610c751 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -559,6 +559,8 @@ impl BlockLookups { BlockProcessType::SingleCustodyColumn(id) => { self.on_processing_result_inner::>(id, result, cx) } + // TODO(gloas): route into the payload envelope lookup state machine. + BlockProcessType::SinglePayloadEnvelope(_) => Ok(LookupResult::Pending), }; self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 347b018a93..14a38f0e72 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -73,7 +73,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -132,6 +133,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -193,6 +202,7 @@ pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, SingleCustodyColumn(Id), + SinglePayloadEnvelope(Id), } impl BlockProcessType { @@ -200,7 +210,8 @@ impl BlockProcessType { match self { BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } - | BlockProcessType::SingleCustodyColumn(id) => *id, + | BlockProcessType::SingleCustodyColumn(id) + | BlockProcessType::SinglePayloadEnvelope(id) => *id, } } } @@ -502,6 +513,9 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_payload_envelope_response(id, peer_id, RpcEvent::RPCError(error)) + } SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) } @@ -848,6 +862,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -1209,6 +1234,27 @@ impl SyncManager { } } + // TODO(gloas): dispatch into block_lookups once the envelope lookup state machine lands. + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => self + .on_single_payload_envelope_response( + id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ), + _ => { + crit!(%peer_id, "bad request id for payload envelope"); + } + } + } + fn rpc_data_column_received( &mut self, sync_request_id: SyncRequestId, @@ -1237,6 +1283,22 @@ impl SyncManager { } } + fn on_single_payload_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(_resp) = self + .network + .on_single_payload_envelope_response(id, peer_id, envelope) + { + // TODO(gloas): dispatch into + // `block_lookups.on_download_response::>(...)` once + // the envelope lookup state machine lands. + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 465e23998b..9d5ac40c0a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,10 @@ //! channel and stores a global RPC ID to perform requests. use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; -pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; +pub use self::requests::{ + BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest, + PayloadEnvelopesByRootSingleRequest, +}; use super::SyncMessage; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; @@ -37,6 +40,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -201,6 +205,9 @@ pub struct SyncNetworkContext { ActiveRequests>, /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRoot requests + payload_envelopes_by_root_requests: + ActiveRequests>, /// A mapping of active DataColumnsByRoot requests data_columns_by_root_requests: ActiveRequests>, @@ -294,6 +301,7 @@ impl SyncNetworkContext { request_id: 1, blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), + payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"), data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), @@ -322,6 +330,7 @@ impl SyncNetworkContext { request_id: _, blocks_by_root_requests, blobs_by_root_requests, + payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, blobs_by_range_requests, @@ -345,6 +354,10 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlob { id: *id }); + let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id }); let data_column_by_root_ids = data_columns_by_root_requests .active_requests_of_peer(peer_id) .into_iter() @@ -363,6 +376,7 @@ impl SyncNetworkContext { .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) + .chain(payload_envelopes_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) @@ -419,6 +433,7 @@ impl SyncNetworkContext { request_id: _, blocks_by_root_requests, blobs_by_root_requests, + payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, blobs_by_range_requests, @@ -441,6 +456,7 @@ impl SyncNetworkContext { for peer_id in blocks_by_root_requests .iter_request_peers() .chain(blobs_by_root_requests.iter_request_peers()) + .chain(payload_envelopes_by_root_requests.iter_request_peers()) .chain(data_columns_by_root_requests.iter_request_peers()) .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) @@ -927,6 +943,81 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(id.req_id)) } + /// Request a payload envelope for a block root via PayloadEnvelopesByRoot RPC. + #[allow(dead_code)] + pub fn payload_lookup_request( + &mut self, + lookup_id: SingleLookupId, + lookup_peers: Arc>>, + block_root: Hash256, + ) -> Result { + // Skip the download if fork-choice already saw this envelope (e.g. imported via gossip + // before the lookup got here). + if self.chain.envelope_is_known_to_fork_choice(&block_root) { + return Ok(LookupRequestResult::NoRequestNeeded( + "envelope already known to fork-choice", + )); + } + + let active_request_count_by_peer = self.active_request_count_by_peer(); + let Some(peer_id) = lookup_peers + .read() + .iter() + .map(|peer| { + ( + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + return Ok(LookupRequestResult::Pending("no peers")); + }; + + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; + + let request = PayloadEnvelopesByRootSingleRequest { block_root }; + + let network_request = RequestType::PayloadEnvelopesByRoot( + request + .clone() + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: network_request, + app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRoot", + ?block_root, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_root_requests.insert( + id, + peer_id, + // true = enforce that the peer returns a response. We only request a single envelope + // and the peer must have it. + true, + PayloadEnvelopesByRootRequestItems::new(request), + Span::none(), + ); + + Ok(LookupRequestResult::RequestSent(id.req_id)) + } + /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: /// - If we have a downloaded but not yet processed block /// - If the da_checker has a pending block @@ -1476,6 +1567,27 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + pub(crate) fn on_single_payload_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>> { + let resp = self + .payload_envelopes_by_root_requests + .on_response(id, rpc_event); + let resp = resp.map(|res| { + res.and_then(|(mut envelopes, seen_timestamp)| { + match envelopes.pop() { + Some(envelope) => Ok((envelope, seen_timestamp)), + // Should never happen, we enforce at least 1 chunk. + None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()), + } + }) + }); + self.on_rpc_response_result(resp, peer_id) + } + #[allow(clippy::type_complexity)] pub(crate) fn on_data_columns_by_root_response( &mut self, @@ -1652,6 +1764,35 @@ impl SyncNetworkContext { }) } + #[allow(dead_code)] + pub fn send_payload_for_processing( + &self, + block_root: Hash256, + envelope: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!( + ?block_root, + ?process_type, + "Sending payload envelope for processing" + ); + + beacon_processor + .send_lookup_envelope(block_root, envelope, seen_timestamp, process_type) + .map_err(|e| { + error!( + error = ?e, + "Failed to send sync payload envelope to processor" + ); + SendErrorProcessor::SendError + }) + } + pub fn send_custody_columns_for_processing( &self, _id: Id, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index ad60dffb45..8c091eca80 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_root::{ + PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, +}; use crate::metrics; @@ -27,6 +30,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs new file mode 100644 index 0000000000..a142d86e90 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs @@ -0,0 +1,54 @@ +use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest; +use std::sync::Arc; +use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope}; + +use super::{ActiveRequestItems, LookupVerifyError}; + +#[derive(Debug, Clone)] +pub struct PayloadEnvelopesByRootSingleRequest { + pub block_root: Hash256, +} + +impl PayloadEnvelopesByRootSingleRequest { + pub fn into_request( + self, + fork_context: &ForkContext, + ) -> Result { + PayloadEnvelopesByRootRequest::new(vec![self.block_root], fork_context) + } +} + +pub struct PayloadEnvelopesByRootRequestItems { + request: PayloadEnvelopesByRootSingleRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRootRequestItems { + pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRootRequestItems { + type Item = Arc>; + + /// Append a response to the single chunk request. We expect exactly one envelope per + /// block root. Returns `true` when the single expected item has been received. + fn add(&mut self, envelope: Self::Item) -> Result { + let block_root = envelope.message.beacon_block_root; + if self.request.block_root != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + + self.items.push(envelope); + // Always returns true, we expect a single envelope per block root + Ok(true) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +}