Serve rpc by range and by root:

This commit is contained in:
Eitan Seri- Levi
2026-02-24 00:55:29 -08:00
parent dcc43e3d20
commit ffc2b97699
19 changed files with 1140 additions and 8 deletions

View File

@@ -14,7 +14,8 @@ use beacon_processor::{
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
LightClientUpdatesByRangeRequest,
LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest,
PayloadEnvelopesByRootRequest,
};
use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use lighthouse_network::{
@@ -686,6 +687,46 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new work event to process `PayloadEnvelopesByRootRequest`s from the RPC network.
pub fn send_payload_envelopes_by_roots_request(
self: &Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId, // Use ResponseId here
request: PayloadEnvelopesByRootRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.handle_payload_envelopes_by_root_request(peer_id, inbound_request_id, request)
.await;
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::PayloadEnvelopesByRootRequest(Box::pin(process_fn)),
})
}
/// Create a new work event to process `PayloadEnvelopesByRangeRequest`s from the RPC network.
pub fn send_payload_envelopes_by_range_request(
self: &Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: PayloadEnvelopesByRangeRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.handle_payload_envelopes_by_range_request(peer_id, inbound_request_id, request)
.await;
};
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::PayloadEnvelopesByRangeRequest(Box::pin(process_fn)),
})
}
/// Create a new work event to process `BlobsByRangeRequest`s from the RPC network.
pub fn send_blobs_by_range_request(
self: &Arc<Self>,

View File

@@ -7,6 +7,7 @@ use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenS
use itertools::{Itertools, process_results};
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest,
};
use lighthouse_network::rpc::*;
use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo};
@@ -254,6 +255,113 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(())
}
/// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer.
#[instrument(
name = "lh_handle_payload_envelopes_by_root_request",
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub async fn handle_payload_envelopes_by_root_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: PayloadEnvelopesByRootRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_stream(
peer_id,
inbound_request_id,
self.clone()
.handle_payload_envelopes_by_root_request_inner(
peer_id,
inbound_request_id,
request,
)
.await,
Response::PayloadEnvelopesByRoot,
);
}
/// Handle a `ExecutionPayloadEnvelopes` request from the peer.
async fn handle_payload_envelopes_by_root_request_inner(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
request: PayloadEnvelopesByRootRequest,
) -> Result<(), (RpcErrorResponse, &'static str)> {
let log_results = |peer_id, requested_envelopes, send_envelope_count| {
debug!(
%peer_id,
requested = requested_envelopes,
returned = %send_envelope_count,
"ExecutionPayloadEnvelopes outgoing response processed"
);
};
let requested_envelopes = request.beacon_block_roots.len();
let mut envelope_stream = match self
.chain
.get_payload_envelopes_checking_caches(request.beacon_block_roots.to_vec())
{
Ok(envelope_stream) => envelope_stream,
Err(e) => {
error!( error = ?e, "Error getting payload envelope stream");
return Err((
RpcErrorResponse::ServerError,
"Error getting payload envelope stream",
));
}
};
// Fetching payload envelopes is async because it may have to hit the execution layer for payloads.
let mut send_envelope_count = 0;
while let Some((root, result)) = envelope_stream.next().await {
match result.as_ref() {
Ok(Some(envelope)) => {
self.send_response(
peer_id,
inbound_request_id,
Response::PayloadEnvelopesByRoot(Some(envelope.clone())),
);
send_envelope_count += 1;
}
Ok(None) => {
debug!(
%peer_id,
request_root = ?root,
"Peer requested unknown payload envelope"
);
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
block_root = ?root,
reason = "execution layer not synced",
"Failed to fetch execution payload for payload envelopes by root request"
);
log_results(peer_id, requested_envelopes, send_envelope_count);
return Err((
RpcErrorResponse::ResourceUnavailable,
"Execution layer not synced",
));
}
Err(e) => {
debug!(
?peer_id,
request_root = ?root,
error = ?e,
"Error fetching payload envelope for peer"
);
}
}
}
log_results(peer_id, requested_envelopes, send_envelope_count);
Ok(())
}
/// Handle a `BlobsByRoot` request from the peer.
#[instrument(
name = "lh_handle_blobs_by_root_request",
@@ -983,6 +1091,182 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.collect::<Vec<_>>())
}
/// Handle a `ExecutionPayloadEnvelopesByRange` request from the peer.
#[instrument(
name = "lh_handle_payload_envelopes_by_range_request",
parent = None,
level = "debug",
skip_all,
fields(peer_id = %peer_id, client = tracing::field::Empty)
)]
pub async fn handle_payload_envelopes_by_range_request(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
req: PayloadEnvelopesByRangeRequest,
) {
let client = self.network_globals.client(&peer_id);
Span::current().record("client", field::display(client.kind));
self.terminate_response_stream(
peer_id,
inbound_request_id,
self.clone()
.handle_payload_envelopes_by_range_request_inner(peer_id, inbound_request_id, req)
.await,
Response::PayloadEnvelopesByRange,
);
}
/// Handle a `ExecutionPayloadEnvelopesByRange` request from the peer.
async fn handle_payload_envelopes_by_range_request_inner(
self: Arc<Self>,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
req: PayloadEnvelopesByRangeRequest,
) -> Result<(), (RpcErrorResponse, &'static str)> {
let req_start_slot = req.start_slot;
let req_count = req.count;
debug!(
%peer_id,
count = req_count,
start_slot = %req_start_slot,
"Received ExecutionPayloadEnvelopesByRange Request"
);
// Spawn a blocking handle since get_block_roots_for_slot_range takes a sync lock on the
// fork-choice.
let network_beacon_processor = self.clone();
let block_roots = self
.executor
.spawn_blocking_handle(
move || {
network_beacon_processor.get_block_roots_for_slot_range(
req_start_slot,
req_count,
"ExecutionPayloadEnvelopesByRange",
)
},
"get_block_roots_for_slot_range",
)
.ok_or((RpcErrorResponse::ServerError, "shutting down"))?
.await
.map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??
.iter()
.map(|(root, _)| *root)
.collect::<Vec<_>>();
let current_slot = self
.chain
.slot()
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
let log_results = |peer_id, payloads_sent| {
if payloads_sent < (req_count as usize) {
debug!(
%peer_id,
msg = "Failed to return all requested payload envelopes",
start_slot = %req_start_slot,
%current_slot,
requested = req_count,
returned = payloads_sent,
"ExecutionPayloadEnvelopesByRange outgoing response processed"
);
} else {
debug!(
%peer_id,
start_slot = %req_start_slot,
%current_slot,
requested = req_count,
returned = payloads_sent,
"ExecutionPayloadEnvelopesByRange outgoing response processed"
);
}
};
let mut envelope_stream = match self.chain.get_payload_envelopes(block_roots) {
Ok(envelope_stream) => envelope_stream,
Err(e) => {
error!(error = ?e, "Error getting payload envelope stream");
return Err((RpcErrorResponse::ServerError, "Iterator error"));
}
};
// Fetching payload envelopes is async because it may have to hit the execution layer for payloads.
let mut envelopes_sent = 0;
while let Some((root, result)) = envelope_stream.next().await {
match result.as_ref() {
Ok(Some(envelope)) => {
// Due to skip slots, blocks could be out of the range, we ensure they
// are in the range before sending
if envelope.slot() >= req_start_slot
&& envelope.slot() < req_start_slot + req.count
{
envelopes_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
inbound_request_id,
response: Response::PayloadEnvelopesByRange(Some(envelope.clone())),
});
}
}
Ok(None) => {
error!(
request = ?req,
%peer_id,
request_root = ?root,
"Envelope in the chain is not in the store"
);
log_results(peer_id, envelopes_sent);
return Err((RpcErrorResponse::ServerError, "Database inconsistency"));
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
block_root = ?root,
reason = "execution layer not synced",
"Failed to fetch execution payload for envelope by range request"
);
log_results(peer_id, envelopes_sent);
// send the stream terminator
return Err((
RpcErrorResponse::ResourceUnavailable,
"Execution layer not synced",
));
}
Err(e) => {
if matches!(
e,
BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, boxed_error)
if matches!(**boxed_error, execution_layer::Error::EngineError(_))
) {
warn!(
info = "this may occur occasionally when the EE is busy",
block_root = ?root,
error = ?e,
"Error rebuilding payload for peer"
);
} else {
error!(
block_root = ?root,
error = ?e,
"Error fetching payload envelope for peer"
);
}
log_results(peer_id, envelopes_sent);
// send the stream terminator
return Err((
RpcErrorResponse::ServerError,
"Failed fetching payload envelopes",
));
}
}
}
log_results(peer_id, envelopes_sent);
Ok(())
}
/// Handle a `BlobsByRange` request from the peer.
#[instrument(
name = "lh_handle_blobs_by_range_request",

