mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-24 00:08:27 +00:00
merge conflicts
This commit is contained in:
@@ -60,6 +60,7 @@ use crate::execution_payload::{
|
||||
};
|
||||
use crate::kzg_utils::blobs_to_data_column_sidecars;
|
||||
use crate::observed_block_producers::SeenBlock;
|
||||
use crate::payload_envelope_verification::EnvelopeError;
|
||||
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
|
||||
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
|
||||
use crate::{
|
||||
@@ -321,13 +322,18 @@ pub enum BlockError {
|
||||
bid_parent_root: Hash256,
|
||||
block_parent_root: Hash256,
|
||||
},
|
||||
/// The parent block is known but its execution payload envelope has not been received yet.
|
||||
/// The child block is known but its parent execution payload envelope has not been received yet.
|
||||
///
|
||||
/// ## Peer scoring
|
||||
///
|
||||
/// It's unclear if this block is valid, but it cannot be fully verified without the parent's
|
||||
/// execution payload envelope.
|
||||
ParentEnvelopeUnknown { parent_root: Hash256 },
|
||||
|
||||
PayloadEnvelopeError {
|
||||
e: Box<EnvelopeError>,
|
||||
penalize_peer: bool,
|
||||
},
|
||||
}
|
||||
|
||||
/// Which specific signature(s) are invalid in a SignedBeaconBlock
|
||||
@@ -494,6 +500,36 @@ impl From<ArithError> for BlockError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<EnvelopeError> for BlockError {
|
||||
fn from(e: EnvelopeError) -> Self {
|
||||
let penalize_peer = match &e {
|
||||
// REJECT per spec: peer sent invalid envelope data
|
||||
EnvelopeError::BadSignature
|
||||
| EnvelopeError::BuilderIndexMismatch { .. }
|
||||
| EnvelopeError::BlockHashMismatch { .. }
|
||||
| EnvelopeError::SlotMismatch { .. }
|
||||
| EnvelopeError::IncorrectBlockProposer { .. } => true,
|
||||
// IGNORE per spec: not the peer's fault
|
||||
EnvelopeError::BlockRootUnknown { .. }
|
||||
| EnvelopeError::PriorToFinalization { .. }
|
||||
| EnvelopeError::UnknownValidator { .. } => false,
|
||||
// Internal errors: not the peer's fault
|
||||
EnvelopeError::BeaconChainError(_)
|
||||
| EnvelopeError::BeaconStateError(_)
|
||||
| EnvelopeError::BlockProcessingError(_)
|
||||
| EnvelopeError::EnvelopeProcessingError(_)
|
||||
| EnvelopeError::ExecutionPayloadError(_)
|
||||
| EnvelopeError::BlockError(_)
|
||||
| EnvelopeError::InternalError(_)
|
||||
| EnvelopeError::OptimisticSyncNotSupported { .. } => false,
|
||||
};
|
||||
BlockError::PayloadEnvelopeError {
|
||||
e: Box::new(e),
|
||||
penalize_peer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores information about verifying a payload against an execution engine.
|
||||
#[derive(Debug, PartialEq, Clone, Encode, Decode)]
|
||||
pub struct PayloadVerificationOutcome {
|
||||
@@ -1978,8 +2014,9 @@ fn load_parent<T: BeaconChainTypes, B: AsBlock<T::EthSpec>>(
|
||||
} else if let Ok(parent_bid_block_hash) = parent_block.payload_bid_block_hash()
|
||||
&& block.as_block().is_parent_block_full(parent_bid_block_hash)
|
||||
{
|
||||
// Post-Gloas Full block case.
|
||||
// TODO(gloas): loading the envelope here is not very efficient
|
||||
// If the parent's execution payload envelope hasn't arrived yet,
|
||||
// return an unknown parent error so the block gets sent to the
|
||||
// reprocess queue.
|
||||
let envelope = chain
|
||||
.store
|
||||
.get_payload_envelope(&root)?
|
||||
|
||||
@@ -90,7 +90,6 @@ pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
.boxed()
|
||||
}
|
||||
/// Publishes a signed execution payload envelope to the network.
|
||||
/// TODO(gloas): Add gossip verification (BroadcastValidation::Gossip) before import.
|
||||
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
@@ -132,18 +131,21 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
};
|
||||
|
||||
let ctx = chain.gossip_verification_context();
|
||||
let Ok(gossip_verifed_envelope) = GossipVerifiedEnvelope::new(signed_envelope, &ctx) else {
|
||||
warn!(%slot, %beacon_block_root, "Execution payload envelope rejected");
|
||||
return Err(warp_utils::reject::custom_bad_request(
|
||||
"execution payload envelope rejected, gossip verification".to_string(),
|
||||
));
|
||||
let gossip_verified_envelope = match GossipVerifiedEnvelope::new(signed_envelope, &ctx) {
|
||||
Ok(envelope) => envelope,
|
||||
Err(e) => {
|
||||
warn!(%slot, %beacon_block_root, error = ?e, "Execution payload envelope rejected");
|
||||
return Err(warp_utils::reject::custom_bad_request(format!(
|
||||
"execution payload envelope rejected: {e:?}",
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
// Import the envelope locally (runs state transition and notifies the EL).
|
||||
chain
|
||||
.process_execution_payload_envelope(
|
||||
beacon_block_root,
|
||||
gossip_verifed_envelope,
|
||||
gossip_verified_envelope,
|
||||
NotifyExecutionLayer::Yes,
|
||||
BlockImportSource::HttpApi,
|
||||
publish_fn,
|
||||
|
||||
@@ -1390,7 +1390,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
return None;
|
||||
}
|
||||
// BlobNotRequired is unreachable. Only constructed in `process_gossip_blob`
|
||||
Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => {
|
||||
Err(e @ BlockError::InternalError(_))
|
||||
| Err(e @ BlockError::BlobNotRequired(_))
|
||||
| Err(e @ BlockError::PayloadEnvelopeError { .. }) => {
|
||||
error!(error = %e, "Internal block gossip validation error");
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -109,9 +109,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
);
|
||||
self.send_sync_message(SyncMessage::BlockComponentProcessed {
|
||||
process_type,
|
||||
result: BlockProcessingResult::Err(BlockError::InternalError(format!(
|
||||
"Envelope verification failed: {e:?}"
|
||||
))),
|
||||
result: BlockProcessingResult::Err(e.into()),
|
||||
});
|
||||
return;
|
||||
}
|
||||
@@ -138,9 +136,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
?beacon_block_root,
|
||||
"RPC payload envelope processing failed"
|
||||
);
|
||||
BlockProcessingResult::Err(BlockError::InternalError(format!(
|
||||
"Envelope processing failed: {e:?}"
|
||||
)))
|
||||
BlockProcessingResult::Err(e.into())
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -336,7 +336,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
// TODO(EIP-7732): implement outgoing payload envelopes by range responses once
|
||||
// range sync requests them.
|
||||
Response::PayloadEnvelopesByRange(_) => {
|
||||
unreachable!()
|
||||
error!(%peer_id, "Unexpected PayloadEnvelopesByRange response");
|
||||
}
|
||||
// Light client responses should not be received
|
||||
Response::LightClientBootstrap(_)
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::sync::block_lookups::single_block_lookup::{
|
||||
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
|
||||
};
|
||||
use crate::sync::block_lookups::{
|
||||
BlobRequestState, BlockRequestState, CustodyRequestState, PeerId,
|
||||
BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, PeerId,
|
||||
};
|
||||
use crate::sync::manager::BlockProcessType;
|
||||
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
|
||||
@@ -12,16 +12,17 @@ use parking_lot::RwLock;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::{DataColumnSidecarList, SignedBeaconBlock};
|
||||
use types::{DataColumnSidecarList, SignedBeaconBlock, SignedExecutionPayloadEnvelope};
|
||||
|
||||
use super::SingleLookupId;
|
||||
use super::single_block_lookup::{ComponentRequests, DownloadResult};
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum ResponseType {
|
||||
Block,
|
||||
Blob,
|
||||
CustodyColumn,
|
||||
Envelope,
|
||||
}
|
||||
|
||||
/// This trait unifies common single block lookup functionality across blocks and blobs. This
|
||||
@@ -151,6 +152,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
|
||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
|
||||
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
|
||||
ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"),
|
||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||
}
|
||||
}
|
||||
@@ -205,6 +207,7 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
||||
ComponentRequests::WaitingForBlock => Err("waiting for block"),
|
||||
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
|
||||
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
|
||||
ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"),
|
||||
ComponentRequests::NotNeeded { .. } => Err("not needed"),
|
||||
}
|
||||
}
|
||||
@@ -215,3 +218,52 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
|
||||
&mut self.state
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> RequestState<T> for EnvelopeRequestState<T::EthSpec> {
|
||||
type VerifiedResponseType = Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>;
|
||||
|
||||
fn make_request(
|
||||
&self,
|
||||
id: Id,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
_: usize,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> Result<LookupRequestResult, LookupRequestError> {
|
||||
cx.envelope_lookup_request(id, lookup_peers, self.block_root)
|
||||
.map_err(LookupRequestError::SendFailedNetwork)
|
||||
}
|
||||
|
||||
fn send_for_processing(
|
||||
id: Id,
|
||||
download_result: DownloadResult<Self::VerifiedResponseType>,
|
||||
cx: &SyncNetworkContext<T>,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let DownloadResult {
|
||||
value,
|
||||
block_root,
|
||||
seen_timestamp,
|
||||
..
|
||||
} = download_result;
|
||||
cx.send_envelope_for_processing(id, value, seen_timestamp, block_root)
|
||||
.map_err(LookupRequestError::SendFailedProcessor)
|
||||
}
|
||||
|
||||
fn response_type() -> ResponseType {
|
||||
ResponseType::Envelope
|
||||
}
|
||||
|
||||
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
|
||||
match &mut request.component_requests {
|
||||
ComponentRequests::ActiveEnvelopeRequest(request) => Ok(request),
|
||||
_ => Err("expecting envelope request"),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&self.state
|
||||
}
|
||||
|
||||
fn get_state_mut(&mut self) -> &mut SingleLookupRequestState<Self::VerifiedResponseType> {
|
||||
&mut self.state
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,9 @@
|
||||
|
||||
use self::parent_chain::{NodeChain, compute_parent_chains};
|
||||
pub use self::single_block_lookup::DownloadResult;
|
||||
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
|
||||
use self::single_block_lookup::{
|
||||
AwaitingParent, LookupRequestError, LookupResult, SingleBlockLookup,
|
||||
};
|
||||
use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE};
|
||||
use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
|
||||
use crate::metrics;
|
||||
@@ -39,7 +41,9 @@ use fnv::FnvHashMap;
|
||||
use lighthouse_network::service::api_types::SingleLookupReqId;
|
||||
use lighthouse_network::{PeerAction, PeerId};
|
||||
use lru_cache::LRUTimeCache;
|
||||
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
|
||||
pub use single_block_lookup::{
|
||||
BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState,
|
||||
};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -214,7 +218,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
self.new_current_lookup(
|
||||
block_root,
|
||||
Some(block_component),
|
||||
Some(parent_root),
|
||||
Some(AwaitingParent::Block(parent_root)),
|
||||
// On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required
|
||||
// to have the rest of the block components (refer to decoupled blob gossip). Create
|
||||
// the lookup with zero peers to house the block components.
|
||||
@@ -226,7 +230,37 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Seach a block whose parent root is unknown.
|
||||
/// A child block's parent envelope is missing. Create a child lookup (with the block component)
|
||||
/// that waits for the parent envelope, and an envelope-only lookup for the parent.
|
||||
///
|
||||
/// Returns true if both lookups are created or already exist.
|
||||
#[must_use = "only reference the new lookup if returns true"]
|
||||
pub fn search_child_and_parent_envelope(
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block_component: BlockComponent<T::EthSpec>,
|
||||
parent_root: Hash256,
|
||||
peer_id: PeerId,
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> bool {
|
||||
let envelope_lookup_exists =
|
||||
self.search_parent_envelope_of_child(parent_root, &[peer_id], cx);
|
||||
if envelope_lookup_exists {
|
||||
// Create child lookup that waits for the parent envelope.
|
||||
// The child block itself has already been seen, so we pass it as a component.
|
||||
self.new_current_lookup(
|
||||
block_root,
|
||||
Some(block_component),
|
||||
Some(AwaitingParent::Envelope(parent_root)),
|
||||
&[],
|
||||
cx,
|
||||
)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Search a block whose parent root is unknown.
|
||||
///
|
||||
/// Returns true if the lookup is created or already exists
|
||||
#[must_use = "only reference the new lookup if returns true"]
|
||||
@@ -344,6 +378,57 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
|
||||
}
|
||||
|
||||
/// A block triggers the search of a parent envelope.
|
||||
#[must_use = "only reference the new lookup if returns true"]
|
||||
pub fn search_parent_envelope_of_child(
|
||||
&mut self,
|
||||
parent_root: Hash256,
|
||||
peers: &[PeerId],
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> bool {
|
||||
// Check if there's already a lookup for this root (could be a block lookup or envelope
|
||||
// lookup). If so, add peers and let it handle the envelope.
|
||||
if let Some((&lookup_id, _lookup)) = self
|
||||
.single_block_lookups
|
||||
.iter_mut()
|
||||
.find(|(_, lookup)| lookup.is_for_block(parent_root))
|
||||
{
|
||||
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
|
||||
warn!(error = ?e, "Error adding peers to envelope lookup");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if self.single_block_lookups.len() >= MAX_LOOKUPS {
|
||||
warn!(?parent_root, "Dropping envelope lookup reached max");
|
||||
return false;
|
||||
}
|
||||
|
||||
let lookup = SingleBlockLookup::new_envelope_only(parent_root, peers, cx.next_id());
|
||||
let _guard = lookup.span.clone().entered();
|
||||
|
||||
let id = lookup.id;
|
||||
let lookup = match self.single_block_lookups.entry(id) {
|
||||
Entry::Vacant(entry) => entry.insert(lookup),
|
||||
Entry::Occupied(_) => {
|
||||
warn!(id, "Lookup exists with same id");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
?peers,
|
||||
?parent_root,
|
||||
id = lookup.id,
|
||||
"Created envelope-only lookup"
|
||||
);
|
||||
metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED);
|
||||
self.metrics.created_lookups += 1;
|
||||
|
||||
let result = lookup.continue_requests(cx);
|
||||
self.on_lookup_result(id, result, "new_envelope_lookup", cx)
|
||||
}
|
||||
|
||||
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
|
||||
/// constructed.
|
||||
/// Returns true if the lookup is created or already exists
|
||||
@@ -352,7 +437,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
&mut self,
|
||||
block_root: Hash256,
|
||||
block_component: Option<BlockComponent<T::EthSpec>>,
|
||||
awaiting_parent: Option<Hash256>,
|
||||
awaiting_parent: Option<AwaitingParent>,
|
||||
peers: &[PeerId],
|
||||
cx: &mut SyncNetworkContext<T>,
|
||||
) -> bool {
|
||||
@@ -387,13 +472,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
|
||||
// Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress
|
||||
if let Some(awaiting_parent) = awaiting_parent
|
||||
if let Some(AwaitingParent::Block(parent_root) | AwaitingParent::Envelope(parent_root)) =
|
||||
awaiting_parent
|
||||
&& !self
|
||||
.single_block_lookups
|
||||
.iter()
|
||||
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent))
|
||||
.any(|(_, lookup)| lookup.is_for_block(parent_root))
|
||||
{
|
||||
warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found");
|
||||
warn!(block_root = ?parent_root, "Ignoring child lookup parent lookup not found");
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -427,9 +513,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
debug!(
|
||||
?peers,
|
||||
?block_root,
|
||||
awaiting_parent = awaiting_parent
|
||||
.map(|root| root.to_string())
|
||||
.unwrap_or("none".to_owned()),
|
||||
?awaiting_parent,
|
||||
id = lookup.id,
|
||||
"Created block lookup"
|
||||
);
|
||||
@@ -561,17 +645,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
|
||||
}
|
||||
BlockProcessType::SinglePayloadEnvelope { id, block_root } => {
|
||||
match result {
|
||||
BlockProcessingResult::Ok(_) => {
|
||||
self.continue_envelope_child_lookups(block_root, cx);
|
||||
}
|
||||
BlockProcessingResult::Err(e) => {
|
||||
debug!(%id, error = ?e, "Payload envelope processing failed");
|
||||
// TODO(EIP-7732): resolve awaiting_envelope on affected lookups so they can retry
|
||||
}
|
||||
_ => {}
|
||||
let result = self
|
||||
.on_processing_result_inner::<EnvelopeRequestState<T::EthSpec>>(id, result, cx);
|
||||
// On successful envelope import, unblock child lookups waiting for this envelope
|
||||
if matches!(&result, Ok(LookupResult::Completed)) {
|
||||
self.continue_envelope_child_lookups(block_root, cx);
|
||||
}
|
||||
return;
|
||||
result
|
||||
}
|
||||
};
|
||||
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
|
||||
@@ -687,6 +767,26 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
// We opt to drop the lookup instead.
|
||||
Action::Drop(format!("{e:?}"))
|
||||
}
|
||||
BlockError::PayloadEnvelopeError { e, penalize_peer } => {
|
||||
debug!(
|
||||
?block_root,
|
||||
error = ?e,
|
||||
"Payload envelope processing error"
|
||||
);
|
||||
if penalize_peer {
|
||||
let peer_group = request_state.on_processing_failure()?;
|
||||
for peer in peer_group.all() {
|
||||
cx.report_peer(
|
||||
*peer,
|
||||
PeerAction::MidToleranceError,
|
||||
"lookup_envelope_processing_failure",
|
||||
);
|
||||
}
|
||||
Action::Retry
|
||||
} else {
|
||||
Action::Drop(format!("{e:?}"))
|
||||
}
|
||||
}
|
||||
other => {
|
||||
debug!(
|
||||
?block_root,
|
||||
@@ -721,6 +821,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
ResponseType::CustodyColumn => {
|
||||
"lookup_custody_column_processing_failure"
|
||||
}
|
||||
ResponseType::Envelope => "lookup_envelope_processing_failure",
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -764,22 +865,21 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
Action::ParentEnvelopeUnknown { parent_root } => {
|
||||
let peers = lookup.all_peers();
|
||||
lookup.set_awaiting_envelope(parent_root);
|
||||
// Pick a peer to request the envelope from
|
||||
let peer_id = peers.first().copied().ok_or_else(|| {
|
||||
LookupRequestError::Failed("No peers available for envelope request".to_owned())
|
||||
})?;
|
||||
match cx.envelope_lookup_request(lookup_id, peer_id, parent_root) {
|
||||
Ok(_) => {
|
||||
debug!(
|
||||
id = lookup_id,
|
||||
?block_root,
|
||||
?parent_root,
|
||||
"Requesting missing parent envelope"
|
||||
);
|
||||
Ok(LookupResult::Pending)
|
||||
}
|
||||
Err(e) => Err(LookupRequestError::SendFailedNetwork(e)),
|
||||
lookup.set_awaiting_parent_envelope(parent_root);
|
||||
let envelope_lookup_exists =
|
||||
self.search_parent_envelope_of_child(parent_root, &peers, cx);
|
||||
if envelope_lookup_exists {
|
||||
debug!(
|
||||
id = lookup_id,
|
||||
?block_root,
|
||||
?parent_root,
|
||||
"Marking lookup as awaiting parent envelope"
|
||||
);
|
||||
Ok(LookupResult::Pending)
|
||||
} else {
|
||||
Err(LookupRequestError::Failed(format!(
|
||||
"Envelope lookup could not be created for {parent_root:?}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
Action::Drop(reason) => {
|
||||
@@ -831,7 +931,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
|
||||
|
||||
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
||||
if lookup.awaiting_parent() == Some(block_root) {
|
||||
if lookup.awaiting_parent_block() == Some(block_root) {
|
||||
lookup.resolve_awaiting_parent();
|
||||
debug!(
|
||||
parent_root = ?block_root,
|
||||
@@ -858,8 +958,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
let mut lookup_results = vec![];
|
||||
|
||||
for (id, lookup) in self.single_block_lookups.iter_mut() {
|
||||
if lookup.awaiting_envelope() == Some(block_root) {
|
||||
lookup.resolve_awaiting_envelope();
|
||||
if lookup.awaiting_parent_envelope() == Some(block_root) {
|
||||
lookup.resolve_awaiting_parent();
|
||||
debug!(
|
||||
envelope_root = ?block_root,
|
||||
id,
|
||||
@@ -891,10 +991,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[reason]);
|
||||
self.metrics.dropped_lookups += 1;
|
||||
|
||||
let dropped_root = dropped_lookup.block_root();
|
||||
let child_lookups = self
|
||||
.single_block_lookups
|
||||
.iter()
|
||||
.filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root()))
|
||||
.filter(|(_, lookup)| {
|
||||
lookup.awaiting_parent_block() == Some(dropped_root)
|
||||
|| lookup.awaiting_parent_envelope() == Some(dropped_root)
|
||||
})
|
||||
.map(|(id, _)| *id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -1062,17 +1166,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
&'a self,
|
||||
lookup: &'a SingleBlockLookup<T>,
|
||||
) -> Result<&'a SingleBlockLookup<T>, String> {
|
||||
if let Some(awaiting_parent) = lookup.awaiting_parent() {
|
||||
if let Some(parent_root) = lookup.awaiting_parent_block() {
|
||||
if let Some(lookup) = self
|
||||
.single_block_lookups
|
||||
.values()
|
||||
.find(|l| l.block_root() == awaiting_parent)
|
||||
.find(|l| l.block_root() == parent_root)
|
||||
{
|
||||
self.find_oldest_ancestor_lookup(lookup)
|
||||
} else {
|
||||
Err(format!(
|
||||
"Lookup references unknown parent {awaiting_parent:?}"
|
||||
))
|
||||
Err(format!("Lookup references unknown parent {parent_root:?}"))
|
||||
}
|
||||
} else {
|
||||
Ok(lookup)
|
||||
@@ -1105,7 +1207,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(parent_root) = lookup.awaiting_parent() {
|
||||
if let Some(parent_root) = lookup.awaiting_parent_block() {
|
||||
if let Some((&child_id, _)) = self
|
||||
.single_block_lookups
|
||||
.iter()
|
||||
|
||||
@@ -13,7 +13,7 @@ impl<T: BeaconChainTypes> From<&SingleBlockLookup<T>> for Node {
|
||||
fn from(value: &SingleBlockLookup<T>) -> Self {
|
||||
Self {
|
||||
block_root: value.block_root(),
|
||||
parent_root: value.awaiting_parent(),
|
||||
parent_root: value.awaiting_parent_block(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,9 @@ use store::Hash256;
|
||||
use strum::IntoStaticStr;
|
||||
use tracing::{Span, debug_span};
|
||||
use types::data::FixedBlobSidecarList;
|
||||
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
|
||||
use types::{
|
||||
DataColumnSidecarList, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
|
||||
};
|
||||
|
||||
// Dedicated enum for LookupResult to force its usage
|
||||
#[must_use = "LookupResult must be handled with on_lookup_result"]
|
||||
@@ -56,6 +58,14 @@ pub enum LookupRequestError {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum AwaitingParent {
|
||||
/// Waiting for the parent block to be imported.
|
||||
Block(Hash256),
|
||||
/// The parent block is imported but its execution payload envelope is missing.
|
||||
Envelope(Hash256),
|
||||
}
|
||||
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug(bound(T: BeaconChainTypes)))]
|
||||
pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
@@ -69,8 +79,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
|
||||
#[educe(Debug(method(fmt_peer_set_as_len)))]
|
||||
peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
block_root: Hash256,
|
||||
awaiting_parent: Option<Hash256>,
|
||||
awaiting_envelope: Option<Hash256>,
|
||||
awaiting_parent: Option<AwaitingParent>,
|
||||
created: Instant,
|
||||
pub(crate) span: Span,
|
||||
}
|
||||
@@ -80,6 +89,7 @@ pub(crate) enum ComponentRequests<E: EthSpec> {
|
||||
WaitingForBlock,
|
||||
ActiveBlobRequest(BlobRequestState<E>, usize),
|
||||
ActiveCustodyRequest(CustodyRequestState<E>),
|
||||
ActiveEnvelopeRequest(EnvelopeRequestState<E>),
|
||||
// When printing in debug this state display the reason why it's not needed
|
||||
#[allow(dead_code)]
|
||||
NotNeeded(&'static str),
|
||||
@@ -90,7 +100,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
requested_block_root: Hash256,
|
||||
peers: &[PeerId],
|
||||
id: Id,
|
||||
awaiting_parent: Option<Hash256>,
|
||||
awaiting_parent: Option<AwaitingParent>,
|
||||
) -> Self {
|
||||
let lookup_span = debug_span!(
|
||||
"lh_single_block_lookup",
|
||||
@@ -105,16 +115,38 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))),
|
||||
block_root: requested_block_root,
|
||||
awaiting_parent,
|
||||
awaiting_envelope: None,
|
||||
created: Instant::now(),
|
||||
span: lookup_span,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create an envelope-only lookup. The block is already imported, we just need the envelope.
|
||||
pub fn new_envelope_only(block_root: Hash256, peers: &[PeerId], id: Id) -> Self {
|
||||
let mut lookup = Self::new(block_root, peers, id, None);
|
||||
// Block is already imported, mark as completed
|
||||
lookup
|
||||
.block_request_state
|
||||
.state
|
||||
.on_completed_request("block already imported")
|
||||
.expect("block state starts as AwaitingDownload");
|
||||
lookup.component_requests =
|
||||
ComponentRequests::ActiveEnvelopeRequest(EnvelopeRequestState::new(block_root));
|
||||
lookup
|
||||
}
|
||||
|
||||
/// Reset the status of all internal requests
|
||||
pub fn reset_requests(&mut self) {
|
||||
self.block_request_state = BlockRequestState::new(self.block_root);
|
||||
self.component_requests = ComponentRequests::WaitingForBlock;
|
||||
match &self.component_requests {
|
||||
ComponentRequests::ActiveEnvelopeRequest(_) => {
|
||||
self.component_requests = ComponentRequests::ActiveEnvelopeRequest(
|
||||
EnvelopeRequestState::new(self.block_root),
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
self.component_requests = ComponentRequests::WaitingForBlock;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
|
||||
@@ -130,34 +162,39 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
self.block_root
|
||||
}
|
||||
|
||||
pub fn awaiting_parent(&self) -> Option<Hash256> {
|
||||
pub fn awaiting_parent(&self) -> Option<AwaitingParent> {
|
||||
self.awaiting_parent
|
||||
}
|
||||
|
||||
/// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send
|
||||
/// components for processing.
|
||||
/// Returns the parent root if awaiting a parent block.
|
||||
pub fn awaiting_parent_block(&self) -> Option<Hash256> {
|
||||
match self.awaiting_parent {
|
||||
Some(AwaitingParent::Block(root)) => Some(root),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the parent root if awaiting a parent envelope.
|
||||
pub fn awaiting_parent_envelope(&self) -> Option<Hash256> {
|
||||
match self.awaiting_parent {
|
||||
Some(AwaitingParent::Envelope(root)) => Some(root),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark this lookup as awaiting a parent block to be imported before processing.
|
||||
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
|
||||
self.awaiting_parent = Some(parent_root)
|
||||
}
|
||||
|
||||
/// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for
|
||||
/// processing.
|
||||
pub fn resolve_awaiting_parent(&mut self) {
|
||||
self.awaiting_parent = None;
|
||||
}
|
||||
|
||||
pub fn awaiting_envelope(&self) -> Option<Hash256> {
|
||||
self.awaiting_envelope
|
||||
self.awaiting_parent = Some(AwaitingParent::Block(parent_root));
|
||||
}
|
||||
|
||||
/// Mark this lookup as awaiting a parent envelope to be imported before processing.
|
||||
pub fn set_awaiting_envelope(&mut self, parent_root: Hash256) {
|
||||
self.awaiting_envelope = Some(parent_root);
|
||||
pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) {
|
||||
self.awaiting_parent = Some(AwaitingParent::Envelope(parent_root));
|
||||
}
|
||||
|
||||
/// Mark this lookup as no longer awaiting a parent envelope.
|
||||
pub fn resolve_awaiting_envelope(&mut self) {
|
||||
self.awaiting_envelope = None;
|
||||
/// Mark this lookup as no longer awaiting any parent.
|
||||
pub fn resolve_awaiting_parent(&mut self) {
|
||||
self.awaiting_parent = None;
|
||||
}
|
||||
|
||||
/// Returns the time elapsed since this lookup was created
|
||||
@@ -194,6 +231,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
ComponentRequests::WaitingForBlock => false,
|
||||
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
|
||||
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
|
||||
ComponentRequests::ActiveEnvelopeRequest(request) => request.state.is_processed(),
|
||||
ComponentRequests::NotNeeded { .. } => true,
|
||||
}
|
||||
}
|
||||
@@ -201,7 +239,6 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
/// Returns true if this request is expecting some event to make progress
|
||||
pub fn is_awaiting_event(&self) -> bool {
|
||||
self.awaiting_parent.is_some()
|
||||
|| self.awaiting_envelope.is_some()
|
||||
|| self.block_request_state.state.is_awaiting_event()
|
||||
|| match &self.component_requests {
|
||||
// If components are waiting for the block request to complete, here we should
|
||||
@@ -214,6 +251,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
ComponentRequests::ActiveCustodyRequest(request) => {
|
||||
request.state.is_awaiting_event()
|
||||
}
|
||||
ComponentRequests::ActiveEnvelopeRequest(request) => {
|
||||
request.state.is_awaiting_event()
|
||||
}
|
||||
ComponentRequests::NotNeeded { .. } => false,
|
||||
}
|
||||
}
|
||||
@@ -283,6 +323,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
ComponentRequests::ActiveCustodyRequest(_) => {
|
||||
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
|
||||
}
|
||||
ComponentRequests::ActiveEnvelopeRequest(_) => {
|
||||
self.continue_request::<EnvelopeRequestState<T::EthSpec>>(cx, 0)?
|
||||
}
|
||||
ComponentRequests::NotNeeded { .. } => {} // do nothing
|
||||
}
|
||||
|
||||
@@ -304,7 +347,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
|
||||
expected_blobs: usize,
|
||||
) -> Result<(), LookupRequestError> {
|
||||
let id = self.id;
|
||||
let awaiting_event = self.awaiting_parent.is_some() || self.awaiting_envelope.is_some();
|
||||
let awaiting_event = self.awaiting_parent.is_some();
|
||||
let request =
|
||||
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;
|
||||
|
||||
@@ -444,6 +487,26 @@ impl<E: EthSpec> BlockRequestState<E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of the envelope request component of a `SingleBlockLookup`.
|
||||
/// Used for envelope-only lookups where the parent block is already imported
|
||||
/// but its execution payload envelope is missing.
|
||||
#[derive(Educe)]
|
||||
#[educe(Debug)]
|
||||
pub struct EnvelopeRequestState<E: EthSpec> {
|
||||
#[educe(Debug(ignore))]
|
||||
pub block_root: Hash256,
|
||||
pub state: SingleLookupRequestState<Arc<SignedExecutionPayloadEnvelope<E>>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> EnvelopeRequestState<E> {
|
||||
pub fn new(block_root: Hash256) -> Self {
|
||||
Self {
|
||||
block_root,
|
||||
state: SingleLookupRequestState::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DownloadResult<T: Clone> {
|
||||
pub value: T,
|
||||
|
||||
@@ -45,6 +45,7 @@ use crate::service::NetworkMessage;
|
||||
use crate::status::ToStatusMessage;
|
||||
use crate::sync::block_lookups::{
|
||||
BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult,
|
||||
EnvelopeRequestState,
|
||||
};
|
||||
use crate::sync::custody_backfill_sync::CustodyBackFillSync;
|
||||
use crate::sync::network_context::{PeerGroup, RpcResponseResult};
|
||||
@@ -934,9 +935,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
debug!(
|
||||
%block_root,
|
||||
%parent_root,
|
||||
"Parent envelope not yet available, creating lookup"
|
||||
"Parent envelope not yet available, creating envelope lookup"
|
||||
);
|
||||
self.handle_unknown_parent(
|
||||
self.handle_unknown_parent_envelope(
|
||||
peer_id,
|
||||
block_root,
|
||||
parent_root,
|
||||
@@ -1054,6 +1055,40 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a block whose parent block is known but parent envelope is missing.
|
||||
/// Creates an envelope-only lookup for the parent and a child lookup that waits for it.
|
||||
fn handle_unknown_parent_envelope(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
block_root: Hash256,
|
||||
parent_root: Hash256,
|
||||
slot: Slot,
|
||||
block_component: BlockComponent<T::EthSpec>,
|
||||
) {
|
||||
match self.should_search_for_block(Some(slot), &peer_id) {
|
||||
Ok(_) => {
|
||||
if self.block_lookups.search_child_and_parent_envelope(
|
||||
block_root,
|
||||
block_component,
|
||||
parent_root,
|
||||
peer_id,
|
||||
&mut self.network,
|
||||
) {
|
||||
// Lookups created
|
||||
} else {
|
||||
debug!(
|
||||
?block_root,
|
||||
?parent_root,
|
||||
"No lookup created for child and parent envelope"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(reason) => {
|
||||
debug!(%block_root, %parent_root, reason, "Ignoring unknown parent envelope request");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_unknown_block_root(&mut self, peer_id: PeerId, block_root: Hash256) {
|
||||
match self.should_search_for_block(None, &peer_id) {
|
||||
Ok(_) => {
|
||||
@@ -1278,27 +1313,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|
||||
.network
|
||||
.on_single_envelope_response(id, peer_id, rpc_event)
|
||||
{
|
||||
match resp {
|
||||
Ok((envelope, seen_timestamp)) => {
|
||||
let block_root = envelope.beacon_block_root();
|
||||
debug!(
|
||||
?block_root,
|
||||
%id,
|
||||
"Downloaded payload envelope, sending for processing"
|
||||
);
|
||||
if let Err(e) = self.network.send_envelope_for_processing(
|
||||
id.req_id,
|
||||
envelope,
|
||||
seen_timestamp,
|
||||
block_root,
|
||||
) {
|
||||
error!(error = ?e, "Failed to send envelope for processing");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(error = ?e, %id, "Payload envelope download failed");
|
||||
}
|
||||
}
|
||||
self.block_lookups
|
||||
.on_download_response::<EnvelopeRequestState<T::EthSpec>>(
|
||||
id,
|
||||
resp.map(|(value, seen_timestamp)| {
|
||||
(value, PeerGroup::from_single(peer_id), seen_timestamp)
|
||||
}),
|
||||
&mut self.network,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -944,9 +944,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
pub fn envelope_lookup_request(
|
||||
&mut self,
|
||||
lookup_id: SingleLookupId,
|
||||
peer_id: PeerId,
|
||||
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
|
||||
block_root: Hash256,
|
||||
) -> Result<Id, RpcRequestSendError> {
|
||||
) -> Result<LookupRequestResult, RpcRequestSendError> {
|
||||
let active_request_count_by_peer = self.active_request_count_by_peer();
|
||||
let Some(peer_id) = lookup_peers
|
||||
.read()
|
||||
.iter()
|
||||
.map(|peer| {
|
||||
(
|
||||
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
|
||||
rand::random::<u32>(),
|
||||
peer,
|
||||
)
|
||||
})
|
||||
.min()
|
||||
.map(|(_, _, peer)| *peer)
|
||||
else {
|
||||
return Ok(LookupRequestResult::Pending("no peers"));
|
||||
};
|
||||
|
||||
let id = SingleLookupReqId {
|
||||
lookup_id,
|
||||
req_id: self.next_id(),
|
||||
@@ -988,7 +1005,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
request_span,
|
||||
);
|
||||
|
||||
Ok(id.req_id)
|
||||
Ok(LookupRequestResult::RequestSent(id.req_id))
|
||||
}
|
||||
|
||||
/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
|
||||
@@ -1900,6 +1917,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
||||
"data_columns_by_range",
|
||||
self.data_columns_by_range_requests.len(),
|
||||
),
|
||||
(
|
||||
"payload_envelopes_by_root",
|
||||
self.payload_envelopes_by_root_requests.len(),
|
||||
),
|
||||
("custody_by_root", self.custody_by_root_requests.len()),
|
||||
(
|
||||
"components_by_range",
|
||||
|
||||
Reference in New Issue
Block a user