Implement beacon_blocks_by_head (#9237)

Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Lion - dapplion
2026-05-06 20:41:01 -06:00
committed by GitHub
parent 31e5f308c3
commit 7148bfcdd1
14 changed files with 637 additions and 8 deletions

View File

@@ -14,8 +14,8 @@ use beacon_processor::{
};
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest,
BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest,
DataColumnsByRootRequest, LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest,
PayloadEnvelopesByRootRequest,
};
use lighthouse_network::service::api_types::CustodyBackfillBatchId;
@@ -699,6 +699,26 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new work event to process `BlocksByHeadRequest`s from the RPC network.
pub fn send_blocks_by_head_request(
self: &Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: BlocksByHeadRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.handle_blocks_by_head_request(peer_id, inbound_request_id, request)
.await;
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::BlocksByHeadRequest(Box::pin(process_fn)),
})
}
/// Create a new work event to process `BlocksByRootRequest`s from the RPC network.
pub fn send_blocks_by_roots_request(
self: &Arc<Self>,

View File

@@ -7,8 +7,8 @@ use beacon_chain::payload_envelope_streamer::EnvelopeRequestSource;
use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenSlotSkipped};
use itertools::{Itertools, process_results};
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest,
BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest,
DataColumnsByRootRequest, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest,
};
use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo};
@@ -256,6 +256,266 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(())
}
/// Handle a `BeaconBlocksByHead` request from the peer.
///
/// Walks the parent chain of `request.beacon_root` (inclusive) and emits up to
/// `min(request.count, MAX_REQUEST_BLOCKS_DENEB)` blocks in descending slot order.
/// See consensus-specs PR 5181.
#[instrument(
name = "lh_handle_blocks_by_head_request",
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub async fn handle_blocks_by_head_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: BlocksByHeadRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_stream(
peer_id,
inbound_request_id,
self.clone()
.handle_blocks_by_head_request_inner(peer_id, inbound_request_id, request)
.await,
Response::BlocksByHead,
);
}
async fn handle_blocks_by_head_request_inner(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: BlocksByHeadRequest,
) -> Result<(), (RpcErrorResponse, &'static str)> {
let spec = &self.chain.spec;
// Cap the response at MAX_REQUEST_BLOCKS_DENEB regardless of what the peer asked for,
// matching the spec.
let max_request_blocks = spec.max_request_blocks(types::ForkName::Deneb) as u64;
let cap = request.count.min(max_request_blocks);
let beacon_root = request.beacon_root;
debug!(
%peer_id,
beacon_root = ?beacon_root,
count = request.count,
cap,
"Received BlocksByHead Request"
);
if cap == 0 {
return Ok(());
}
// Walk the parent chain on a blocking thread because `get_blinded_block` hits the store
// synchronously and we may walk up to MAX_REQUEST_BLOCKS_DENEB ancestors.
let network_beacon_processor = self.clone();
let block_roots = self
.executor
.spawn_blocking_handle(
move || network_beacon_processor.get_block_roots_ancestor_of_head(beacon_root, cap),
"get_block_roots_ancestor_of_head",
)
.ok_or((RpcErrorResponse::ServerError, "shutting down"))?
.await
.map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??;
let requested_blocks = block_roots.len();
let log_results = |peer_id, blocks_sent| {
debug!(
%peer_id,
requested = requested_blocks,
returned = blocks_sent,
"BlocksByHead outgoing response processed"
);
};
let mut block_stream = match self.chain.get_blocks(block_roots) {
Ok(block_stream) => block_stream,
Err(e) => {
error!(error = ?e, "Error getting block stream");
return Err((RpcErrorResponse::ServerError, "Iterator error"));
}
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
let mut blocks_sent = 0;
while let Some((root, result)) = block_stream.next().await {
match result.as_ref() {
Ok(Some(block)) => {
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
inbound_request_id,
response: Response::BlocksByHead(Some(block.clone())),
});
}
Ok(None) => {
error!(
%peer_id,
request_root = ?root,
"Block in the chain is not in the store"
);
log_results(peer_id, blocks_sent);
return Err((RpcErrorResponse::ServerError, "Database inconsistency"));
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
block_root = ?root,
reason = "execution layer not synced",
"Failed to fetch execution payload for blocks by head request"
);
log_results(peer_id, blocks_sent);
return Err((
RpcErrorResponse::ResourceUnavailable,
"Execution layer not synced",
));
}
Err(e) => {
if matches!(
e,
BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, boxed_error)
if matches!(**boxed_error, execution_layer::Error::EngineError(_))
) {
warn!(
info = "this may occur occasionally when the EE is busy",
block_root = ?root,
error = ?e,
"Error rebuilding payload for peer"
);
} else {
error!(
block_root = ?root,
error = ?e,
"Error fetching block for peer"
);
}
log_results(peer_id, blocks_sent);
return Err((RpcErrorResponse::ServerError, "Failed fetching blocks"));
}
}
}
log_results(peer_id, blocks_sent);
Ok(())
}
/// Walks the parent chain of `head_root` (inclusive) and returns up to `count` block roots
/// in descending slot order. Synchronous so it can be run on a blocking thread.
///
/// Two regimes are handled:
/// 1. Above finalization → fork-choice's in-memory proto-array supplies the roots
/// (zero DB reads).
/// 2. At or below finalization → the freezer DB's `BeaconBlockRoots` column (the
/// canonical slot→root index for finalized blocks, populated for
/// `[oldest_block_slot, split.slot)` with skip slots reusing the prior block's
/// root) supplies the roots. The head state is never consulted: its 8192-slot
/// `block_roots` bucket would silently truncate deep walks and is the wrong
/// source of truth for canonical history below finalization.
///
/// Returns `ResourceUnavailable` if `head_root` is not known to the node.
fn get_block_roots_ancestor_of_head(
&self,
head_root: Hash256,
count: u64,
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
if count == 0 {
return Ok(vec![]);
}
// 1. Walk ancestors in proto-array (in-memory, zero DB reads). Track the
// deepest slot we collected — that's where the freezer walk picks up.
let mut roots: Vec<Hash256> = Vec::with_capacity(count as usize);
let mut deepest_slot: Option<Slot> = None;
{
let fork_choice = self.chain.canonical_head.fork_choice_read_lock();
for (root, slot) in fork_choice
.proto_array()
.iter_block_roots(&head_root)
.take(count as usize)
{
roots.push(root);
deepest_slot = Some(slot);
}
}
let store = &self.chain.store;
// 2. Fallback: `head_root` is at or below finalization (proto-array doesn't
// track it). Look up its slot in the store, then verify it is the canonical
// block at that slot via the freezer index — a non-canonical hot-DB block at
// slot < split.slot can shadow the finalized chain. If the freezer
// disagrees (or doesn't have that slot), serve just the single block we
// found, satisfying the spec's "MUST return at least one block if you have
// it" clause.
let mut current_slot = if let Some(slot) = deepest_slot {
slot
} else {
let block = self
.chain
.get_blinded_block(&head_root)
.map_err(|e| {
error!(error = ?e, "Error reading blinded block for BlocksByHead beacon_root");
(RpcErrorResponse::ServerError, "Database error")
})?
.ok_or((RpcErrorResponse::ResourceUnavailable, "Unknown beacon_root"))?;
let block_slot = block.slot();
roots.push(head_root);
match store.get_cold_block_root(block_slot) {
Ok(Some(r)) if r == head_root => {} // canonical, OK to walk back
Ok(_) => return Ok(roots),
Err(e) => {
error!(error = ?e, "Error reading freezer block_root for BlocksByHead");
return Err((RpcErrorResponse::ServerError, "Database error"));
}
}
block_slot
};
if (roots.len() as u64) >= count {
return Ok(roots);
}
// 3. Spillover via the freezer DB's `BeaconBlockRoots` index (the canonical
// slot→root mapping for finalized blocks). Skip slots reuse the prior
// block's root; dedup on insert.
let oldest_block_slot = store.get_oldest_block_slot();
let mut last_root = roots.last().copied();
while (roots.len() as u64) < count && current_slot > oldest_block_slot {
current_slot = match current_slot.as_u64().checked_sub(1) {
Some(s) => Slot::from(s),
None => break,
};
match store.get_cold_block_root(current_slot) {
Ok(Some(root)) => {
if Some(root) != last_root {
roots.push(root);
last_root = Some(root);
}
}
Ok(None) => {
// Hole in the freezer index (e.g. before `oldest_block_slot` on a
// checkpoint-synced node). Stop walking.
break;
}
Err(e) => {
error!(error = ?e, "Error walking freezer block_roots");
return Err((RpcErrorResponse::ServerError, "Database error"));
}
}
}
Ok(roots)
}
/// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer.
#[instrument(
name = "lh_handle_payload_envelopes_by_root_request",

