diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 4be427db29..cbb0f51ab9 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -204,6 +204,23 @@ impl BlockRequest { matches!(self, BlockRequest::Complete { .. }) } + /// Best-effort lookup of the block: prefer the in-flight download if we have it; otherwise + /// fall back to the chain's processing-status cache (the block may have arrived via gossip / + /// HTTP API before this lookup downloads it). + fn peek_block_or_cached>( + &self, + block_root: Hash256, + cx: &mut SyncNetworkContext, + ) -> Option>> { + self.peek_block().cloned().or_else(|| { + match cx.chain.get_block_process_status(&block_root) { + BlockProcessStatus::NotValidated(block, _) + | BlockProcessStatus::ExecutionValidated(block) => Some(block), + BlockProcessStatus::Unknown => None, + } + }) + } + fn insert_verified_response( &mut self, result: DownloadResult>>, @@ -246,6 +263,12 @@ enum DataRequestState { Complete, } +impl DataRequest { + fn is_complete(&self) -> bool { + matches!(self.state, DataRequestState::Complete) + } +} + impl DataRequestState { fn is_awaiting_event(&self) -> bool { match &self { @@ -271,10 +294,10 @@ enum DataDownload { } impl DataDownload { - fn continue_requests>( + fn send_request>( &mut self, id: Id, - peers: Arc>>, + peers: PeerSet, cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { match self { @@ -285,16 +308,13 @@ impl DataDownload { } => { let br = *block_root; let eb = *expected_blobs; - state.make_request(|| cx.blob_lookup_request(id, peers, br, eb))?; + state.make_request(|| cx.blob_lookup_request(id, peers, br, eb)) } - DataDownload::Columns { - block_root, state, .. - } => { + DataDownload::Columns { block_root, state } => { let br = *block_root; - state.make_request(|| cx.custody_lookup_request(id, br, peers))?; + state.make_request(|| cx.custody_lookup_request(id, br, peers)) } } - Ok(()) } fn is_completed(&self) -> bool { @@ -330,6 +350,28 @@ enum DownloadedData { Columns(DataColumnSidecarList), } +impl DownloadedData { + fn send_for_processing>( + &self, + id: Id, + block_root: Hash256, + cx: &mut SyncNetworkContext, + ) -> Result<(), SendErrorProcessor> { + match self { + DownloadedData::Blobs(blobs) => { + cx.send_blobs_for_processing(id, block_root, blobs.clone(), Duration::ZERO) + } + DownloadedData::Columns(columns) => cx.send_custody_columns_for_processing( + id, + block_root, + columns.clone(), + Duration::ZERO, + BlockProcessType::SingleCustodyColumn(id), + ), + } + } +} + // === Payload request: WaitingForBlock → Downloading → Downloaded → Processing → Complete === #[derive(Debug)] @@ -356,6 +398,15 @@ enum PayloadRequestState { Complete, } +impl PayloadRequest { + fn is_complete(&self) -> bool { + if !self.state.is_awaiting_event() && self.peers.read().is_empty() { + return true; + } + matches!(self.state, PayloadRequestState::Complete) + } +} + impl PayloadRequestState { fn is_awaiting_event(&self) -> bool { match self { @@ -698,38 +749,26 @@ impl SingleBlockLookup { match &mut self.data_request { // None = waiting for block None => { - // Prefer a block downloaded by this lookup. Otherwise fall back to the - // chain's processing-status cache: the block may already be in the - // availability checker via gossip/HTTP API before this lookup downloads - // it, and we can still drive the data request in parallel. - let block = self.block_request.peek_block().cloned().or_else(|| { - match cx.chain.get_block_process_status(&block_root) { - BlockProcessStatus::NotValidated(block, _) - | BlockProcessStatus::ExecutionValidated(block) => Some(block), - BlockProcessStatus::Unknown => None, - } - }); - if let Some(block) = block { - let peers = self - .get_data_peers::( - block.slot(), - block.execution_hash(), - cx.spec(), - ) - .map_err(LookupRequestError::InternalError)?; - self.data_request = Some(DataRequest { - peers, - state: DataRequestState::new( - block.slot(), - self.block_root, - block.num_expected_blobs(), - cx.spec(), - ), - }); - } else { - // Wait for block to be downloaded + let Some(block) = self.block_request.peek_block_or_cached(block_root, cx) + else { break; - } + }; + let peers = self + .get_data_peers::( + block.slot(), + block.execution_hash(), + cx.spec(), + ) + .map_err(LookupRequestError::InternalError)?; + self.data_request = Some(DataRequest { + peers, + state: DataRequestState::new( + block.slot(), + self.block_root, + block.num_expected_blobs(), + cx.spec(), + ), + }); } Some(request) => match &mut request.state { DataRequestState::Downloading(dl) => { @@ -737,7 +776,7 @@ impl SingleBlockLookup { // inside `ActiveCustodyRequest`, not against `data_peers`. Only gate on // `data_peers` for post-Gloas, where peer sets are strictly partitioned // and no fallback pool exists. - dl.continue_requests(id, request.peers.clone(), cx)?; + dl.send_request(id, request.peers.clone(), cx)?; if dl.is_completed() { // All data already imported (e.g. received via gossip) @@ -750,27 +789,8 @@ impl SingleBlockLookup { } } DataRequestState::Downloaded { data, peer_group } => { - match data { - DownloadedData::Blobs(blobs) => { - cx.send_blobs_for_processing( - id, - self.block_root, - blobs.clone(), - Duration::ZERO, - ) - .map_err(LookupRequestError::SendFailedProcessor)?; - } - DownloadedData::Columns(columns) => { - cx.send_custody_columns_for_processing( - id, - self.block_root, - columns.clone(), - Duration::ZERO, - BlockProcessType::SingleCustodyColumn(id), - ) - .map_err(LookupRequestError::SendFailedProcessor)?; - } - } + data.send_for_processing(id, self.block_root, cx) + .map_err(LookupRequestError::SendFailedProcessor)?; let peer_group = peer_group.clone(); request.state = DataRequestState::Processing { peer_group }; // Processing needs an async trigger. @@ -785,51 +805,35 @@ impl SingleBlockLookup { loop { match &mut self.payload_request { None => { - // Same fallback as the data stream: the block may be in the availability - // checker via gossip before this lookup downloads it. - let block = self.block_request.peek_block().cloned().or_else(|| { - match cx.chain.get_block_process_status(&block_root) { - BlockProcessStatus::NotValidated(block, _) - | BlockProcessStatus::ExecutionValidated(block) => Some(block), - BlockProcessStatus::Unknown => None, - } - }); - if let Some(block) = block { - let peers = self - .get_data_peers::( - block.slot(), - block.execution_hash(), - cx.spec(), - ) - .map_err(LookupRequestError::InternalError)?; - self.payload_request = Some(PayloadRequest { - peers, - state: PayloadRequestState::new(block.slot(), cx.spec()), - }); - } else { + let Some(block) = self.block_request.peek_block_or_cached(block_root, cx) + else { break; - } + }; + let peers = self + .get_data_peers::( + block.slot(), + block.execution_hash(), + cx.spec(), + ) + .map_err(LookupRequestError::InternalError)?; + self.payload_request = Some(PayloadRequest { + peers, + state: PayloadRequestState::new(block.slot(), cx.spec()), + }); } Some(request) => match &mut request.state { PayloadRequestState::Downloading { state, .. } => { - // This are peers that claim to have imported a block whose parent_hash == - // this block's execution's hash - match cx.payload_lookup_request(id, request.peers.clone(), block_root) { - Ok(LookupRequestResult::RequestSent(req_id)) => { - state.on_download_start(req_id)?; - } - Ok(LookupRequestResult::NoRequestNeeded(_reason)) => { - // Envelope is already known (e.g. imported by gossip). Skip - // download and mark payload stream complete. - request.state = PayloadRequestState::Complete; - continue; - } - Ok(LookupRequestResult::Pending(reason)) => { - state.update_awaiting_download_status(reason); - } - Err(e) => { - return Err(LookupRequestError::SendFailedNetwork(e)); - } + // Peers in `request.peers` are those that have signalled they imported a + // child of this block whose bid's parent_hash matches our execution_hash — + // i.e. they are proven to have the envelope. `make_request` is a no-op if + // a request is already in flight, so it's safe to call on every tick. + let peers = request.peers.clone(); + state.make_request(|| cx.payload_lookup_request(id, peers, block_root))?; + + if state.is_completed() { + // Envelope already known to fork-choice (NoRequestNeeded). + request.state = PayloadRequestState::Complete; + continue; } if let Some(result) = state.take_download_result() { request.state = PayloadRequestState::Downloaded { @@ -867,20 +871,11 @@ impl SingleBlockLookup { // === Check completion === if self.block_request.is_complete() - && matches!( - self.data_request, - Some(DataRequest { - state: DataRequestState::Complete, - .. - }) - ) - && matches!( - self.payload_request, - Some(PayloadRequest { - state: PayloadRequestState::Complete, - .. - }) - ) + && self.data_request.as_ref().is_some_and(|r| r.is_complete()) + && self + .payload_request + .as_ref() + .is_some_and(|r| r.is_complete()) { return Ok(LookupResult::Completed); } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index c16d8969e9..aa8334e4eb 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -434,9 +434,9 @@ impl TestRig { process_fn.await } } - Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) => { - process_fn.await - } + Work::RpcBlobs { process_fn } + | Work::RpcCustodyColumn(process_fn) + | Work::RpcEnvelope(process_fn) => process_fn.await, Work::ChainSegment { process_fn, process_id: (chain_id, batch_epoch),