When a block comes in whose parent is unkown, queue the block for processing and lookup the parent envelope

This commit is contained in:
Eitan Seri- Levi
2026-03-26 23:40:35 -07:00
parent e1a2cfe202
commit 09e9a54314
14 changed files with 608 additions and 43 deletions

View File

@@ -321,6 +321,13 @@ pub enum BlockError {
bid_parent_root: Hash256,
block_parent_root: Hash256,
},
/// The parent block is known but its execution payload envelope has not been received yet.
///
/// ## Peer scoring
///
/// It's unclear if this block is valid, but it cannot be fully verified without the parent's
/// execution payload envelope.
ParentEnvelopeUnknown { parent_root: Hash256 },
}
/// Which specific signature(s) are invalid in a SignedBeaconBlock
@@ -1939,13 +1946,13 @@ fn load_parent<T: BeaconChainTypes, B: AsBlock<T::EthSpec>>(
&& let Ok(parent_bid_block_hash) = parent_block.payload_bid_block_hash()
{
if block.as_block().is_parent_block_full(parent_bid_block_hash) {
// TODO(gloas): loading the envelope here is not very efficient
// TODO(gloas): check parent payload existence prior to this point?
let envelope = chain.store.get_payload_envelope(&root)?.ok_or_else(|| {
BeaconChainError::DBInconsistent(format!(
"Missing envelope for parent block {root:?}",
))
})?;
// If the parent's execution payload envelope hasn't arrived yet,
// return an unknown parent error so the block gets sent to the
// reprocess queue.
let envelope = chain
.store
.get_payload_envelope(&root)?
.ok_or(BlockError::ParentEnvelopeUnknown { parent_root: root })?;
(StatePayloadStatus::Full, envelope.message.state_root)
} else {
(StatePayloadStatus::Pending, parent_block.state_root())

View File

@@ -416,6 +416,9 @@ pub enum Work<E: EthSpec> {
RpcBlobs {
process_fn: AsyncFn,
},
RpcPayloadEnvelope {
process_fn: AsyncFn,
},
RpcCustodyColumn(AsyncFn),
ColumnReconstruction(AsyncFn),
IgnoredRpcBlock {
@@ -477,6 +480,7 @@ pub enum WorkType {
GossipLightClientOptimisticUpdate,
RpcBlock,
RpcBlobs,
RpcPayloadEnvelope,
RpcCustodyColumn,
ColumnReconstruction,
IgnoredRpcBlock,
@@ -538,6 +542,7 @@ impl<E: EthSpec> Work<E> {
Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences,
Work::RpcBlock { .. } => WorkType::RpcBlock,
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
Work::RpcPayloadEnvelope { .. } => WorkType::RpcPayloadEnvelope,
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
@@ -1169,7 +1174,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::GossipLightClientOptimisticUpdate { .. } => work_queues
.lc_gossip_optimistic_update_queue
.push(work, work_id),
Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => {
Work::RpcBlock { .. }
| Work::IgnoredRpcBlock { .. }
| Work::RpcPayloadEnvelope { .. } => {
work_queues.rpc_block_queue.push(work, work_id)
}
Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id),
@@ -1301,7 +1308,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::GossipLightClientOptimisticUpdate => {
work_queues.lc_gossip_optimistic_update_queue.len()
}
WorkType::RpcBlock => work_queues.rpc_block_queue.len(),
WorkType::RpcBlock | WorkType::RpcPayloadEnvelope => {
work_queues.rpc_block_queue.len()
}
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => {
work_queues.rpc_blob_queue.len()
}
@@ -1496,6 +1505,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_root: _,
}
| Work::RpcBlobs { process_fn }
| Work::RpcPayloadEnvelope { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),

View File

@@ -1,6 +1,10 @@
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_chain::payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope;
use beacon_chain::{
BeaconChain, BeaconChainTypes, NotifyExecutionLayer,
payload_envelope_verification::EnvelopeError,
};
use bytes::Bytes;
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use lighthouse_network::PubsubMessage;
@@ -9,8 +13,11 @@ use ssz::Decode;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{info, warn};
use types::SignedExecutionPayloadEnvelope;
use warp::{Filter, Rejection, Reply, reply::Response};
use types::{BlockImportSource, SignedExecutionPayloadEnvelope};
use warp::{
Filter, Rejection, Reply,
hyper::{Body, Response},
};
// POST beacon/execution_payload_envelope (SSZ)
pub(crate) fn post_beacon_execution_payload_envelope_ssz<T: BeaconChainTypes>(
@@ -77,40 +84,71 @@ pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
.boxed()
}
/// Publishes a signed execution payload envelope to the network.
/// TODO(gloas): Add gossip verification (BroadcastValidation::Gossip) before import.
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
) -> Result<Response, Rejection> {
) -> Result<Response<Body>, Rejection> {
let slot = envelope.message.slot;
let beacon_block_root = envelope.message.beacon_block_root;
let builder_index = envelope.message.builder_index;
// TODO(gloas): Replace this check once we have gossip validation.
if !chain.spec.is_gloas_scheduled() {
return Err(warp_utils::reject::custom_bad_request(
"Execution payload envelopes are not supported before the Gloas fork".into(),
));
}
// TODO(gloas): We should probably add validation here i.e. BroadcastValidation::Gossip
info!(
%slot,
%beacon_block_root,
builder_index = envelope.message.builder_index,
"Publishing signed execution payload envelope to network"
);
let signed_envelope = Arc::new(envelope);
// Publish to the network
crate::utils::publish_pubsub_message(
network_tx,
PubsubMessage::ExecutionPayload(Box::new(envelope)),
)
.map_err(|_| {
warn!(%slot, "Failed to publish execution payload envelope to network");
warp_utils::reject::custom_server_error(
"Unable to publish execution payload envelope to network".into(),
// The publish_fn is called inside process_execution_payload_envelope after consensus
// verification but before the EL call.
let envelope_for_publish = signed_envelope.clone();
let sender = network_tx.clone();
let publish_fn = move || {
info!(
%slot,
%beacon_block_root,
builder_index,
"Publishing signed execution payload envelope to network"
);
crate::utils::publish_pubsub_message(
&sender,
PubsubMessage::ExecutionPayload(Box::new((*envelope_for_publish).clone())),
)
})?;
.map_err(|_| {
warn!(%slot, "Failed to publish execution payload envelope to network");
EnvelopeError::InternalError(
"Unable to publish execution payload envelope to network".to_owned(),
)
})
};
let ctx = chain.gossip_verification_context();
let Ok(gossip_verifed_envelope) = GossipVerifiedEnvelope::new(signed_envelope, &ctx) else {
warn!(%slot, %beacon_block_root, "Execution payload envelope rejected");
return Err(warp_utils::reject::custom_bad_request(
"execution payload envelope rejected, gossip verification".to_string(),
));
};
// Import the envelope locally (runs state transition and notifies the EL).
chain
.process_execution_payload_envelope(
beacon_block_root,
gossip_verifed_envelope,
NotifyExecutionLayer::Yes,
BlockImportSource::HttpApi,
publish_fn,
)
.await
.map_err(|e| {
warn!(%slot, %beacon_block_root, reason = ?e, "Execution payload envelope rejected");
warp_utils::reject::custom_bad_request(format!(
"execution payload envelope rejected: {e:?}"
))
})?;
Ok(warp::reply().into_response())
}

View File

@@ -31,6 +31,8 @@ pub enum SyncRequestId {
BlobsByRange(BlobsByRangeRequestId),
/// Data columns by range request
DataColumnsByRange(DataColumnsByRangeRequestId),
/// Request searching for an execution payload envelope given a block root.
SinglePayloadEnvelope { id: SingleLookupReqId },
}
/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.

View File

@@ -1290,6 +1290,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root));
return None;
}
Err(BlockError::ParentEnvelopeUnknown { parent_root }) => {
debug!(
?block_root,
?parent_root,
"Parent envelope not yet available for gossip block"
);
self.send_sync_message(SyncMessage::UnknownParentEnvelope(
peer_id, block, block_root,
));
return None;
}
Err(e @ BlockError::BeaconChainError(_)) => {
debug!(
error = ?e,
@@ -1578,6 +1589,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Block with unknown parent attempted to be processed"
);
}
Err(BlockError::ParentEnvelopeUnknown { parent_root }) => {
debug!(
%block_root,
?parent_root,
"Parent envelope not yet available, need envelope lookup"
);
// Unlike ParentUnknown, this can legitimately happen during processing
// because the parent envelope may not have arrived yet. The lookup
// system will handle retrying via Action::ParentEnvelopeUnknown.
}
Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => {
debug!(
error = %e,

View File

@@ -541,6 +541,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}
/// Create a new `Work` event for an RPC payload envelope.
pub fn send_rpc_payload_envelope(
self: &Arc<Self>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Result<(), Error<T::EthSpec>> {
let process_fn =
self.clone()
.generate_rpc_envelope_process_fn(envelope, seen_timestamp, process_type);
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcPayloadEnvelope { process_fn },
})
}
/// Create a new `Work` event for some blobs, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
pub fn send_rpc_blobs(

View File

@@ -4,7 +4,7 @@ use crate::sync::BatchProcessResult;
use crate::sync::manager::CustodyBatchProcessResult;
use crate::sync::{
ChainId,
manager::{BlockProcessType, SyncMessage},
manager::{BlockProcessType, BlockProcessingResult, SyncMessage},
};
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
@@ -28,7 +28,9 @@ use store::KzgCommitment;
use tracing::{debug, debug_span, error, info, instrument, warn};
use types::data::FixedBlobSidecarList;
use types::kzg_ext::format_kzg_commitments;
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
use types::{
BlockImportSource, DataColumnSidecarList, Epoch, Hash256, SignedExecutionPayloadEnvelope,
};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
@@ -73,6 +75,80 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Box::pin(process_fn)
}
/// Returns an async closure which processes a payload envelope received via RPC.
pub fn generate_rpc_envelope_process_fn(
self: Arc<Self>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> AsyncFn {
let process_fn = async move {
self.process_rpc_envelope(envelope, seen_timestamp, process_type)
.await;
};
Box::pin(process_fn)
}
/// Process an execution payload envelope received via RPC.
async fn process_rpc_envelope(
self: Arc<Self>,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
_seen_timestamp: Duration,
process_type: BlockProcessType,
) {
let beacon_block_root = envelope.beacon_block_root();
// Verify the envelope using the gossip verification path (same checks apply to RPC)
let verified_envelope = match self.chain.verify_envelope_for_gossip(envelope).await {
Ok(verified) => verified,
Err(e) => {
debug!(
error = ?e,
?beacon_block_root,
"RPC payload envelope failed verification"
);
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: BlockProcessingResult::Err(BlockError::InternalError(format!(
"Envelope verification failed: {e:?}"
))),
});
return;
}
};
// Process the verified envelope
let result = self
.chain
.process_execution_payload_envelope(
beacon_block_root,
verified_envelope,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await;
let processing_result = match result {
Ok(status) => BlockProcessingResult::Ok(status),
Err(e) => {
debug!(
error = ?e,
?beacon_block_root,
"RPC payload envelope processing failed"
);
BlockProcessingResult::Err(BlockError::InternalError(format!(
"Envelope processing failed: {e:?}"
)))
}
};
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: processing_result,
});
}
/// Returns the `process_fn` and `ignore_fn` required when requeuing an RPC block.
pub fn generate_lookup_beacon_block_fns(
self: Arc<Self>,

View File

@@ -24,7 +24,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, trace, warn};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock,
SignedExecutionPayloadEnvelope,
};
/// Handles messages from the network and routes them to the appropriate service to be handled.
pub struct Router<T: BeaconChainTypes> {
@@ -327,10 +330,13 @@ impl<T: BeaconChainTypes> Router<T> {
Response::DataColumnsByRange(data_column) => {
self.on_data_columns_by_range_response(peer_id, app_request_id, data_column);
}
// TODO(EIP-7732): implement outgoing payload envelopes by range and root
// responses once sync manager requests them.
Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => {
debug!("Requesting envelopes by root and by range not supported yet");
Response::PayloadEnvelopesByRoot(envelope) => {
self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope);
}
// TODO(EIP-7732): implement outgoing payload envelopes by range responses once
// range sync requests them.
Response::PayloadEnvelopesByRange(_) => {
unreachable!()
}
// Light client responses should not be received
Response::LightClientBootstrap(_)
@@ -703,6 +709,40 @@ impl<T: BeaconChainTypes> Router<T> {
});
}
/// Handle a `PayloadEnvelopesByRoot` response from the peer.
pub fn on_payload_envelopes_by_root_response(
&mut self,
peer_id: PeerId,
app_request_id: AppRequestId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
let sync_request_id = match app_request_id {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::SinglePayloadEnvelope { .. } => id,
other => {
crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request");
return;
}
},
AppRequestId::Router => {
crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync");
return;
}
AppRequestId::Internal => unreachable!("Handled internally"),
};
trace!(
%peer_id,
"Received PayloadEnvelopesByRoot Response"
);
self.send_to_sync(SyncMessage::RpcPayloadEnvelope {
peer_id,
sync_request_id,
envelope,
seen_timestamp: timestamp_now(),
});
}
/// Handle a `BlobsByRoot` response from the peer.
pub fn on_blobs_by_root_response(
&mut self,

View File

@@ -109,6 +109,7 @@ pub type SingleLookupId = u32;
enum Action {
Retry,
ParentUnknown { parent_root: Hash256 },
ParentEnvelopeUnknown { parent_root: Hash256 },
Drop(/* reason: */ String),
Continue,
}
@@ -559,6 +560,19 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessType::SingleCustodyColumn(id) => {
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SinglePayloadEnvelope { id, block_root } => {
match result {
BlockProcessingResult::Ok(_) => {
self.continue_envelope_child_lookups(block_root, cx);
}
BlockProcessingResult::Err(e) => {
debug!(%id, error = ?e, "Payload envelope processing failed");
// TODO(EIP-7732): resolve awaiting_envelope on affected lookups so they can retry
}
_ => {}
}
return;
}
};
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
}
@@ -645,6 +659,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
request_state.revert_to_awaiting_processing()?;
Action::ParentUnknown { parent_root }
}
BlockError::ParentEnvelopeUnknown { parent_root } => {
// The parent block is known but its execution payload envelope is missing.
// Revert to awaiting processing and fetch the envelope via RPC.
request_state.revert_to_awaiting_processing()?;
Action::ParentEnvelopeUnknown { parent_root }
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
@@ -742,6 +762,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
)))
}
}
Action::ParentEnvelopeUnknown { parent_root } => {
let peers = lookup.all_peers();
lookup.set_awaiting_envelope(parent_root);
// Pick a peer to request the envelope from
let peer_id = peers.first().copied().ok_or_else(|| {
LookupRequestError::Failed("No peers available for envelope request".to_owned())
})?;
match cx.envelope_lookup_request(lookup_id, peer_id, parent_root) {
Ok(_) => {
debug!(
id = lookup_id,
?block_root,
?parent_root,
"Requesting missing parent envelope"
);
Ok(LookupResult::Pending)
}
Err(e) => Err(LookupRequestError::SendFailedNetwork(e)),
}
}
Action::Drop(reason) => {
// Drop with noop
Err(LookupRequestError::Failed(reason))
@@ -809,6 +849,33 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
/// Makes progress on lookups that were waiting for a parent envelope to be imported.
pub fn continue_envelope_child_lookups(
&mut self,
block_root: Hash256,
cx: &mut SyncNetworkContext<T>,
) {
let mut lookup_results = vec![];
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_envelope() == Some(block_root) {
lookup.resolve_awaiting_envelope();
debug!(
envelope_root = ?block_root,
id,
block_root = ?lookup.block_root(),
"Continuing lookup after envelope imported"
);
let result = lookup.continue_requests(cx);
lookup_results.push((*id, result));
}
}
for (id, result) in lookup_results {
self.on_lookup_result(id, result, "continue_envelope_child_lookups", cx);
}
}
/// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need
/// the parent to make progress to resolve, therefore we must drop them if the parent is
/// dropped.