View File

@@ -19,11 +19,14 @@ use beacon_chain::test_utils::{
};
use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *};
use bls::Signature;
use fixed_bytes::FixedBytesExtended;
use itertools::Itertools;
use libp2p::gossipsub::MessageAcceptance;
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3,
PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest,
};
use lighthouse_network::{
Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response,
@@ -41,8 +44,9 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::{
AttesterSlashing, BlobSidecar, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch,
EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
EthSpec, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, Hash256,
MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
};
use types::{
BlobSidecarList,
@@ -534,6 +538,29 @@ impl TestRig {
.unwrap();
}
pub fn enqueue_payload_envelopes_by_range_request(&self, start_slot: u64, count: u64) {
self.network_beacon_processor
.send_payload_envelopes_by_range_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
PayloadEnvelopesByRangeRequest { start_slot, count },
)
.unwrap();
}
pub fn enqueue_payload_envelopes_by_root_request(
&self,
beacon_block_roots: RuntimeVariableList<Hash256>,
) {
self.network_beacon_processor
.send_payload_envelopes_by_roots_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
PayloadEnvelopesByRootRequest { beacon_block_roots },
)
.unwrap();
}
pub fn enqueue_backfill_batch(&self, epoch: Epoch) {
self.network_beacon_processor
.send_chain_segment(
@@ -2102,3 +2129,226 @@ async fn test_data_columns_by_range_no_duplicates_with_skip_slots() {
unique_roots.len(),
);
}
/// Create a test `SignedExecutionPayloadEnvelope` with the given slot and beacon block root.
fn make_test_payload_envelope(
slot: Slot,
beacon_block_root: Hash256,
) -> SignedExecutionPayloadEnvelope<E> {
SignedExecutionPayloadEnvelope {
message: ExecutionPayloadEnvelope {
payload: ExecutionPayloadGloas::default(),
execution_requests: ExecutionRequests::default(),
builder_index: 0,
beacon_block_root,
slot,
state_root: Hash256::zero(),
},
signature: Signature::empty(),
}
}
#[tokio::test]
async fn test_payload_envelopes_by_range() {
// Only test when Gloas fork is scheduled
if test_spec::<E>().gloas_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new(64).await;
let start_slot = 0;
let slot_count = 32;
// Manually store payload envelopes for each block in the range
let mut expected_count = 0;
for slot in start_slot..slot_count {
if let Some(root) = rig
.chain
.block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None)
.unwrap()
{
let envelope = make_test_payload_envelope(Slot::new(slot), root);
rig.chain
.store
.put_payload_envelope(&root, envelope)
.unwrap();
expected_count += 1;
}
}
rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count);
let mut actual_count = 0;
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::PayloadEnvelopesByRange(envelope),
inbound_request_id: _,
} = next
{
if envelope.is_some() {
actual_count += 1;
} else {
break;
}
} else if let NetworkMessage::SendErrorResponse { .. } = next {
// Error response terminates the stream
break;
} else {
panic!("unexpected message {:?}", next);
}
}
assert_eq!(expected_count, actual_count);
}
#[tokio::test]
async fn test_payload_envelopes_by_root() {
// Only test when Gloas fork is scheduled
if test_spec::<E>().gloas_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new(64).await;
let block_root = rig
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
// Manually store a payload envelope for this block
let envelope = make_test_payload_envelope(Slot::new(1), block_root);
rig.chain
.store
.put_payload_envelope(&block_root, envelope)
.unwrap();
let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap();
rig.enqueue_payload_envelopes_by_root_request(roots);
let mut actual_count = 0;
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::PayloadEnvelopesByRoot(envelope),
inbound_request_id: _,
} = next
{
if envelope.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
assert_eq!(1, actual_count);
}
#[tokio::test]
async fn test_payload_envelopes_by_root_unknown_root_returns_empty() {
// Only test when Gloas fork is scheduled
if test_spec::<E>().gloas_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new(64).await;
// Request envelope for a root that has no stored envelope
let block_root = rig
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
// Don't store any envelope — the handler should return 0 envelopes
let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap();
rig.enqueue_payload_envelopes_by_root_request(roots);
let mut actual_count = 0;
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::PayloadEnvelopesByRoot(envelope),
inbound_request_id: _,
} = next
{
if envelope.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
assert_eq!(0, actual_count);
}
#[tokio::test]
async fn test_payload_envelopes_by_range_no_duplicates_with_skip_slots() {
// Only test when Gloas fork is scheduled
if test_spec::<E>().gloas_fork_epoch.is_none() {
return;
};
// Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6.
let skip_slots: HashSet<u64> = [5, 6].into_iter().collect();
let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await;
let start_slot = 0u64;
let slot_count = 10u64;
// Store payload envelopes for all blocks in the range (skipping the skip slots)
for slot in start_slot..slot_count {
if let Some(root) = rig
.chain
.block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None)
.unwrap()
{
let envelope = make_test_payload_envelope(Slot::new(slot), root);
rig.chain
.store
.put_payload_envelope(&root, envelope)
.unwrap();
}
}
rig.enqueue_payload_envelopes_by_range_request(start_slot, slot_count);
let mut beacon_block_roots: Vec<Hash256> = Vec::new();
while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::PayloadEnvelopesByRange(envelope),
inbound_request_id: _,
} = next
{
if let Some(env) = envelope {
beacon_block_roots.push(env.beacon_block_root());
} else {
break;
}
} else if let NetworkMessage::SendErrorResponse { .. } = next {
break;
} else {
panic!("unexpected message {:?}", next);
}
}
assert!(
!beacon_block_roots.is_empty(),
"Should have received at least some payload envelopes"
);
// Skip slots should not cause duplicate envelopes for the same block root
let unique_roots: HashSet<_> = beacon_block_roots.iter().collect();
assert_eq!(
beacon_block_roots.len(),
unique_roots.len(),
"Response contained duplicate block roots: got {} envelopes but only {} unique roots",
beacon_block_roots.len(),
unique_roots.len(),
);
}

View File

@@ -229,6 +229,24 @@ impl<T: BeaconChainTypes> Router<T> {
request,
),
),
RequestType::PayloadEnvelopesByRoot(request) => self
.handle_beacon_processor_send_result(
self.network_beacon_processor
.send_payload_envelopes_by_roots_request(
peer_id,
inbound_request_id,
request,
),
),
RequestType::PayloadEnvelopesByRange(request) => self
.handle_beacon_processor_send_result(
self.network_beacon_processor
.send_payload_envelopes_by_range_request(
peer_id,
inbound_request_id,
request,
),
),
RequestType::BlobsByRange(request) => self.handle_beacon_processor_send_result(
self.network_beacon_processor.send_blobs_by_range_request(
peer_id,
@@ -309,6 +327,11 @@ impl<T: BeaconChainTypes> Router<T> {
Response::DataColumnsByRange(data_column) => {
self.on_data_columns_by_range_response(peer_id, app_request_id, data_column);
}
// TODO(EIP-7732): implement outgoing payload envelopes by range and root
// responses once sync manager requests them.
Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => {
unreachable!()
}
// Light client responses should not be received
Response::LightClientBootstrap(_)
| Response::LightClientOptimisticUpdate(_)