Tighten the three sub-state-machine loops in continue_requests

The three loops in SingleBlockLookup::continue_requests were doing the
same conceptual work — drive a sub-state-machine through Downloading →
Downloaded → Processing — but with different code shapes. Pull the
repeated bits out so the loop bodies show the state-machine structure
without inline variant-matching:

- BlockRequest::peek_block_or_cached(block_root, cx): the "peek the
  in-flight block, otherwise fall back to the AC processing-status
  cache" pattern was duplicated verbatim in the data and payload None
  arms. Both arms now call it. Lives on BlockRequest so the borrow
  checker can split it from `&mut self.{data,payload}_request`.
- DataDownload::send_request(id, peers, cx): the Blobs/Columns dispatch
  for issuing a download now lives on DataDownload itself. Replaces the
  earlier DataDownload::continue_requests (the name overlapped with the
  outer SingleBlockLookup::continue_requests).
- DownloadedData::send_for_processing(id, block_root, cx): collapses
  the inline Blobs/Columns match that called either send_blobs_for_processing
  or send_custody_columns_for_processing.
- Payload Downloading arm now uses state.make_request(...) like block
  and data, matching shape across all three loops. As a side effect
  payload retries are now bounded by SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
  closing the "infinite retry loop on repeated download failure" the
  original PR description flagged.
- Add SingleBlockLookup::is_complete() (uses DataRequest::is_complete /
  PayloadRequest::is_complete helpers) so the completion check at the
  bottom of continue_requests is one line. Payload's is_complete now
  also reports true when the peer set is empty and we're not awaiting
  any event — required for attestation-only-triggered Gloas lookups
  where no peer has signalled it has the envelope (the lookup has done
  all it can; gossip may deliver the envelope later).

Also adds Work::RpcEnvelope to the test rig's beacon-processor mock.
This commit is contained in:
dapplion
2026-05-19 15:28:46 -06:00
parent f6e4438719
commit 64dae1d9da
2 changed files with 114 additions and 119 deletions

View File

