Gloas lookup sync

Rewrites the single block lookup state machine for Gloas, where block, data
(blobs/columns), and execution payload envelope are independent components
that can arrive and import out of order.

- Three additive-only sub-state-machines for block / data / payload streams.
  Peer sets start empty for data/payload and grow as children arrive — the
  parent lookup's completion requirement can widen over time without
  mutating any state machine.
- `AwaitingParent` becomes a struct carrying the child's `parent_block_hash`
  so the parent can be classified empty/full from the child's bid reference.
- Wires `PayloadEnvelopesByRoot` RPC end-to-end through `SyncNetworkContext`:
  request sending, response routing (`SingleLookupReqId::SinglePayloadEnvelope`),
  and integration into `PayloadRequest`. Envelope *processing* is still a TODO;
  only the download path is wired.
- Test rig: serves envelopes from a `network_envelopes_by_root` cache
  populated from the external harness; bumps test validator count to 8 so
  `proposer_lookahead` can populate at the Fulu → Gloas upgrade.
- Enables gloas in `TEST_NETWORK_FORKS`.
- Fixes: genesis parent check, infinite retry loop on repeated download
  failure, no-op in `on_completed_request`, and peer sets not being cleared
  on disconnect.
This commit is contained in:
dapplion
2026-04-22 00:37:14 +02:00
parent 7731b5f250
commit ebe9fe228a
14 changed files with 1939 additions and 931 deletions

View File

@@ -2,7 +2,10 @@
//! channel and stores a global RPC ID to perform requests.
use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError};
pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest};
pub use self::requests::{
BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest,
PayloadEnvelopesByRootSingleRequest,
};
use super::SyncMessage;
use super::block_sidecar_coupling::RangeBlockComponentsRequest;
use super::manager::BlockProcessType;
@@ -37,6 +40,7 @@ pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRootRequestItems,
};
#[cfg(test)]
use slot_clock::SlotClock;
@@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn};
use types::data::FixedBlobSidecarList;
use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, Slot,
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
pub mod custody;
@@ -201,6 +205,9 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
ActiveRequests<SingleLookupReqId, BlocksByRootRequestItems<T::EthSpec>>,
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
blobs_by_root_requests: ActiveRequests<SingleLookupReqId, BlobsByRootRequestItems<T::EthSpec>>,
/// A mapping of active PayloadEnvelopesByRoot requests
payload_envelopes_by_root_requests:
ActiveRequests<SingleLookupReqId, PayloadEnvelopesByRootRequestItems<T::EthSpec>>,
/// A mapping of active DataColumnsByRoot requests
data_columns_by_root_requests:
ActiveRequests<DataColumnsByRootRequestId, DataColumnsByRootRequestItems<T::EthSpec>>,
@@ -294,6 +301,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: 1,
blocks_by_root_requests: ActiveRequests::new("blocks_by_root"),
blobs_by_root_requests: ActiveRequests::new("blobs_by_root"),
payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"),
data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"),
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
@@ -322,6 +330,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: _,
blocks_by_root_requests,
blobs_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
blobs_by_range_requests,
@@ -345,6 +354,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SingleBlob { id: *id });
let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id });
let data_column_by_root_ids = data_columns_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
@@ -363,6 +376,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
blocks_by_root_ids
.chain(blobs_by_root_ids)
.chain(payload_envelopes_by_root_ids)
.chain(data_column_by_root_ids)
.chain(blocks_by_range_ids)
.chain(blobs_by_range_ids)
@@ -419,6 +433,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: _,
blocks_by_root_requests,
blobs_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
blobs_by_range_requests,
@@ -441,6 +456,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
for peer_id in blocks_by_root_requests
.iter_request_peers()
.chain(blobs_by_root_requests.iter_request_peers())
.chain(payload_envelopes_by_root_requests.iter_request_peers())
.chain(data_columns_by_root_requests.iter_request_peers())
.chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers())
@@ -927,6 +943,72 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request a payload envelope for a block root via PayloadEnvelopesByRoot RPC.
pub fn payload_lookup_request(
&mut self,
lookup_id: SingleLookupId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer();
let Some(peer_id) = lookup_peers
.read()
.iter()
.map(|peer| {
(
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
rand::random::<u32>(),
peer,
)
})
.min()
.map(|(_, _, peer)| *peer)
else {
return Ok(LookupRequestResult::Pending("no peers"));
};
let id = SingleLookupReqId {
lookup_id,
req_id: self.next_id(),
};
let request = PayloadEnvelopesByRootSingleRequest { block_root };
let network_request = RequestType::PayloadEnvelopesByRoot(
request
.clone()
.into_request(&self.fork_context)
.map_err(RpcRequestSendError::InternalError)?,
);
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
request: network_request,
app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }),
})
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "PayloadEnvelopesByRoot",
?block_root,
peer = %peer_id,
%id,
"Sync RPC request sent"
);
self.payload_envelopes_by_root_requests.insert(
id,
peer_id,
// true = enforce that the peer returns a response. We only request a single envelope
// and the peer must have it.
true,
PayloadEnvelopesByRootRequestItems::new(request),
Span::none(),
);
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
/// - If we have a downloaded but not yet processed block
/// - If the da_checker has a pending block
@@ -1464,6 +1546,27 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_payload_envelope_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>> {
let resp = self
.payload_envelopes_by_root_requests
.on_response(id, rpc_event);
let resp = resp.map(|res| {
res.and_then(|(mut envelopes, seen_timestamp)| {
match envelopes.pop() {
Some(envelope) => Ok((envelope, seen_timestamp)),
// Should never happen, we enforce at least 1 chunk.
None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()),
}
})
});
self.on_rpc_response_result(resp, peer_id)
}
#[allow(clippy::type_complexity)]
pub(crate) fn on_data_columns_by_root_response(
&mut self,