single envelope lookup

This commit is contained in:
Eitan Seri- Levi
2026-03-03 23:11:40 -08:00
parent bf18f8a756
commit c396272c00
5 changed files with 94 additions and 18 deletions

View File

@@ -896,9 +896,9 @@ impl<S: SlotClock> ReprocessQueue<S> {
.ready_work_tx
.try_send(ReadyWork::Envelope(envelope))
.is_err()
{
error!(?block_root, "Failed to send envelope after timeout");
}
{
error!(?block_root, "Failed to send envelope after timeout");
}
}
InboundEvent::ReadyAttestation(queued_id) => {
metrics::inc_counter(

View File

@@ -109,6 +109,7 @@ pub type SingleLookupId = u32;
enum Action {
Retry,
ParentUnknown { parent_root: Hash256 },
ParentEnvelopeUnknown { parent_root: Hash256 },
Drop(/* reason: */ String),
Continue,
}
@@ -559,14 +560,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessType::SingleCustodyColumn(id) => {
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SinglePayloadEnvelope { id } => {
// TODO(EIP-7732): Wire into lookup state machine once envelope lookups are
// fully integrated. For now, just log the result.
debug!(
%id,
?result,
"Payload envelope processing result (not yet wired to lookups)"
);
BlockProcessType::SinglePayloadEnvelope { id, block_root } => {
match result {
BlockProcessingResult::Ok(_) => {
self.continue_envelope_child_lookups(block_root, cx);
}
BlockProcessingResult::Err(e) => {
debug!(%id, error = ?e, "Payload envelope processing failed");
// TODO(EIP-7732): resolve awaiting_envelope on affected lookups so they can retry
}
_ => {}
}
return;
}
};
@@ -655,6 +659,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
request_state.revert_to_awaiting_processing()?;
Action::ParentUnknown { parent_root }
}
BlockError::ParentEnvelopeUnknown { parent_root } => {
// The parent block is known but its execution payload envelope is missing.
// Revert to awaiting processing and fetch the envelope via RPC.
request_state.revert_to_awaiting_processing()?;
Action::ParentEnvelopeUnknown { parent_root }
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
@@ -752,6 +762,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
)))
}
}
Action::ParentEnvelopeUnknown { parent_root } => {
let peers = lookup.all_peers();
lookup.set_awaiting_envelope(parent_root);
// Pick a peer to request the envelope from
let peer_id = peers.first().copied().ok_or_else(|| {
LookupRequestError::Failed("No peers available for envelope request".to_owned())
})?;
match cx.envelope_lookup_request(lookup_id, peer_id, parent_root) {
Ok(_) => {
debug!(
id = lookup_id,
?block_root,
?parent_root,
"Requesting missing parent envelope"
);
Ok(LookupResult::Pending)
}
Err(e) => Err(LookupRequestError::SendFailedNetwork(e)),
}
}
Action::Drop(reason) => {
// Drop with noop
Err(LookupRequestError::Failed(reason))
@@ -819,6 +849,33 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
/// Makes progress on lookups that were waiting for a parent envelope to be imported.
pub fn continue_envelope_child_lookups(
&mut self,
block_root: Hash256,
cx: &mut SyncNetworkContext<T>,
) {
let mut lookup_results = vec![];
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_envelope() == Some(block_root) {
lookup.resolve_awaiting_envelope();
debug!(
envelope_root = ?block_root,
id,
block_root = ?lookup.block_root(),
"Continuing lookup after envelope imported"
);
let result = lookup.continue_requests(cx);
lookup_results.push((*id, result));
}
}
for (id, result) in lookup_results {
self.on_lookup_result(id, result, "continue_envelope_child_lookups", cx);
}
}
/// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need
/// the parent to make progress to resolve, therefore we must drop them if the parent is
/// dropped.

View File

@@ -70,6 +70,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
awaiting_envelope: Option<Hash256>,
created: Instant,
pub(crate) span: Span,
}
@@ -104,6 +105,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
block_root: requested_block_root,
awaiting_parent,
awaiting_envelope: None,
created: Instant::now(),
span: lookup_span,
}
@@ -144,6 +146,20 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.awaiting_parent = None;
}
pub fn awaiting_envelope(&self) -> Option<Hash256> {
self.awaiting_envelope
}
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
pub fn set_awaiting_envelope(&mut self, parent_root: Hash256) {
self.awaiting_envelope = Some(parent_root);
}
/// Mark this lookup as no longer awaiting a parent envelope.
pub fn resolve_awaiting_envelope(&mut self) {
self.awaiting_envelope = None;
}
/// Returns the time elapsed since this lookup was created
pub fn elapsed_since_created(&self) -> Duration {
self.created.elapsed()
@@ -185,6 +201,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.awaiting_envelope.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| match &self.component_requests {
// If components are waiting for the block request to complete, here we should
@@ -287,7 +304,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
expected_blobs: usize,
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
let awaiting_event = self.awaiting_parent.is_some() || self.awaiting_envelope.is_some();
let request =
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
@@ -331,7 +348,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent {
} else if !awaiting_event {
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
if let Some(result) = request.get_state_mut().maybe_start_processing() {

View File

@@ -193,7 +193,7 @@ pub enum BlockProcessType {
SingleBlock { id: Id },
SingleBlob { id: Id },
SingleCustodyColumn(Id),
SinglePayloadEnvelope { id: Id },
SinglePayloadEnvelope { id: Id, block_root: Hash256 },
}
impl BlockProcessType {
@@ -202,7 +202,7 @@ impl BlockProcessType {
BlockProcessType::SingleBlock { id }
| BlockProcessType::SingleBlob { id }
| BlockProcessType::SingleCustodyColumn(id)
| BlockProcessType::SinglePayloadEnvelope { id } => *id,
| BlockProcessType::SinglePayloadEnvelope { id, .. } => *id,
}
}
}
@@ -1256,8 +1256,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
{
match resp {
Ok((envelope, seen_timestamp)) => {
let block_root = envelope.beacon_block_root();
debug!(
block_root = ?envelope.beacon_block_root(),
?block_root,
%id,
"Downloaded payload envelope, sending for processing"
);
@@ -1265,6 +1266,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
id.req_id,
envelope,
seen_timestamp,
block_root,
) {
error!(error = ?e, "Failed to send envelope for processing");
}

View File

@@ -1705,18 +1705,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
id: Id,
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
seen_timestamp: Duration,
block_root: Hash256,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
let block_root = envelope.beacon_block_root();
debug!(?block_root, ?id, "Sending payload envelope for processing");
beacon_processor
.send_rpc_payload_envelope(
envelope,
seen_timestamp,
BlockProcessType::SinglePayloadEnvelope { id },
BlockProcessType::SinglePayloadEnvelope { id, block_root },
)
.map_err(|e| {
error!(