mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-29 20:27:14 +00:00
Compare commits
18 Commits
fc-complia
...
pr-9039-te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7000fc0d1 | ||
|
|
51e295229b | ||
|
|
269e474f49 | ||
|
|
ca59cf453e | ||
|
|
755b8d8510 | ||
|
|
20f0c7bf4b | ||
|
|
34e5f89537 | ||
|
|
3112792435 | ||
|
|
e7dd95131d | ||
|
|
b333841229 | ||
|
|
f897215684 | ||
|
|
214e3ce9f0 | ||
|
|
1cd4d57204 | ||
|
|
3523804515 | ||
|
|
86ddd0d88d | ||
|
|
93cfa0ffdb | ||
|
|
1eefef610e | ||
|
|
09e9a54314 |
@@ -60,6 +60,7 @@ use crate::execution_payload::{
|
|||||||
};
|
};
|
||||||
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
||||||
use crate::observed_block_producers::SeenBlock;
|
use crate::observed_block_producers::SeenBlock;
|
||||||
|
use crate::payload_envelope_verification::EnvelopeError;
|
||||||
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
|
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
|
||||||
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
|
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -320,6 +321,18 @@ pub enum BlockError {
|
|||||||
bid_parent_root: Hash256,
|
bid_parent_root: Hash256,
|
||||||
block_parent_root: Hash256,
|
block_parent_root: Hash256,
|
||||||
},
|
},
|
||||||
|
/// The block is known but its parent execution payload envelope has not been received yet.
|
||||||
|
///
|
||||||
|
/// ## Peer scoring
|
||||||
|
///
|
||||||
|
/// It's unclear if this block is valid, but it cannot be fully verified without the parent's
|
||||||
|
/// execution payload envelope.
|
||||||
|
ParentEnvelopeUnknown { parent_root: Hash256 },
|
||||||
|
|
||||||
|
PayloadEnvelopeError {
|
||||||
|
e: Box<EnvelopeError>,
|
||||||
|
penalize_peer: bool,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Which specific signature(s) are invalid in a SignedBeaconBlock
|
/// Which specific signature(s) are invalid in a SignedBeaconBlock
|
||||||
@@ -486,6 +499,36 @@ impl From<ArithError> for BlockError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<EnvelopeError> for BlockError {
|
||||||
|
fn from(e: EnvelopeError) -> Self {
|
||||||
|
let penalize_peer = match &e {
|
||||||
|
// REJECT per spec: peer sent invalid envelope data
|
||||||
|
EnvelopeError::BadSignature
|
||||||
|
| EnvelopeError::BuilderIndexMismatch { .. }
|
||||||
|
| EnvelopeError::BlockHashMismatch { .. }
|
||||||
|
| EnvelopeError::SlotMismatch { .. }
|
||||||
|
| EnvelopeError::IncorrectBlockProposer { .. } => true,
|
||||||
|
// IGNORE per spec: not the peer's fault
|
||||||
|
EnvelopeError::BlockRootUnknown { .. }
|
||||||
|
| EnvelopeError::PriorToFinalization { .. }
|
||||||
|
| EnvelopeError::UnknownValidator { .. } => false,
|
||||||
|
// Internal errors: not the peer's fault
|
||||||
|
EnvelopeError::BeaconChainError(_)
|
||||||
|
| EnvelopeError::BeaconStateError(_)
|
||||||
|
| EnvelopeError::BlockProcessingError(_)
|
||||||
|
| EnvelopeError::EnvelopeProcessingError(_)
|
||||||
|
| EnvelopeError::ExecutionPayloadError(_)
|
||||||
|
| EnvelopeError::BlockError(_)
|
||||||
|
| EnvelopeError::InternalError(_)
|
||||||
|
| EnvelopeError::OptimisticSyncNotSupported { .. } => false,
|
||||||
|
};
|
||||||
|
BlockError::PayloadEnvelopeError {
|
||||||
|
e: Box::new(e),
|
||||||
|
penalize_peer,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Stores information about verifying a payload against an execution engine.
|
/// Stores information about verifying a payload against an execution engine.
|
||||||
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
|
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
|
||||||
pub struct PayloadVerificationOutcome {
|
pub struct PayloadVerificationOutcome {
|
||||||
@@ -897,12 +940,26 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(gloas) The following validation can only be completed once fork choice has been implemented:
|
// Check that we've received the parent envelope. If not, issue a single envelope
|
||||||
// The block's parent execution payload (defined by bid.parent_block_hash) has been seen
|
// lookup for the parent and queue this block in the reprocess queue.
|
||||||
// (via gossip or non-gossip sources) (a client MAY queue blocks for processing
|
//
|
||||||
// once the parent payload is retrieved). If execution_payload verification of block's execution
|
// The anchor block (proto-array root) is implicitly considered to have its payload
|
||||||
// payload parent by an execution node is complete, verify the block's execution payload
|
// received: there is no envelope to fetch for the anchor (per spec, the anchor is
|
||||||
// parent (defined by bid.parent_block_hash) passes all validation.
|
// never added to `store.payloads`), and the anchor is trusted by definition.
|
||||||
|
let parent_is_gloas = chain
|
||||||
|
.spec
|
||||||
|
.fork_name_at_slot::<T::EthSpec>(parent_block.slot)
|
||||||
|
.gloas_enabled();
|
||||||
|
let parent_is_anchor = parent_block.parent_root.is_none();
|
||||||
|
|
||||||
|
if parent_is_gloas
|
||||||
|
&& !parent_is_anchor
|
||||||
|
&& !fork_choice_read_lock.is_payload_received(&block.message().parent_root())
|
||||||
|
{
|
||||||
|
return Err(BlockError::ParentEnvelopeUnknown {
|
||||||
|
parent_root: block.message().parent_root(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
drop(fork_choice_read_lock);
|
drop(fork_choice_read_lock);
|
||||||
|
|
||||||
@@ -1951,7 +2008,6 @@ fn load_parent<T: BeaconChainTypes, B: AsBlock<T::EthSpec>>(
|
|||||||
// Retrieve any state that is advanced through to at most `block.slot()`: this is
|
// Retrieve any state that is advanced through to at most `block.slot()`: this is
|
||||||
// particularly important if `block` descends from the finalized/split block, but at a slot
|
// particularly important if `block` descends from the finalized/split block, but at a slot
|
||||||
// prior to the finalized slot (which is invalid and inaccessible in our DB schema).
|
// prior to the finalized slot (which is invalid and inaccessible in our DB schema).
|
||||||
//
|
|
||||||
let (parent_state_root, state) = chain
|
let (parent_state_root, state) = chain
|
||||||
.store
|
.store
|
||||||
.get_advanced_hot_state(root, block.slot(), parent_block.state_root())?
|
.get_advanced_hot_state(root, block.slot(), parent_block.state_root())?
|
||||||
|
|||||||
@@ -417,6 +417,9 @@ pub enum Work<E: EthSpec> {
|
|||||||
RpcBlobs {
|
RpcBlobs {
|
||||||
process_fn: AsyncFn,
|
process_fn: AsyncFn,
|
||||||
},
|
},
|
||||||
|
RpcPayloadEnvelope {
|
||||||
|
process_fn: AsyncFn,
|
||||||
|
},
|
||||||
RpcCustodyColumn(AsyncFn),
|
RpcCustodyColumn(AsyncFn),
|
||||||
ColumnReconstruction(AsyncFn),
|
ColumnReconstruction(AsyncFn),
|
||||||
IgnoredRpcBlock {
|
IgnoredRpcBlock {
|
||||||
@@ -483,6 +486,7 @@ pub enum WorkType {
|
|||||||
GossipLightClientOptimisticUpdate,
|
GossipLightClientOptimisticUpdate,
|
||||||
RpcBlock,
|
RpcBlock,
|
||||||
RpcBlobs,
|
RpcBlobs,
|
||||||
|
RpcPayloadEnvelope,
|
||||||
RpcCustodyColumn,
|
RpcCustodyColumn,
|
||||||
ColumnReconstruction,
|
ColumnReconstruction,
|
||||||
IgnoredRpcBlock,
|
IgnoredRpcBlock,
|
||||||
@@ -545,6 +549,7 @@ impl<E: EthSpec> Work<E> {
|
|||||||
Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences,
|
Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences,
|
||||||
Work::RpcBlock { .. } => WorkType::RpcBlock,
|
Work::RpcBlock { .. } => WorkType::RpcBlock,
|
||||||
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
|
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
|
||||||
|
Work::RpcPayloadEnvelope { .. } => WorkType::RpcPayloadEnvelope,
|
||||||
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
|
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
|
||||||
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
|
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
|
||||||
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
|
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
|
||||||
@@ -1183,7 +1188,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
Work::GossipLightClientOptimisticUpdate { .. } => work_queues
|
Work::GossipLightClientOptimisticUpdate { .. } => work_queues
|
||||||
.lc_gossip_optimistic_update_queue
|
.lc_gossip_optimistic_update_queue
|
||||||
.push(work, work_id),
|
.push(work, work_id),
|
||||||
Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => {
|
Work::RpcBlock { .. }
|
||||||
|
| Work::IgnoredRpcBlock { .. }
|
||||||
|
| Work::RpcPayloadEnvelope { .. } => {
|
||||||
work_queues.rpc_block_queue.push(work, work_id)
|
work_queues.rpc_block_queue.push(work, work_id)
|
||||||
}
|
}
|
||||||
Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id),
|
Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id),
|
||||||
@@ -1318,7 +1325,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
WorkType::GossipLightClientOptimisticUpdate => {
|
WorkType::GossipLightClientOptimisticUpdate => {
|
||||||
work_queues.lc_gossip_optimistic_update_queue.len()
|
work_queues.lc_gossip_optimistic_update_queue.len()
|
||||||
}
|
}
|
||||||
WorkType::RpcBlock => work_queues.rpc_block_queue.len(),
|
WorkType::RpcBlock | WorkType::RpcPayloadEnvelope => {
|
||||||
|
work_queues.rpc_block_queue.len()
|
||||||
|
}
|
||||||
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => {
|
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => {
|
||||||
work_queues.rpc_blob_queue.len()
|
work_queues.rpc_blob_queue.len()
|
||||||
}
|
}
|
||||||
@@ -1513,6 +1522,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
|
|||||||
beacon_block_root: _,
|
beacon_block_root: _,
|
||||||
}
|
}
|
||||||
| Work::RpcBlobs { process_fn }
|
| Work::RpcBlobs { process_fn }
|
||||||
|
| Work::RpcPayloadEnvelope { process_fn }
|
||||||
| Work::RpcCustodyColumn(process_fn)
|
| Work::RpcCustodyColumn(process_fn)
|
||||||
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
|
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
|
||||||
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
|
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
|
||||||
|
|||||||
@@ -5,7 +5,11 @@ use crate::version::{
|
|||||||
ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header,
|
ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header,
|
||||||
execution_optimistic_finalized_beacon_response,
|
execution_optimistic_finalized_beacon_response,
|
||||||
};
|
};
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope;
|
||||||
|
use beacon_chain::{
|
||||||
|
BeaconChain, BeaconChainTypes, NotifyExecutionLayer,
|
||||||
|
payload_envelope_verification::EnvelopeError,
|
||||||
|
};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use eth2::types as api_types;
|
use eth2::types as api_types;
|
||||||
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
|
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
|
||||||
@@ -15,7 +19,7 @@ use ssz::{Decode, Encode};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
use types::SignedExecutionPayloadEnvelope;
|
use types::{BlockImportSource, SignedExecutionPayloadEnvelope};
|
||||||
use warp::{
|
use warp::{
|
||||||
Filter, Rejection, Reply,
|
Filter, Rejection, Reply,
|
||||||
hyper::{Body, Response},
|
hyper::{Body, Response},
|
||||||
@@ -93,33 +97,66 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
|||||||
) -> Result<Response<Body>, Rejection> {
|
) -> Result<Response<Body>, Rejection> {
|
||||||
let slot = envelope.slot();
|
let slot = envelope.slot();
|
||||||
let beacon_block_root = envelope.message.beacon_block_root;
|
let beacon_block_root = envelope.message.beacon_block_root;
|
||||||
|
let builder_index = envelope.message.builder_index;
|
||||||
|
|
||||||
// TODO(gloas): Replace this check once we have gossip validation.
|
|
||||||
if !chain.spec.is_gloas_scheduled() {
|
if !chain.spec.is_gloas_scheduled() {
|
||||||
return Err(warp_utils::reject::custom_bad_request(
|
return Err(warp_utils::reject::custom_bad_request(
|
||||||
"Execution payload envelopes are not supported before the Gloas fork".into(),
|
"Execution payload envelopes are not supported before the Gloas fork".into(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(gloas): We should probably add validation here i.e. BroadcastValidation::Gossip
|
let signed_envelope = Arc::new(envelope);
|
||||||
info!(
|
|
||||||
%slot,
|
|
||||||
%beacon_block_root,
|
|
||||||
builder_index = envelope.message.builder_index,
|
|
||||||
"Publishing signed execution payload envelope to network"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Publish to the network
|
// The publish_fn is called inside process_execution_payload_envelope after consensus
|
||||||
crate::utils::publish_pubsub_message(
|
// verification but before the EL call.
|
||||||
network_tx,
|
let envelope_for_publish = signed_envelope.clone();
|
||||||
PubsubMessage::ExecutionPayload(Box::new(envelope)),
|
let sender = network_tx.clone();
|
||||||
)
|
let publish_fn = move || {
|
||||||
.map_err(|_| {
|
info!(
|
||||||
warn!(%slot, "Failed to publish execution payload envelope to network");
|
%slot,
|
||||||
warp_utils::reject::custom_server_error(
|
%beacon_block_root,
|
||||||
"Unable to publish execution payload envelope to network".into(),
|
builder_index,
|
||||||
|
"Publishing signed execution payload envelope to network"
|
||||||
|
);
|
||||||
|
crate::utils::publish_pubsub_message(
|
||||||
|
&sender,
|
||||||
|
PubsubMessage::ExecutionPayload(Box::new((*envelope_for_publish).clone())),
|
||||||
)
|
)
|
||||||
})?;
|
.map_err(|_| {
|
||||||
|
warn!(%slot, "Failed to publish execution payload envelope to network");
|
||||||
|
EnvelopeError::InternalError(
|
||||||
|
"Unable to publish execution payload envelope to network".to_owned(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let ctx = chain.payload_envelope_gossip_verification_context();
|
||||||
|
let gossip_verified_envelope = match GossipVerifiedEnvelope::new(signed_envelope, &ctx) {
|
||||||
|
Ok(envelope) => envelope,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(%slot, %beacon_block_root, error = ?e, "Execution payload envelope rejected");
|
||||||
|
return Err(warp_utils::reject::custom_bad_request(format!(
|
||||||
|
"execution payload envelope rejected: {e:?}",
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Import the envelope locally (runs state transition and notifies the EL).
|
||||||
|
chain
|
||||||
|
.process_execution_payload_envelope(
|
||||||
|
beacon_block_root,
|
||||||
|
gossip_verified_envelope,
|
||||||
|
NotifyExecutionLayer::Yes,
|
||||||
|
BlockImportSource::HttpApi,
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!(%slot, %beacon_block_root, reason = ?e, "Execution payload envelope rejected");
|
||||||
|
warp_utils::reject::custom_bad_request(format!(
|
||||||
|
"execution payload envelope rejected: {e:?}"
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
Ok(warp::reply().into_response())
|
Ok(warp::reply().into_response())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ pub enum SyncRequestId {
|
|||||||
BlobsByRange(BlobsByRangeRequestId),
|
BlobsByRange(BlobsByRangeRequestId),
|
||||||
/// Data columns by range request
|
/// Data columns by range request
|
||||||
DataColumnsByRange(DataColumnsByRangeRequestId),
|
DataColumnsByRange(DataColumnsByRangeRequestId),
|
||||||
|
/// Request searching for an execution payload envelope given a block root.
|
||||||
|
SinglePayloadEnvelope { id: SingleLookupReqId },
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
|
/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
|
||||||
|
|||||||
@@ -1735,6 +1735,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root));
|
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root));
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
Err(BlockError::ParentEnvelopeUnknown { parent_root }) => {
|
||||||
|
debug!(
|
||||||
|
?block_root,
|
||||||
|
?parent_root,
|
||||||
|
"Parent envelope not yet available for gossip block"
|
||||||
|
);
|
||||||
|
self.send_sync_message(SyncMessage::UnknownParentEnvelope(
|
||||||
|
peer_id, block, block_root,
|
||||||
|
));
|
||||||
|
return None;
|
||||||
|
}
|
||||||
Err(e @ BlockError::BeaconChainError(_)) => {
|
Err(e @ BlockError::BeaconChainError(_)) => {
|
||||||
debug!(
|
debug!(
|
||||||
error = ?e,
|
error = ?e,
|
||||||
@@ -1824,7 +1835,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
// BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
|
// BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
|
||||||
Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => {
|
Err(e @ BlockError::InternalError(_))
|
||||||
|
| Err(e @ BlockError::BlobNotRequired(_))
|
||||||
|
| Err(e @ BlockError::PayloadEnvelopeError { .. }) => {
|
||||||
error!(error = %e, "Internal block gossip validation error");
|
error!(error = %e, "Internal block gossip validation error");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -2021,6 +2034,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
"Block with unknown parent attempted to be processed"
|
"Block with unknown parent attempted to be processed"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Err(BlockError::ParentEnvelopeUnknown { parent_root }) => {
|
||||||
|
debug!(
|
||||||
|
%block_root,
|
||||||
|
?parent_root,
|
||||||
|
"Parent envelope not yet available, need envelope lookup"
|
||||||
|
);
|
||||||
|
// Unlike ParentUnknown, this can legitimately happen during processing
|
||||||
|
// because the parent envelope may not have arrived yet. The lookup
|
||||||
|
// system will handle retrying via Action::ParentEnvelopeUnknown.
|
||||||
|
}
|
||||||
Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => {
|
Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => {
|
||||||
debug!(
|
debug!(
|
||||||
error = %e,
|
error = %e,
|
||||||
|
|||||||
@@ -567,6 +567,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a new `Work` event for an RPC payload envelope.
|
||||||
|
pub fn send_rpc_payload_envelope(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||||
|
seen_timestamp: Duration,
|
||||||
|
process_type: BlockProcessType,
|
||||||
|
) -> Result<(), Error<T::EthSpec>> {
|
||||||
|
let process_fn =
|
||||||
|
self.clone()
|
||||||
|
.generate_rpc_envelope_process_fn(envelope, seen_timestamp, process_type);
|
||||||
|
self.try_send(BeaconWorkEvent {
|
||||||
|
drop_during_sync: false,
|
||||||
|
work: Work::RpcPayloadEnvelope { process_fn },
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a new `Work` event for some blobs, where the result from computation (if any) is
|
/// Create a new `Work` event for some blobs, where the result from computation (if any) is
|
||||||
/// sent to the other side of `result_tx`.
|
/// sent to the other side of `result_tx`.
|
||||||
pub fn send_rpc_blobs(
|
pub fn send_rpc_blobs(
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use crate::sync::BatchProcessResult;
|
|||||||
use crate::sync::manager::CustodyBatchProcessResult;
|
use crate::sync::manager::CustodyBatchProcessResult;
|
||||||
use crate::sync::{
|
use crate::sync::{
|
||||||
ChainId,
|
ChainId,
|
||||||
manager::{BlockProcessType, SyncMessage},
|
manager::{BlockProcessType, BlockProcessingResult, SyncMessage},
|
||||||
};
|
};
|
||||||
use beacon_chain::block_verification_types::LookupBlock;
|
use beacon_chain::block_verification_types::LookupBlock;
|
||||||
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
|
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
|
||||||
@@ -28,7 +28,9 @@ use store::KzgCommitment;
|
|||||||
use tracing::{debug, debug_span, error, info, instrument, warn};
|
use tracing::{debug, debug_span, error, info, instrument, warn};
|
||||||
use types::data::FixedBlobSidecarList;
|
use types::data::FixedBlobSidecarList;
|
||||||
use types::kzg_ext::format_kzg_commitments;
|
use types::kzg_ext::format_kzg_commitments;
|
||||||
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
|
use types::{
|
||||||
|
BlockImportSource, DataColumnSidecarList, Epoch, Hash256, SignedExecutionPayloadEnvelope,
|
||||||
|
};
|
||||||
|
|
||||||
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
|
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
@@ -73,6 +75,77 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
|||||||
Box::pin(process_fn)
|
Box::pin(process_fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns an async closure which processes a payload envelope received via RPC.
|
||||||
|
pub fn generate_rpc_envelope_process_fn(
|
||||||
|
self: Arc<Self>,
|
||||||
|
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||||
|
seen_timestamp: Duration,
|
||||||
|
process_type: BlockProcessType,
|
||||||
|
) -> AsyncFn {
|
||||||
|
let process_fn = async move {
|
||||||
|
self.process_rpc_envelope(envelope, seen_timestamp, process_type)
|
||||||
|
.await;
|
||||||
|
};
|
||||||
|
Box::pin(process_fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process an execution payload envelope received via RPC.
|
||||||
|
async fn process_rpc_envelope(
|
||||||
|
self: Arc<Self>,
|
||||||
|
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||||
|
_seen_timestamp: Duration,
|
||||||
|
process_type: BlockProcessType,
|
||||||
|
) {
|
||||||
|
let beacon_block_root = envelope.beacon_block_root();
|
||||||
|
|
||||||
|
// Verify the envelope using the gossip verification path (same checks apply to RPC)
|
||||||
|
let verified_envelope = match self.chain.verify_envelope_for_gossip(envelope).await {
|
||||||
|
Ok(verified) => verified,
|
||||||
|
Err(e) => {
|
||||||
|
debug!(
|
||||||
|
error = ?e,
|
||||||
|
?beacon_block_root,
|
||||||
|
"RPC payload envelope failed verification"
|
||||||
|
);
|
||||||
|
self.send_sync_message(SyncMessage::BlockComponentProcessed {
|
||||||
|
process_type,
|
||||||
|
result: BlockProcessingResult::Err(e.into()),
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Process the verified envelope
|
||||||
|
let result = self
|
||||||
|
.chain
|
||||||
|
.process_execution_payload_envelope(
|
||||||
|
beacon_block_root,
|
||||||
|
verified_envelope,
|
||||||
|
NotifyExecutionLayer::Yes,
|
||||||
|
BlockImportSource::Lookup,
|
||||||
|
#[allow(clippy::result_large_err)]
|
||||||
|
|| Ok(()),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let processing_result = match result {
|
||||||
|
Ok(status) => BlockProcessingResult::Ok(status),
|
||||||
|
Err(e) => {
|
||||||
|
debug!(
|
||||||
|
error = ?e,
|
||||||
|
?beacon_block_root,
|
||||||
|
"RPC payload envelope processing failed"
|
||||||
|
);
|
||||||
|
BlockProcessingResult::Err(e.into())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.send_sync_message(SyncMessage::BlockComponentProcessed {
|
||||||
|
process_type,
|
||||||
|
result: processing_result,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the `process_fn` and `ignore_fn` required when requeuing an RPC block.
|
/// Returns the `process_fn` and `ignore_fn` required when requeuing an RPC block.
|
||||||
pub fn generate_lookup_beacon_block_fns(
|
pub fn generate_lookup_beacon_block_fns(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ use crate::{
|
|||||||
sync::{SyncMessage, manager::BlockProcessType},
|
sync::{SyncMessage, manager::BlockProcessType},
|
||||||
};
|
};
|
||||||
use beacon_chain::block_verification_types::LookupBlock;
|
use beacon_chain::block_verification_types::LookupBlock;
|
||||||
|
use beacon_chain::chain_config::ChainConfig;
|
||||||
use beacon_chain::custody_context::NodeCustodyType;
|
use beacon_chain::custody_context::NodeCustodyType;
|
||||||
use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip_fulu;
|
use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip_fulu;
|
||||||
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
|
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
|
||||||
@@ -134,7 +135,10 @@ impl TestRig {
|
|||||||
.fresh_ephemeral_store()
|
.fresh_ephemeral_store()
|
||||||
.mock_execution_layer()
|
.mock_execution_layer()
|
||||||
.node_custody_type(NodeCustodyType::Fullnode)
|
.node_custody_type(NodeCustodyType::Fullnode)
|
||||||
.chain_config(<_>::default())
|
.chain_config(ChainConfig {
|
||||||
|
disable_get_blobs: true,
|
||||||
|
..ChainConfig::default()
|
||||||
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
harness.advance_slot();
|
harness.advance_slot();
|
||||||
@@ -169,7 +173,10 @@ impl TestRig {
|
|||||||
.fresh_ephemeral_store()
|
.fresh_ephemeral_store()
|
||||||
.mock_execution_layer()
|
.mock_execution_layer()
|
||||||
.node_custody_type(node_custody_type)
|
.node_custody_type(node_custody_type)
|
||||||
.chain_config(<_>::default())
|
.chain_config(ChainConfig {
|
||||||
|
disable_get_blobs: true,
|
||||||
|
..ChainConfig::default()
|
||||||
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
harness.advance_slot();
|
harness.advance_slot();
|
||||||
@@ -1001,14 +1008,30 @@ async fn data_column_reconstruction_at_deadline() {
|
|||||||
rig.enqueue_gossip_data_columns(i);
|
rig.enqueue_gossip_data_columns(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Expect all gossip events + reconstruction
|
// Drain the journal until we've seen all gossip events plus at least one
|
||||||
let mut expected_events: Vec<WorkType> = (0..min_columns_for_reconstruction)
|
// reconstruction. Under real crypto the reprocess queue can dispatch the
|
||||||
.map(|_| WorkType::GossipDataColumnSidecar)
|
// reconstruction work item more than once (the second is a no-op via
|
||||||
.collect();
|
// `reconstruction_started`), so we don't pin the count — we just require >= 1.
|
||||||
expected_events.push(WorkType::ColumnReconstruction);
|
let gsc: &str = WorkType::GossipDataColumnSidecar.into();
|
||||||
|
let cr: &str = WorkType::ColumnReconstruction.into();
|
||||||
rig.assert_event_journal_contains_ordered(&expected_events)
|
let (mut gossip_seen, mut recon_seen) = (0usize, 0usize);
|
||||||
.await;
|
let drain = async {
|
||||||
|
while let Some(event) = rig.work_journal_rx.recv().await {
|
||||||
|
if event == gsc {
|
||||||
|
gossip_seen += 1;
|
||||||
|
} else if event == cr {
|
||||||
|
recon_seen += 1;
|
||||||
|
}
|
||||||
|
if gossip_seen == min_columns_for_reconstruction && recon_seen >= 1 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if tokio::time::timeout(STANDARD_TIMEOUT, drain).await.is_err() {
|
||||||
|
panic!("timeout: gossip_seen={gossip_seen}, recon_seen={recon_seen}");
|
||||||
|
}
|
||||||
|
assert_eq!(gossip_seen, min_columns_for_reconstruction);
|
||||||
|
assert!(recon_seen >= 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test the column reconstruction is delayed for columns that arrive for a previous slot.
|
// Test the column reconstruction is delayed for columns that arrive for a previous slot.
|
||||||
|
|||||||
@@ -19,13 +19,14 @@ use lighthouse_network::{
|
|||||||
};
|
};
|
||||||
use logging::TimeLatch;
|
use logging::TimeLatch;
|
||||||
use logging::crit;
|
use logging::crit;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::{SlotClock, timestamp_now};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||||
use tracing::{debug, error, trace, warn};
|
use tracing::{debug, error, trace, warn};
|
||||||
use types::{
|
use types::{
|
||||||
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock,
|
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock,
|
||||||
|
SignedExecutionPayloadEnvelope,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Handles messages from the network and routes them to the appropriate service to be handled.
|
/// Handles messages from the network and routes them to the appropriate service to be handled.
|
||||||
@@ -341,10 +342,13 @@ impl<T: BeaconChainTypes> Router<T> {
|
|||||||
Response::DataColumnsByRange(data_column) => {
|
Response::DataColumnsByRange(data_column) => {
|
||||||
self.on_data_columns_by_range_response(peer_id, app_request_id, data_column);
|
self.on_data_columns_by_range_response(peer_id, app_request_id, data_column);
|
||||||
}
|
}
|
||||||
// TODO(EIP-7732): implement outgoing payload envelopes by range and root
|
Response::PayloadEnvelopesByRoot(envelope) => {
|
||||||
// responses once sync manager requests them.
|
self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope);
|
||||||
Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => {
|
}
|
||||||
debug!("Requesting envelopes by root and by range not supported yet");
|
// TODO(EIP-7732): implement outgoing payload envelopes by range responses once
|
||||||
|
// range sync requests them.
|
||||||
|
Response::PayloadEnvelopesByRange(_) => {
|
||||||
|
error!(%peer_id, "Unexpected PayloadEnvelopesByRange response");
|
||||||
}
|
}
|
||||||
// Light client responses should not be received
|
// Light client responses should not be received
|
||||||
Response::LightClientBootstrap(_)
|
Response::LightClientBootstrap(_)
|
||||||
@@ -718,6 +722,40 @@ impl<T: BeaconChainTypes> Router<T> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle a `PayloadEnvelopesByRoot` response from the peer.
|
||||||
|
pub fn on_payload_envelopes_by_root_response(
|
||||||
|
&mut self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
app_request_id: AppRequestId,
|
||||||
|
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||||
|
) {
|
||||||
|
let sync_request_id = match app_request_id {
|
||||||
|
AppRequestId::Sync(sync_id) => match sync_id {
|
||||||
|
id @ SyncRequestId::SinglePayloadEnvelope { .. } => id,
|
||||||
|
other => {
|
||||||
|
crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
AppRequestId::Router => {
|
||||||
|
crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
AppRequestId::Internal => unreachable!("Handled internally"),
|
||||||
|
};
|
||||||
|
|
||||||
|
trace!(
|
||||||
|
%peer_id,
|
||||||
|
"Received PayloadEnvelopesByRoot Response"
|
||||||
|
);
|
||||||
|
self.send_to_sync(SyncMessage::RpcPayloadEnvelope {
|
||||||
|
peer_id,
|
||||||
|
sync_request_id,
|
||||||
|
envelope,
|
||||||
|
seen_timestamp: timestamp_now(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle a `BlobsByRoot` response from the peer.
|
/// Handle a `BlobsByRoot` response from the peer.
|
||||||
pub fn on_blobs_by_root_response(
|
pub fn on_blobs_by_root_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use crate::sync::block_lookups::single_block_lookup::{
|
|||||||
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
|
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
|
||||||
};
|
};
|
||||||
use crate::sync::block_lookups::{
|
use crate::sync::block_lookups::{
|
||||||
BlobRequestState, BlockRequestState, CustodyRequestState, PeerId,
|
BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, PeerId,
|
||||||
};
|
};
|
||||||
use crate::sync::manager::BlockProcessType;
|
use crate::sync::manager::BlockProcessType;
|
||||||
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
|
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
|
||||||
@@ -12,16 +12,17 @@ use parking_lot::RwLock;
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use types::data::FixedBlobSidecarList;
|
use types::data::FixedBlobSidecarList;
|
||||||
use types::{DataColumnSidecarList, SignedBeaconBlock};
|
use types::{DataColumnSidecarList, SignedBeaconBlock, SignedExecutionPayloadEnvelope};
|
||||||
|
|
||||||
use super::SingleLookupId;
|
use super::SingleLookupId;
|
||||||
use super::single_block_lookup::{ComponentRequests, DownloadResult};
|
use super::single_block_lookup::{ComponentRequests, DownloadResult};
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
pub enum ResponseType {
|
pub enum ResponseType {
|
||||||
Block,
|
Block,
|
||||||
Blob,
|
Blob,
|
||||||
CustodyColumn,
|
CustodyColumn,
|
||||||
|
Envelope,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This trait unifies common single block lookup functionality across blocks and blobs. This
|
/// This trait unifies common single block lookup functionality across blocks and blobs. This
|
||||||
@@ -151,6 +152,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
|
|||||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||||
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
|
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
|
||||||
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
|
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"),
|
||||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -205,6 +207,7 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
|||||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||||
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
|
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
|
||||||
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
|
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"),
|
||||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -215,3 +218,52 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
|||||||
&mut self.state
|
&mut self.state
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: BeaconChainTypes> RequestState<T> for EnvelopeRequestState<T::EthSpec> {
|
||||||
|
type VerifiedResponseType = Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>;
|
||||||
|
|
||||||
|
fn make_request(
|
||||||
|
&self,
|
||||||
|
id: Id,
|
||||||
|
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||||
|
_: usize,
|
||||||
|
cx: &mut SyncNetworkContext<T>,
|
||||||
|
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||||
|
cx.envelope_lookup_request(id, lookup_peers, self.block_root)
|
||||||
|
.map_err(LookupRequestError::SendFailedNetwork)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_for_processing(
|
||||||
|
id: Id,
|
||||||
|
download_result: DownloadResult<Self::VerifiedResponseType>,
|
||||||
|
cx: &SyncNetworkContext<T>,
|
||||||
|
) -> Result<(), LookupRequestError> {
|
||||||
|
let DownloadResult {
|
||||||
|
value,
|
||||||
|
block_root,
|
||||||
|
seen_timestamp,
|
||||||
|
..
|
||||||
|
} = download_result;
|
||||||
|
cx.send_envelope_for_processing(id, value, seen_timestamp, block_root)
|
||||||
|
.map_err(LookupRequestError::SendFailedProcessor)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response_type() -> ResponseType {
|
||||||
|
ResponseType::Envelope
|
||||||
|
}
|
||||||
|
|
||||||
|
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||||
|
match &mut request.component_requests {
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(request) => Ok(request),
|
||||||
|
_ => Err("expecting envelope request"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||||
|
&self.state
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||||
|
&mut self.state
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -22,7 +22,9 @@
|
|||||||
|
|
||||||
use self::parent_chain::{NodeChain, compute_parent_chains};
|
use self::parent_chain::{NodeChain, compute_parent_chains};
|
||||||
pub use self::single_block_lookup::DownloadResult;
|
pub use self::single_block_lookup::DownloadResult;
|
||||||
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
|
use self::single_block_lookup::{
|
||||||
|
AwaitingParent, LookupRequestError, LookupResult, SingleBlockLookup,
|
||||||
|
};
|
||||||
use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE};
|
use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE};
|
||||||
use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
|
use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
@@ -39,7 +41,9 @@ use fnv::FnvHashMap;
|
|||||||
use lighthouse_network::service::api_types::SingleLookupReqId;
|
use lighthouse_network::service::api_types::SingleLookupReqId;
|
||||||
use lighthouse_network::{PeerAction, PeerId};
|
use lighthouse_network::{PeerAction, PeerId};
|
||||||
use lru_cache::LRUTimeCache;
|
use lru_cache::LRUTimeCache;
|
||||||
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
|
pub use single_block_lookup::{
|
||||||
|
BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState,
|
||||||
|
};
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -109,6 +113,7 @@ pub type SingleLookupId = u32;
|
|||||||
enum Action {
|
enum Action {
|
||||||
Retry,
|
Retry,
|
||||||
ParentUnknown { parent_root: Hash256 },
|
ParentUnknown { parent_root: Hash256 },
|
||||||
|
ParentEnvelopeUnknown { parent_root: Hash256 },
|
||||||
Drop(/* reason: */ String),
|
Drop(/* reason: */ String),
|
||||||
Continue,
|
Continue,
|
||||||
}
|
}
|
||||||
@@ -213,7 +218,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
self.new_current_lookup(
|
self.new_current_lookup(
|
||||||
block_root,
|
block_root,
|
||||||
Some(block_component),
|
Some(block_component),
|
||||||
Some(parent_root),
|
Some(AwaitingParent::Block(parent_root)),
|
||||||
// On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required
|
// On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required
|
||||||
// to have the rest of the block components (refer to decoupled blob gossip). Create
|
// to have the rest of the block components (refer to decoupled blob gossip). Create
|
||||||
// the lookup with zero peers to house the block components.
|
// the lookup with zero peers to house the block components.
|
||||||
@@ -225,7 +230,37 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Seach a block whose parent root is unknown.
|
/// A child block's parent envelope is missing. Create a child lookup (with the block component)
|
||||||
|
/// that waits for the parent envelope, and an envelope-only lookup for the parent.
|
||||||
|
///
|
||||||
|
/// Returns true if both lookups are created or already exist.
|
||||||
|
#[must_use = "only reference the new lookup if returns true"]
|
||||||
|
pub fn search_child_and_parent_envelope(
|
||||||
|
&mut self,
|
||||||
|
block_root: Hash256,
|
||||||
|
block_component: BlockComponent<T::EthSpec>,
|
||||||
|
parent_root: Hash256,
|
||||||
|
peer_id: PeerId,
|
||||||
|
cx: &mut SyncNetworkContext<T>,
|
||||||
|
) -> bool {
|
||||||
|
let envelope_lookup_exists =
|
||||||
|
self.search_parent_envelope_of_child(parent_root, &[peer_id], cx);
|
||||||
|
if envelope_lookup_exists {
|
||||||
|
// Create child lookup that waits for the parent envelope.
|
||||||
|
// The child block itself has already been seen, so we pass it as a component.
|
||||||
|
self.new_current_lookup(
|
||||||
|
block_root,
|
||||||
|
Some(block_component),
|
||||||
|
Some(AwaitingParent::Envelope(parent_root)),
|
||||||
|
&[],
|
||||||
|
cx,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Search a block whose parent root is unknown.
|
||||||
///
|
///
|
||||||
/// Returns true if the lookup is created or already exists
|
/// Returns true if the lookup is created or already exists
|
||||||
#[must_use = "only reference the new lookup if returns true"]
|
#[must_use = "only reference the new lookup if returns true"]
|
||||||
@@ -343,6 +378,57 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
|
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A block triggers the search of a parent envelope.
|
||||||
|
#[must_use = "only reference the new lookup if returns true"]
|
||||||
|
pub fn search_parent_envelope_of_child(
|
||||||
|
&mut self,
|
||||||
|
parent_root: Hash256,
|
||||||
|
peers: &[PeerId],
|
||||||
|
cx: &mut SyncNetworkContext<T>,
|
||||||
|
) -> bool {
|
||||||
|
// Check if there's already a lookup for this root (could be a block lookup or envelope
|
||||||
|
// lookup). If so, add peers and let it handle the envelope.
|
||||||
|
if let Some((&lookup_id, _lookup)) = self
|
||||||
|
.single_block_lookups
|
||||||
|
.iter_mut()
|
||||||
|
.find(|(_, lookup)| lookup.is_for_block(parent_root))
|
||||||
|
{
|
||||||
|
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
|
||||||
|
warn!(error = ?e, "Error adding peers to envelope lookup");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.single_block_lookups.len() >= MAX_LOOKUPS {
|
||||||
|
warn!(?parent_root, "Dropping envelope lookup reached max");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let lookup = SingleBlockLookup::new_envelope_only(parent_root, peers, cx.next_id());
|
||||||
|
let _guard = lookup.span.clone().entered();
|
||||||
|
|
||||||
|
let id = lookup.id;
|
||||||
|
let lookup = match self.single_block_lookups.entry(id) {
|
||||||
|
Entry::Vacant(entry) => entry.insert(lookup),
|
||||||
|
Entry::Occupied(_) => {
|
||||||
|
warn!(id, "Lookup exists with same id");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
?peers,
|
||||||
|
?parent_root,
|
||||||
|
id = lookup.id,
|
||||||
|
"Created envelope-only lookup"
|
||||||
|
);
|
||||||
|
metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED);
|
||||||
|
self.metrics.created_lookups += 1;
|
||||||
|
|
||||||
|
let result = lookup.continue_requests(cx);
|
||||||
|
self.on_lookup_result(id, result, "new_envelope_lookup", cx)
|
||||||
|
}
|
||||||
|
|
||||||
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
|
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
|
||||||
/// constructed.
|
/// constructed.
|
||||||
/// Returns true if the lookup is created or already exists
|
/// Returns true if the lookup is created or already exists
|
||||||
@@ -351,7 +437,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
block_component: Option<BlockComponent<T::EthSpec>>,
|
block_component: Option<BlockComponent<T::EthSpec>>,
|
||||||
awaiting_parent: Option<Hash256>,
|
awaiting_parent: Option<AwaitingParent>,
|
||||||
peers: &[PeerId],
|
peers: &[PeerId],
|
||||||
cx: &mut SyncNetworkContext<T>,
|
cx: &mut SyncNetworkContext<T>,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
@@ -386,13 +472,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress
|
// Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress
|
||||||
if let Some(awaiting_parent) = awaiting_parent
|
if let Some(AwaitingParent::Block(parent_root) | AwaitingParent::Envelope(parent_root)) =
|
||||||
|
awaiting_parent
|
||||||
&& !self
|
&& !self
|
||||||
.single_block_lookups
|
.single_block_lookups
|
||||||
.iter()
|
.iter()
|
||||||
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent))
|
.any(|(_, lookup)| lookup.is_for_block(parent_root))
|
||||||
{
|
{
|
||||||
warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found");
|
warn!(block_root = ?parent_root, "Ignoring child lookup parent lookup not found");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -426,9 +513,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
debug!(
|
debug!(
|
||||||
?peers,
|
?peers,
|
||||||
?block_root,
|
?block_root,
|
||||||
awaiting_parent = awaiting_parent
|
?awaiting_parent,
|
||||||
.map(|root| root.to_string())
|
|
||||||
.unwrap_or("none".to_owned()),
|
|
||||||
id = lookup.id,
|
id = lookup.id,
|
||||||
"Created block lookup"
|
"Created block lookup"
|
||||||
);
|
);
|
||||||
@@ -559,6 +644,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
BlockProcessType::SingleCustodyColumn(id) => {
|
BlockProcessType::SingleCustodyColumn(id) => {
|
||||||
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
|
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
|
||||||
}
|
}
|
||||||
|
BlockProcessType::SinglePayloadEnvelope { id, block_root } => {
|
||||||
|
let result = self
|
||||||
|
.on_processing_result_inner::<EnvelopeRequestState<T::EthSpec>>(id, result, cx);
|
||||||
|
// On successful envelope import, unblock child lookups waiting for this envelope
|
||||||
|
if matches!(&result, Ok(LookupResult::Completed)) {
|
||||||
|
self.continue_envelope_child_lookups(block_root, cx);
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
};
|
};
|
||||||
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
|
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
|
||||||
}
|
}
|
||||||
@@ -645,6 +739,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
request_state.revert_to_awaiting_processing()?;
|
request_state.revert_to_awaiting_processing()?;
|
||||||
Action::ParentUnknown { parent_root }
|
Action::ParentUnknown { parent_root }
|
||||||
}
|
}
|
||||||
|
BlockError::ParentEnvelopeUnknown { parent_root } => {
|
||||||
|
// The parent block is known but its execution payload envelope is missing.
|
||||||
|
// Revert to awaiting processing and fetch the envelope via RPC.
|
||||||
|
request_state.revert_to_awaiting_processing()?;
|
||||||
|
Action::ParentEnvelopeUnknown { parent_root }
|
||||||
|
}
|
||||||
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
|
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
|
||||||
// These errors indicate that the execution layer is offline
|
// These errors indicate that the execution layer is offline
|
||||||
// and failed to validate the execution payload. Do not downscore peer.
|
// and failed to validate the execution payload. Do not downscore peer.
|
||||||
@@ -667,6 +767,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
// We opt to drop the lookup instead.
|
// We opt to drop the lookup instead.
|
||||||
Action::Drop(format!("{e:?}"))
|
Action::Drop(format!("{e:?}"))
|
||||||
}
|
}
|
||||||
|
BlockError::PayloadEnvelopeError { e, penalize_peer } => {
|
||||||
|
debug!(
|
||||||
|
?block_root,
|
||||||
|
error = ?e,
|
||||||
|
"Payload envelope processing error"
|
||||||
|
);
|
||||||
|
if penalize_peer {
|
||||||
|
let peer_group = request_state.on_processing_failure()?;
|
||||||
|
for peer in peer_group.all() {
|
||||||
|
cx.report_peer(
|
||||||
|
*peer,
|
||||||
|
PeerAction::MidToleranceError,
|
||||||
|
"lookup_envelope_processing_failure",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Action::Retry
|
||||||
|
} else {
|
||||||
|
Action::Drop(format!("{e:?}"))
|
||||||
|
}
|
||||||
|
}
|
||||||
other => {
|
other => {
|
||||||
debug!(
|
debug!(
|
||||||
?block_root,
|
?block_root,
|
||||||
@@ -701,6 +821,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
ResponseType::CustodyColumn => {
|
ResponseType::CustodyColumn => {
|
||||||
"lookup_custody_column_processing_failure"
|
"lookup_custody_column_processing_failure"
|
||||||
}
|
}
|
||||||
|
ResponseType::Envelope => "lookup_envelope_processing_failure",
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -742,6 +863,25 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Action::ParentEnvelopeUnknown { parent_root } => {
|
||||||
|
let peers = lookup.all_peers();
|
||||||
|
lookup.set_awaiting_parent_envelope(parent_root);
|
||||||
|
let envelope_lookup_exists =
|
||||||
|
self.search_parent_envelope_of_child(parent_root, &peers, cx);
|
||||||
|
if envelope_lookup_exists {
|
||||||
|
debug!(
|
||||||
|
id = lookup_id,
|
||||||
|
?block_root,
|
||||||
|
?parent_root,
|
||||||
|
"Marking lookup as awaiting parent envelope"
|
||||||
|
);
|
||||||
|
Ok(LookupResult::Pending)
|
||||||
|
} else {
|
||||||
|
Err(LookupRequestError::Failed(format!(
|
||||||
|
"Envelope lookup could not be created for {parent_root:?}"
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
Action::Drop(reason) => {
|
Action::Drop(reason) => {
|
||||||
// Drop with noop
|
// Drop with noop
|
||||||
Err(LookupRequestError::Failed(reason))
|
Err(LookupRequestError::Failed(reason))
|
||||||
@@ -791,7 +931,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
|
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
|
||||||
|
|
||||||
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
||||||
if lookup.awaiting_parent() == Some(block_root) {
|
if lookup.awaiting_parent_block() == Some(block_root) {
|
||||||
lookup.resolve_awaiting_parent();
|
lookup.resolve_awaiting_parent();
|
||||||
debug!(
|
debug!(
|
||||||
parent_root = ?block_root,
|
parent_root = ?block_root,
|
||||||
@@ -809,6 +949,33 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Makes progress on lookups that were waiting for a parent envelope to be imported.
|
||||||
|
pub fn continue_envelope_child_lookups(
|
||||||
|
&mut self,
|
||||||
|
block_root: Hash256,
|
||||||
|
cx: &mut SyncNetworkContext<T>,
|
||||||
|
) {
|
||||||
|
let mut lookup_results = vec![];
|
||||||
|
|
||||||
|
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
||||||
|
if lookup.awaiting_parent_envelope() == Some(block_root) {
|
||||||
|
lookup.resolve_awaiting_parent();
|
||||||
|
debug!(
|
||||||
|
envelope_root = ?block_root,
|
||||||
|
id,
|
||||||
|
block_root = ?lookup.block_root(),
|
||||||
|
"Continuing lookup after envelope imported"
|
||||||
|
);
|
||||||
|
let result = lookup.continue_requests(cx);
|
||||||
|
lookup_results.push((*id, result));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (id, result) in lookup_results {
|
||||||
|
self.on_lookup_result(id, result, "continue_envelope_child_lookups", cx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need
|
/// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need
|
||||||
/// the parent to make progress to resolve, therefore we must drop them if the parent is
|
/// the parent to make progress to resolve, therefore we must drop them if the parent is
|
||||||
/// dropped.
|
/// dropped.
|
||||||
@@ -824,10 +991,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[reason]);
|
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[reason]);
|
||||||
self.metrics.dropped_lookups += 1;
|
self.metrics.dropped_lookups += 1;
|
||||||
|
|
||||||
|
let dropped_root = dropped_lookup.block_root();
|
||||||
let child_lookups = self
|
let child_lookups = self
|
||||||
.single_block_lookups
|
.single_block_lookups
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root()))
|
.filter(|(_, lookup)| {
|
||||||
|
lookup.awaiting_parent_block() == Some(dropped_root)
|
||||||
|
|| lookup.awaiting_parent_envelope() == Some(dropped_root)
|
||||||
|
})
|
||||||
.map(|(id, _)| *id)
|
.map(|(id, _)| *id)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
@@ -995,17 +1166,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
&'a self,
|
&'a self,
|
||||||
lookup: &'a SingleBlockLookup<T>,
|
lookup: &'a SingleBlockLookup<T>,
|
||||||
) -> Result<&'a SingleBlockLookup<T>, String> {
|
) -> Result<&'a SingleBlockLookup<T>, String> {
|
||||||
if let Some(awaiting_parent) = lookup.awaiting_parent() {
|
if let Some(parent_root) = lookup.awaiting_parent_block() {
|
||||||
if let Some(lookup) = self
|
if let Some(lookup) = self
|
||||||
.single_block_lookups
|
.single_block_lookups
|
||||||
.values()
|
.values()
|
||||||
.find(|l| l.block_root() == awaiting_parent)
|
.find(|l| l.block_root() == parent_root)
|
||||||
{
|
{
|
||||||
self.find_oldest_ancestor_lookup(lookup)
|
self.find_oldest_ancestor_lookup(lookup)
|
||||||
} else {
|
} else {
|
||||||
Err(format!(
|
Err(format!("Lookup references unknown parent {parent_root:?}"))
|
||||||
"Lookup references unknown parent {awaiting_parent:?}"
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Ok(lookup)
|
Ok(lookup)
|
||||||
@@ -1038,7 +1207,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(parent_root) = lookup.awaiting_parent() {
|
if let Some(parent_root) = lookup.awaiting_parent_block() {
|
||||||
if let Some((&child_id, _)) = self
|
if let Some((&child_id, _)) = self
|
||||||
.single_block_lookups
|
.single_block_lookups
|
||||||
.iter()
|
.iter()
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ impl<T: BeaconChainTypes> From<&SingleBlockLookup<T>> for Node {
|
|||||||
fn from(value: &SingleBlockLookup<T>) -> Self {
|
fn from(value: &SingleBlockLookup<T>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
block_root: value.block_root(),
|
block_root: value.block_root(),
|
||||||
parent_root: value.awaiting_parent(),
|
parent_root: value.awaiting_parent_block(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,7 +16,9 @@ use store::Hash256;
|
|||||||
use strum::IntoStaticStr;
|
use strum::IntoStaticStr;
|
||||||
use tracing::{Span, debug_span};
|
use tracing::{Span, debug_span};
|
||||||
use types::data::FixedBlobSidecarList;
|
use types::data::FixedBlobSidecarList;
|
||||||
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
|
use types::{
|
||||||
|
DataColumnSidecarList, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||||
|
};
|
||||||
|
|
||||||
// Dedicated enum for LookupResult to force its usage
|
// Dedicated enum for LookupResult to force its usage
|
||||||
#[must_use = "LookupResult must be handled with on_lookup_result"]
|
#[must_use = "LookupResult must be handled with on_lookup_result"]
|
||||||
@@ -56,6 +58,14 @@ pub enum LookupRequestError {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum AwaitingParent {
|
||||||
|
/// Waiting for the parent block to be imported.
|
||||||
|
Block(Hash256),
|
||||||
|
/// The parent block is imported but its execution payload envelope is missing.
|
||||||
|
Envelope(Hash256),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Educe)]
|
#[derive(Educe)]
|
||||||
#[educe(Debug(bound(T: BeaconChainTypes)))]
|
#[educe(Debug(bound(T: BeaconChainTypes)))]
|
||||||
pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||||
@@ -69,7 +79,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
|||||||
#[educe(Debug(method(fmt_peer_set_as_len)))]
|
#[educe(Debug(method(fmt_peer_set_as_len)))]
|
||||||
peers: Arc<RwLock<HashSet<PeerId>>>,
|
peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||||
block_root: Hash256,
|
block_root: Hash256,
|
||||||
awaiting_parent: Option<Hash256>,
|
awaiting_parent: Option<AwaitingParent>,
|
||||||
created: Instant,
|
created: Instant,
|
||||||
pub(crate) span: Span,
|
pub(crate) span: Span,
|
||||||
}
|
}
|
||||||
@@ -79,6 +89,7 @@ pub(crate) enum ComponentRequests<E: EthSpec> {
|
|||||||
WaitingForBlock,
|
WaitingForBlock,
|
||||||
ActiveBlobRequest(BlobRequestState<E>, usize),
|
ActiveBlobRequest(BlobRequestState<E>, usize),
|
||||||
ActiveCustodyRequest(CustodyRequestState<E>),
|
ActiveCustodyRequest(CustodyRequestState<E>),
|
||||||
|
ActiveEnvelopeRequest(EnvelopeRequestState<E>),
|
||||||
// When printing in debug this state display the reason why it's not needed
|
// When printing in debug this state display the reason why it's not needed
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
NotNeeded(&'static str),
|
NotNeeded(&'static str),
|
||||||
@@ -89,7 +100,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
requested_block_root: Hash256,
|
requested_block_root: Hash256,
|
||||||
peers: &[PeerId],
|
peers: &[PeerId],
|
||||||
id: Id,
|
id: Id,
|
||||||
awaiting_parent: Option<Hash256>,
|
awaiting_parent: Option<AwaitingParent>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let lookup_span = debug_span!(
|
let lookup_span = debug_span!(
|
||||||
"lh_single_block_lookup",
|
"lh_single_block_lookup",
|
||||||
@@ -109,10 +120,33 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create an envelope-only lookup. The block is already imported, we just need the envelope.
|
||||||
|
pub fn new_envelope_only(block_root: Hash256, peers: &[PeerId], id: Id) -> Self {
|
||||||
|
let mut lookup = Self::new(block_root, peers, id, None);
|
||||||
|
// Block is already imported, mark as completed
|
||||||
|
lookup
|
||||||
|
.block_request_state
|
||||||
|
.state
|
||||||
|
.on_completed_request("block already imported")
|
||||||
|
.expect("block state starts as AwaitingDownload");
|
||||||
|
lookup.component_requests =
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(EnvelopeRequestState::new(block_root));
|
||||||
|
lookup
|
||||||
|
}
|
||||||
|
|
||||||
/// Reset the status of all internal requests
|
/// Reset the status of all internal requests
|
||||||
pub fn reset_requests(&mut self) {
|
pub fn reset_requests(&mut self) {
|
||||||
self.block_request_state = BlockRequestState::new(self.block_root);
|
self.block_request_state = BlockRequestState::new(self.block_root);
|
||||||
self.component_requests = ComponentRequests::WaitingForBlock;
|
match &self.component_requests {
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(_) => {
|
||||||
|
self.component_requests = ComponentRequests::ActiveEnvelopeRequest(
|
||||||
|
EnvelopeRequestState::new(self.block_root),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
self.component_requests = ComponentRequests::WaitingForBlock;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
|
/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
|
||||||
@@ -128,18 +162,37 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
self.block_root
|
self.block_root
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn awaiting_parent(&self) -> Option<Hash256> {
|
pub fn awaiting_parent(&self) -> Option<AwaitingParent> {
|
||||||
self.awaiting_parent
|
self.awaiting_parent
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send
|
/// Returns the parent root if awaiting a parent block.
|
||||||
/// components for processing.
|
pub fn awaiting_parent_block(&self) -> Option<Hash256> {
|
||||||
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
|
match self.awaiting_parent {
|
||||||
self.awaiting_parent = Some(parent_root)
|
Some(AwaitingParent::Block(root)) => Some(root),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for
|
/// Returns the parent root if awaiting a parent envelope.
|
||||||
/// processing.
|
pub fn awaiting_parent_envelope(&self) -> Option<Hash256> {
|
||||||
|
match self.awaiting_parent {
|
||||||
|
Some(AwaitingParent::Envelope(root)) => Some(root),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark this lookup as awaiting a parent block to be imported before processing.
|
||||||
|
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
|
||||||
|
self.awaiting_parent = Some(AwaitingParent::Block(parent_root));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
|
||||||
|
pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) {
|
||||||
|
self.awaiting_parent = Some(AwaitingParent::Envelope(parent_root));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark this lookup as no longer awaiting any parent.
|
||||||
pub fn resolve_awaiting_parent(&mut self) {
|
pub fn resolve_awaiting_parent(&mut self) {
|
||||||
self.awaiting_parent = None;
|
self.awaiting_parent = None;
|
||||||
}
|
}
|
||||||
@@ -180,6 +233,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
ComponentRequests::WaitingForBlock => false,
|
ComponentRequests::WaitingForBlock => false,
|
||||||
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
|
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
|
||||||
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
|
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(request) => request.state.is_processed(),
|
||||||
ComponentRequests::NotNeeded { .. } => true,
|
ComponentRequests::NotNeeded { .. } => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -199,6 +253,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
ComponentRequests::ActiveCustodyRequest(request) => {
|
ComponentRequests::ActiveCustodyRequest(request) => {
|
||||||
request.state.is_awaiting_event()
|
request.state.is_awaiting_event()
|
||||||
}
|
}
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(request) => {
|
||||||
|
request.state.is_awaiting_event()
|
||||||
|
}
|
||||||
ComponentRequests::NotNeeded { .. } => false,
|
ComponentRequests::NotNeeded { .. } => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -268,6 +325,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
ComponentRequests::ActiveCustodyRequest(_) => {
|
ComponentRequests::ActiveCustodyRequest(_) => {
|
||||||
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
|
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
|
||||||
}
|
}
|
||||||
|
ComponentRequests::ActiveEnvelopeRequest(_) => {
|
||||||
|
self.continue_request::<EnvelopeRequestState<T::EthSpec>>(cx, 0)?
|
||||||
|
}
|
||||||
ComponentRequests::NotNeeded { .. } => {} // do nothing
|
ComponentRequests::NotNeeded { .. } => {} // do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -289,7 +349,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
expected_blobs: usize,
|
expected_blobs: usize,
|
||||||
) -> Result<(), LookupRequestError> {
|
) -> Result<(), LookupRequestError> {
|
||||||
let id = self.id;
|
let id = self.id;
|
||||||
let awaiting_parent = self.awaiting_parent.is_some();
|
let awaiting_event = self.awaiting_parent.is_some();
|
||||||
let request =
|
let request =
|
||||||
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
||||||
|
|
||||||
@@ -333,7 +393,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
|||||||
// Otherwise, attempt to progress awaiting processing
|
// Otherwise, attempt to progress awaiting processing
|
||||||
// If this request is awaiting a parent lookup to be processed, do not send for processing.
|
// If this request is awaiting a parent lookup to be processed, do not send for processing.
|
||||||
// The request will be rejected with unknown parent error.
|
// The request will be rejected with unknown parent error.
|
||||||
} else if !awaiting_parent {
|
} else if !awaiting_event {
|
||||||
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
|
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
|
||||||
// useful to conditionally access the result data.
|
// useful to conditionally access the result data.
|
||||||
if let Some(result) = request.get_state_mut().maybe_start_processing() {
|
if let Some(result) = request.get_state_mut().maybe_start_processing() {
|
||||||
@@ -429,6 +489,26 @@ impl<E: EthSpec> BlockRequestState<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The state of the envelope request component of a `SingleBlockLookup`.
|
||||||
|
/// Used for envelope-only lookups where the parent block is already imported
|
||||||
|
/// but its execution payload envelope is missing.
|
||||||
|
#[derive(Educe)]
|
||||||
|
#[educe(Debug)]
|
||||||
|
pub struct EnvelopeRequestState<E: EthSpec> {
|
||||||
|
#[educe(Debug(ignore))]
|
||||||
|
pub block_root: Hash256,
|
||||||
|
pub state: SingleLookupRequestState<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: EthSpec> EnvelopeRequestState<E> {
|
||||||
|
pub fn new(block_root: Hash256) -> Self {
|
||||||
|
Self {
|
||||||
|
block_root,
|
||||||
|
state: SingleLookupRequestState::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct DownloadResult<T: Clone> {
|
pub struct DownloadResult<T: Clone> {
|
||||||
pub value: T,
|
pub value: T,
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ use crate::service::NetworkMessage;
|
|||||||
use crate::status::ToStatusMessage;
|
use crate::status::ToStatusMessage;
|
||||||
use crate::sync::block_lookups::{
|
use crate::sync::block_lookups::{
|
||||||
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
|
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
|
||||||
|
EnvelopeRequestState,
|
||||||
};
|
};
|
||||||
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
|
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
|
||||||
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
|
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
|
||||||
@@ -65,7 +66,7 @@ use lighthouse_network::types::{NetworkGlobals, SyncState};
|
|||||||
use lighthouse_network::{PeerAction, PeerId};
|
use lighthouse_network::{PeerAction, PeerId};
|
||||||
use logging::crit;
|
use logging::crit;
|
||||||
use lru_cache::LRUTimeCache;
|
use lru_cache::LRUTimeCache;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::{SlotClock, timestamp_now};
|
||||||
use std::ops::Sub;
|
use std::ops::Sub;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -73,7 +74,8 @@ use strum::IntoStaticStr;
|
|||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{debug, error, info, trace};
|
use tracing::{debug, error, info, trace};
|
||||||
use types::{
|
use types::{
|
||||||
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot,
|
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
|
||||||
|
SignedExecutionPayloadEnvelope, Slot,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
|
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
|
||||||
@@ -132,6 +134,14 @@ pub enum SyncMessage<E: EthSpec> {
|
|||||||
seen_timestamp: Duration,
|
seen_timestamp: Duration,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// An execution payload envelope has been received from the RPC.
|
||||||
|
RpcPayloadEnvelope {
|
||||||
|
sync_request_id: SyncRequestId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
envelope: Option<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||||
|
seen_timestamp: Duration,
|
||||||
|
},
|
||||||
|
|
||||||
/// A block with an unknown parent has been received.
|
/// A block with an unknown parent has been received.
|
||||||
UnknownParentBlock(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
|
UnknownParentBlock(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
|
||||||
|
|
||||||
@@ -141,6 +151,9 @@ pub enum SyncMessage<E: EthSpec> {
|
|||||||
/// A data column with an unknown parent has been received.
|
/// A data column with an unknown parent has been received.
|
||||||
UnknownParentDataColumn(PeerId, Arc<DataColumnSidecar<E>>),
|
UnknownParentDataColumn(PeerId, Arc<DataColumnSidecar<E>>),
|
||||||
|
|
||||||
|
/// A block's parent is known but its execution payload envelope has not been received yet.
|
||||||
|
UnknownParentEnvelope(PeerId, Arc<SignedBeaconBlock<E>>, Hash256),
|
||||||
|
|
||||||
/// A partial data column with an unknown parent has been received.
|
/// A partial data column with an unknown parent has been received.
|
||||||
UnknownParentPartialDataColumn {
|
UnknownParentPartialDataColumn {
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
@@ -191,6 +204,7 @@ pub enum BlockProcessType {
|
|||||||
SingleBlock { id: Id },
|
SingleBlock { id: Id },
|
||||||
SingleBlob { id: Id },
|
SingleBlob { id: Id },
|
||||||
SingleCustodyColumn(Id),
|
SingleCustodyColumn(Id),
|
||||||
|
SinglePayloadEnvelope { id: Id, block_root: Hash256 },
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockProcessType {
|
impl BlockProcessType {
|
||||||
@@ -198,7 +212,8 @@ impl BlockProcessType {
|
|||||||
match self {
|
match self {
|
||||||
BlockProcessType::SingleBlock { id }
|
BlockProcessType::SingleBlock { id }
|
||||||
| BlockProcessType::SingleBlob { id }
|
| BlockProcessType::SingleBlob { id }
|
||||||
| BlockProcessType::SingleCustodyColumn(id) => *id,
|
| BlockProcessType::SingleCustodyColumn(id)
|
||||||
|
| BlockProcessType::SinglePayloadEnvelope { id, .. } => *id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -512,6 +527,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
SyncRequestId::DataColumnsByRange(req_id) => {
|
SyncRequestId::DataColumnsByRange(req_id) => {
|
||||||
self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error))
|
self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error))
|
||||||
}
|
}
|
||||||
|
SyncRequestId::SinglePayloadEnvelope { id } => {
|
||||||
|
self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -846,6 +864,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
} => {
|
} => {
|
||||||
self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp)
|
self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp)
|
||||||
}
|
}
|
||||||
|
SyncMessage::RpcPayloadEnvelope {
|
||||||
|
sync_request_id,
|
||||||
|
peer_id,
|
||||||
|
envelope,
|
||||||
|
seen_timestamp,
|
||||||
|
} => self.rpc_payload_envelope_received(
|
||||||
|
sync_request_id,
|
||||||
|
peer_id,
|
||||||
|
envelope,
|
||||||
|
seen_timestamp,
|
||||||
|
),
|
||||||
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
|
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
|
||||||
let block_slot = block.slot();
|
let block_slot = block.slot();
|
||||||
let parent_root = block.parent_root();
|
let parent_root = block.parent_root();
|
||||||
@@ -911,6 +940,27 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
SyncMessage::UnknownParentEnvelope(peer_id, block, block_root) => {
|
||||||
|
let block_slot = block.slot();
|
||||||
|
let parent_root = block.parent_root();
|
||||||
|
debug!(
|
||||||
|
%block_root,
|
||||||
|
%parent_root,
|
||||||
|
"Parent envelope not yet available, creating envelope lookup"
|
||||||
|
);
|
||||||
|
self.handle_unknown_parent_envelope(
|
||||||
|
peer_id,
|
||||||
|
block_root,
|
||||||
|
parent_root,
|
||||||
|
block_slot,
|
||||||
|
BlockComponent::Block(DownloadResult {
|
||||||
|
value: block.block_cloned(),
|
||||||
|
block_root,
|
||||||
|
seen_timestamp: timestamp_now(),
|
||||||
|
peer_group: PeerGroup::from_single(peer_id),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
SyncMessage::UnknownParentPartialDataColumn {
|
SyncMessage::UnknownParentPartialDataColumn {
|
||||||
peer_id,
|
peer_id,
|
||||||
block_root,
|
block_root,
|
||||||
@@ -1036,6 +1086,40 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle a block whose parent block is known but parent envelope is missing.
|
||||||
|
/// Creates an envelope-only lookup for the parent and a child lookup that waits for it.
|
||||||
|
fn handle_unknown_parent_envelope(
|
||||||
|
&mut self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
block_root: Hash256,
|
||||||
|
parent_root: Hash256,
|
||||||
|
slot: Slot,
|
||||||
|
block_component: BlockComponent<T::EthSpec>,
|
||||||
|
) {
|
||||||
|
match self.should_search_for_block(Some(slot), &peer_id) {
|
||||||
|
Ok(_) => {
|
||||||
|
if self.block_lookups.search_child_and_parent_envelope(
|
||||||
|
block_root,
|
||||||
|
block_component,
|
||||||
|
parent_root,
|
||||||
|
peer_id,
|
||||||
|
&mut self.network,
|
||||||
|
) {
|
||||||
|
// Lookups created
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
?block_root,
|
||||||
|
?parent_root,
|
||||||
|
"No lookup created for child and parent envelope"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(reason) => {
|
||||||
|
debug!(%block_root, %parent_root, reason, "Ignoring unknown parent envelope request");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_unknown_block_root(&mut self, peer_id: PeerId, block_root: Hash256) {
|
fn handle_unknown_block_root(&mut self, peer_id: PeerId, block_root: Hash256) {
|
||||||
match self.should_search_for_block(None, &peer_id) {
|
match self.should_search_for_block(None, &peer_id) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
@@ -1231,6 +1315,46 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn rpc_payload_envelope_received(
|
||||||
|
&mut self,
|
||||||
|
sync_request_id: SyncRequestId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
envelope: Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||||
|
seen_timestamp: Duration,
|
||||||
|
) {
|
||||||
|
match sync_request_id {
|
||||||
|
SyncRequestId::SinglePayloadEnvelope { id } => self.on_single_envelope_response(
|
||||||
|
id,
|
||||||
|
peer_id,
|
||||||
|
RpcEvent::from_chunk(envelope, seen_timestamp),
|
||||||
|
),
|
||||||
|
_ => {
|
||||||
|
crit!(%peer_id, "bad request id for payload envelope");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_single_envelope_response(
|
||||||
|
&mut self,
|
||||||
|
id: SingleLookupReqId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||||
|
) {
|
||||||
|
if let Some(resp) = self
|
||||||
|
.network
|
||||||
|
.on_single_envelope_response(id, peer_id, rpc_event)
|
||||||
|
{
|
||||||
|
self.block_lookups
|
||||||
|
.on_download_response::<EnvelopeRequestState<T::EthSpec>>(
|
||||||
|
id,
|
||||||
|
resp.map(|(value, seen_timestamp)| {
|
||||||
|
(value, PeerGroup::from_single(peer_id), seen_timestamp)
|
||||||
|
}),
|
||||||
|
&mut self.network,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn on_single_blob_response(
|
fn on_single_blob_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
id: SingleLookupReqId,
|
id: SingleLookupReqId,
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ pub use requests::LookupVerifyError;
|
|||||||
use requests::{
|
use requests::{
|
||||||
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
|
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
|
||||||
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
|
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
|
||||||
|
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
|
||||||
};
|
};
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
@@ -52,7 +53,7 @@ use tracing::{Span, debug, debug_span, error, warn};
|
|||||||
use types::data::FixedBlobSidecarList;
|
use types::data::FixedBlobSidecarList;
|
||||||
use types::{
|
use types::{
|
||||||
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
|
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
|
||||||
ForkContext, Hash256, SignedBeaconBlock, Slot,
|
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub mod custody;
|
pub mod custody;
|
||||||
@@ -213,6 +214,9 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
|
|||||||
/// A mapping of active DataColumnsByRange requests
|
/// A mapping of active DataColumnsByRange requests
|
||||||
data_columns_by_range_requests:
|
data_columns_by_range_requests:
|
||||||
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
|
ActiveRequests<DataColumnsByRangeRequestId, DataColumnsByRangeRequestItems<T::EthSpec>>,
|
||||||
|
/// A mapping of active PayloadEnvelopesByRoot requests
|
||||||
|
payload_envelopes_by_root_requests:
|
||||||
|
ActiveRequests<SingleLookupReqId, PayloadEnvelopesByRootRequestItems<T::EthSpec>>,
|
||||||
/// Mapping of active custody column requests for a block root
|
/// Mapping of active custody column requests for a block root
|
||||||
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
|
custody_by_root_requests: FnvHashMap<CustodyRequester, ActiveCustodyRequest<T>>,
|
||||||
|
|
||||||
@@ -298,6 +302,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
|
blocks_by_range_requests: ActiveRequests::new("blocks_by_range"),
|
||||||
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
|
blobs_by_range_requests: ActiveRequests::new("blobs_by_range"),
|
||||||
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
|
data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"),
|
||||||
|
payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"),
|
||||||
custody_by_root_requests: <_>::default(),
|
custody_by_root_requests: <_>::default(),
|
||||||
components_by_range_requests: FnvHashMap::default(),
|
components_by_range_requests: FnvHashMap::default(),
|
||||||
custody_backfill_data_column_batch_requests: FnvHashMap::default(),
|
custody_backfill_data_column_batch_requests: FnvHashMap::default(),
|
||||||
@@ -326,6 +331,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
blocks_by_range_requests,
|
blocks_by_range_requests,
|
||||||
blobs_by_range_requests,
|
blobs_by_range_requests,
|
||||||
data_columns_by_range_requests,
|
data_columns_by_range_requests,
|
||||||
|
payload_envelopes_by_root_requests,
|
||||||
// custody_by_root_requests is a meta request of data_columns_by_root_requests
|
// custody_by_root_requests is a meta request of data_columns_by_root_requests
|
||||||
custody_by_root_requests: _,
|
custody_by_root_requests: _,
|
||||||
// components_by_range_requests is a meta request of various _by_range requests
|
// components_by_range_requests is a meta request of various _by_range requests
|
||||||
@@ -361,12 +367,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
.active_requests_of_peer(peer_id)
|
.active_requests_of_peer(peer_id)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
|
.map(|req_id| SyncRequestId::DataColumnsByRange(*req_id));
|
||||||
|
let envelope_by_root_ids = payload_envelopes_by_root_requests
|
||||||
|
.active_requests_of_peer(peer_id)
|
||||||
|
.into_iter()
|
||||||
|
.map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id });
|
||||||
blocks_by_root_ids
|
blocks_by_root_ids
|
||||||
.chain(blobs_by_root_ids)
|
.chain(blobs_by_root_ids)
|
||||||
.chain(data_column_by_root_ids)
|
.chain(data_column_by_root_ids)
|
||||||
.chain(blocks_by_range_ids)
|
.chain(blocks_by_range_ids)
|
||||||
.chain(blobs_by_range_ids)
|
.chain(blobs_by_range_ids)
|
||||||
.chain(data_column_by_range_ids)
|
.chain(data_column_by_range_ids)
|
||||||
|
.chain(envelope_by_root_ids)
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -423,6 +434,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
blocks_by_range_requests,
|
blocks_by_range_requests,
|
||||||
blobs_by_range_requests,
|
blobs_by_range_requests,
|
||||||
data_columns_by_range_requests,
|
data_columns_by_range_requests,
|
||||||
|
payload_envelopes_by_root_requests,
|
||||||
// custody_by_root_requests is a meta request of data_columns_by_root_requests
|
// custody_by_root_requests is a meta request of data_columns_by_root_requests
|
||||||
custody_by_root_requests: _,
|
custody_by_root_requests: _,
|
||||||
// components_by_range_requests is a meta request of various _by_range requests
|
// components_by_range_requests is a meta request of various _by_range requests
|
||||||
@@ -445,6 +457,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
.chain(blocks_by_range_requests.iter_request_peers())
|
.chain(blocks_by_range_requests.iter_request_peers())
|
||||||
.chain(blobs_by_range_requests.iter_request_peers())
|
.chain(blobs_by_range_requests.iter_request_peers())
|
||||||
.chain(data_columns_by_range_requests.iter_request_peers())
|
.chain(data_columns_by_range_requests.iter_request_peers())
|
||||||
|
.chain(payload_envelopes_by_root_requests.iter_request_peers())
|
||||||
{
|
{
|
||||||
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
|
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
|
||||||
}
|
}
|
||||||
@@ -927,6 +940,74 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
Ok(LookupRequestResult::RequestSent(id.req_id))
|
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Request a payload envelope for `block_root` from a peer.
|
||||||
|
pub fn envelope_lookup_request(
|
||||||
|
&mut self,
|
||||||
|
lookup_id: SingleLookupId,
|
||||||
|
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||||
|
block_root: Hash256,
|
||||||
|
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||||
|
let active_request_count_by_peer = self.active_request_count_by_peer();
|
||||||
|
let Some(peer_id) = lookup_peers
|
||||||
|
.read()
|
||||||
|
.iter()
|
||||||
|
.map(|peer| {
|
||||||
|
(
|
||||||
|
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
|
||||||
|
rand::random::<u32>(),
|
||||||
|
peer,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.min()
|
||||||
|
.map(|(_, _, peer)| *peer)
|
||||||
|
else {
|
||||||
|
return Ok(LookupRequestResult::Pending("no peers"));
|
||||||
|
};
|
||||||
|
|
||||||
|
let id = SingleLookupReqId {
|
||||||
|
lookup_id,
|
||||||
|
req_id: self.next_id(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let request = PayloadEnvelopesByRootSingleRequest(block_root);
|
||||||
|
|
||||||
|
let network_request = RequestType::PayloadEnvelopesByRoot(
|
||||||
|
request
|
||||||
|
.into_request(&self.fork_context)
|
||||||
|
.map_err(RpcRequestSendError::InternalError)?,
|
||||||
|
);
|
||||||
|
self.network_send
|
||||||
|
.send(NetworkMessage::SendRequest {
|
||||||
|
peer_id,
|
||||||
|
request: network_request,
|
||||||
|
app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }),
|
||||||
|
})
|
||||||
|
.map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
method = "PayloadEnvelopesByRoot",
|
||||||
|
?block_root,
|
||||||
|
peer = %peer_id,
|
||||||
|
%id,
|
||||||
|
"Sync RPC request sent"
|
||||||
|
);
|
||||||
|
|
||||||
|
let request_span = debug_span!(
|
||||||
|
parent: Span::current(),
|
||||||
|
"lh_outgoing_envelope_by_root_request",
|
||||||
|
%block_root,
|
||||||
|
);
|
||||||
|
self.payload_envelopes_by_root_requests.insert(
|
||||||
|
id,
|
||||||
|
peer_id,
|
||||||
|
true,
|
||||||
|
PayloadEnvelopesByRootRequestItems::new(request),
|
||||||
|
request_span,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||||
|
}
|
||||||
|
|
||||||
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
||||||
/// - If we have a downloaded but not yet processed block
|
/// - If we have a downloaded but not yet processed block
|
||||||
/// - If the da_checker has a pending block
|
/// - If the da_checker has a pending block
|
||||||
@@ -1435,6 +1516,27 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
self.on_rpc_response_result(resp, peer_id)
|
self.on_rpc_response_result(resp, peer_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn on_single_envelope_response(
|
||||||
|
&mut self,
|
||||||
|
id: SingleLookupReqId,
|
||||||
|
peer_id: PeerId,
|
||||||
|
rpc_event: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
|
||||||
|
) -> Option<RpcResponseResult<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>> {
|
||||||
|
let resp = self
|
||||||
|
.payload_envelopes_by_root_requests
|
||||||
|
.on_response(id, rpc_event);
|
||||||
|
let resp = resp.map(|res| {
|
||||||
|
res.and_then(|(mut envelopes, seen_timestamp)| {
|
||||||
|
match envelopes.pop() {
|
||||||
|
Some(envelope) => Ok((envelope, seen_timestamp)),
|
||||||
|
// Should never happen, request items enforces at least 1 chunk.
|
||||||
|
None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
self.on_rpc_response_result(resp, peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn on_single_blob_response(
|
pub(crate) fn on_single_blob_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
id: SingleLookupReqId,
|
id: SingleLookupReqId,
|
||||||
@@ -1610,6 +1712,33 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_envelope_for_processing(
|
||||||
|
&self,
|
||||||
|
id: Id,
|
||||||
|
envelope: Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>,
|
||||||
|
seen_timestamp: Duration,
|
||||||
|
block_root: Hash256,
|
||||||
|
) -> Result<(), SendErrorProcessor> {
|
||||||
|
let beacon_processor = self
|
||||||
|
.beacon_processor_if_enabled()
|
||||||
|
.ok_or(SendErrorProcessor::ProcessorNotAvailable)?;
|
||||||
|
|
||||||
|
debug!(?block_root, ?id, "Sending payload envelope for processing");
|
||||||
|
beacon_processor
|
||||||
|
.send_rpc_payload_envelope(
|
||||||
|
envelope,
|
||||||
|
seen_timestamp,
|
||||||
|
BlockProcessType::SinglePayloadEnvelope { id, block_root },
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!(
|
||||||
|
error = ?e,
|
||||||
|
"Failed to send sync envelope to processor"
|
||||||
|
);
|
||||||
|
SendErrorProcessor::SendError
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn send_blobs_for_processing(
|
pub fn send_blobs_for_processing(
|
||||||
&self,
|
&self,
|
||||||
id: Id,
|
id: Id,
|
||||||
@@ -1788,6 +1917,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
"data_columns_by_range",
|
"data_columns_by_range",
|
||||||
self.data_columns_by_range_requests.len(),
|
self.data_columns_by_range_requests.len(),
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"payload_envelopes_by_root",
|
||||||
|
self.payload_envelopes_by_root_requests.len(),
|
||||||
|
),
|
||||||
("custody_by_root", self.custody_by_root_requests.len()),
|
("custody_by_root", self.custody_by_root_requests.len()),
|
||||||
(
|
(
|
||||||
"components_by_range",
|
"components_by_range",
|
||||||
|
|||||||
@@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems;
|
|||||||
pub use data_columns_by_root::{
|
pub use data_columns_by_root::{
|
||||||
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
|
DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest,
|
||||||
};
|
};
|
||||||
|
pub use payload_envelopes_by_root::{
|
||||||
|
PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
|
|
||||||
@@ -27,6 +30,7 @@ mod blocks_by_range;
|
|||||||
mod blocks_by_root;
|
mod blocks_by_root;
|
||||||
mod data_columns_by_range;
|
mod data_columns_by_range;
|
||||||
mod data_columns_by_root;
|
mod data_columns_by_root;
|
||||||
|
mod payload_envelopes_by_root;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
|
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
|
||||||
pub enum LookupVerifyError {
|
pub enum LookupVerifyError {
|
||||||
|
|||||||
@@ -0,0 +1,53 @@
|
|||||||
|
use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope};
|
||||||
|
|
||||||
|
use super::{ActiveRequestItems, LookupVerifyError};
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub struct PayloadEnvelopesByRootSingleRequest(pub Hash256);
|
||||||
|
|
||||||
|
impl PayloadEnvelopesByRootSingleRequest {
|
||||||
|
pub fn into_request(
|
||||||
|
self,
|
||||||
|
fork_context: &ForkContext,
|
||||||
|
) -> Result<PayloadEnvelopesByRootRequest, String> {
|
||||||
|
PayloadEnvelopesByRootRequest::new(vec![self.0], fork_context)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PayloadEnvelopesByRootRequestItems<E: EthSpec> {
|
||||||
|
request: PayloadEnvelopesByRootSingleRequest,
|
||||||
|
items: Vec<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: EthSpec> PayloadEnvelopesByRootRequestItems<E> {
|
||||||
|
pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self {
|
||||||
|
Self {
|
||||||
|
request,
|
||||||
|
items: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: EthSpec> ActiveRequestItems for PayloadEnvelopesByRootRequestItems<E> {
|
||||||
|
type Item = Arc<SignedExecutionPayloadEnvelope<E>>;
|
||||||
|
|
||||||
|
/// Append a response to the single chunk request. If the chunk is valid, the request is
|
||||||
|
/// resolved immediately.
|
||||||
|
/// The active request SHOULD be dropped after `add_response` returns an error
|
||||||
|
fn add(&mut self, envelope: Self::Item) -> Result<bool, LookupVerifyError> {
|
||||||
|
let beacon_block_root = envelope.beacon_block_root();
|
||||||
|
if self.request.0 != beacon_block_root {
|
||||||
|
return Err(LookupVerifyError::UnrequestedBlockRoot(beacon_block_root));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.items.push(envelope);
|
||||||
|
// Always returns true, payload envelopes by root expects a single response
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume(&mut self) -> Vec<Self::Item> {
|
||||||
|
std::mem::take(&mut self.items)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1507,6 +1507,11 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns whether the payload envelope has been received for the given block.
|
||||||
|
pub fn is_payload_received(&self, block_root: &Hash256) -> bool {
|
||||||
|
self.proto_array.is_payload_received(block_root)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns whether the proposer should extend the execution payload chain of the given block.
|
/// Returns whether the proposer should extend the execution payload chain of the given block.
|
||||||
pub fn should_extend_payload(&self, block_root: &Hash256) -> Result<bool, Error<T::Error>> {
|
pub fn should_extend_payload(&self, block_root: &Hash256) -> Result<bool, Error<T::Error>> {
|
||||||
let proposer_boost_root = self.fc_store.proposer_boost_root();
|
let proposer_boost_root = self.fc_store.proposer_boost_root();
|
||||||
|
|||||||
Reference in New Issue
Block a user