View File

@@ -24,8 +24,8 @@ use itertools::Itertools;
use libp2p::gossipsub::MessageAcceptance;
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3,
PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest,
BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest,
MetaDataV3, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest,
};
use lighthouse_network::{
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
@@ -501,6 +501,16 @@ impl TestRig {
.unwrap();
}
pub fn enqueue_blocks_by_head_request(&self, beacon_root: Hash256, count: u64) {
self.network_beacon_processor
.send_blocks_by_head_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
BlocksByHeadRequest { beacon_root, count },
)
.unwrap();
}
pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList<BlobIdentifier>) {
self.network_beacon_processor
.send_blobs_by_roots_request(
@@ -2346,3 +2356,153 @@ async fn test_payload_envelopes_by_range_no_duplicates_with_skip_slots() {
// 1. Gossip envelope arrives before its block → queued via UnknownBlockForEnvelope
// 2. Block imported → envelope released and processed successfully
// 3. Timeout path → envelope released and re-verified
/// Drain `network_rx` collecting `Response::BlocksByHead(Some(_))` block roots until the
/// stream terminator (`None`) arrives. Panics on any other message type so tests fail
/// loudly if an error response sneaks in.
async fn drain_blocks_by_head_response(rig: &mut TestRig) -> Vec<Hash256> {
let mut roots = Vec::new();
while let Some(msg) = rig.network_rx.recv().await {
match msg {
NetworkMessage::SendResponse {
response: Response::BlocksByHead(Some(block)),
..
} => roots.push(block.canonical_root()),
NetworkMessage::SendResponse {
response: Response::BlocksByHead(None),
..
} => return roots,
other => panic!("unexpected message: {:?}", other),
}
}
roots
}
// `BlocksByHead` request that crosses the finalized boundary: proto-array supplies
// the unfinalized head + ancestors down to the finalized root, then the freezer's
// `BeaconBlockRoots` index supplies the rest. Verifies the spillover path
// `get_block_roots_ancestor_of_head` takes when count > proto-array depth.
#[tokio::test]
async fn test_blocks_by_head_spillover_into_freezer() {
// Long enough for finalization + state migration to populate the freezer.
let mut rig = TestRig::new(SLOTS_PER_EPOCH * 4).await;
// Sanity-check the precondition: finalization advanced past genesis and the split
// slot is non-zero, so the freezer's `BeaconBlockRoots` column has entries.
assert!(
rig.chain
.canonical_head
.cached_head()
.finalized_checkpoint()
.epoch
> Epoch::new(0),
"test precondition: chain must have finalized past epoch 0",
);
assert!(
rig.chain.store.get_split_slot() > Slot::new(0),
"test precondition: state migration must have populated the freezer",
);
let head_slot = rig.chain.canonical_head.cached_head().head_slot();
let head_root = rig.chain.canonical_head.cached_head().head_block_root();
// Walk all the way back to slot 1: exercises both proto-array (above finalization)
// and freezer (at/below finalization).
let count = head_slot.as_u64();
rig.enqueue_blocks_by_head_request(head_root, count);
let actual = drain_blocks_by_head_response(&mut rig).await;
// Build the canonical descending root list independently. The harness has no skip
// slots so every slot in [1, head_slot] has a unique block, but we still dedup
// defensively to mirror the function under test.
let mut expected: Vec<Hash256> = Vec::new();
let mut last: Option<Hash256> = None;
for offset in 0..count {
let slot = Slot::new(head_slot.as_u64() - offset);
if let Some(root) = rig
.chain
.block_root_at_slot(slot, WhenSlotSkipped::Prev)
.unwrap()
&& Some(root) != last
{
expected.push(root);
last = Some(root);
}
}
assert_eq!(
actual, expected,
"BlocksByHead must serve the full canonical parent chain across the finalized boundary",
);
assert_eq!(actual.first(), Some(&head_root), "first root must be head");
}
// `BlocksByHead` with `beacon_root` set to a finalized block root (case-2 fallback in
// `get_block_roots_ancestor_of_head`): proto-array doesn't track it, so we
// `get_blinded_block` for its slot, verify canonicity via the freezer index, and walk
// back from there.
#[tokio::test]
async fn test_blocks_by_head_finalized_root() {
let mut rig = TestRig::new(SLOTS_PER_EPOCH * 4).await;
let finalized_root = rig
.chain
.canonical_head
.cached_head()
.finalized_checkpoint()
.root;
let finalized_slot = rig
.chain
.get_blinded_block(&finalized_root)
.unwrap()
.expect("finalized block exists in store")
.slot();
assert!(
finalized_slot > Slot::new(0),
"test precondition: finalized block must not be genesis",
);
let count = 8u64.min(finalized_slot.as_u64());
rig.enqueue_blocks_by_head_request(finalized_root, count);
let actual = drain_blocks_by_head_response(&mut rig).await;
let mut expected: Vec<Hash256> = Vec::new();
let mut last: Option<Hash256> = None;
for offset in 0..count {
let slot = Slot::new(finalized_slot.as_u64() - offset);
if let Some(root) = rig
.chain
.block_root_at_slot(slot, WhenSlotSkipped::Prev)
.unwrap()
&& Some(root) != last
{
expected.push(root);
last = Some(root);
}
}
assert_eq!(actual, expected);
assert_eq!(
actual.first(),
Some(&finalized_root),
"first root must be the requested finalized root",
);
}
// `BlocksByHead` for a `beacon_root` we don't have. Spec says we MUST return an error
// (we map this to `ResourceUnavailable`).
#[tokio::test]
async fn test_blocks_by_head_unknown_root() {
let mut rig = TestRig::new(SLOTS_PER_EPOCH).await;
rig.enqueue_blocks_by_head_request(Hash256::repeat_byte(0xab), 4);
match rig.network_rx.recv().await.expect("a network message") {
NetworkMessage::SendErrorResponse { error, .. } => {
assert_matches!(
error,
lighthouse_network::rpc::RpcErrorResponse::ResourceUnavailable
);
}
other => panic!("expected SendErrorResponse, got {:?}", other),
}
}

View File

@@ -243,6 +243,13 @@ impl<T: BeaconChainTypes> Router<T> {
request,
),
),
RequestType::BlocksByHead(request) => self.handle_beacon_processor_send_result(
self.network_beacon_processor.send_blocks_by_head_request(
peer_id,
inbound_request_id,
request,
),
),
RequestType::PayloadEnvelopesByRoot(request) => self
.handle_beacon_processor_send_result(
self.network_beacon_processor
@@ -346,6 +353,11 @@ impl<T: BeaconChainTypes> Router<T> {
Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => {
debug!("Requesting envelopes by root and by range not supported yet");
}
// Lighthouse currently only serves BlocksByHead and does not issue it as a client,
// so receiving a response is unexpected. Drop it without crashing.
Response::BlocksByHead(_) => {
debug!("BlocksByHead response received but not requested by lighthouse");
}
// Light client responses should not be received
Response::LightClientBootstrap(_)
| Response::LightClientOptimisticUpdate(_)