Gloas lookup sync boilerplate (#9322)

Implements the boring boilerplate to send envelopes by root requests and process them. Pre-step to

- https://github.com/sigp/lighthouse/pull/9155


  


Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Lion - dapplion
2026-05-20 06:56:49 -06:00
committed by GitHub
parent 398efc3acc
commit 2c76ee5b6b
12 changed files with 398 additions and 8 deletions

View File

@@ -559,6 +559,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessType::SingleCustodyColumn(id) => {
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
}
// TODO(gloas): route into the payload envelope lookup state machine.
BlockProcessType::SinglePayloadEnvelope(_) => Ok(LookupResult::Pending),
};
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
}

View File

@@ -73,7 +73,8 @@ use strum::IntoStaticStr;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
@@ -132,6 +133,14 @@ pub enum SyncMessage<E: EthSpec> {
seen_timestamp: Duration,
},
/// A payload envelope has been received from the RPC.
RpcPayloadEnvelope {
sync_request_id: SyncRequestId,
peer_id: PeerId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<E>>>,
seen_timestamp: Duration,
},
/// A block with an unknown parent has been received.
UnknownParentBlock(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
@@ -193,6 +202,7 @@ pub enum BlockProcessType {
SingleBlock { id: Id },
SingleBlob { id: Id },
SingleCustodyColumn(Id),
SinglePayloadEnvelope(Id),
}
impl BlockProcessType {
@@ -200,7 +210,8 @@ impl BlockProcessType {
match self {
BlockProcessType::SingleBlock { id }
| BlockProcessType::SingleBlob { id }
| BlockProcessType::SingleCustodyColumn(id) => *id,
| BlockProcessType::SingleCustodyColumn(id)
| BlockProcessType::SinglePayloadEnvelope(id) => *id,
}
}
}
@@ -502,6 +513,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::SingleBlob { id } => {
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::SinglePayloadEnvelope { id } => {
self.on_single_payload_envelope_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::DataColumnsByRoot(req_id) => {
self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error))
}
@@ -848,6 +862,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => {
self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp)
}
SyncMessage::RpcPayloadEnvelope {
sync_request_id,
peer_id,
envelope,
seen_timestamp,
} => self.rpc_payload_envelope_received(
sync_request_id,
peer_id,
envelope,
seen_timestamp,
),
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
let block_slot = block.slot();
let parent_root = block.parent_root();
@@ -1209,6 +1234,27 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
// TODO(gloas): dispatch into block_lookups once the envelope lookup state machine lands.
fn rpc_payload_envelope_received(
&mut self,
sync_request_id: SyncRequestId,
peer_id: PeerId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
seen_timestamp: Duration,
) {
match sync_request_id {
SyncRequestId::SinglePayloadEnvelope { id } => self
.on_single_payload_envelope_response(
id,
peer_id,
RpcEvent::from_chunk(envelope, seen_timestamp),
),
_ => {
crit!(%peer_id, "bad request id for payload envelope");
}
}
}
fn rpc_data_column_received(
&mut self,
sync_request_id: SyncRequestId,
@@ -1237,6 +1283,22 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn on_single_payload_envelope_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
envelope: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
if let Some(_resp) = self
.network
.on_single_payload_envelope_response(id, peer_id, envelope)
{
// TODO(gloas): dispatch into
// `block_lookups.on_download_response::<PayloadEnvelopeRequestState<_>>(...)` once
// the envelope lookup state machine lands.
}
}
fn on_single_blob_response(
&mut self,
id: SingleLookupReqId,

View File

@@ -2,7 +2,10 @@
//! channel and stores a global RPC ID to perform requests.
use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError};
pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest};
pub use self::requests::{
BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest,
PayloadEnvelopesByRootSingleRequest,
};
use super::SyncMessage;
use super::block_sidecar_coupling::RangeBlockComponentsRequest;
use super::manager::BlockProcessType;
@@ -37,6 +40,7 @@ pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRootRequestItems,
};
#[cfg(test)]
use slot_clock::SlotClock;
@@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn};
use types::data::FixedBlobSidecarList;
use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, Slot,
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
pub mod custody;
@@ -201,6 +205,9 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
ActiveRequests<SingleLookupReqId, BlocksByRootRequestItems<T::EthSpec>>,
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
blobs_by_root_requests: ActiveRequests<SingleLookupReqId, BlobsByRootRequestItems<T::EthSpec>>,
/// A mapping of active PayloadEnvelopesByRoot requests
payload_envelopes_by_root_requests:
ActiveRequests<SingleLookupReqId, PayloadEnvelopesByRootRequestItems<T::EthSpec>>,
/// A mapping of active DataColumnsByRoot requests
data_columns_by_root_requests:
ActiveRequests<DataColumnsByRootRequestId, DataColumnsByRootRequestItems<T::EthSpec>>,
@@ -294,6 +301,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: 1,
blocks_by_root_requests: ActiveRequests::new("blocks_by_root"),
blobs_by_root_requests: ActiveRequests::new("blobs_by_root"),
payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"),
data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"),
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
@@ -322,6 +330,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: _,
blocks_by_root_requests,
blobs_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
blobs_by_range_requests,
@@ -345,6 +354,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SingleBlob { id: *id });
let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id });
let data_column_by_root_ids = data_columns_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
@@ -363,6 +376,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
blocks_by_root_ids
.chain(blobs_by_root_ids)
.chain(payload_envelopes_by_root_ids)
.chain(data_column_by_root_ids)
.chain(blocks_by_range_ids)
.chain(blobs_by_range_ids)
@@ -419,6 +433,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: _,
blocks_by_root_requests,
blobs_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
blobs_by_range_requests,
@@ -441,6 +456,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
for peer_id in blocks_by_root_requests
.iter_request_peers()
.chain(blobs_by_root_requests.iter_request_peers())
.chain(payload_envelopes_by_root_requests.iter_request_peers())
.chain(data_columns_by_root_requests.iter_request_peers())
.chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers())
@@ -927,6 +943,81 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request a payload envelope for a block root via PayloadEnvelopesByRoot RPC.
#[allow(dead_code)]
pub fn payload_lookup_request(
&mut self,
lookup_id: SingleLookupId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> {
// Skip the download if fork-choice already saw this envelope (e.g. imported via gossip
// before the lookup got here).
if self.chain.envelope_is_known_to_fork_choice(&block_root) {
return Ok(LookupRequestResult::NoRequestNeeded(
"envelope already known to fork-choice",
));
}
let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(peer_id) = lookup_peers
.read()
.iter()
.map(|peer| {
(
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
rand::random::<u32>(),
peer,
)
})
.min()
.map(|(_, _, peer)| *peer)
else {
return Ok(LookupRequestResult::Pending("no peers"));
};
let id = SingleLookupReqId {
lookup_id,
req_id: self.next_id(),
};
let request = PayloadEnvelopesByRootSingleRequest { block_root };
let network_request = RequestType::PayloadEnvelopesByRoot(
request
.clone()
.into_request(&self.fork_context)
.map_err(RpcRequestSendError::InternalError)?,
);
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
request: network_request,
app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }),
})
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "PayloadEnvelopesByRoot",
?block_root,
peer = %peer_id,
%id,
"Sync RPC request sent"
);
self.payload_envelopes_by_root_requests.insert(
id,
peer_id,
// true = enforce that the peer returns a response. We only request a single envelope
// and the peer must have it.
true,
PayloadEnvelopesByRootRequestItems::new(request),
Span::none(),
);
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
/// - If we have a downloaded but not yet processed block
/// - If the da_checker has a pending block
@@ -1476,6 +1567,27 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_payload_envelope_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>> {
let resp = self
.payload_envelopes_by_root_requests
.on_response(id, rpc_event);
let resp = resp.map(|res| {
res.and_then(|(mut envelopes, seen_timestamp)| {
match envelopes.pop() {
Some(envelope) => Ok((envelope, seen_timestamp)),
// Should never happen, we enforce at least 1 chunk.
None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()),
}
})
});
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
pub(crate) fn on_data_columns_by_root_response(
&mut self,
@@ -1652,6 +1764,35 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
}
#[allow(dead_code)]
pub fn send_payload_for_processing(
&self,
block_root: Hash256,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
debug!(
?block_root,
?process_type,
"Sending payload envelope for processing"
);
beacon_processor
.send_lookup_envelope(block_root, envelope, seen_timestamp, process_type)
.map_err(|e| {
error!(
error = ?e,
"Failed to send sync payload envelope to processor"
);
SendErrorProcessor::SendError
})
}
pub fn send_custody_columns_for_processing(
&self,
_id: Id,

View File

@@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems;
pub use data_columns_by_root::{
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
};
pub use payload_envelopes_by_root::{
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
};
use crate::metrics;
@@ -27,6 +30,7 @@ mod blocks_by_range;
mod blocks_by_root;
mod data_columns_by_range;
mod data_columns_by_root;
mod payload_envelopes_by_root;
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupVerifyError {

View File

@@ -0,0 +1,54 @@
use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest;
use std::sync::Arc;
use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope};
use super::{ActiveRequestItems, LookupVerifyError};
#[derive(Debug, Clone)]
pub struct PayloadEnvelopesByRootSingleRequest {
pub block_root: Hash256,
}
impl PayloadEnvelopesByRootSingleRequest {
pub fn into_request(
self,
fork_context: &ForkContext,
) -> Result<PayloadEnvelopesByRootRequest, String> {
PayloadEnvelopesByRootRequest::new(vec![self.block_root], fork_context)
}
}
pub struct PayloadEnvelopesByRootRequestItems<E: EthSpec> {
request: PayloadEnvelopesByRootSingleRequest,
items: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
}
impl<E: EthSpec> PayloadEnvelopesByRootRequestItems<E> {
pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self {
Self {
request,
items: vec![],
}
}
}
impl<E: EthSpec> ActiveRequestItems for PayloadEnvelopesByRootRequestItems<E> {
type Item = Arc<SignedExecutionPayloadEnvelope<E>>;
/// Append a response to the single chunk request. We expect exactly one envelope per
/// block root. Returns `true` when the single expected item has been received.
fn add(&mut self, envelope: Self::Item) -> Result<bool, LookupVerifyError> {
let block_root = envelope.message.beacon_block_root;
if self.request.block_root != block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}
self.items.push(envelope);
// Always returns true, we expect a single envelope per block root
Ok(true)
}
fn consume(&mut self) -> Vec<Self::Item> {
std::mem::take(&mut self.items)
}
}