This commit is contained in:
mergify[bot]
2026-06-10 01:13:23 +00:00
committed by GitHub
19 changed files with 1075 additions and 193 deletions

View File

@@ -30,17 +30,15 @@ TEST_FEATURES ?=
# Cargo profile for regular builds.
PROFILE ?= release
# List of all hard forks up to gloas. This list is used to set env variables for several tests so that
# they run for different forks.
# TODO(EIP-7732) Remove this once we extend network tests to support gloas and use RECENT_FORKS instead
# List of recent hard forks before Gloas. Used by tests that do not support Gloas yet.
RECENT_FORKS_BEFORE_GLOAS=fulu
# List of all recent hard forks. This list is used to set env variables for http_api tests
# List of all recent hard forks. This list is used to set env variables for several tests.
# Include phase0 to test the code paths in sync that are pre blobs
RECENT_FORKS=fulu gloas
# For network tests include phase0 to cover genesis syncing (blocks without blobs or columns)
TEST_NETWORK_FORKS=phase0 $(RECENT_FORKS_BEFORE_GLOAS)
TEST_NETWORK_FORKS=phase0 $(RECENT_FORKS)
# Extra flags for Cargo
CARGO_INSTALL_EXTRA_FLAGS?=
@@ -228,7 +226,6 @@ test-op-pool-%:
-p operation_pool
# Run the tests in the `network` crate for all known forks.
# TODO(EIP-7732) Extend to support gloas by using RECENT_FORKS instead
test-network: $(patsubst %,test-network-%,$(TEST_NETWORK_FORKS))
test-network-%:

View File