@@ -204,6 +204,23 @@ impl<E: EthSpec> BlockRequest<E> {
matches!(self, BlockRequest::Complete { .. })
}
/// Best-effort lookup of the block: prefer the in-flight download if we have it; otherwise
/// fall back to the chain's processing-status cache (the block may have arrived via gossip /
/// HTTP API before this lookup downloads it).
fn peek_block_or_cached<T: BeaconChainTypes<EthSpec = E>>(
&self,
block_root: Hash256,
cx: &mut SyncNetworkContext<T>,
) -> Option<Arc<SignedBeaconBlock<E>>> {
self.peek_block().cloned().or_else(|| {
match cx.chain.get_block_process_status(&block_root) {
BlockProcessStatus::NotValidated(block, _)
| BlockProcessStatus::ExecutionValidated(block) => Some(block),
BlockProcessStatus::Unknown => None,
}
})
}
fn insert_verified_response(
&mut self,
result: DownloadResult<Arc<SignedBeaconBlock<E>>>,
@@ -246,6 +263,12 @@ enum DataRequestState<E: EthSpec> {
Complete,
}
impl<E: EthSpec> DataRequest<E> {
fn is_complete(&self) -> bool {
matches!(self.state, DataRequestState::Complete)
}
}
impl<E: EthSpec> DataRequestState<E> {
fn is_awaiting_event(&self) -> bool {
match &self {
@@ -271,10 +294,10 @@ enum DataDownload<E: EthSpec> {
}
impl<E: EthSpec> DataDownload<E> {
fn continue_requests<T: BeaconChainTypes<EthSpec = E>>(
fn send_request<T: BeaconChainTypes<EthSpec = E>>(
&mut self,
id: Id,
peers: Arc<RwLock<HashSet<PeerId>>>,
peers: PeerSet,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
match self {
@@ -285,16 +308,13 @@ impl<E: EthSpec> DataDownload<E> {
} => {
let br = *block_root;
let eb = *expected_blobs;
state.make_request(|| cx.blob_lookup_request(id, peers, br, eb))?;
state.make_request(|| cx.blob_lookup_request(id, peers, br, eb))
}
DataDownload::Columns {
block_root, state, ..
} => {
DataDownload::Columns { block_root, state } => {
let br = *block_root;
state.make_request(|| cx.custody_lookup_request(id, br, peers))?;
state.make_request(|| cx.custody_lookup_request(id, br, peers))
}
}
Ok(())
}
fn is_completed(&self) -> bool {
@@ -330,6 +350,28 @@ enum DownloadedData<E: EthSpec> {
Columns(DataColumnSidecarList<E>),
}
impl<E: EthSpec> DownloadedData<E> {
fn send_for_processing<T: BeaconChainTypes<EthSpec = E>>(
&self,
id: Id,
block_root: Hash256,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), SendErrorProcessor> {
match self {
DownloadedData::Blobs(blobs) => {
cx.send_blobs_for_processing(id, block_root, blobs.clone(), Duration::ZERO)
}
DownloadedData::Columns(columns) => cx.send_custody_columns_for_processing(
id,
block_root,
columns.clone(),
Duration::ZERO,
BlockProcessType::SingleCustodyColumn(id),
),
}
}
}
// === Payload request: WaitingForBlock → Downloading → Downloaded → Processing → Complete ===
#[derive(Debug)]
@@ -356,6 +398,15 @@ enum PayloadRequestState<E: EthSpec> {
Complete,
}
impl<E: EthSpec> PayloadRequest<E> {
fn is_complete(&self) -> bool {
if !self.state.is_awaiting_event() && self.peers.read().is_empty() {
return true;
}
matches!(self.state, PayloadRequestState::Complete)
}
}
impl<E: EthSpec> PayloadRequestState<E> {
fn is_awaiting_event(&self) -> bool {
match self {
@@ -698,38 +749,26 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
match &mut self.data_request {
// None = waiting for block
None => {
// Prefer a block downloaded by this lookup. Otherwise fall back to the
// chain's processing-status cache: the block may already be in the
// availability checker via gossip/HTTP API before this lookup downloads
// it, and we can still drive the data request in parallel.
let block = self.block_request.peek_block().cloned().or_else(|| {
match cx.chain.get_block_process_status(&block_root) {
BlockProcessStatus::NotValidated(block, _)
| BlockProcessStatus::ExecutionValidated(block) => Some(block),
BlockProcessStatus::Unknown => None,
}
});
if let Some(block) = block {
let peers = self
.get_data_peers::<T::EthSpec>(
block.slot(),
block.execution_hash(),
cx.spec(),
)
.map_err(LookupRequestError::InternalError)?;
self.data_request = Some(DataRequest {
peers,
state: DataRequestState::new(
block.slot(),
self.block_root,
block.num_expected_blobs(),
cx.spec(),
),
});
} else {
// Wait for block to be downloaded
let Some(block) = self.block_request.peek_block_or_cached(block_root, cx)
else {
break;
}
};
let peers = self
.get_data_peers::<T::EthSpec>(
block.slot(),
block.execution_hash(),
cx.spec(),
)
.map_err(LookupRequestError::InternalError)?;
self.data_request = Some(DataRequest {
peers,
state: DataRequestState::new(
block.slot(),
self.block_root,
block.num_expected_blobs(),
cx.spec(),
),
});
}
Some(request) => match &mut request.state {
DataRequestState::Downloading(dl) => {
@@ -737,7 +776,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// inside `ActiveCustodyRequest`, not against `data_peers`. Only gate on
// `data_peers` for post-Gloas, where peer sets are strictly partitioned
// and no fallback pool exists.
dl.continue_requests(id, request.peers.clone(), cx)?;
dl.send_request(id, request.peers.clone(), cx)?;
if dl.is_completed() {
// All data already imported (e.g. received via gossip)
@@ -750,27 +789,8 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
DataRequestState::Downloaded { data, peer_group } => {
match data {
DownloadedData::Blobs(blobs) => {
cx.send_blobs_for_processing(
id,
self.block_root,
blobs.clone(),
Duration::ZERO,
)
.map_err(LookupRequestError::SendFailedProcessor)?;
}
DownloadedData::Columns(columns) => {
cx.send_custody_columns_for_processing(
id,
self.block_root,
columns.clone(),
Duration::ZERO,
BlockProcessType::SingleCustodyColumn(id),
)
.map_err(LookupRequestError::SendFailedProcessor)?;
}
}
data.send_for_processing(id, self.block_root, cx)
.map_err(LookupRequestError::SendFailedProcessor)?;
let peer_group = peer_group.clone();
request.state = DataRequestState::Processing { peer_group };
// Processing needs an async trigger.
@@ -785,51 +805,35 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
loop {
match &mut self.payload_request {
None => {
// Same fallback as the data stream: the block may be in the availability
// checker via gossip before this lookup downloads it.
let block = self.block_request.peek_block().cloned().or_else(|| {
match cx.chain.get_block_process_status(&block_root) {
BlockProcessStatus::NotValidated(block, _)
| BlockProcessStatus::ExecutionValidated(block) => Some(block),
BlockProcessStatus::Unknown => None,
}
});
if let Some(block) = block {
let peers = self
.get_data_peers::<T::EthSpec>(
block.slot(),
block.execution_hash(),
cx.spec(),
)
.map_err(LookupRequestError::InternalError)?;
self.payload_request = Some(PayloadRequest {
peers,
state: PayloadRequestState::new(block.slot(), cx.spec()),
});
} else {
let Some(block) = self.block_request.peek_block_or_cached(block_root, cx)
else {
break;
}
};
let peers = self
.get_data_peers::<T::EthSpec>(
block.slot(),
block.execution_hash(),
cx.spec(),
)
.map_err(LookupRequestError::InternalError)?;
self.payload_request = Some(PayloadRequest {
peers,
state: PayloadRequestState::new(block.slot(), cx.spec()),
});
}
Some(request) => match &mut request.state {
PayloadRequestState::Downloading { state, .. } => {
// This are peers that claim to have imported a block whose parent_hash ==
// this block's execution's hash
match cx.payload_lookup_request(id, request.peers.clone(), block_root) {
Ok(LookupRequestResult::RequestSent(req_id)) => {
state.on_download_start(req_id)?;
}
Ok(LookupRequestResult::NoRequestNeeded(_reason)) => {
// Envelope is already known (e.g. imported by gossip). Skip
// download and mark payload stream complete.
request.state = PayloadRequestState::Complete;
continue;
}
Ok(LookupRequestResult::Pending(reason)) => {
state.update_awaiting_download_status(reason);
}
Err(e) => {
return Err(LookupRequestError::SendFailedNetwork(e));
}
// Peers in `request.peers` are those that have signalled they imported a
// child of this block whose bid's parent_hash matches our execution_hash
// i.e. they are proven to have the envelope. `make_request` is a no-op if
// a request is already in flight, so it's safe to call on every tick.
let peers = request.peers.clone();
state.make_request(|| cx.payload_lookup_request(id, peers, block_root))?;
if state.is_completed() {
// Envelope already known to fork-choice (NoRequestNeeded).
request.state = PayloadRequestState::Complete;
continue;
}
if let Some(result) = state.take_download_result() {
request.state = PayloadRequestState::Downloaded {
@@ -867,20 +871,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// === Check completion ===
if self.block_request.is_complete()
&& matches!(
self.data_request,
Some(DataRequest {
state: DataRequestState::Complete,
..
})
)
&& matches!(
self.payload_request,
Some(PayloadRequest {
state: PayloadRequestState::Complete,
..
})
)
&& self.data_request.as_ref().is_some_and(|r| r.is_complete())
&& self
.payload_request
.as_ref()
.is_some_and(|r| r.is_complete())
{
return Ok(LookupResult::Completed);
}

View File

@@ -434,9 +434,9 @@ impl TestRig {
process_fn.await
}
}
Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) => {
process_fn.await
}
Work::RpcBlobs { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::RpcEnvelope(process_fn) => process_fn.await,
Work::ChainSegment {
process_fn,
process_id: (chain_id, batch_epoch),