View File

@@ -70,6 +70,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
awaiting_envelope: Option<Hash256>,
created: Instant,
pub(crate) span: Span,
}
@@ -104,6 +105,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
block_root: requested_block_root,
awaiting_parent,
awaiting_envelope: None,
created: Instant::now(),
span: lookup_span,
}
@@ -144,6 +146,20 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.awaiting_parent = None;
}
pub fn awaiting_envelope(&self) -> Option<Hash256> {
self.awaiting_envelope
}
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
pub fn set_awaiting_envelope(&mut self, parent_root: Hash256) {
self.awaiting_envelope = Some(parent_root);
}
/// Mark this lookup as no longer awaiting a parent envelope.
pub fn resolve_awaiting_envelope(&mut self) {
self.awaiting_envelope = None;
}
/// Returns the time elapsed since this lookup was created
pub fn elapsed_since_created(&self) -> Duration {
self.created.elapsed()
@@ -185,6 +201,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.awaiting_envelope.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| match &self.component_requests {
// If components are waiting for the block request to complete, here we should
@@ -287,7 +304,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
expected_blobs: usize,
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
let awaiting_event = self.awaiting_parent.is_some() || self.awaiting_envelope.is_some();
let request =
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
@@ -331,7 +348,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent {
} else if !awaiting_event {
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = request.get_state_mut().maybe_start_processing() {

View File

@@ -74,7 +74,8 @@ use strum::IntoStaticStr;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
@@ -133,6 +134,14 @@ pub enum SyncMessage<E: EthSpec> {
seen_timestamp: Duration,
},
/// An execution payload envelope has been received from the RPC.
RpcPayloadEnvelope {
sync_request_id: SyncRequestId,
peer_id: PeerId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<E>>>,
seen_timestamp: Duration,
},
/// A block with an unknown parent has been received.
UnknownParentBlock(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
@@ -142,6 +151,9 @@ pub enum SyncMessage<E: EthSpec> {
/// A data column with an unknown parent has been received.
UnknownParentDataColumn(PeerId, Arc<DataColumnSidecar<E>>),
/// A block's parent is known but its execution payload envelope has not been received yet.
UnknownParentEnvelope(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
/// A peer has sent an attestation that references a block that is unknown. This triggers the
/// manager to attempt to find the block matching the unknown hash.
UnknownBlockHashFromAttestation(PeerId, Hash256),
@@ -184,6 +196,7 @@ pub enum BlockProcessType {
SingleBlock { id: Id },
SingleBlob { id: Id },
SingleCustodyColumn(Id),
SinglePayloadEnvelope { id: Id, block_root: Hash256 },
}
impl BlockProcessType {
@@ -191,7 +204,8 @@ impl BlockProcessType {
match self {
BlockProcessType::SingleBlock { id }
| BlockProcessType::SingleBlob { id }
| BlockProcessType::SingleCustodyColumn(id) => *id,
| BlockProcessType::SingleCustodyColumn(id)
| BlockProcessType::SinglePayloadEnvelope { id, .. } => *id,
}
}
}
@@ -505,6 +519,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::DataColumnsByRange(req_id) => {
self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::SinglePayloadEnvelope { id } => {
self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error))
}
}
}
@@ -839,6 +856,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} => {
self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp)
}
SyncMessage::RpcPayloadEnvelope {
sync_request_id,
peer_id,
envelope,
seen_timestamp,
} => self.rpc_payload_envelope_received(
sync_request_id,
peer_id,
envelope,
seen_timestamp,
),
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
let block_slot = block.slot();
let parent_root = block.parent_root();
@@ -900,6 +928,27 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
}
SyncMessage::UnknownParentEnvelope(peer_id, block, block_root) => {
let block_slot = block.slot();
let parent_root = block.parent_root();
debug!(
%block_root,
%parent_root,
"Parent envelope not yet available, creating lookup"
);
self.handle_unknown_parent(
peer_id,
block_root,
parent_root,
block_slot,
BlockComponent::Block(DownloadResult {
value: block.block_cloned(),
block_root,
seen_timestamp: timestamp_now(),
peer_group: PeerGroup::from_single(peer_id),
}),
);
}
SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => {
if !self.notified_unknown_roots.contains(&(peer_id, block_root)) {
self.notified_unknown_roots.insert((peer_id, block_root));
@@ -1200,6 +1249,59 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
fn rpc_payload_envelope_received(
&mut self,
sync_request_id: SyncRequestId,
peer_id: PeerId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
seen_timestamp: Duration,
) {
match sync_request_id {
SyncRequestId::SinglePayloadEnvelope { id } => self.on_single_envelope_response(
id,
peer_id,
RpcEvent::from_chunk(envelope, seen_timestamp),
),
_ => {
crit!(%peer_id, "bad request id for payload envelope");
}
}
}
fn on_single_envelope_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
if let Some(resp) = self
.network
.on_single_envelope_response(id, peer_id, rpc_event)
{
match resp {
Ok((envelope, seen_timestamp)) => {
let block_root = envelope.beacon_block_root();
debug!(
?block_root,
%id,
"Downloaded payload envelope, sending for processing"
);
if let Err(e) = self.network.send_envelope_for_processing(
id.req_id,
envelope,
seen_timestamp,
block_root,
) {
error!(error = ?e, "Failed to send envelope for processing");
}
}
Err(e) => {
debug!(error = ?e, %id, "Payload envelope download failed");
}
}
}
}
fn on_single_blob_response(
&mut self,
id: SingleLookupReqId,

View File

@@ -37,6 +37,7 @@ pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
};
#[cfg(test)]
use slot_clock::SlotClock;
@@ -52,7 +53,7 @@ use tracing::{Span, debug, debug_span, error, warn};
use types::data::FixedBlobSidecarList;
use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, Slot,
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
pub mod custody;
@@ -213,6 +214,9 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A mapping of active DataColumnsByRange requests
data_columns_by_range_requests:
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
/// A mapping of active PayloadEnvelopesByRoot requests
payload_envelopes_by_root_requests:
ActiveRequests<SingleLookupReqId, PayloadEnvelopesByRootRequestItems<T::EthSpec>>,
/// Mapping of active custody column requests for a block root
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
@@ -298,6 +302,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"),
custody_by_root_requests: <_>::default(),
components_by_range_requests: FnvHashMap::default(),
custody_backfill_data_column_batch_requests: FnvHashMap::default(),
@@ -326,6 +331,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_root_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
@@ -361,12 +367,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.active_requests_of_peer(peer_id)
.into_iter()
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
let envelope_by_root_ids = payload_envelopes_by_root_requests
.active_requests_of_peer(peer_id)
.into_iter()
.map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id });
blocks_by_root_ids
.chain(blobs_by_root_ids)
.chain(data_column_by_root_ids)
.chain(blocks_by_range_ids)
.chain(blobs_by_range_ids)
.chain(data_column_by_range_ids)
.chain(envelope_by_root_ids)
.collect()
}
@@ -423,6 +434,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_root_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
@@ -445,6 +457,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers())
.chain(data_columns_by_range_requests.iter_request_peers())
.chain(payload_envelopes_by_root_requests.iter_request_peers())
{
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
}
@@ -927,6 +940,57 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(LookupRequestResult::RequestSent(id.req_id))
}
/// Request a payload envelope for `block_root` from a peer.
pub fn envelope_lookup_request(
&mut self,
lookup_id: SingleLookupId,
peer_id: PeerId,
block_root: Hash256,
) -> Result<Id, RpcRequestSendError> {
let id = SingleLookupReqId {
lookup_id,
req_id: self.next_id(),
};
let request = PayloadEnvelopesByRootSingleRequest(block_root);
let network_request = RequestType::PayloadEnvelopesByRoot(
request
.into_request(&self.fork_context)
.map_err(RpcRequestSendError::InternalError)?,
);
self.network_send
.send(NetworkMessage::SendRequest {
peer_id,
request: network_request,
app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }),
})
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
debug!(
method = "PayloadEnvelopesByRoot",
?block_root,
peer = %peer_id,
%id,
"Sync RPC request sent"
);
let request_span = debug_span!(
parent: Span::current(),
"lh_outgoing_envelope_by_root_request",
%block_root,
);
self.payload_envelopes_by_root_requests.insert(
id,
peer_id,
true,
PayloadEnvelopesByRootRequestItems::new(request),
request_span,
);
Ok(id.req_id)
}
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
/// - If we have a downloaded but not yet processed block
/// - If the da_checker has a pending block
@@ -1435,6 +1499,27 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_envelope_response(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>> {
let resp = self
.payload_envelopes_by_root_requests
.on_response(id, rpc_event);
let resp = resp.map(|res| {
res.and_then(|(mut envelopes, seen_timestamp)| {
match envelopes.pop() {
Some(envelope) => Ok((envelope, seen_timestamp)),
// Should never happen, request items enforces at least 1 chunk.
None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()),
}
})
});
self.on_rpc_response_result(resp, peer_id)
}
pub(crate) fn on_single_blob_response(
&mut self,
id: SingleLookupReqId,
@@ -1610,6 +1695,33 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
}
pub fn send_envelope_for_processing(
&self,
id: Id,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
seen_timestamp: Duration,
block_root: Hash256,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
debug!(?block_root, ?id, "Sending payload envelope for processing");
beacon_processor
.send_rpc_payload_envelope(
envelope,
seen_timestamp,
BlockProcessType::SinglePayloadEnvelope { id, block_root },
)
.map_err(|e| {
error!(
error = ?e,
"Failed to send sync envelope to processor"
);
SendErrorProcessor::SendError
})
}
pub fn send_blobs_for_processing(
&self,
id: Id,

View File

@@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems;
pub use data_columns_by_root::{
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
};
pub use payload_envelopes_by_root::{
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
};
use crate::metrics;
@@ -27,6 +30,7 @@ mod blocks_by_range;
mod blocks_by_root;
mod data_columns_by_range;
mod data_columns_by_root;
mod payload_envelopes_by_root;
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupVerifyError {

View File

@@ -0,0 +1,53 @@
use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest;
use std::sync::Arc;
use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope};
use super::{ActiveRequestItems, LookupVerifyError};
#[derive(Debug, Copy, Clone)]
pub struct PayloadEnvelopesByRootSingleRequest(pub Hash256);
impl PayloadEnvelopesByRootSingleRequest {
pub fn into_request(
self,
fork_context: &ForkContext,
) -> Result<PayloadEnvelopesByRootRequest, String> {
PayloadEnvelopesByRootRequest::new(vec![self.0], fork_context)
}
}
pub struct PayloadEnvelopesByRootRequestItems<E: EthSpec> {
request: PayloadEnvelopesByRootSingleRequest,
items: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
}
impl<E: EthSpec> PayloadEnvelopesByRootRequestItems<E> {
pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self {
Self {
request,
items: vec![],
}
}
}
impl<E: EthSpec> ActiveRequestItems for PayloadEnvelopesByRootRequestItems<E> {
type Item = Arc<SignedExecutionPayloadEnvelope<E>>;
/// Append a response to the single chunk request. If the chunk is valid, the request is
/// resolved immediately.
/// The active request SHOULD be dropped after `add_response` returns an error
fn add(&mut self, envelope: Self::Item) -> Result<bool, LookupVerifyError> {
let beacon_block_root = envelope.beacon_block_root();
if self.request.0 != beacon_block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(beacon_block_root));
}
self.items.push(envelope);
// Always returns true, payload envelopes by root expects a single response
Ok(true)
}
fn consume(&mut self) -> Vec<Self::Item> {
std::mem::take(&mut self.items)
}
}