@@ -3397,6 +3397,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
{
return Err(BlockError::ParentUnknown {
parent_root: blob.block_parent_root(),
parent_block_hash: None,
});
}
}
@@ -3523,7 +3524,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&parent_root)
{
return Err(BlockError::ParentUnknown { parent_root });
return Err(BlockError::ParentUnknown {
parent_root,
parent_block_hash: None,
});
}
self.emit_sse_data_column_sidecar_events(

View File

@@ -96,6 +96,7 @@ use store::{Error as DBError, KeyValueStore};
use strum::{AsRefStr, IntoStaticStr};
use task_executor::JoinHandle;
use tracing::{Instrument, Span, debug, debug_span, error, info_span, instrument};
use types::ExecutionBlockHash;
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecarList,
Epoch, EthSpec, FullPayload, Hash256, InconsistentFork, KzgProofs, RelativeEpoch,
@@ -125,6 +126,7 @@ pub enum BlockError {
/// its parent.
ParentUnknown {
parent_root: Hash256,
parent_block_hash: Option<ExecutionBlockHash>,
},
/// The block slot is greater than the present slot.
///
@@ -1446,6 +1448,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
ParentImportStatus::UnknownBlock | ParentImportStatus::UnknownPayload => {
return Err(BlockError::ParentUnknown {
parent_root: block.parent_root(),
parent_block_hash: block.as_block().payload_bid_parent_block_hash().ok(),
});
}
}
@@ -1821,6 +1824,7 @@ pub fn check_block_is_finalized_checkpoint_or_descendant<
} else {
Err(BlockError::ParentUnknown {
parent_root: block.parent_root(),
parent_block_hash: block.as_block().payload_bid_parent_block_hash().ok(),
})
}
}
@@ -1915,6 +1919,7 @@ fn verify_parent_block_and_envelope_are_known<T: BeaconChainTypes>(
ParentImportStatus::UnknownBlock | ParentImportStatus::UnknownPayload => {
Err(BlockError::ParentUnknown {
parent_root: block.parent_root(),
parent_block_hash: block.payload_bid_parent_block_hash().ok(),
})
}
}
@@ -1947,6 +1952,7 @@ fn load_parent<T: BeaconChainTypes, B: AsBlock<T::EthSpec>>(
{
return Err(BlockError::ParentUnknown {
parent_root: block.parent_root(),
parent_block_hash: block.as_block().payload_bid_parent_block_hash().ok(),
});
}

View File

@@ -1358,7 +1358,7 @@ async fn block_gossip_verification() {
assert!(
matches!(
unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await),
BlockError::ParentUnknown {parent_root: p}
BlockError::ParentUnknown {parent_root: p, ..}
if p == parent_root
),
"should not import a block for an unknown parent"

View File

@@ -730,7 +730,7 @@ pub fn rpc_data_column_limits<E: EthSpec>(
if fork_name.gloas_enabled() {
RpcLimits::new(
DataColumnSidecarGloas::<E>::min_size(),
DataColumnSidecarGloas::<E>::max_size(
DataColumnSidecarFulu::<E>::max_size(
spec.max_blobs_per_block(current_digest_epoch) as usize
),
)

View File

@@ -732,6 +732,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
%unknown_block_root,
"Unknown block root for column"
);
// Data columns are only propagated once the block has been seen for both Fulu
// and Gloas. `UnknownBlockHashFromAttestation` declares that `peer_id` has
// imported `unknown_block_root`.
self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation(
peer_id,
unknown_block_root,
));
self.propagate_validation_result(
message_id.clone(),
peer_id,
@@ -1076,10 +1083,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
%unknown_block_root,
"Unknown block root for partial column"
);
// TODO(gloas): wire this into proper lookup sync. Sending
// `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that
// mixes column processing with the attestation lookup path and is not
// the right primitive for Gloas column lookups.
// Data columns are only propagated once the block has been seen for both Fulu
// and Gloas. `UnknownBlockHashFromAttestation` declares that `peer_id` has
// imported `unknown_block_root`.
self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation(
peer_id,
unknown_block_root,
@@ -2714,14 +2720,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if allow_reprocess {
// We don't know the block, get the sync manager to handle the block lookup, and
// send the attestation to be scheduled for re-processing.
self.sync_tx
.send(SyncMessage::UnknownBlockHashFromAttestation(
peer_id,
*beacon_block_root,
))
.unwrap_or_else(|_| {
warn!(msg = "UnknownBlockHash", "Failed to send to sync service")
});
self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation(
peer_id,
*beacon_block_root,
));
let msg = match failed_att {
FailedAtt::Aggregate {
attestation,
@@ -3994,13 +3996,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
| PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
}
PayloadAttestationError::UnknownHeadBlock { .. } => {
PayloadAttestationError::UnknownHeadBlock { beacon_block_root } => {
debug!(
%peer_id,
%message_slot,
"Payload attestation references unknown block"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation(
peer_id,
*beacon_block_root,
))
}
PayloadAttestationError::NotInPTC { .. } => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);

View File

@@ -28,7 +28,7 @@ use logging::crit;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, debug_span, error, info, instrument, warn};
use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256};
use types::{BlockImportSource, DataColumnSidecarList, Epoch, ExecutionBlockHash, Hash256};
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
@@ -962,13 +962,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// The classified outcome of submitting a block / blob / column for processing, ready for the
/// lookup state machine to act on without re-inspecting `BlockError`.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum BlockProcessingResult {
/// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the
/// lookup must keep fetching). `info` is a stable label for logs / metrics.
Imported(bool, &'static str),
ParentUnknown {
parent_root: Hash256,
parent_block_hash: Option<ExecutionBlockHash>,
},
/// Processing failed. `penalty` is `Some` when an attributable peer should be downscored;
/// the third tuple element is the `report_peer` telemetry msg. `reason` is for logs only.
@@ -1000,9 +1001,13 @@ impl From<Result<AvailabilityProcessingStatus, BlockError>> for BlockProcessingR
return Self::Imported(true, "duplicate");
}
BlockError::GenesisBlock => return Self::Imported(true, "genesis"),
BlockError::ParentUnknown { parent_root, .. } => {
BlockError::ParentUnknown {
parent_root,
parent_block_hash,
} => {
return Self::ParentUnknown {
parent_root: *parent_root,
parent_block_hash: *parent_block_hash,
};
}
BlockError::BeaconChainError(_) | BlockError::InternalError(_) => None,

View File

@@ -907,7 +907,10 @@ async fn data_column_reconstruction_at_slot_start() {
// reconstruction deadline.
#[tokio::test]
async fn data_column_reconstruction_at_deadline() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
let spec = test_spec::<E>();
// Pre-Gloas data-column path: a Gloas block carries its columns in the payload envelope, so the
// harness produces no block-level data columns and this gossip/reconstruction flow doesn't apply.
if spec.fulu_fork_epoch.is_none() || spec.gloas_fork_epoch.is_some() {
return;
};
@@ -1094,7 +1097,11 @@ async fn import_gossip_block_unacceptably_early() {
/// Data columns that have already been processed but unobserved should be propagated without re-importing.
#[tokio::test]
async fn accept_processed_gossip_data_columns_without_import() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
let spec = test_spec::<E>();
// Pre-Gloas data-column path: a Gloas block carries its columns in the payload envelope, so the
// harness produces no block-level data columns and this gossip flow doesn't apply.
// TODO(gloas): re-enable this test
if spec.fulu_fork_epoch.is_none() || spec.gloas_fork_epoch.is_some() {
return;
};
@@ -1983,6 +1990,11 @@ async fn test_payload_envelopes_by_range() {
// Manually store payload envelopes for each block in the range
let mut expected_roots = Vec::new();
for slot in start_slot..slot_count {
// Genesis (slot 0) has no canonical execution payload, so the by-range handler filters it
// out via `block_has_canonical_payload` even if an envelope is stored for it.
if slot == 0 {
continue;
}
if let Some(root) = rig
.chain
.block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None)
@@ -2076,14 +2088,10 @@ async fn test_payload_envelopes_by_root_unknown_root_returns_empty() {
let mut rig = TestRig::new(64).await;
// Request envelope for a root that has no stored envelope
let block_root = rig
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();
// Use a root with no block: the harness persists an envelope for every block it produces, so a
// real block root would already have one. An unknown root has no stored envelope.
let block_root = Hash256::repeat_byte(0xaa);
// Don't store any envelope — the handler should return 0 envelopes
let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap();
rig.enqueue_payload_envelopes_by_root_request(roots);

View File

@@ -1229,7 +1229,7 @@ mod tests {
fn request_batches_should_not_loop_infinitely() {
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.deterministic_keypairs(4)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.build();

View File

@@ -22,13 +22,16 @@
use self::parent_chain::{NodeChain, compute_parent_chains};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use self::single_block_lookup::{LookupRequestError, PeerType, SingleBlockLookup};
use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE};
use super::network_context::{RpcResponseError, SyncNetworkContext};
use crate::metrics;
use crate::network_beacon_processor::BlockProcessingResult;
use crate::sync::SyncMessage;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::block_lookups::single_block_lookup::{
AwaitingParent, ImportedParent, LookupResult,
};
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
@@ -39,7 +42,10 @@ use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
use tracing::{debug, error, warn};
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};
use types::{
DataColumnSidecarList, EthSpec, ExecutionBlockHash, SignedBeaconBlock,
SignedExecutionPayloadEnvelope,
};
pub mod parent_chain;
mod single_block_lookup;
@@ -73,6 +79,8 @@ const MAX_LOOKUPS: usize = 200;
type BlockDownloadResponse<E> = Result<DownloadResult<Arc<SignedBeaconBlock<E>>>, RpcResponseError>;
type CustodyDownloadResponse<E> =
Result<DownloadResult<DataColumnSidecarList<E>>, RpcResponseError>;
type PayloadDownloadResponse<E> =
Result<DownloadResult<Arc<SignedExecutionPayloadEnvelope<E>>>, RpcResponseError>;
pub enum BlockComponent<E: EthSpec> {
Block(DownloadResult<Arc<SignedBeaconBlock<E>>>),
@@ -169,22 +177,29 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
block_root: Hash256,
block_component: BlockComponent<T::EthSpec>,
parent_root: Hash256,
parent_block_hash: Option<ExecutionBlockHash>,
peer_id: PeerId,
cx: &mut SyncNetworkContext<T>,
) -> bool {
let parent_lookup_exists =
self.search_parent_of_child(parent_root, block_root, &[peer_id], cx);
let parent_lookup_exists = self.search_parent_of_child(
parent_root,
&PeerType::new(parent_block_hash),
block_root,
&[peer_id],
cx,
);
// Only create the child lookup if the parent exists
if parent_lookup_exists {
// `search_parent_of_child` ensures that the parent lookup exists so we can safely wait for it
self.new_current_lookup(
block_root,
Some(block_component),
Some(parent_root),
Some(AwaitingParent::new(parent_root, parent_block_hash)),
// On a `UnknownParentBlock` or `UnknownParentSidecarHeader` event the peer is not
// required to have the rest of the block components. Create the lookup with zero
// peers to house the block components.
&[],
&PeerType::Block,
cx,
)
} else {
@@ -202,7 +217,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
peer_source: &[PeerId],
cx: &mut SyncNetworkContext<T>,
) -> bool {
self.new_current_lookup(block_root, None, None, peer_source, cx)
self.new_current_lookup(block_root, None, None, peer_source, &PeerType::Block, cx)
}
/// A block or blob triggers the search of a parent.
@@ -215,6 +230,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn search_parent_of_child(
&mut self,
block_root_to_search: Hash256,
peer_type: &PeerType,
child_block_root_trigger: Hash256,
peers: &[PeerId],
cx: &mut SyncNetworkContext<T>,
@@ -307,7 +323,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
// `block_root_to_search` is a failed chain check happens inside new_current_lookup
self.new_current_lookup(block_root_to_search, None, None, peers, cx)
self.new_current_lookup(block_root_to_search, None, None, peers, peer_type, cx)
}
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
@@ -318,8 +334,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
block_root: Hash256,
block_component: Option<BlockComponent<T::EthSpec>>,
awaiting_parent: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
peers: &[PeerId],
peer_type: &PeerType,
cx: &mut SyncNetworkContext<T>,
) -> bool {
// If this block or it's parent is part of a known ignored chain, ignore it.
@@ -341,7 +358,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) {
if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, peer_type, cx)
{
warn!(error = ?e, "Error adding peers to ancestor lookup");
}
@@ -353,7 +371,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&& !self
.single_block_lookups
.iter()
.any(|(_, lookup)| lookup.is_for_block(awaiting_parent))
.any(|(_, lookup)| lookup.block_root() == awaiting_parent.parent_root())
{
warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found");
return false;
@@ -368,7 +386,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent);
let mut lookup =
SingleBlockLookup::new(block_root, peers, peer_type, cx.next_id(), awaiting_parent);
let _guard = lookup.span.clone().entered();
// Add block components to the new request
@@ -389,9 +408,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(
?peers,
?block_root,
awaiting_parent = awaiting_parent
.map(|root| root.to_string())
.unwrap_or("none".to_owned()),
?awaiting_parent,
id = lookup.id,
"Created block lookup"
);
@@ -438,6 +455,23 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_lookup_result(id.lookup_id, result, "custody_download_response", cx);
}
pub fn on_payload_download_response(
&mut self,
id: SingleLookupReqId,
response: PayloadDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
debug!(
?id,
"Payload envelope returned for a lookup id that doesn't exist"
);
return;
};
let result = lookup.on_payload_download_response(id.req_id, response, cx);
self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx);
}
/* Error responses */
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
@@ -472,25 +506,62 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
);
let lookup_result = match process_type {
BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(result, cx),
BlockProcessType::SingleBlock { .. } => {
// Update the result of the lookup first, here we may start the download of Gloas
// payload, which may error.
let lookup_result = lookup.on_block_processing_result(result.clone(), cx);
let lookup_is_awaiting_event = lookup.is_awaiting_event();
let block_root = lookup.block_root();
// Then, as a side-effect continue the EMPTY children of this lookup. Only if the
// block just imported which ensures we just do it once per lookup.
if let BlockProcessingResult::Imported(..) = result
&& let Some(bid_block_hash) = lookup.peek_downloaded_bid_block_hash()
{
self.continue_child_lookups(
block_root,
ImportedParent::OnlyGloasBlock(bid_block_hash),
cx,
);
}
// Then if this lookup happens to have only empty children we can remove it now. We
// must make sure that no other lookup is awaiting this one, and that no requests
// are on-going.
if !lookup_is_awaiting_event && !self.has_any_awaiting_children(block_root) {
Ok(LookupResult::Completed)
} else {
lookup_result
}
}
BlockProcessType::SingleCustodyColumn(_) => {
lookup.on_data_processing_result(result, cx)
}
// TODO(gloas): route into the payload envelope lookup state machine.
BlockProcessType::SinglePayloadEnvelope(_) => Ok(LookupResult::Pending),
BlockProcessType::SinglePayloadEnvelope(_) => {
lookup.on_payload_processing_result(result, cx)
}
};
self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx);
}
pub fn has_any_awaiting_children(&self, block_root: Hash256) -> bool {
self.single_block_lookups
.iter()
.any(|(_, lookup)| lookup.is_awaiting_block(block_root))
}
/// Makes progress on the immediate children of `block_root`
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
pub fn continue_child_lookups(
&mut self,
parent_root: Hash256,
imported_parent: ImportedParent,
cx: &mut SyncNetworkContext<T>,
) {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
for (id, lookup) in self.single_block_lookups.iter_mut() {
if lookup.awaiting_parent() == Some(block_root) {
if lookup.is_awaiting_parent(parent_root, imported_parent) {
lookup.resolve_awaiting_parent();
debug!(
parent_root = ?block_root,
?imported_parent,
id,
block_root = ?lookup.block_root(),
"Continuing child lookup"
@@ -523,7 +594,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let child_lookups = self
.single_block_lookups
.iter()
.filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root()))
.filter(|(_, lookup)| lookup.is_awaiting_block(dropped_lookup.block_root()))
.map(|(id, _)| *id)
.collect::<Vec<_>>();
@@ -546,10 +617,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Ok(LookupResult::Pending) => true,
Ok(LookupResult::ParentUnknown {
parent_root,
parent_block_hash,
block_root,
peers,
}) => {
if self.search_parent_of_child(parent_root, block_root, &peers, cx) {
if self.search_parent_of_child(
parent_root,
&PeerType::new(parent_block_hash),
block_root,
&peers,
cx,
) {
true
} else {
self.drop_lookup_and_children(id, "Failed");
@@ -567,16 +645,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED);
self.metrics.completed_lookups += 1;
// Block imported, continue the requests of pending child blocks
self.continue_child_lookups(lookup.block_root(), cx);
self.continue_child_lookups(
lookup.block_root(),
ImportedParent::LookupComplete,
cx,
);
self.update_metrics();
} else {
debug!(id, "Attempting to drop non-existent lookup");
}
false
}
// If UnknownLookup do not log the request error. No need to drop child lookups nor
// update metrics because the lookup does not exist.
Err(LookupRequestError::UnknownLookup) => false,
Err(error) => {
debug!(id, source, ?error, "Dropping lookup on request error");
self.drop_lookup_and_children(id, error.into());
@@ -708,7 +787,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if let Some(lookup) = self
.single_block_lookups
.values()
.find(|l| l.block_root() == awaiting_parent)
.find(|l| l.block_root() == awaiting_parent.parent_root())
{
self.find_oldest_ancestor_lookup(lookup)
} else {
@@ -729,6 +808,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
lookup_id: SingleLookupId,
peers: &[PeerId],
peer_type: &PeerType,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), String> {
let lookup = self
@@ -738,7 +818,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let mut added_some_peer = false;
for peer in peers {
if lookup.add_peer(*peer) {
if lookup.add_peer(*peer, peer_type) {
added_some_peer = true;
debug!(
block_root = ?lookup.block_root(),
@@ -748,15 +828,21 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}
if let Some(parent_root) = lookup.awaiting_parent() {
if let Some(&awaiting_parent) = lookup.awaiting_parent() {
// Regardless of gloas full/empty the lookup to add peers to is keyed by block_root
if let Some((&parent_id, _)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == parent_root)
.find(|(_, l)| l.block_root() == awaiting_parent.parent_root())
{
self.add_peers_to_lookup_and_ancestors(parent_id, peers, cx)
self.add_peers_to_lookup_and_ancestors(
parent_id,
peers,
&awaiting_parent.into_peer_type(),
cx,
)
} else {
Err(format!("Lookup references unknown parent {parent_root:?}"))
Err(format!("Lookup references unknown {awaiting_parent:?}"))
}
} else if added_some_peer {
// If this lookup is not awaiting a parent and we added at least one peer, attempt to

View File

@@ -13,7 +13,7 @@ impl<T: BeaconChainTypes> From<&SingleBlockLookup<T>> for Node {
fn from(value: &SingleBlockLookup<T>) -> Self {
Self {
block_root: value.block_root(),
parent_root: value.awaiting_parent(),
parent_root: value.awaiting_parent().map(|a| a.parent_root()),
}
}
}

View File

@@ -1,6 +1,8 @@
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::network_beacon_processor::BlockProcessingResult;
use crate::sync::block_lookups::{BlockDownloadResponse, CustodyDownloadResponse};
use crate::sync::block_lookups::{
BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse,
};
use crate::sync::manager::BlockProcessType;
use crate::sync::network_context::{
LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, RpcResponseError,
@@ -11,13 +13,16 @@ use beacon_chain::block_verification_types::AsBlock;
use educe::Educe;
use lighthouse_network::service::api_types::Id;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use tracing::{Span, debug_span};
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
use types::{
DataColumnSidecarList, EthSpec, ExecutionBlockHash, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
};
// Dedicated enum for LookupResult to force its usage
#[must_use = "LookupResult must be handled with on_lookup_result"]
@@ -29,6 +34,7 @@ pub enum LookupResult {
/// Block's parent is not known to fork-choice, a parent lookup is needed
ParentUnknown {
parent_root: Hash256,
parent_block_hash: Option<ExecutionBlockHash>,
block_root: Hash256,
peers: Vec<PeerId>,
},
@@ -46,8 +52,6 @@ pub enum LookupRequestError {
BadState(String),
/// Lookup failed for some other reason and should be dropped
Failed(/* reason: */ String),
/// Attempted to retrieve a not known lookup id
UnknownLookup,
/// Received a download result for a different request id than the in-flight request.
/// There should only exist a single request at a time. Having multiple requests is a bug and
/// can result in undefined state, so it's treated as a hard error and the lookup is dropped.
@@ -57,6 +61,31 @@ pub enum LookupRequestError {
},
}
#[derive(Debug, Clone, Copy)]
pub struct AwaitingParent {
parent_root: Hash256,
parent_block_hash: Option<ExecutionBlockHash>,
}
impl AwaitingParent {
pub fn new(parent_root: Hash256, parent_block_hash: Option<ExecutionBlockHash>) -> Self {
Self {
parent_root,
parent_block_hash,
}
}
pub fn parent_root(&self) -> Hash256 {
self.parent_root
}
pub fn into_peer_type(self) -> PeerType {
PeerType::new(self.parent_block_hash)
}
}
type PeerSet = Arc<RwLock<HashSet<PeerId>>>;
#[derive(Debug)]
struct BlockRequest<E: EthSpec> {
state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
@@ -79,6 +108,9 @@ enum DataRequest<E: EthSpec> {
WaitingForBlock,
Request {
slot: Slot,
/// Peers to fetch the data columns from. Pre-Gloas this is the lookup's `peers`; for FULL
/// Gloas blocks this is the `gloas_child_peers` set proven to hold the columns.
peers: PeerSet,
state: SingleLookupRequestState<DataColumnSidecarList<E>>,
},
NoData,
@@ -94,7 +126,62 @@ impl<E: EthSpec> DataRequest<E> {
}
}
type PeerSet = Arc<RwLock<HashSet<PeerId>>>;
/// Tracks the download + processing of a Gloas execution payload envelope. For FULL Gloas blocks the
/// execution payload arrives as a separate `SignedExecutionPayloadEnvelope`, mirroring the way data
/// columns are fetched and processed by `DataRequest`.
#[derive(Debug)]
enum PayloadRequest<E: EthSpec> {
/// Block not yet downloaded, can't tell if a payload is needed.
WaitingForBlock,
/// Post-Gloas block: an execution payload envelope must be fetched and processed *if* the block
/// is FULL. We can't tell FULL from EMPTY from the block alone: only a FULL child of this block
/// proves a payload was published, which is signalled by `peers` becoming non-empty.
Request {
peers: PeerSet,
state: SingleLookupRequestState<Arc<SignedExecutionPayloadEnvelope<E>>>,
},
/// Pre-Gloas block: no payload envelope exists, nothing to fetch.
PreGloas,
}
impl<E: EthSpec> PayloadRequest<E> {
fn is_complete(&self) -> bool {
match &self {
PayloadRequest::WaitingForBlock => false,
PayloadRequest::Request { state, .. } => state.is_processed(),
PayloadRequest::PreGloas => true,
}
}
}
/// Classifies how a peer relates to a lookup, controlling which peer set it is added to.
pub enum PeerType {
/// The peer can serve the looked-up block and (pre-Gloas) its data columns.
Block,
/// The peer claims to have imported a FULL child of this block whose bid references
/// `ExecutionBlockHash` as its parent. Such peers can serve this block's payload envelope and
/// data columns.
PayloadEnvelope(ExecutionBlockHash),
}
impl PeerType {
/// `PayloadEnvelope` when the block's bid `parent_block_hash` is known (post-Gloas), else `Block`.
pub fn new(parent_block_hash: Option<ExecutionBlockHash>) -> Self {
match parent_block_hash {
Some(execution_hash) => PeerType::PayloadEnvelope(execution_hash),
None => PeerType::Block,
}
}
}
/// Used by `is_awaiting_parent` to decide if it can resolve its awaiting parent status
#[derive(Debug, Clone, Copy)]
pub enum ImportedParent {
/// All requests of a lookup are complete, both for pre and post Gloas
LookupComplete,
/// Only post-Gloas, the block request has just been completed. Includes the bid block hash
OnlyGloasBlock(ExecutionBlockHash),
}
#[derive(Educe)]
#[educe(Debug(bound(T: BeaconChainTypes)))]
@@ -103,13 +190,19 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
block_root: Hash256,
block_request: BlockRequest<T::EthSpec>,
data_request: DataRequest<T::EthSpec>,
payload_request: PayloadRequest<T::EthSpec>,
/// Peers that claim to have imported this set of block components. This state is shared with
/// the custody request to have an updated view of the peers that claim to have imported the
/// block associated with this lookup. The peer set of a lookup can change rapidly, and faster
/// than the lifetime of a custody request.
#[educe(Debug(method(fmt_peer_set_as_len)))]
peers: PeerSet,
awaiting_parent: Option<Hash256>,
/// Post-Gloas only: peers that claim to have imported a FULL child of this block, keyed by the
/// child's bid `parent_block_hash`. These (not `peers`) are the peers proven to hold this
/// block's payload envelope and data columns.
#[educe(Debug(method(fmt_peer_map_as_len)))]
gloas_child_peers: HashMap<ExecutionBlockHash, PeerSet>,
awaiting_parent: Option<AwaitingParent>,
created: Instant,
pub(crate) span: Span,
}
@@ -118,8 +211,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
pub fn new(
requested_block_root: Hash256,
peers: &[PeerId],
peer_type: &PeerType,
id: Id,
awaiting_parent: Option<Hash256>,
awaiting_parent: Option<AwaitingParent>,
) -> Self {
let lookup_span = debug_span!(
"lh_single_block_lookup",
@@ -127,12 +221,23 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
id = id,
);
let block_peers: PeerSet = Arc::new(RwLock::new(peers.iter().copied().collect()));
let mut gloas_child_peers = HashMap::new();
match peer_type {
PeerType::Block => {}
PeerType::PayloadEnvelope(execution_hash) => {
gloas_child_peers.insert(*execution_hash, block_peers.clone());
}
}
Self {
id,
block_root: requested_block_root,
block_request: BlockRequest::new(),
data_request: DataRequest::WaitingForBlock,
peers: Arc::new(RwLock::new(peers.iter().copied().collect())),
payload_request: PayloadRequest::WaitingForBlock,
peers: block_peers,
gloas_child_peers,
awaiting_parent,
created: Instant::now(),
span: lookup_span,
@@ -147,19 +252,41 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.map(|block| block.slot())
}
pub fn peek_downloaded_bid_block_hash(&self) -> Option<ExecutionBlockHash> {
self.block_request
.state
.peek_downloaded_data()
.and_then(|block| {
block
.message()
.body()
.signed_execution_payload_bid()
.ok()
.map(|bid| bid.message.block_hash)
})
}
/// Get the block root that is being requested.
pub fn block_root(&self) -> Hash256 {
self.block_root
}
pub fn awaiting_parent(&self) -> Option<Hash256> {
self.awaiting_parent
pub fn awaiting_parent(&self) -> Option<&AwaitingParent> {
self.awaiting_parent.as_ref()
}
pub fn is_awaiting_block(&self, block_root: Hash256) -> bool {
if let Some(awaiting_parent) = &self.awaiting_parent {
awaiting_parent.parent_root() == block_root
} else {
false
}
}
/// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send
/// components for processing.
pub fn set_awaiting_parent(&mut self, parent_root: Hash256) {
self.awaiting_parent = Some(parent_root)
pub fn set_awaiting_parent(&mut self, parent: AwaitingParent) {
self.awaiting_parent = Some(parent);
}
/// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for
@@ -168,6 +295,37 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.awaiting_parent = None;
}
/// Check if this lookup awaiting_parent status can be resolved given that `parent_root` and
/// `imported_parent` have just been imported
pub fn is_awaiting_parent(
&mut self,
parent_root: Hash256,
imported_parent: ImportedParent,
) -> bool {
let Some(awaiting_parent) = self.awaiting_parent else {
return false;
};
if awaiting_parent.parent_root() != parent_root {
return false;
}
match imported_parent {
ImportedParent::LookupComplete => true,
ImportedParent::OnlyGloasBlock(bid_block_hash) => {
if let Some(parent_block_hash) = awaiting_parent.parent_block_hash {
// This lookup is the execution child of `parent_execution_hash`. If the
// parent hash the same `bid_block_hash` this is FULL child and we must wait
// for the entire parent lookup to be imported. Otherwise it's a EMPTY child
// and we can import now.
parent_block_hash != bid_block_hash
} else {
// A parent that's gloas imported and this lookup claims to be before gloas.
debug_assert!(false, "Received post-gloas action for pre-gloas lookup");
false
}
}
}
}
/// Returns the time elapsed since this lookup was created
pub fn elapsed_since_created(&self) -> Duration {
self.created.elapsed()
@@ -201,6 +359,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
DataRequest::Request { state, .. } => state.is_awaiting_event(),
DataRequest::NoData => false,
}
|| match &self.payload_request {
PayloadRequest::WaitingForBlock => true,
PayloadRequest::Request { state, .. } => state.is_awaiting_event(),
PayloadRequest::PreGloas => false,
}
}
/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
@@ -235,6 +398,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
DataRequest::Request {
slot: block.slot(),
peers: self.get_data_peers(block.payload_bid_block_hash().ok()),
state: SingleLookupRequestState::new(),
}
} else {
@@ -244,18 +408,14 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
break;
}
}
DataRequest::Request { slot, state } => {
DataRequest::Request { slot, peers, state } => {
state.maybe_start_downloading(|| {
cx.custody_lookup_request(
self.id,
self.block_root,
*slot,
self.peers.clone(),
)
cx.custody_lookup_request(self.id, self.block_root, *slot, peers.clone())
})?;
// Wait for the parent to be imported, data column processing result handle does
// Wait for the current block and parent to be imported, data column processing result handle does
// not support `ParentUnknown`.
if self.awaiting_parent.is_none()
if self.block_request.state.is_processed()
&& self.awaiting_parent.is_none()
&& let Some(data) = state.maybe_start_processing()
{
cx.send_custody_columns_for_processing(
@@ -273,16 +433,78 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}
// === Payload request (Gloas only) ===
loop {
match &mut self.payload_request {
PayloadRequest::WaitingForBlock => {
if let Some(block) = self.block_request.state.peek_downloaded_data() {
self.payload_request = if block.fork_name_unchecked().gloas_enabled() {
PayloadRequest::Request {
peers: self.get_data_peers(block.payload_bid_block_hash().ok()),
state: SingleLookupRequestState::new(),
}
} else {
PayloadRequest::PreGloas
};
} else {
break;
}
}
PayloadRequest::Request { peers, state } => {
state.maybe_start_downloading(|| {
cx.payload_lookup_request(self.id, peers.clone(), self.block_root)
})?;
// The envelope can only be verified once the block itself is imported;
// otherwise processing returns `BlockRootUnknown` and the lookup burns retries
// until `TooManyAttempts` while the block is parked awaiting its parent.
if self.block_request.state.is_processed()
&& let Some(data) = state.maybe_start_processing()
{
cx.send_payload_for_processing(
self.block_root,
data.value,
data.seen_timestamp,
BlockProcessType::SinglePayloadEnvelope(self.id),
)
.map_err(LookupRequestError::SendFailedProcessor)?;
}
break;
}
PayloadRequest::PreGloas => break,
}
}
// If all components of this lookup are already processed, there will be no future events
// that can make progress so it must be dropped. Consider the lookup completed.
// This case can happen if we receive the components from gossip during a retry.
if self.block_request.is_complete() && self.data_request.is_complete() {
if self.block_request.is_complete()
&& self.data_request.is_complete()
&& self.payload_request.is_complete()
{
return Ok(LookupResult::Completed);
}
Ok(LookupResult::Pending)
}
/// Returns the peers that should serve this block's data columns and payload envelope. For FULL
/// Gloas blocks these are the peers that claimed to have imported a FULL child of this block
/// (keyed by this block's bid `block_hash`). Pre-Gloas blocks carry no bid, so this returns the
/// lookup's `peers` unchanged.
fn get_data_peers(&mut self, bid_block_hash: Option<ExecutionBlockHash>) -> PeerSet {
if let Some(bid_block_hash) = bid_block_hash {
// Gloas: the child-attested peer set for this bid is the canonical peer set. DO NOT
// default to `self.peers`: post-Gloas `self.peers` have not claimed to import this
// block's data nor its payload. This set may remain empty until a FULL child arrives.
self.gloas_child_peers
.entry(bid_block_hash)
.or_default()
.clone()
} else {
self.peers.clone()
}
}
/// Handle block processing result. Advances the lookup state machine.
pub fn on_block_processing_result(
&mut self,
@@ -293,15 +515,22 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
BlockProcessingResult::Imported(_fully_imported, _info) => {
self.block_request.state.on_processing_success()?;
}
BlockProcessingResult::ParentUnknown { parent_root } => {
BlockProcessingResult::ParentUnknown {
parent_root,
parent_block_hash,
} => {
// `BlockError::ParentUnknown` is only returned when processing blocks. Revert the
// block request to `Downloaded` and park this lookup until the parent resolves; a
// future call to `continue_requests` will re-submit the block for processing once
// the parent lookup completes.
self.block_request.state.revert_to_awaiting_processing()?;
self.set_awaiting_parent(parent_root);
self.set_awaiting_parent(AwaitingParent {
parent_root,
parent_block_hash,
});
return Ok(LookupResult::ParentUnknown {
parent_root,
parent_block_hash,
block_root: self.block_root,
peers: self.all_peers(),
});
@@ -345,6 +574,37 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.continue_requests(cx)
}
/// Handle payload envelope processing result (Gloas only).
pub fn on_payload_processing_result(
&mut self,
result: BlockProcessingResult,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
let PayloadRequest::Request { state, .. } = &mut self.payload_request else {
return Err(LookupRequestError::BadState(
"no payload_request".to_owned(),
));
};
match result {
BlockProcessingResult::Imported(_fully_imported, _info) => {
state.on_processing_success()?;
}
BlockProcessingResult::ParentUnknown { .. } => {
return Err(LookupRequestError::BadState(
"payload processing returned ParentUnknown".to_owned(),
));
}
BlockProcessingResult::Error { penalty, .. } => {
let peers = state.on_processing_failure()?;
if let Some((action, whom, msg)) = penalty {
whom.apply(action, &peers, msg, cx);
}
}
}
self.continue_requests(cx)
}
/// Handle a block download response. Updates download state and advances the lookup.
pub fn on_block_download_response(
&mut self,
@@ -373,6 +633,23 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.continue_requests(cx)
}
/// Handle a payload envelope download response. Updates download state and advances the lookup.
pub fn on_payload_download_response(
&mut self,
req_id: ReqId,
result: PayloadDownloadResponse<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
let PayloadRequest::Request { state, .. } = &mut self.payload_request else {
return Err(LookupRequestError::BadState(
"no payload_request".to_owned(),
));
};
state.on_download_response(req_id, result)?;
self.continue_requests(cx)
}
/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> Vec<PeerId> {
self.peers.read().iter().copied().collect()
@@ -380,18 +657,54 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into any peer set.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
self.peers.write().insert(peer_id)
pub fn add_peer(&mut self, peer_id: PeerId, peer_type: &PeerType) -> bool {
let mut added = false;
match peer_type {
PeerType::PayloadEnvelope(execution_hash) => {
// This peer claims to have imported a FULL child of this block whose bid references
// `execution_hash` as its parent. It is therefore proven to hold this block's
// payload envelope and data columns.
added |= self
.gloas_child_peers
.entry(*execution_hash)
.or_default()
.write()
.insert(peer_id);
}
PeerType::Block => {}
}
// Always add to the main block peers, they can at least serve the block.
added |= self.peers.write().insert(peer_id);
added
}
/// Remove peer from available peers.
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.write().remove(peer_id);
for set in self.gloas_child_peers.values() {
set.write().remove(peer_id);
}
}
/// Returns true if this lookup has zero peers
pub fn has_no_peers(&self) -> bool {
self.peers.read().is_empty()
if self.block_request.is_complete()
&& let Some(block) = self.block_request.state.peek_downloaded_data()
&& let Ok(bid_block_hash) = block.payload_bid_block_hash()
{
// Gloas block request complete, the main peer set is irrelevant. Check only the gloas
// child peers
match self.gloas_child_peers.get(&bid_block_hash) {
Some(set) => set.read().is_empty(),
None => false,
}
} else {
self.peers.read().is_empty()
&& self
.gloas_child_peers
.values()
.all(|set| set.read().is_empty())
}
}
}
@@ -702,3 +1015,11 @@ fn fmt_peer_set_as_len(
) -> Result<(), std::fmt::Error> {
write!(f, "{}", peer_set.read().len())
}
fn fmt_peer_map_as_len(
peer_map: &HashMap<ExecutionBlockHash, PeerSet>,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
let total = peer_map.values().map(|set| set.read().len()).sum::<usize>();
write!(f, "{}", total)
}

View File

@@ -515,6 +515,15 @@ mod tests {
}
}
/// The custody-column coupling tests below build Fulu data-column sidecars directly, which is
/// incompatible with a Gloas genesis (Gloas columns have a different structure). Skip them when
/// `FORK_NAME` schedules Gloas at genesis. TODO(gloas): port the harness to build Gloas columns.
fn skip_under_gloas() -> bool {
test_spec::<E>()
.fork_name_at_epoch(Epoch::new(0))
.gloas_enabled()
}
fn blocks_id(parent_request_id: ComponentsByRangeRequestId) -> BlocksByRangeRequestId {
BlocksByRangeRequestId {
id: 1,
@@ -619,6 +628,9 @@ mod tests {
#[test]
fn rpc_block_with_custody_columns() {
if skip_under_gloas() {
return;
}
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
@@ -697,6 +709,9 @@ mod tests {
#[test]
fn rpc_block_with_custody_columns_batched() {
if skip_under_gloas() {
return;
}
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));
@@ -791,6 +806,9 @@ mod tests {
#[test]
fn missing_custody_columns_from_faulty_peers() {
if skip_under_gloas() {
return;
}
// GIVEN: A request expecting sampling columns from multiple peers
let spec = Arc::new(test_spec::<E>());
let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode));
@@ -886,6 +904,9 @@ mod tests {
#[test]
fn retry_logic_after_peer_failures() {
if skip_under_gloas() {
return;
}
// GIVEN: A request expecting sampling columns where some peers initially fail
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));
@@ -1002,6 +1023,9 @@ mod tests {
#[test]
fn max_retries_exceeded_behavior() {
if skip_under_gloas() {
return;
}
// GIVEN: A request where peers consistently fail to provide required columns
let mut spec = test_spec::<E>();
spec.deneb_fork_epoch = Some(Epoch::new(0));

View File

@@ -71,8 +71,8 @@ use strum::IntoStaticStr;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace};
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
BlobSidecar, DataColumnSidecar, EthSpec, ExecutionBlockHash, ForkContext, Hash256,
SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
@@ -852,11 +852,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncMessage::UnknownParentBlock(peer_id, block, block_root) => {
let block_slot = block.slot();
let parent_root = block.parent_root();
let parent_block_hash = block.payload_bid_parent_block_hash().ok();
debug!(%block_root, %parent_root, "Received unknown parent block message");
self.handle_unknown_parent(
peer_id,
block_root,
parent_root,
parent_block_hash,
block_slot,
BlockComponent::Block(DownloadResult {
value: block.block_cloned(),
@@ -876,6 +878,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id,
block_root,
parent_root,
// The event `UnknownParentSidecarHeader` only fires for pre-Gloas data
// structues, so the bid parent hash is None.
None,
slot,
BlockComponent::Sidecar,
);
@@ -951,6 +956,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
block_root: Hash256,
parent_root: Hash256,
parent_block_hash: Option<ExecutionBlockHash>,
slot: Slot,
block_component: BlockComponent<T::EthSpec>,
) {
@@ -960,6 +966,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
block_root,
block_component,
parent_root,
parent_block_hash,
peer_id,
&mut self.network,
) {
@@ -1139,7 +1146,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
// TODO(gloas): dispatch into block_lookups once the envelope lookup state machine lands.
fn rpc_payload_envelope_received(
&mut self,
sync_request_id: SyncRequestId,
@@ -1194,13 +1200,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
envelope: RpcEvent<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
) {
if let Some(_resp) = self
if let Some(resp) = self
.network
.on_single_payload_envelope_response(id, peer_id, envelope)
{
// TODO(gloas): dispatch into
// `block_lookups.on_download_response::<PayloadEnvelopeRequestState<_>>(...)` once
// the envelope lookup state machine lands.
self.block_lookups.on_payload_download_response(
id,
resp.map(|(value, seen_timestamp)| {
DownloadResult::new(value, PeerGroup::from_single(peer_id), seen_timestamp)
}),
&mut self.network,
)
}
}

View File

@@ -53,8 +53,8 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{Span, debug, debug_span, error, warn};
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext,
Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot,
};
pub mod custody;
@@ -98,6 +98,7 @@ pub type CustodyByRootResult<T> =
Result<DownloadResult<DataColumnSidecarList<T>>, RpcResponseError>;
#[derive(Debug)]
#[allow(private_interfaces)]
pub enum RpcResponseError {
RpcError(#[allow(dead_code)] RPCError),
VerifyError(LookupVerifyError),
@@ -310,6 +311,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}
pub fn spec(&self) -> &ChainSpec {
&self.chain.spec
}
pub fn send_sync_message(&mut self, sync_message: SyncMessage<T::EthSpec>) {
self.network_beacon_processor
.send_sync_message(sync_message);
@@ -921,19 +926,23 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
/// Request a payload envelope for a block root via PayloadEnvelopesByRoot RPC.
#[allow(dead_code)]
pub fn payload_lookup_request(
&mut self,
lookup_id: SingleLookupId,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
) -> Result<LookupRequestResult<()>, RpcRequestSendError> {
) -> Result<
LookupRequestResult<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>,
RpcRequestSendError,
> {
// Skip the download if fork-choice already saw this envelope (e.g. imported via gossip
// before the lookup got here).
if self.chain.envelope_is_known_to_fork_choice(&block_root) {
// before the lookup got here). Return the cached envelope so the request completes.
if self.chain.envelope_is_known_to_fork_choice(&block_root)
&& let Ok(Some(envelope)) = self.chain.get_payload_envelope(&block_root)
{
return Ok(LookupRequestResult::NoRequestNeeded(
"envelope already known to fork-choice",
(),
Arc::new(envelope),
));
}
@@ -1052,6 +1061,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
block_slot: Slot,
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Result<LookupRequestResult<DataColumnSidecarList<T::EthSpec>>, RpcRequestSendError> {
// Code below will issue column requests even if `lookup_peers` is empty. This is not okay,
// as we want to have at least one signal that some of our peers has already seen the
// block's data.
if lookup_peers.read().is_empty() {
return Ok(LookupRequestResult::Pending("no peers"));
}
let custody_indexes_imported = self
.chain
.cached_data_column_indexes(&block_root, block_slot)
@@ -1567,7 +1583,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
})
}
#[allow(dead_code)]
pub fn send_payload_for_processing(
&self,
block_root: Hash256,

View File

@@ -310,11 +310,10 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
// and downscore if data_columns_by_root does not return the expected custody
// columns. For the rest of peers, don't downscore if columns are missing.
//
// Post-Gloas, blocks and payload envelopes are decoupled. A peer may
// have the block but not yet imported the envelope and data columns.
// Don't enforce max_responses in this case.
lookup_peers.contains(&peer_id)
&& !cx.fork_context.current_fork_name().gloas_enabled(),
// Post-Gloas the lookup peer set is the `gloas_child_peers`: peers that imported
// a FULL child, which requires the parent's columns. They provably custody the
// columns, so withholding is penalizable just like pre-Gloas.
lookup_peers.contains(&peer_id),
)
.map_err(Error::SendFailed)?;

View File

@@ -38,11 +38,15 @@ use tokio::sync::mpsc;
use tracing::info;
use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSubnetId,
ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock,
SignedExecutionPayloadEnvelope, Slot,
};
const D: Duration = Duration::new(0, 0);
/// Gloas genesis needs enough validators to populate `proposer_lookahead`.
const TEST_RIG_VALIDATOR_COUNT: usize = 8;
/// Configuration for how the test rig should respond to sync requests.
///
/// Controls simulated peer behavior during lookup tests, including RPC errors,
@@ -59,6 +63,10 @@ pub struct SimulateConfig {
return_too_few_data_n_times: usize,
return_no_columns_on_indices_n_times: usize,
return_no_columns_on_indices: Vec<ColumnIndex>,
/// Only omit columns for this block root, if set.
return_no_columns_for_block: Option<Hash256>,
/// Leave matching envelope requests unanswered.
hold_envelope_for_block: Option<Hash256>,
skip_by_range_routes: bool,
// Use a callable fn because BlockProcessingResult does not implement Clone
#[educe(Debug(ignore))]
@@ -132,6 +140,16 @@ impl SimulateConfig {
self
}
fn return_no_columns_for_block(mut self, block_root: Hash256) -> Self {
self.return_no_columns_for_block = Some(block_root);
self
}
fn hold_envelope_for_block(mut self, block_root: Hash256) -> Self {
self.hold_envelope_for_block = Some(block_root);
self
}
pub(super) fn return_rpc_error(mut self, error: RPCError) -> Self {
self.return_rpc_error = Some(error);
self
@@ -211,6 +229,14 @@ pub(crate) struct TestRigConfig {
node_custody_type_override: Option<NodeCustodyType>,
}
struct FullEmptyFork {
a: Hash256,
b: Hash256,
c: Hash256,
b_block: Arc<SignedBeaconBlock<E>>,
c_block: Arc<SignedBeaconBlock<E>>,
}
impl TestRig {
pub(crate) fn new(test_rig_config: TestRigConfig) -> Self {
// Use `fork_from_env` logic to set correct fork epochs
@@ -221,10 +247,10 @@ impl TestRig {
Duration::from_secs(12),
);
// Initialise a new beacon chain
// Gloas genesis needs enough validators for proposer lookahead.
let harness = BeaconChainHarness::<EphemeralHarnessType<E>>::builder(E)
.spec(spec.clone())
.deterministic_keypairs(1)
.deterministic_keypairs(TEST_RIG_VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()
.testing_slot_clock(clock.clone())
@@ -304,6 +330,7 @@ impl TestRig {
fork_name,
network_blocks_by_root: <_>::default(),
network_blocks_by_slot: <_>::default(),
network_envelopes_by_root: <_>::default(),
penalties: <_>::default(),
seen_lookups: <_>::default(),
requests: <_>::default(),
@@ -428,9 +455,9 @@ impl TestRig {
process_fn.await
}
}
Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) => {
process_fn.await
}
Work::RpcBlobs { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::RpcEnvelope(process_fn) => process_fn.await,
Work::ChainSegment {
process_fn,
process_id: (chain_id, batch_epoch),
@@ -557,11 +584,14 @@ impl TestRig {
}
let will_omit_columns = req.data_column_ids.iter().any(|id| {
id.columns.iter().any(|c| {
self.complete_strategy
.return_no_columns_on_indices
.contains(c)
})
self.complete_strategy
.return_no_columns_for_block
.is_none_or(|root| id.block_root == root)
&& id.columns.iter().any(|c| {
self.complete_strategy
.return_no_columns_on_indices
.contains(c)
})
});
let columns_to_omit = if will_omit_columns
&& self.complete_strategy.return_no_columns_on_indices_n_times > 0
@@ -615,15 +645,34 @@ impl TestRig {
.return_wrong_sidecar_for_block_n_times -= 1;
let first = columns.first_mut().expect("empty columns");
let column = Arc::make_mut(first);
column
.signed_block_header_mut()
.expect("not fulu")
.message
.body_root = Hash256::ZERO;
// Corrupt the claimed block root.
match column {
DataColumnSidecar::Fulu(col) => {
col.signed_block_header.message.body_root = Hash256::ZERO;
}
DataColumnSidecar::Gloas(col) => {
col.beacon_block_root = Hash256::ZERO;
}
}
}
self.send_rpc_columns_response(req_id, peer_id, &columns);
}
(RequestType::PayloadEnvelopesByRoot(req), AppRequestId::Sync(req_id)) => {
// Lookup sync requests one envelope root at a time.
let block_root = req
.beacon_block_roots
.as_slice()
.first()
.copied()
.unwrap_or_else(|| panic!("empty envelope request: {req:?}"));
if self.complete_strategy.hold_envelope_for_block == Some(block_root) {
return;
}
let envelope = self.network_envelopes_by_root.get(&block_root).cloned();
self.send_rpc_envelope_response(req_id, peer_id, envelope);
}
(RequestType::BlocksByRange(req), AppRequestId::Sync(req_id)) => {
if self.complete_strategy.skip_by_range_routes {
return;
@@ -883,16 +932,44 @@ impl TestRig {
});
}
fn send_rpc_envelope_response(
&mut self,
sync_request_id: SyncRequestId,
peer_id: PeerId,
envelope: Option<Arc<SignedExecutionPayloadEnvelope<E>>>,
) {
self.log(&format!(
"Completing request {sync_request_id:?} to {peer_id} with envelope {:?}",
envelope.as_ref().map(|e| e.slot())
));
self.push_sync_message(SyncMessage::RpcPayloadEnvelope {
sync_request_id,
peer_id,
envelope: envelope.clone(),
seen_timestamp: D,
});
// Stream termination
self.push_sync_message(SyncMessage::RpcPayloadEnvelope {
sync_request_id,
peer_id,
envelope: None,
seen_timestamp: D,
});
}
#[allow(dead_code)]
fn is_after_gloas(&self) -> bool {
self.fork_name.gloas_enabled()
}
// Preparation steps
/// Returns the block root of the tip of the built chain
pub(super) async fn build_chain(&mut self, block_count: usize) -> Hash256 {
let mut blocks = vec![];
fn get_external_harness_with_genesis(&mut self) -> BeaconChainHarness<EphemeralHarnessType<E>> {
// Initialise a new beacon chain
let external_harness = BeaconChainHarness::<EphemeralHarnessType<E>>::builder(E)
.spec(self.harness.spec.clone())
.deterministic_keypairs(1)
.deterministic_keypairs(TEST_RIG_VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()
.testing_slot_clock(self.harness.chain.slot_clock.clone())
@@ -912,7 +989,17 @@ impl TestRig {
self.network_blocks_by_slot
.insert(genesis_block.slot(), genesis_block);
for i in 0..block_count {
external_harness
}
/// Returns the block root of the tip of the built chain
pub(super) async fn build_chain(&mut self, block_count: usize) -> Hash256 {
let mut blocks = vec![];
// Initialise a new beacon chain
let external_harness = self.get_external_harness_with_genesis();
for _ in 0..block_count {
external_harness.advance_slot();
let block_root = external_harness
.extend_chain(
@@ -922,23 +1009,17 @@ impl TestRig {
)
.await;
let block = external_harness.get_full_block(&block_root);
let block_root = block.canonical_root();
let block_slot = block.slot();
self.network_blocks_by_root
.insert(block_root, block.clone());
self.network_blocks_by_slot.insert(block_slot, block);
self.log(&format!(
"Produced block {} index {i} in external harness",
block_slot,
));
self.insert_external_block(
block,
external_harness
.chain
.get_payload_envelope(&block_root)
.unwrap(),
);
blocks.push((block_slot, block_root));
}
// Re-log to have a nice list of block roots at the end
for block in &blocks {
self.log(&format!("Build chain {block:?}"));
}
// Auto-update the clock on the main harness to accept the blocks
self.harness
.set_current_slot(external_harness.get_current_slot());
@@ -946,6 +1027,152 @@ impl TestRig {
blocks.last().expect("empty blocks").1
}
/// Builds:
///
/// ```text
/// G (full) -> A (full) -> B (FULL: bid.parent_block_hash == A.block_hash)
/// A -> C (EMPTY: bid.parent_block_hash == G.block_hash)
/// ```
pub(super) async fn build_full_empty_fork(&mut self) -> (Hash256, Hash256, Hash256) {
// Initialise a new beacon chain (mirrors `build_chain`).
let external_harness = self.get_external_harness_with_genesis();
// G: full canonical block on genesis.
external_harness.advance_slot();
let g_root = external_harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let g_block_hash = external_harness
.get_full_block(&g_root)
.as_block()
.payload_bid_block_hash()
.unwrap();
// A: full block on G, imported with its envelope so the FULL child below sees A as full.
external_harness.advance_slot();
let a_slot = external_harness.get_current_slot();
let (a_contents, a_envelope, a_state) = external_harness
.make_block_with_envelope(external_harness.get_current_state(), a_slot)
.await;
let a_block = a_contents.0.clone();
let a_root = a_block.canonical_root();
let a_block_hash = a_block.as_block().payload_bid_block_hash().unwrap();
external_harness
.process_block(a_slot, a_root, a_contents)
.await
.unwrap();
external_harness.advance_slot();
let child_slot = external_harness.get_current_slot();
// C: EMPTY child of A. Built before A's envelope is imported, so its bid points at G.
let (c_contents, c_envelope, c_state) = external_harness
.make_block_with_envelope(a_state.clone(), child_slot)
.await;
let c_block = c_contents.0.clone();
let c_root = c_block.canonical_root();
// Import A's envelope so the next child sees A as full.
let a_envelope = a_envelope.expect("A should have envelope");
external_harness
.process_envelope(a_root, a_envelope, &a_state, a_block.state_root())
.await;
// B: FULL child of A. Built after A's envelope is imported, so its bid points at A.
let (b_contents, b_envelope, b_state) = external_harness
.make_block_with_envelope(a_state.clone(), child_slot)
.await;
let b_block = b_contents.0.clone();
let b_root = b_block.canonical_root();
assert_eq!(
(
b_block.parent_root(),
c_block.parent_root(),
b_block.is_parent_block_full(a_block_hash),
c_block.is_parent_block_full(a_block_hash),
c_block.is_parent_block_full(g_block_hash),
),
(a_root, a_root, true, false, true)
);
// Import both children (and their envelopes) so every block is served through the same
// `get_full_block` path as the rest of the chain.
external_harness
.process_block(child_slot, c_root, c_contents)
.await
.unwrap();
if let Some(c_envelope) = c_envelope {
external_harness
.process_envelope(c_root, c_envelope, &c_state, c_block.state_root())
.await;
}
external_harness
.process_block(child_slot, b_root, b_contents)
.await
.unwrap();
if let Some(b_envelope) = b_envelope {
external_harness
.process_envelope(b_root, b_envelope, &b_state, b_block.state_root())
.await;
}
// Cache every block through the single `get_full_block` + `insert_external_block2` path.
for root in [g_root, a_root, c_root, b_root] {
let block = external_harness.get_full_block(&root);
let envelope = external_harness.chain.get_payload_envelope(&root).unwrap();
self.insert_external_block(block, envelope);
}
self.harness.set_current_slot(child_slot);
(a_root, b_root, c_root)
}
async fn new_gloas_full_empty_fork() -> Option<(Self, FullEmptyFork)> {
let Some(mut r) = Self::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else {
return None;
};
if !r.is_after_gloas() {
return None;
}
let (a, b, c) = r.build_full_empty_fork().await;
let fork = FullEmptyFork {
a,
b,
c,
b_block: r.network_blocks_by_root.get(&b).unwrap().block_cloned(),
c_block: r.network_blocks_by_root.get(&c).unwrap().block_cloned(),
};
Some((r, fork))
}
fn insert_external_block(
&mut self,
block: RangeSyncBlock<E>,
envelope: Option<SignedExecutionPayloadEnvelope<E>>,
) {
let block_root = block.canonical_root();
let block_slot = block.slot();
self.network_blocks_by_root
.insert(block_root, block.clone());
self.network_blocks_by_slot.insert(block_slot, block);
// Cache Gloas envelopes for lookup RPCs.
if let Some(envelope) = envelope {
self.network_envelopes_by_root
.insert(block_root, envelope.into());
}
self.log(&format!(
"Produced block {block_root:?} slot {block_slot} in external harness",
));
}
fn corrupt_last_block_signature(&mut self) {
let range_sync_block = self.get_last_block().clone();
let mut block = (*range_sync_block.block_cloned()).clone();
@@ -978,7 +1205,16 @@ impl TestRig {
}
fn corrupt_last_column_kzg_proof(&mut self) {
let range_sync_block = self.get_last_block().clone();
let block_root = self.get_last_block().canonical_root();
self.corrupt_column_kzg_proof(block_root);
}
fn corrupt_column_kzg_proof(&mut self, block_root: Hash256) {
let range_sync_block = self
.network_blocks_by_root
.get(&block_root)
.unwrap_or_else(|| panic!("No block for root {block_root}"))
.clone();
let block = range_sync_block.block_cloned();
let blobs = range_sync_block.block_data().blobs();
let mut columns = range_sync_block
@@ -989,7 +1225,7 @@ impl TestRig {
let column = Arc::make_mut(first);
let proof = column.kzg_proofs_mut().first_mut().expect("no kzg proofs");
*proof = kzg::KzgProof::empty();
self.re_insert_block(block, blobs, Some(columns));
self.upsert_block(block, blobs, Some(columns));
}
fn get_last_block(&self) -> &RangeSyncBlock<E> {
@@ -1009,6 +1245,15 @@ impl TestRig {
) {
self.network_blocks_by_slot.clear();
self.network_blocks_by_root.clear();
self.upsert_block(block, blobs, columns);
}
fn upsert_block(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<types::BlobSidecarList<E>>,
columns: Option<types::DataColumnSidecarList<E>>,
) {
let block_root = block.canonical_root();
let block_slot = block.slot();
let block_data = if let Some(columns) = columns {
@@ -1135,6 +1380,10 @@ impl TestRig {
self.harness.chain.head().head_slot()
}
pub(super) fn head_root(&self) -> Hash256 {
self.harness.chain.head().head_block_root()
}
pub(super) fn assert_head_slot(&self, slot: u64) {
assert_eq!(self.head_slot(), Slot::new(slot), "Unexpected head slot");
}
@@ -1341,6 +1590,40 @@ impl TestRig {
self.fork_name.fulu_enabled()
}
fn trigger_unknown_parent_blocks_from_all_peers(
&mut self,
blocks: &[Arc<SignedBeaconBlock<E>>],
) {
for peer in self.new_connected_peers_for_peerdas() {
for block in blocks {
self.trigger_unknown_parent_block(peer, block.clone());
}
}
}
fn trigger_full_empty_fork(&mut self, fork: &FullEmptyFork) {
self.trigger_unknown_parent_blocks_from_all_peers(&[
fork.b_block.clone(),
fork.c_block.clone(),
]);
}
async fn trigger_custody_lookup_from_all_peers(&mut self) -> Option<Hash256> {
if self.is_after_gloas() {
self.build_chain(2).await;
let child = self.get_last_block().block_cloned();
let parent_root = child.parent_root();
self.trigger_unknown_parent_blocks_from_all_peers(&[child]);
Some(parent_root)
} else {
let block_root = self.build_chain(1).await;
for peer in self.new_connected_peers_for_peerdas() {
self.trigger_unknown_block_from_attestation(block_root, peer);
}
None
}
}
fn trigger_unknown_parent_block(&mut self, peer_id: PeerId, block: Arc<SignedBeaconBlock<E>>) {
let block_root = block.canonical_root();
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root))
@@ -1351,17 +1634,17 @@ impl TestRig {
peer_id: PeerId,
data_column: Arc<DataColumnSidecar<E>>,
) {
let block_root = data_column.block_root();
let slot = data_column.slot();
let parent_root = match data_column.as_ref() {
DataColumnSidecar::Fulu(column) => column.block_parent_root(),
DataColumnSidecar::Gloas(_) => panic!("Gloas data column not supported in this test"),
let DataColumnSidecar::Fulu(col) = data_column.as_ref() else {
self.log(&format!(
"trigger_unknown_parent_data_column noop for Gloas peer {peer_id:?}"
));
return;
};
self.send_sync_message(SyncMessage::UnknownParentSidecarHeader {
peer_id,
block_root,
parent_root,
slot,
block_root: col.block_root(),
parent_root: col.block_parent_root(),
slot: col.slot(),
});
}
@@ -1395,6 +1678,13 @@ impl TestRig {
self.sync_manager.block_lookups().active_single_lookups()
}
fn active_lookup_roots(&self) -> Vec<Hash256> {
self.active_single_lookups()
.iter()
.map(|l| l.block_root)
.collect()
}
fn active_single_lookups_count(&self) -> usize {
self.active_single_lookups().len()
}
@@ -1789,6 +2079,10 @@ async fn happy_path_unknown_data_parent(depth: usize) {
let Some(mut r) = TestRig::new_after_fulu() else {
return;
};
// No unknown-parent data-column trigger post-Gloas.
if r.is_after_gloas() {
return;
}
r.build_chain(depth).await;
r.trigger_with_last_unknown_data_column_parent();
r.simulate(SimulateConfig::happy_path()).await;
@@ -1806,7 +2100,9 @@ async fn happy_path_multiple_triggers(depth: usize) {
r.trigger_with_last_block();
r.trigger_with_last_unknown_block_parent();
r.trigger_with_last_unknown_block_parent();
r.trigger_with_last_unknown_data_column_parent();
if !r.is_after_gloas() {
r.trigger_with_last_unknown_data_column_parent();
}
r.simulate(SimulateConfig::happy_path()).await;
assert_eq!(r.created_lookups(), depth + 1, "Don't create extra lookups");
r.assert_successful_lookup_sync();
@@ -1838,7 +2134,10 @@ async fn bad_peer_empty_data_response(depth: usize) {
r.simulate(SimulateConfig::new().return_no_data_once())
.await;
// We register a penalty, retry and complete sync successfully
r.assert_penalties(&["NotEnoughResponsesReturned"]);
if !r.is_after_gloas() {
// TODO(gloas): tip columns have no attributable FULL-child peer here.
r.assert_penalties(&["NotEnoughResponsesReturned"]);
}
r.assert_successful_lookup_sync();
// TODO(tree-sync) Assert that a single lookup is created (no drops)
}
@@ -1853,7 +2152,10 @@ async fn bad_peer_too_few_data_response(depth: usize) {
r.simulate(SimulateConfig::new().return_too_few_data_once())
.await;
// We register a penalty, retry and complete sync successfully
r.assert_penalties(&["NotEnoughResponsesReturned"]);
if !r.is_after_gloas() {
// TODO(gloas): tip columns have no attributable FULL-child peer here.
r.assert_penalties(&["NotEnoughResponsesReturned"]);
}
r.assert_successful_lookup_sync();
// TODO(tree-sync) Assert that a single lookup is created (no drops)
}
@@ -1878,8 +2180,13 @@ async fn bad_peer_wrong_data_response(depth: usize) {
r.build_chain_and_trigger_last_block(depth).await;
r.simulate(SimulateConfig::new().return_wrong_sidecar_for_block_once())
.await;
// We register a penalty, retry and complete sync successfully
r.assert_penalties(&["UnrequestedBlockRoot"]);
// We register a penalty, retry and complete sync successfully. Under Gloas the tip block
// (depth 1) has no attributable FULL-child peer so no custody request is made and no penalty
// is possible; at depth >= 2 the parent's columns are served by the tip (its FULL child), so
// the wrong-sidecar penalty is attributable.
if !r.is_after_gloas() || depth >= 2 {
r.assert_penalties(&["UnrequestedBlockRoot"]);
}
r.assert_successful_lookup_sync();
// TODO(tree-sync) Assert that a single lookup is created (no drops)
}
@@ -1953,10 +2260,16 @@ async fn unknown_parent_does_not_add_peers_to_itself() {
r.build_chain(2).await;
r.trigger_with_last_unknown_block_parent();
r.trigger_with_last_unknown_block_parent();
r.trigger_with_last_unknown_data_column_parent();
// No data-column parent trigger post-Gloas.
let parent_lookup_peers = if r.is_after_gloas() {
2
} else {
r.trigger_with_last_unknown_data_column_parent();
3
};
r.simulate(SimulateConfig::happy_path()).await;
r.assert_peers_at_lookup_of_slot(2, 0);
r.assert_peers_at_lookup_of_slot(1, 3);
r.assert_peers_at_lookup_of_slot(1, parent_lookup_peers);
assert_eq!(r.created_lookups(), 2, "Don't create extra lookups");
// All lookups should NOT complete on this test, however note the following for the tip lookup,
// it's the lookup for the tip block which has 0 peers and a block cached:
@@ -1996,6 +2309,10 @@ async fn test_single_block_lookup_ignored_response() {
/// Assert that if the beacon processor returns DuplicateFullyImported, the lookup completes successfully
async fn test_single_block_lookup_duplicate_response() {
let mut r = TestRig::default();
// The mock only covers block processing; Gloas also needs real envelope/column results.
if r.is_after_gloas() {
return;
}
r.build_chain_and_trigger_last_block(1).await;
// Send a DuplicateFullyImported response, the lookup should complete successfully
r.simulate(
@@ -2060,6 +2377,10 @@ async fn lookups_form_chain() {
/// Assert that if a lookup chain (by appending ancestors) is too long we drop it
async fn test_parent_lookup_too_deep_grow_ancestor_one() {
let mut r = TestRig::default();
// TODO(gloas): range sync does not fetch payload envelopes yet.
if r.is_after_gloas() {
return;
}
r.build_chain(PARENT_DEPTH_TOLERANCE + 1).await;
r.trigger_with_last_block();
r.simulate(SimulateConfig::happy_path()).await;
@@ -2210,6 +2531,10 @@ async fn block_in_da_checker_skips_download() {
let Some(mut r) = TestRig::new_after_fulu() else {
return;
};
// TODO(gloas): the helper does not populate the envelope missing-component path yet.
if r.is_after_gloas() {
return;
}
// Add block to da_checker
// Complete test with happy path
// Assert that there were no requests for blocks
@@ -2279,14 +2604,13 @@ async fn custody_lookup_some_custody_failures(test_type: FuluTestType) {
let Some(mut r) = TestRig::new_fulu_peer_test(test_type) else {
return;
};
let block_root = r.build_chain(1).await;
// Send the same trigger from all peers, so that the lookup has all peers
for peer in r.new_connected_peers_for_peerdas() {
r.trigger_unknown_block_from_attestation(block_root, peer);
}
let block_under_test = r.trigger_custody_lookup_from_all_peers().await;
let custody_columns = r.custody_columns();
r.simulate(SimulateConfig::new().return_no_columns_on_indices(&custody_columns[..4], 3))
.await;
let mut config = SimulateConfig::new().return_no_columns_on_indices(&custody_columns[..4], 3);
if let Some(block_root) = block_under_test {
config = config.return_no_columns_for_block(block_root);
}
r.simulate(config).await;
r.assert_penalties_of_type("NotEnoughResponsesReturned");
r.assert_successful_lookup_sync();
}
@@ -2295,20 +2619,15 @@ async fn custody_lookup_permanent_custody_failures(test_type: FuluTestType) {
let Some(mut r) = TestRig::new_fulu_peer_test(test_type) else {
return;
};
let block_root = r.build_chain(1).await;
// Send the same trigger from all peers, so that the lookup has all peers
for peer in r.new_connected_peers_for_peerdas() {
r.trigger_unknown_block_from_attestation(block_root, peer);
}
let block_under_test = r.trigger_custody_lookup_from_all_peers().await;
let custody_columns = r.custody_columns();
r.simulate(
SimulateConfig::new().return_no_columns_on_indices(&custody_columns[..2], usize::MAX),
)
.await;
// Every peer that does not return a column is part of the lookup because it claimed to have
// imported the lookup, so we will penalize.
let mut config =
SimulateConfig::new().return_no_columns_on_indices(&custody_columns[..2], usize::MAX);
if let Some(block_root) = block_under_test {
config = config.return_no_columns_for_block(block_root);
}
r.simulate(config).await;
r.assert_penalties_of_type("NotEnoughResponsesReturned");
r.assert_failed_lookup_sync();
}
@@ -2346,6 +2665,10 @@ async fn crypto_on_fail_with_bad_column_proposer_signature() {
let Some(mut r) = TestRig::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else {
return;
};
// Gloas columns have no per-column proposer signature.
if r.is_after_gloas() {
return;
}
r.build_chain(1).await;
r.corrupt_last_column_proposer_signature();
r.trigger_with_last_block();
@@ -2364,9 +2687,16 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() {
let Some(mut r) = TestRig::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else {
return;
};
r.build_chain(1).await;
r.corrupt_last_column_kzg_proof();
r.trigger_with_last_block();
if r.is_after_gloas() {
r.build_chain(2).await;
let child = r.get_last_block().block_cloned();
r.corrupt_column_kzg_proof(child.parent_root());
r.trigger_unknown_parent_blocks_from_all_peers(&[child]);
} else {
r.build_chain(1).await;
r.corrupt_last_column_kzg_proof();
r.trigger_with_last_block();
}
r.simulate(SimulateConfig::happy_path()).await;
if cfg!(feature = "fake_crypto") {
r.assert_successful_lookup_sync();
@@ -2376,3 +2706,36 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() {
r.assert_penalties_of_type("AvailabilityCheck");
}
}
#[tokio::test]
async fn gloas_full_empty_children_retain_parent_for_payload() {
let Some((mut r, fork)) = TestRig::new_gloas_full_empty_fork().await else {
return;
};
r.trigger_full_empty_fork(&fork);
r.simulate(SimulateConfig::happy_path()).await;
r.assert_successful_lookup_sync();
}
#[tokio::test]
async fn gloas_empty_child_continues_while_parent_payload_withheld() {
let Some((mut r, fork)) = TestRig::new_gloas_full_empty_fork().await else {
return;
};
r.trigger_full_empty_fork(&fork);
r.simulate(SimulateConfig::happy_path().hold_envelope_for_block(fork.a))
.await;
assert_eq!(r.head_root(), fork.c);
assert_eq!(r.created_lookups(), 4);
assert_eq!(r.completed_lookups(), 2);
assert_eq!(r.dropped_lookups(), 0);
assert_eq!(r.active_lookup_roots(), vec![fork.a, fork.b]);
r.assert_no_penalties();
r.assert_empty_network();
r.assert_empty_processor();
}

View File

@@ -21,7 +21,7 @@ use tokio::sync::mpsc;
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use types::{ForkName, Hash256, MinimalEthSpec as E, Slot};
use types::{ForkName, Hash256, MinimalEthSpec as E, SignedExecutionPayloadEnvelope, Slot};
mod lookups;
mod range;
@@ -77,6 +77,10 @@ struct TestRig {
/// Blocks that will be used in the test but may not be known to `harness` yet.
network_blocks_by_root: HashMap<Hash256, RangeSyncBlock<E>>,
network_blocks_by_slot: HashMap<Slot, RangeSyncBlock<E>>,
/// Gloas execution payload envelopes keyed by block root, populated during `build_chain`
/// from the external harness store. The rig serves these when a lookup issues a
/// `PayloadEnvelopesByRoot` request.
network_envelopes_by_root: HashMap<Hash256, Arc<SignedExecutionPayloadEnvelope<E>>>,
penalties: Vec<ReportedPenalty>,
/// All seen lookups through the test run
seen_lookups: HashMap<Id, SeenLookup>,

View File

@@ -34,6 +34,13 @@ use types::{Epoch, EthSpec, Hash256, MinimalEthSpec as E, Slot};
const SLOTS_PER_EPOCH: usize = 8;
impl TestRig {
/// Range sync doesn't yet ingest Gloas blocks in these tests: the range harness doesn't serve
/// payload envelopes, so a Gloas block never becomes fully available and sync can't complete.
/// Skip the affected completion tests under a Gloas genesis. TODO(gloas): support range sync.
fn skip_range_sync_under_gloas(&self) -> bool {
self.fork_name.gloas_enabled()
}
fn add_head_peer(&mut self) -> PeerId {
let local_info = self.local_info();
self.add_supernode_peer(SyncInfo {
@@ -260,6 +267,9 @@ impl TestRig {
#[tokio::test]
async fn head_sync_completes() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
r.setup_head_sync().await;
r.simulate(SimulateConfig::happy_path()).await;
r.assert_head_sync_completed();
@@ -271,6 +281,9 @@ async fn head_sync_completes() {
#[tokio::test]
async fn finalized_to_head_transition() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
r.setup_finalized_and_head_sync().await;
r.simulate(SimulateConfig::happy_path()).await;
r.assert_range_sync_completed();
@@ -282,6 +295,9 @@ async fn finalized_to_head_transition() {
#[tokio::test]
async fn finalized_sync_completes() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
r.setup_finalized_sync().await;
r.simulate(SimulateConfig::happy_path()).await;
r.assert_range_sync_completed();
@@ -293,6 +309,9 @@ async fn finalized_sync_completes() {
#[tokio::test]
async fn batch_rpc_error_retries() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
r.setup_finalized_sync().await;
r.simulate(SimulateConfig::happy_path().return_rpc_error(RPCError::UnsupportedProtocol))
.await;
@@ -361,6 +380,9 @@ async fn batch_peer_returns_partial_columns_then_succeeds() {
#[tokio::test]
async fn batch_non_faulty_failure_retries() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
r.setup_finalized_sync().await;
r.simulate(SimulateConfig::happy_path().with_range_non_faulty_failures(1))
.await;
@@ -372,6 +394,9 @@ async fn batch_non_faulty_failure_retries() {
#[tokio::test]
async fn batch_faulty_failure_redownloads() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
r.setup_finalized_sync().await;
r.simulate(SimulateConfig::happy_path().with_range_faulty_failures(1))
.await;
@@ -428,6 +453,9 @@ async fn late_response_for_removed_chain() {
#[tokio::test]
async fn ee_offline_then_online_resumes_sync() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
r.setup_finalized_sync().await;
r.simulate(SimulateConfig::happy_path().with_ee_offline_for_n_range_responses(2))
.await;
@@ -440,6 +468,9 @@ async fn ee_offline_then_online_resumes_sync() {
#[tokio::test]
async fn finalized_sync_with_local_head_partial() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
r.setup_finalized_sync_with_local_head(3).await;
r.simulate(SimulateConfig::happy_path()).await;
r.assert_range_sync_completed();
@@ -450,6 +481,9 @@ async fn finalized_sync_with_local_head_partial() {
#[tokio::test]
async fn finalized_sync_with_local_head_near_target() {
let mut r = TestRig::default();
if r.skip_range_sync_under_gloas() {
return;
}
let target_epochs = 5;
let local_slots = (target_epochs * SLOTS_PER_EPOCH) - 1; // all blocks except last
r.build_chain(target_epochs * SLOTS_PER_EPOCH).await;
@@ -468,7 +502,7 @@ async fn finalized_sync_with_local_head_near_target() {
#[tokio::test]
async fn not_enough_custody_peers_then_peers_arrive() {
let mut r = TestRig::default();
if !r.fork_name.fulu_enabled() {
if !r.fork_name.fulu_enabled() || r.skip_range_sync_under_gloas() {
return;
}
let remote_info = r.setup_finalized_sync_insufficient_peers().await;
@@ -495,7 +529,7 @@ async fn not_enough_custody_peers_then_peers_arrive() {
#[tokio::test]
async fn finalized_sync_not_enough_custody_peers_resume_after_peer_cgc_update() {
let mut r = TestRig::default();
if !r.fork_name.fulu_enabled() {
if !r.fork_name.fulu_enabled() || r.skip_range_sync_under_gloas() {
return;
}