diff --git a/Makefile b/Makefile index 3c00883ce9..94ad55bf6b 100644 --- a/Makefile +++ b/Makefile @@ -30,17 +30,15 @@ TEST_FEATURES ?= # Cargo profile for regular builds. PROFILE ?= release -# List of all hard forks up to gloas. This list is used to set env variables for several tests so that -# they run for different forks. -# TODO(EIP-7732) Remove this once we extend network tests to support gloas and use RECENT_FORKS instead +# List of recent hard forks before Gloas. Used by tests that do not support Gloas yet. RECENT_FORKS_BEFORE_GLOAS=fulu -# List of all recent hard forks. This list is used to set env variables for http_api tests +# List of all recent hard forks. This list is used to set env variables for several tests. # Include phase0 to test the code paths in sync that are pre blobs RECENT_FORKS=fulu gloas # For network tests include phase0 to cover genesis syncing (blocks without blobs or columns) -TEST_NETWORK_FORKS=phase0 $(RECENT_FORKS_BEFORE_GLOAS) +TEST_NETWORK_FORKS=phase0 $(RECENT_FORKS) # Extra flags for Cargo CARGO_INSTALL_EXTRA_FLAGS?= @@ -228,7 +226,6 @@ test-op-pool-%: -p operation_pool # Run the tests in the `network` crate for all known forks. -# TODO(EIP-7732) Extend to support gloas by using RECENT_FORKS instead test-network: $(patsubst %,test-network-%,$(TEST_NETWORK_FORKS)) test-network-%: diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index dc786fb7fb..8b50f4b18f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3397,6 +3397,7 @@ impl BeaconChain { { return Err(BlockError::ParentUnknown { parent_root: blob.block_parent_root(), + parent_block_hash: None, }); } } @@ -3523,7 +3524,10 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&parent_root) { - return Err(BlockError::ParentUnknown { parent_root }); + return Err(BlockError::ParentUnknown { + parent_root, + parent_block_hash: None, + }); } self.emit_sse_data_column_sidecar_events( diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 49ab4a06d2..7b9b7e8218 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -96,6 +96,7 @@ use store::{Error as DBError, KeyValueStore}; use strum::{AsRefStr, IntoStaticStr}; use task_executor::JoinHandle; use tracing::{Instrument, Span, debug, debug_span, error, info_span, instrument}; +use types::ExecutionBlockHash; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, FullPayload, Hash256, InconsistentFork, KzgProofs, RelativeEpoch, @@ -125,6 +126,7 @@ pub enum BlockError { /// its parent. ParentUnknown { parent_root: Hash256, + parent_block_hash: Option, }, /// The block slot is greater than the present slot. /// @@ -1446,6 +1448,7 @@ impl ExecutionPendingBlock { ParentImportStatus::UnknownBlock | ParentImportStatus::UnknownPayload => { return Err(BlockError::ParentUnknown { parent_root: block.parent_root(), + parent_block_hash: block.as_block().payload_bid_parent_block_hash().ok(), }); } } @@ -1821,6 +1824,7 @@ pub fn check_block_is_finalized_checkpoint_or_descendant< } else { Err(BlockError::ParentUnknown { parent_root: block.parent_root(), + parent_block_hash: block.as_block().payload_bid_parent_block_hash().ok(), }) } } @@ -1915,6 +1919,7 @@ fn verify_parent_block_and_envelope_are_known( ParentImportStatus::UnknownBlock | ParentImportStatus::UnknownPayload => { Err(BlockError::ParentUnknown { parent_root: block.parent_root(), + parent_block_hash: block.payload_bid_parent_block_hash().ok(), }) } } @@ -1947,6 +1952,7 @@ fn load_parent>( { return Err(BlockError::ParentUnknown { parent_root: block.parent_root(), + parent_block_hash: block.as_block().payload_bid_parent_block_hash().ok(), }); } diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 74a521a79c..d7bba13eb0 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1358,7 +1358,7 @@ async fn block_gossip_verification() { assert!( matches!( unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), - BlockError::ParentUnknown {parent_root: p} + BlockError::ParentUnknown {parent_root: p, ..} if p == parent_root ), "should not import a block for an unknown parent" diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 056ffc03b8..ed9dc5666a 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -730,7 +730,7 @@ pub fn rpc_data_column_limits( if fork_name.gloas_enabled() { RpcLimits::new( DataColumnSidecarGloas::::min_size(), - DataColumnSidecarGloas::::max_size( + DataColumnSidecarFulu::::max_size( spec.max_blobs_per_block(current_digest_epoch) as usize ), ) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 29e43b18c2..2668a14dc5 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -732,6 +732,13 @@ impl NetworkBeaconProcessor { %unknown_block_root, "Unknown block root for column" ); + // Data columns are only propagated once the block has been seen for both Fulu + // and Gloas. `UnknownBlockHashFromAttestation` declares that `peer_id` has + // imported `unknown_block_root`. + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + unknown_block_root, + )); self.propagate_validation_result( message_id.clone(), peer_id, @@ -1076,10 +1083,9 @@ impl NetworkBeaconProcessor { %unknown_block_root, "Unknown block root for partial column" ); - // TODO(gloas): wire this into proper lookup sync. Sending - // `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that - // mixes column processing with the attestation lookup path and is not - // the right primitive for Gloas column lookups. + // Data columns are only propagated once the block has been seen for both Fulu + // and Gloas. `UnknownBlockHashFromAttestation` declares that `peer_id` has + // imported `unknown_block_root`. self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( peer_id, unknown_block_root, @@ -2714,14 +2720,10 @@ impl NetworkBeaconProcessor { if allow_reprocess { // We don't know the block, get the sync manager to handle the block lookup, and // send the attestation to be scheduled for re-processing. - self.sync_tx - .send(SyncMessage::UnknownBlockHashFromAttestation( - peer_id, - *beacon_block_root, - )) - .unwrap_or_else(|_| { - warn!(msg = "UnknownBlockHash", "Failed to send to sync service") - }); + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + *beacon_block_root, + )); let msg = match failed_att { FailedAtt::Aggregate { attestation, @@ -3994,13 +3996,17 @@ impl NetworkBeaconProcessor { | PayloadAttestationError::PriorPayloadAttestationMessageKnown { .. } => { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } - PayloadAttestationError::UnknownHeadBlock { .. } => { + PayloadAttestationError::UnknownHeadBlock { beacon_block_root } => { debug!( %peer_id, %message_slot, "Payload attestation references unknown block" ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + *beacon_block_root, + )) } PayloadAttestationError::NotInPTC { .. } => { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8245b5dc0c..528a261bb8 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -28,7 +28,7 @@ use logging::crit; use std::sync::Arc; use std::time::Duration; use tracing::{debug, debug_span, error, info, instrument, warn}; -use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; +use types::{BlockImportSource, DataColumnSidecarList, Epoch, ExecutionBlockHash, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -962,13 +962,14 @@ impl NetworkBeaconProcessor { /// The classified outcome of submitting a block / blob / column for processing, ready for the /// lookup state machine to act on without re-inspecting `BlockError`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum BlockProcessingResult { /// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the /// lookup must keep fetching). `info` is a stable label for logs / metrics. Imported(bool, &'static str), ParentUnknown { parent_root: Hash256, + parent_block_hash: Option, }, /// Processing failed. `penalty` is `Some` when an attributable peer should be downscored; /// the third tuple element is the `report_peer` telemetry msg. `reason` is for logs only. @@ -1000,9 +1001,13 @@ impl From> for BlockProcessingR return Self::Imported(true, "duplicate"); } BlockError::GenesisBlock => return Self::Imported(true, "genesis"), - BlockError::ParentUnknown { parent_root, .. } => { + BlockError::ParentUnknown { + parent_root, + parent_block_hash, + } => { return Self::ParentUnknown { parent_root: *parent_root, + parent_block_hash: *parent_block_hash, }; } BlockError::BeaconChainError(_) | BlockError::InternalError(_) => None, diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 8ccfe38fa3..6b7c623230 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -907,7 +907,10 @@ async fn data_column_reconstruction_at_slot_start() { // reconstruction deadline. #[tokio::test] async fn data_column_reconstruction_at_deadline() { - if test_spec::().fulu_fork_epoch.is_none() { + let spec = test_spec::(); + // Pre-Gloas data-column path: a Gloas block carries its columns in the payload envelope, so the + // harness produces no block-level data columns and this gossip/reconstruction flow doesn't apply. + if spec.fulu_fork_epoch.is_none() || spec.gloas_fork_epoch.is_some() { return; }; @@ -1094,7 +1097,11 @@ async fn import_gossip_block_unacceptably_early() { /// Data columns that have already been processed but unobserved should be propagated without re-importing. #[tokio::test] async fn accept_processed_gossip_data_columns_without_import() { - if test_spec::().fulu_fork_epoch.is_none() { + let spec = test_spec::(); + // Pre-Gloas data-column path: a Gloas block carries its columns in the payload envelope, so the + // harness produces no block-level data columns and this gossip flow doesn't apply. + // TODO(gloas): re-enable this test + if spec.fulu_fork_epoch.is_none() || spec.gloas_fork_epoch.is_some() { return; }; @@ -1983,6 +1990,11 @@ async fn test_payload_envelopes_by_range() { // Manually store payload envelopes for each block in the range let mut expected_roots = Vec::new(); for slot in start_slot..slot_count { + // Genesis (slot 0) has no canonical execution payload, so the by-range handler filters it + // out via `block_has_canonical_payload` even if an envelope is stored for it. + if slot == 0 { + continue; + } if let Some(root) = rig .chain .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) @@ -2076,14 +2088,10 @@ async fn test_payload_envelopes_by_root_unknown_root_returns_empty() { let mut rig = TestRig::new(64).await; - // Request envelope for a root that has no stored envelope - let block_root = rig - .chain - .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) - .unwrap() - .unwrap(); + // Use a root with no block: the harness persists an envelope for every block it produces, so a + // real block root would already have one. An unknown root has no stored envelope. + let block_root = Hash256::repeat_byte(0xaa); - // Don't store any envelope — the handler should return 0 envelopes let roots = RuntimeVariableList::new(vec![block_root], 1).unwrap(); rig.enqueue_payload_envelopes_by_root_request(roots); diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index f3dab7f395..36816fb5d6 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -1229,7 +1229,7 @@ mod tests { fn request_batches_should_not_loop_infinitely() { let harness = BeaconChainHarness::builder(MinimalEthSpec) .default_spec() - .deterministic_keypairs(4) + .deterministic_keypairs(8) .fresh_ephemeral_store() .build(); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 0cbeb5ee4e..d403382e9e 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -22,13 +22,16 @@ use self::parent_chain::{NodeChain, compute_parent_chains}; pub use self::single_block_lookup::DownloadResult; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; +use self::single_block_lookup::{LookupRequestError, PeerType, SingleBlockLookup}; use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE}; use super::network_context::{RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::network_beacon_processor::BlockProcessingResult; use crate::sync::SyncMessage; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; +use crate::sync::block_lookups::single_block_lookup::{ + AwaitingParent, ImportedParent, LookupResult, +}; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::PeerId; @@ -39,7 +42,10 @@ use std::sync::Arc; use std::time::Duration; use store::Hash256; use tracing::{debug, error, warn}; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock}; +use types::{ + DataColumnSidecarList, EthSpec, ExecutionBlockHash, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, +}; pub mod parent_chain; mod single_block_lookup; @@ -73,6 +79,8 @@ const MAX_LOOKUPS: usize = 200; type BlockDownloadResponse = Result>>, RpcResponseError>; type CustodyDownloadResponse = Result>, RpcResponseError>; +type PayloadDownloadResponse = + Result>>, RpcResponseError>; pub enum BlockComponent { Block(DownloadResult>>), @@ -169,22 +177,29 @@ impl BlockLookups { block_root: Hash256, block_component: BlockComponent, parent_root: Hash256, + parent_block_hash: Option, peer_id: PeerId, cx: &mut SyncNetworkContext, ) -> bool { - let parent_lookup_exists = - self.search_parent_of_child(parent_root, block_root, &[peer_id], cx); + let parent_lookup_exists = self.search_parent_of_child( + parent_root, + &PeerType::new(parent_block_hash), + block_root, + &[peer_id], + cx, + ); // Only create the child lookup if the parent exists if parent_lookup_exists { // `search_parent_of_child` ensures that the parent lookup exists so we can safely wait for it self.new_current_lookup( block_root, Some(block_component), - Some(parent_root), + Some(AwaitingParent::new(parent_root, parent_block_hash)), // On a `UnknownParentBlock` or `UnknownParentSidecarHeader` event the peer is not // required to have the rest of the block components. Create the lookup with zero // peers to house the block components. &[], + &PeerType::Block, cx, ) } else { @@ -202,7 +217,7 @@ impl BlockLookups { peer_source: &[PeerId], cx: &mut SyncNetworkContext, ) -> bool { - self.new_current_lookup(block_root, None, None, peer_source, cx) + self.new_current_lookup(block_root, None, None, peer_source, &PeerType::Block, cx) } /// A block or blob triggers the search of a parent. @@ -215,6 +230,7 @@ impl BlockLookups { pub fn search_parent_of_child( &mut self, block_root_to_search: Hash256, + peer_type: &PeerType, child_block_root_trigger: Hash256, peers: &[PeerId], cx: &mut SyncNetworkContext, @@ -307,7 +323,7 @@ impl BlockLookups { } // `block_root_to_search` is a failed chain check happens inside new_current_lookup - self.new_current_lookup(block_root_to_search, None, None, peers, cx) + self.new_current_lookup(block_root_to_search, None, None, peers, peer_type, cx) } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is @@ -318,8 +334,9 @@ impl BlockLookups { &mut self, block_root: Hash256, block_component: Option>, - awaiting_parent: Option, + awaiting_parent: Option, peers: &[PeerId], + peer_type: &PeerType, cx: &mut SyncNetworkContext, ) -> bool { // If this block or it's parent is part of a known ignored chain, ignore it. @@ -341,7 +358,8 @@ impl BlockLookups { } } - if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) { + if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, peer_type, cx) + { warn!(error = ?e, "Error adding peers to ancestor lookup"); } @@ -353,7 +371,7 @@ impl BlockLookups { && !self .single_block_lookups .iter() - .any(|(_, lookup)| lookup.is_for_block(awaiting_parent)) + .any(|(_, lookup)| lookup.block_root() == awaiting_parent.parent_root()) { warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found"); return false; @@ -368,7 +386,8 @@ impl BlockLookups { // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), // signal here to hold processing downloaded data. - let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let mut lookup = + SingleBlockLookup::new(block_root, peers, peer_type, cx.next_id(), awaiting_parent); let _guard = lookup.span.clone().entered(); // Add block components to the new request @@ -389,9 +408,7 @@ impl BlockLookups { debug!( ?peers, ?block_root, - awaiting_parent = awaiting_parent - .map(|root| root.to_string()) - .unwrap_or("none".to_owned()), + ?awaiting_parent, id = lookup.id, "Created block lookup" ); @@ -438,6 +455,23 @@ impl BlockLookups { self.on_lookup_result(id.lookup_id, result, "custody_download_response", cx); } + pub fn on_payload_download_response( + &mut self, + id: SingleLookupReqId, + response: PayloadDownloadResponse, + cx: &mut SyncNetworkContext, + ) { + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { + debug!( + ?id, + "Payload envelope returned for a lookup id that doesn't exist" + ); + return; + }; + let result = lookup.on_payload_download_response(id.req_id, response, cx); + self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx); + } + /* Error responses */ pub fn peer_disconnected(&mut self, peer_id: &PeerId) { @@ -472,25 +506,62 @@ impl BlockLookups { ); let lookup_result = match process_type { - BlockProcessType::SingleBlock { .. } => lookup.on_block_processing_result(result, cx), + BlockProcessType::SingleBlock { .. } => { + // Update the result of the lookup first, here we may start the download of Gloas + // payload, which may error. + let lookup_result = lookup.on_block_processing_result(result.clone(), cx); + let lookup_is_awaiting_event = lookup.is_awaiting_event(); + let block_root = lookup.block_root(); + // Then, as a side-effect continue the EMPTY children of this lookup. Only if the + // block just imported which ensures we just do it once per lookup. + if let BlockProcessingResult::Imported(..) = result + && let Some(bid_block_hash) = lookup.peek_downloaded_bid_block_hash() + { + self.continue_child_lookups( + block_root, + ImportedParent::OnlyGloasBlock(bid_block_hash), + cx, + ); + } + // Then if this lookup happens to have only empty children we can remove it now. We + // must make sure that no other lookup is awaiting this one, and that no requests + // are on-going. + if !lookup_is_awaiting_event && !self.has_any_awaiting_children(block_root) { + Ok(LookupResult::Completed) + } else { + lookup_result + } + } BlockProcessType::SingleCustodyColumn(_) => { lookup.on_data_processing_result(result, cx) } - // TODO(gloas): route into the payload envelope lookup state machine. - BlockProcessType::SinglePayloadEnvelope(_) => Ok(LookupResult::Pending), + BlockProcessType::SinglePayloadEnvelope(_) => { + lookup.on_payload_processing_result(result, cx) + } }; self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx); } + pub fn has_any_awaiting_children(&self, block_root: Hash256) -> bool { + self.single_block_lookups + .iter() + .any(|(_, lookup)| lookup.is_awaiting_block(block_root)) + } + /// Makes progress on the immediate children of `block_root` - pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext) { + pub fn continue_child_lookups( + &mut self, + parent_root: Hash256, + imported_parent: ImportedParent, + cx: &mut SyncNetworkContext, + ) { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_parent() == Some(block_root) { + if lookup.is_awaiting_parent(parent_root, imported_parent) { lookup.resolve_awaiting_parent(); debug!( - parent_root = ?block_root, + ?imported_parent, id, block_root = ?lookup.block_root(), "Continuing child lookup" @@ -523,7 +594,7 @@ impl BlockLookups { let child_lookups = self .single_block_lookups .iter() - .filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root())) + .filter(|(_, lookup)| lookup.is_awaiting_block(dropped_lookup.block_root())) .map(|(id, _)| *id) .collect::>(); @@ -546,10 +617,17 @@ impl BlockLookups { Ok(LookupResult::Pending) => true, Ok(LookupResult::ParentUnknown { parent_root, + parent_block_hash, block_root, peers, }) => { - if self.search_parent_of_child(parent_root, block_root, &peers, cx) { + if self.search_parent_of_child( + parent_root, + &PeerType::new(parent_block_hash), + block_root, + &peers, + cx, + ) { true } else { self.drop_lookup_and_children(id, "Failed"); @@ -567,16 +645,17 @@ impl BlockLookups { metrics::inc_counter(&metrics::SYNC_LOOKUP_COMPLETED); self.metrics.completed_lookups += 1; // Block imported, continue the requests of pending child blocks - self.continue_child_lookups(lookup.block_root(), cx); + self.continue_child_lookups( + lookup.block_root(), + ImportedParent::LookupComplete, + cx, + ); self.update_metrics(); } else { debug!(id, "Attempting to drop non-existent lookup"); } false } - // If UnknownLookup do not log the request error. No need to drop child lookups nor - // update metrics because the lookup does not exist. - Err(LookupRequestError::UnknownLookup) => false, Err(error) => { debug!(id, source, ?error, "Dropping lookup on request error"); self.drop_lookup_and_children(id, error.into()); @@ -708,7 +787,7 @@ impl BlockLookups { if let Some(lookup) = self .single_block_lookups .values() - .find(|l| l.block_root() == awaiting_parent) + .find(|l| l.block_root() == awaiting_parent.parent_root()) { self.find_oldest_ancestor_lookup(lookup) } else { @@ -729,6 +808,7 @@ impl BlockLookups { &mut self, lookup_id: SingleLookupId, peers: &[PeerId], + peer_type: &PeerType, cx: &mut SyncNetworkContext, ) -> Result<(), String> { let lookup = self @@ -738,7 +818,7 @@ impl BlockLookups { let mut added_some_peer = false; for peer in peers { - if lookup.add_peer(*peer) { + if lookup.add_peer(*peer, peer_type) { added_some_peer = true; debug!( block_root = ?lookup.block_root(), @@ -748,15 +828,21 @@ impl BlockLookups { } } - if let Some(parent_root) = lookup.awaiting_parent() { + if let Some(&awaiting_parent) = lookup.awaiting_parent() { + // Regardless of gloas full/empty the lookup to add peers to is keyed by block_root if let Some((&parent_id, _)) = self .single_block_lookups .iter() - .find(|(_, l)| l.block_root() == parent_root) + .find(|(_, l)| l.block_root() == awaiting_parent.parent_root()) { - self.add_peers_to_lookup_and_ancestors(parent_id, peers, cx) + self.add_peers_to_lookup_and_ancestors( + parent_id, + peers, + &awaiting_parent.into_peer_type(), + cx, + ) } else { - Err(format!("Lookup references unknown parent {parent_root:?}")) + Err(format!("Lookup references unknown {awaiting_parent:?}")) } } else if added_some_peer { // If this lookup is not awaiting a parent and we added at least one peer, attempt to diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 5deea1dd94..120ce5b1cc 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -13,7 +13,7 @@ impl From<&SingleBlockLookup> for Node { fn from(value: &SingleBlockLookup) -> Self { Self { block_root: value.block_root(), - parent_root: value.awaiting_parent(), + parent_root: value.awaiting_parent().map(|a| a.parent_root()), } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 157da5d806..f03eed1638 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,6 +1,8 @@ use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::network_beacon_processor::BlockProcessingResult; -use crate::sync::block_lookups::{BlockDownloadResponse, CustodyDownloadResponse}; +use crate::sync::block_lookups::{ + BlockDownloadResponse, CustodyDownloadResponse, PayloadDownloadResponse, +}; use crate::sync::manager::BlockProcessType; use crate::sync::network_context::{ LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, RpcResponseError, @@ -11,13 +13,16 @@ use beacon_chain::block_verification_types::AsBlock; use educe::Educe; use lighthouse_network::service::api_types::Id; use parking_lot::RwLock; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; use tracing::{Span, debug_span}; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; +use types::{ + DataColumnSidecarList, EthSpec, ExecutionBlockHash, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, +}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -29,6 +34,7 @@ pub enum LookupResult { /// Block's parent is not known to fork-choice, a parent lookup is needed ParentUnknown { parent_root: Hash256, + parent_block_hash: Option, block_root: Hash256, peers: Vec, }, @@ -46,8 +52,6 @@ pub enum LookupRequestError { BadState(String), /// Lookup failed for some other reason and should be dropped Failed(/* reason: */ String), - /// Attempted to retrieve a not known lookup id - UnknownLookup, /// Received a download result for a different request id than the in-flight request. /// There should only exist a single request at a time. Having multiple requests is a bug and /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. @@ -57,6 +61,31 @@ pub enum LookupRequestError { }, } +#[derive(Debug, Clone, Copy)] +pub struct AwaitingParent { + parent_root: Hash256, + parent_block_hash: Option, +} + +impl AwaitingParent { + pub fn new(parent_root: Hash256, parent_block_hash: Option) -> Self { + Self { + parent_root, + parent_block_hash, + } + } + + pub fn parent_root(&self) -> Hash256 { + self.parent_root + } + + pub fn into_peer_type(self) -> PeerType { + PeerType::new(self.parent_block_hash) + } +} + +type PeerSet = Arc>>; + #[derive(Debug)] struct BlockRequest { state: SingleLookupRequestState>>, @@ -79,6 +108,9 @@ enum DataRequest { WaitingForBlock, Request { slot: Slot, + /// Peers to fetch the data columns from. Pre-Gloas this is the lookup's `peers`; for FULL + /// Gloas blocks this is the `gloas_child_peers` set proven to hold the columns. + peers: PeerSet, state: SingleLookupRequestState>, }, NoData, @@ -94,7 +126,62 @@ impl DataRequest { } } -type PeerSet = Arc>>; +/// Tracks the download + processing of a Gloas execution payload envelope. For FULL Gloas blocks the +/// execution payload arrives as a separate `SignedExecutionPayloadEnvelope`, mirroring the way data +/// columns are fetched and processed by `DataRequest`. +#[derive(Debug)] +enum PayloadRequest { + /// Block not yet downloaded, can't tell if a payload is needed. + WaitingForBlock, + /// Post-Gloas block: an execution payload envelope must be fetched and processed *if* the block + /// is FULL. We can't tell FULL from EMPTY from the block alone: only a FULL child of this block + /// proves a payload was published, which is signalled by `peers` becoming non-empty. + Request { + peers: PeerSet, + state: SingleLookupRequestState>>, + }, + /// Pre-Gloas block: no payload envelope exists, nothing to fetch. + PreGloas, +} + +impl PayloadRequest { + fn is_complete(&self) -> bool { + match &self { + PayloadRequest::WaitingForBlock => false, + PayloadRequest::Request { state, .. } => state.is_processed(), + PayloadRequest::PreGloas => true, + } + } +} + +/// Classifies how a peer relates to a lookup, controlling which peer set it is added to. +pub enum PeerType { + /// The peer can serve the looked-up block and (pre-Gloas) its data columns. + Block, + /// The peer claims to have imported a FULL child of this block whose bid references + /// `ExecutionBlockHash` as its parent. Such peers can serve this block's payload envelope and + /// data columns. + PayloadEnvelope(ExecutionBlockHash), +} + +impl PeerType { + /// `PayloadEnvelope` when the block's bid `parent_block_hash` is known (post-Gloas), else `Block`. + pub fn new(parent_block_hash: Option) -> Self { + match parent_block_hash { + Some(execution_hash) => PeerType::PayloadEnvelope(execution_hash), + None => PeerType::Block, + } + } +} + +/// Used by `is_awaiting_parent` to decide if it can resolve its awaiting parent status +#[derive(Debug, Clone, Copy)] +pub enum ImportedParent { + /// All requests of a lookup are complete, both for pre and post Gloas + LookupComplete, + /// Only post-Gloas, the block request has just been completed. Includes the bid block hash + OnlyGloasBlock(ExecutionBlockHash), +} #[derive(Educe)] #[educe(Debug(bound(T: BeaconChainTypes)))] @@ -103,13 +190,19 @@ pub struct SingleBlockLookup { block_root: Hash256, block_request: BlockRequest, data_request: DataRequest, + payload_request: PayloadRequest, /// Peers that claim to have imported this set of block components. This state is shared with /// the custody request to have an updated view of the peers that claim to have imported the /// block associated with this lookup. The peer set of a lookup can change rapidly, and faster /// than the lifetime of a custody request. #[educe(Debug(method(fmt_peer_set_as_len)))] peers: PeerSet, - awaiting_parent: Option, + /// Post-Gloas only: peers that claim to have imported a FULL child of this block, keyed by the + /// child's bid `parent_block_hash`. These (not `peers`) are the peers proven to hold this + /// block's payload envelope and data columns. + #[educe(Debug(method(fmt_peer_map_as_len)))] + gloas_child_peers: HashMap, + awaiting_parent: Option, created: Instant, pub(crate) span: Span, } @@ -118,8 +211,9 @@ impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, peers: &[PeerId], + peer_type: &PeerType, id: Id, - awaiting_parent: Option, + awaiting_parent: Option, ) -> Self { let lookup_span = debug_span!( "lh_single_block_lookup", @@ -127,12 +221,23 @@ impl SingleBlockLookup { id = id, ); + let block_peers: PeerSet = Arc::new(RwLock::new(peers.iter().copied().collect())); + let mut gloas_child_peers = HashMap::new(); + match peer_type { + PeerType::Block => {} + PeerType::PayloadEnvelope(execution_hash) => { + gloas_child_peers.insert(*execution_hash, block_peers.clone()); + } + } + Self { id, block_root: requested_block_root, block_request: BlockRequest::new(), data_request: DataRequest::WaitingForBlock, - peers: Arc::new(RwLock::new(peers.iter().copied().collect())), + payload_request: PayloadRequest::WaitingForBlock, + peers: block_peers, + gloas_child_peers, awaiting_parent, created: Instant::now(), span: lookup_span, @@ -147,19 +252,41 @@ impl SingleBlockLookup { .map(|block| block.slot()) } + pub fn peek_downloaded_bid_block_hash(&self) -> Option { + self.block_request + .state + .peek_downloaded_data() + .and_then(|block| { + block + .message() + .body() + .signed_execution_payload_bid() + .ok() + .map(|bid| bid.message.block_hash) + }) + } + /// Get the block root that is being requested. pub fn block_root(&self) -> Hash256 { self.block_root } - pub fn awaiting_parent(&self) -> Option { - self.awaiting_parent + pub fn awaiting_parent(&self) -> Option<&AwaitingParent> { + self.awaiting_parent.as_ref() + } + + pub fn is_awaiting_block(&self, block_root: Hash256) -> bool { + if let Some(awaiting_parent) = &self.awaiting_parent { + awaiting_parent.parent_root() == block_root + } else { + false + } } /// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send /// components for processing. - pub fn set_awaiting_parent(&mut self, parent_root: Hash256) { - self.awaiting_parent = Some(parent_root) + pub fn set_awaiting_parent(&mut self, parent: AwaitingParent) { + self.awaiting_parent = Some(parent); } /// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for @@ -168,6 +295,37 @@ impl SingleBlockLookup { self.awaiting_parent = None; } + /// Check if this lookup awaiting_parent status can be resolved given that `parent_root` and + /// `imported_parent` have just been imported + pub fn is_awaiting_parent( + &mut self, + parent_root: Hash256, + imported_parent: ImportedParent, + ) -> bool { + let Some(awaiting_parent) = self.awaiting_parent else { + return false; + }; + if awaiting_parent.parent_root() != parent_root { + return false; + } + match imported_parent { + ImportedParent::LookupComplete => true, + ImportedParent::OnlyGloasBlock(bid_block_hash) => { + if let Some(parent_block_hash) = awaiting_parent.parent_block_hash { + // This lookup is the execution child of `parent_execution_hash`. If the + // parent hash the same `bid_block_hash` this is FULL child and we must wait + // for the entire parent lookup to be imported. Otherwise it's a EMPTY child + // and we can import now. + parent_block_hash != bid_block_hash + } else { + // A parent that's gloas imported and this lookup claims to be before gloas. + debug_assert!(false, "Received post-gloas action for pre-gloas lookup"); + false + } + } + } + } + /// Returns the time elapsed since this lookup was created pub fn elapsed_since_created(&self) -> Duration { self.created.elapsed() @@ -201,6 +359,11 @@ impl SingleBlockLookup { DataRequest::Request { state, .. } => state.is_awaiting_event(), DataRequest::NoData => false, } + || match &self.payload_request { + PayloadRequest::WaitingForBlock => true, + PayloadRequest::Request { state, .. } => state.is_awaiting_event(), + PayloadRequest::PreGloas => false, + } } /// Makes progress on all requests of this lookup. Any error is not recoverable and must result @@ -235,6 +398,7 @@ impl SingleBlockLookup { } else if cx.chain.should_fetch_custody_columns(block_epoch) { DataRequest::Request { slot: block.slot(), + peers: self.get_data_peers(block.payload_bid_block_hash().ok()), state: SingleLookupRequestState::new(), } } else { @@ -244,18 +408,14 @@ impl SingleBlockLookup { break; } } - DataRequest::Request { slot, state } => { + DataRequest::Request { slot, peers, state } => { state.maybe_start_downloading(|| { - cx.custody_lookup_request( - self.id, - self.block_root, - *slot, - self.peers.clone(), - ) + cx.custody_lookup_request(self.id, self.block_root, *slot, peers.clone()) })?; - // Wait for the parent to be imported, data column processing result handle does + // Wait for the current block and parent to be imported, data column processing result handle does // not support `ParentUnknown`. - if self.awaiting_parent.is_none() + if self.block_request.state.is_processed() + && self.awaiting_parent.is_none() && let Some(data) = state.maybe_start_processing() { cx.send_custody_columns_for_processing( @@ -273,16 +433,78 @@ impl SingleBlockLookup { } } + // === Payload request (Gloas only) === + loop { + match &mut self.payload_request { + PayloadRequest::WaitingForBlock => { + if let Some(block) = self.block_request.state.peek_downloaded_data() { + self.payload_request = if block.fork_name_unchecked().gloas_enabled() { + PayloadRequest::Request { + peers: self.get_data_peers(block.payload_bid_block_hash().ok()), + state: SingleLookupRequestState::new(), + } + } else { + PayloadRequest::PreGloas + }; + } else { + break; + } + } + PayloadRequest::Request { peers, state } => { + state.maybe_start_downloading(|| { + cx.payload_lookup_request(self.id, peers.clone(), self.block_root) + })?; + // The envelope can only be verified once the block itself is imported; + // otherwise processing returns `BlockRootUnknown` and the lookup burns retries + // until `TooManyAttempts` while the block is parked awaiting its parent. + if self.block_request.state.is_processed() + && let Some(data) = state.maybe_start_processing() + { + cx.send_payload_for_processing( + self.block_root, + data.value, + data.seen_timestamp, + BlockProcessType::SinglePayloadEnvelope(self.id), + ) + .map_err(LookupRequestError::SendFailedProcessor)?; + } + break; + } + PayloadRequest::PreGloas => break, + } + } + // If all components of this lookup are already processed, there will be no future events // that can make progress so it must be dropped. Consider the lookup completed. // This case can happen if we receive the components from gossip during a retry. - if self.block_request.is_complete() && self.data_request.is_complete() { + if self.block_request.is_complete() + && self.data_request.is_complete() + && self.payload_request.is_complete() + { return Ok(LookupResult::Completed); } Ok(LookupResult::Pending) } + /// Returns the peers that should serve this block's data columns and payload envelope. For FULL + /// Gloas blocks these are the peers that claimed to have imported a FULL child of this block + /// (keyed by this block's bid `block_hash`). Pre-Gloas blocks carry no bid, so this returns the + /// lookup's `peers` unchanged. + fn get_data_peers(&mut self, bid_block_hash: Option) -> PeerSet { + if let Some(bid_block_hash) = bid_block_hash { + // Gloas: the child-attested peer set for this bid is the canonical peer set. DO NOT + // default to `self.peers`: post-Gloas `self.peers` have not claimed to import this + // block's data nor its payload. This set may remain empty until a FULL child arrives. + self.gloas_child_peers + .entry(bid_block_hash) + .or_default() + .clone() + } else { + self.peers.clone() + } + } + /// Handle block processing result. Advances the lookup state machine. pub fn on_block_processing_result( &mut self, @@ -293,15 +515,22 @@ impl SingleBlockLookup { BlockProcessingResult::Imported(_fully_imported, _info) => { self.block_request.state.on_processing_success()?; } - BlockProcessingResult::ParentUnknown { parent_root } => { + BlockProcessingResult::ParentUnknown { + parent_root, + parent_block_hash, + } => { // `BlockError::ParentUnknown` is only returned when processing blocks. Revert the // block request to `Downloaded` and park this lookup until the parent resolves; a // future call to `continue_requests` will re-submit the block for processing once // the parent lookup completes. self.block_request.state.revert_to_awaiting_processing()?; - self.set_awaiting_parent(parent_root); + self.set_awaiting_parent(AwaitingParent { + parent_root, + parent_block_hash, + }); return Ok(LookupResult::ParentUnknown { parent_root, + parent_block_hash, block_root: self.block_root, peers: self.all_peers(), }); @@ -345,6 +574,37 @@ impl SingleBlockLookup { self.continue_requests(cx) } + /// Handle payload envelope processing result (Gloas only). + pub fn on_payload_processing_result( + &mut self, + result: BlockProcessingResult, + cx: &mut SyncNetworkContext, + ) -> Result { + let PayloadRequest::Request { state, .. } = &mut self.payload_request else { + return Err(LookupRequestError::BadState( + "no payload_request".to_owned(), + )); + }; + + match result { + BlockProcessingResult::Imported(_fully_imported, _info) => { + state.on_processing_success()?; + } + BlockProcessingResult::ParentUnknown { .. } => { + return Err(LookupRequestError::BadState( + "payload processing returned ParentUnknown".to_owned(), + )); + } + BlockProcessingResult::Error { penalty, .. } => { + let peers = state.on_processing_failure()?; + if let Some((action, whom, msg)) = penalty { + whom.apply(action, &peers, msg, cx); + } + } + } + self.continue_requests(cx) + } + /// Handle a block download response. Updates download state and advances the lookup. pub fn on_block_download_response( &mut self, @@ -373,6 +633,23 @@ impl SingleBlockLookup { self.continue_requests(cx) } + /// Handle a payload envelope download response. Updates download state and advances the lookup. + pub fn on_payload_download_response( + &mut self, + req_id: ReqId, + result: PayloadDownloadResponse, + cx: &mut SyncNetworkContext, + ) -> Result { + let PayloadRequest::Request { state, .. } = &mut self.payload_request else { + return Err(LookupRequestError::BadState( + "no payload_request".to_owned(), + )); + }; + + state.on_download_response(req_id, result)?; + self.continue_requests(cx) + } + /// Get all unique peers that claim to have imported this set of block components pub fn all_peers(&self) -> Vec { self.peers.read().iter().copied().collect() @@ -380,18 +657,54 @@ impl SingleBlockLookup { /// Add peer to all request states. The peer must be able to serve this request. /// Returns true if the peer was newly inserted into any peer set. - pub fn add_peer(&mut self, peer_id: PeerId) -> bool { - self.peers.write().insert(peer_id) + pub fn add_peer(&mut self, peer_id: PeerId, peer_type: &PeerType) -> bool { + let mut added = false; + match peer_type { + PeerType::PayloadEnvelope(execution_hash) => { + // This peer claims to have imported a FULL child of this block whose bid references + // `execution_hash` as its parent. It is therefore proven to hold this block's + // payload envelope and data columns. + added |= self + .gloas_child_peers + .entry(*execution_hash) + .or_default() + .write() + .insert(peer_id); + } + PeerType::Block => {} + } + // Always add to the main block peers, they can at least serve the block. + added |= self.peers.write().insert(peer_id); + added } /// Remove peer from available peers. pub fn remove_peer(&mut self, peer_id: &PeerId) { self.peers.write().remove(peer_id); + for set in self.gloas_child_peers.values() { + set.write().remove(peer_id); + } } /// Returns true if this lookup has zero peers pub fn has_no_peers(&self) -> bool { - self.peers.read().is_empty() + if self.block_request.is_complete() + && let Some(block) = self.block_request.state.peek_downloaded_data() + && let Ok(bid_block_hash) = block.payload_bid_block_hash() + { + // Gloas block request complete, the main peer set is irrelevant. Check only the gloas + // child peers + match self.gloas_child_peers.get(&bid_block_hash) { + Some(set) => set.read().is_empty(), + None => false, + } + } else { + self.peers.read().is_empty() + && self + .gloas_child_peers + .values() + .all(|set| set.read().is_empty()) + } } } @@ -702,3 +1015,11 @@ fn fmt_peer_set_as_len( ) -> Result<(), std::fmt::Error> { write!(f, "{}", peer_set.read().len()) } + +fn fmt_peer_map_as_len( + peer_map: &HashMap, + f: &mut std::fmt::Formatter, +) -> Result<(), std::fmt::Error> { + let total = peer_map.values().map(|set| set.read().len()).sum::(); + write!(f, "{}", total) +} diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 5ec45c8fea..999b3dd30e 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -515,6 +515,15 @@ mod tests { } } + /// The custody-column coupling tests below build Fulu data-column sidecars directly, which is + /// incompatible with a Gloas genesis (Gloas columns have a different structure). Skip them when + /// `FORK_NAME` schedules Gloas at genesis. TODO(gloas): port the harness to build Gloas columns. + fn skip_under_gloas() -> bool { + test_spec::() + .fork_name_at_epoch(Epoch::new(0)) + .gloas_enabled() + } + fn blocks_id(parent_request_id: ComponentsByRangeRequestId) -> BlocksByRangeRequestId { BlocksByRangeRequestId { id: 1, @@ -619,6 +628,9 @@ mod tests { #[test] fn rpc_block_with_custody_columns() { + if skip_under_gloas() { + return; + } let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); @@ -697,6 +709,9 @@ mod tests { #[test] fn rpc_block_with_custody_columns_batched() { + if skip_under_gloas() { + return; + } let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); @@ -791,6 +806,9 @@ mod tests { #[test] fn missing_custody_columns_from_faulty_peers() { + if skip_under_gloas() { + return; + } // GIVEN: A request expecting sampling columns from multiple peers let spec = Arc::new(test_spec::()); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); @@ -886,6 +904,9 @@ mod tests { #[test] fn retry_logic_after_peer_failures() { + if skip_under_gloas() { + return; + } // GIVEN: A request expecting sampling columns where some peers initially fail let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); @@ -1002,6 +1023,9 @@ mod tests { #[test] fn max_retries_exceeded_behavior() { + if skip_under_gloas() { + return; + } // GIVEN: A request where peers consistently fail to provide required columns let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 66bb13ae98..7a90163852 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -71,8 +71,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, - SignedExecutionPayloadEnvelope, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ExecutionBlockHash, ForkContext, Hash256, + SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -852,11 +852,13 @@ impl SyncManager { SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); + let parent_block_hash = block.payload_bid_parent_block_hash().ok(); debug!(%block_root, %parent_root, "Received unknown parent block message"); self.handle_unknown_parent( peer_id, block_root, parent_root, + parent_block_hash, block_slot, BlockComponent::Block(DownloadResult { value: block.block_cloned(), @@ -876,6 +878,9 @@ impl SyncManager { peer_id, block_root, parent_root, + // The event `UnknownParentSidecarHeader` only fires for pre-Gloas data + // structues, so the bid parent hash is None. + None, slot, BlockComponent::Sidecar, ); @@ -951,6 +956,7 @@ impl SyncManager { peer_id: PeerId, block_root: Hash256, parent_root: Hash256, + parent_block_hash: Option, slot: Slot, block_component: BlockComponent, ) { @@ -960,6 +966,7 @@ impl SyncManager { block_root, block_component, parent_root, + parent_block_hash, peer_id, &mut self.network, ) { @@ -1139,7 +1146,6 @@ impl SyncManager { } } - // TODO(gloas): dispatch into block_lookups once the envelope lookup state machine lands. fn rpc_payload_envelope_received( &mut self, sync_request_id: SyncRequestId, @@ -1194,13 +1200,17 @@ impl SyncManager { peer_id: PeerId, envelope: RpcEvent>>, ) { - if let Some(_resp) = self + if let Some(resp) = self .network .on_single_payload_envelope_response(id, peer_id, envelope) { - // TODO(gloas): dispatch into - // `block_lookups.on_download_response::>(...)` once - // the envelope lookup state machine lands. + self.block_lookups.on_payload_download_response( + id, + resp.map(|(value, seen_timestamp)| { + DownloadResult::new(value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index dfeb8d8f12..c9a48c9d5e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -53,8 +53,8 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc; use tracing::{Span, debug, debug_span, error, warn}; use types::{ - BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext, - Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, + BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -98,6 +98,7 @@ pub type CustodyByRootResult = Result>, RpcResponseError>; #[derive(Debug)] +#[allow(private_interfaces)] pub enum RpcResponseError { RpcError(#[allow(dead_code)] RPCError), VerifyError(LookupVerifyError), @@ -310,6 +311,10 @@ impl SyncNetworkContext { } } + pub fn spec(&self) -> &ChainSpec { + &self.chain.spec + } + pub fn send_sync_message(&mut self, sync_message: SyncMessage) { self.network_beacon_processor .send_sync_message(sync_message); @@ -921,19 +926,23 @@ impl SyncNetworkContext { } /// Request a payload envelope for a block root via PayloadEnvelopesByRoot RPC. - #[allow(dead_code)] pub fn payload_lookup_request( &mut self, lookup_id: SingleLookupId, lookup_peers: Arc>>, block_root: Hash256, - ) -> Result, RpcRequestSendError> { + ) -> Result< + LookupRequestResult>>, + RpcRequestSendError, + > { // Skip the download if fork-choice already saw this envelope (e.g. imported via gossip - // before the lookup got here). - if self.chain.envelope_is_known_to_fork_choice(&block_root) { + // before the lookup got here). Return the cached envelope so the request completes. + if self.chain.envelope_is_known_to_fork_choice(&block_root) + && let Ok(Some(envelope)) = self.chain.get_payload_envelope(&block_root) + { return Ok(LookupRequestResult::NoRequestNeeded( "envelope already known to fork-choice", - (), + Arc::new(envelope), )); } @@ -1052,6 +1061,13 @@ impl SyncNetworkContext { block_slot: Slot, lookup_peers: Arc>>, ) -> Result>, RpcRequestSendError> { + // Code below will issue column requests even if `lookup_peers` is empty. This is not okay, + // as we want to have at least one signal that some of our peers has already seen the + // block's data. + if lookup_peers.read().is_empty() { + return Ok(LookupRequestResult::Pending("no peers")); + } + let custody_indexes_imported = self .chain .cached_data_column_indexes(&block_root, block_slot) @@ -1567,7 +1583,6 @@ impl SyncNetworkContext { }) } - #[allow(dead_code)] pub fn send_payload_for_processing( &self, block_root: Hash256, diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index e74b74ec08..b1a4b52867 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -310,11 +310,10 @@ impl ActiveCustodyRequest { // and downscore if data_columns_by_root does not return the expected custody // columns. For the rest of peers, don't downscore if columns are missing. // - // Post-Gloas, blocks and payload envelopes are decoupled. A peer may - // have the block but not yet imported the envelope and data columns. - // Don't enforce max_responses in this case. - lookup_peers.contains(&peer_id) - && !cx.fork_context.current_fork_name().gloas_enabled(), + // Post-Gloas the lookup peer set is the `gloas_child_peers`: peers that imported + // a FULL child, which requires the parent's columns. They provably custody the + // columns, so withholding is penalizable just like pre-Gloas. + lookup_peers.contains(&peer_id), ) .map_err(Error::SendFailed)?; diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 91227d77f8..fb0956cf50 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -38,11 +38,15 @@ use tokio::sync::mpsc; use tracing::info; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, - ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, + ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; const D: Duration = Duration::new(0, 0); +/// Gloas genesis needs enough validators to populate `proposer_lookahead`. +const TEST_RIG_VALIDATOR_COUNT: usize = 8; + /// Configuration for how the test rig should respond to sync requests. /// /// Controls simulated peer behavior during lookup tests, including RPC errors, @@ -59,6 +63,10 @@ pub struct SimulateConfig { return_too_few_data_n_times: usize, return_no_columns_on_indices_n_times: usize, return_no_columns_on_indices: Vec, + /// Only omit columns for this block root, if set. + return_no_columns_for_block: Option, + /// Leave matching envelope requests unanswered. + hold_envelope_for_block: Option, skip_by_range_routes: bool, // Use a callable fn because BlockProcessingResult does not implement Clone #[educe(Debug(ignore))] @@ -132,6 +140,16 @@ impl SimulateConfig { self } + fn return_no_columns_for_block(mut self, block_root: Hash256) -> Self { + self.return_no_columns_for_block = Some(block_root); + self + } + + fn hold_envelope_for_block(mut self, block_root: Hash256) -> Self { + self.hold_envelope_for_block = Some(block_root); + self + } + pub(super) fn return_rpc_error(mut self, error: RPCError) -> Self { self.return_rpc_error = Some(error); self @@ -211,6 +229,14 @@ pub(crate) struct TestRigConfig { node_custody_type_override: Option, } +struct FullEmptyFork { + a: Hash256, + b: Hash256, + c: Hash256, + b_block: Arc>, + c_block: Arc>, +} + impl TestRig { pub(crate) fn new(test_rig_config: TestRigConfig) -> Self { // Use `fork_from_env` logic to set correct fork epochs @@ -221,10 +247,10 @@ impl TestRig { Duration::from_secs(12), ); - // Initialise a new beacon chain + // Gloas genesis needs enough validators for proposer lookahead. let harness = BeaconChainHarness::>::builder(E) .spec(spec.clone()) - .deterministic_keypairs(1) + .deterministic_keypairs(TEST_RIG_VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() .testing_slot_clock(clock.clone()) @@ -304,6 +330,7 @@ impl TestRig { fork_name, network_blocks_by_root: <_>::default(), network_blocks_by_slot: <_>::default(), + network_envelopes_by_root: <_>::default(), penalties: <_>::default(), seen_lookups: <_>::default(), requests: <_>::default(), @@ -428,9 +455,9 @@ impl TestRig { process_fn.await } } - Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) => { - process_fn.await - } + Work::RpcBlobs { process_fn } + | Work::RpcCustodyColumn(process_fn) + | Work::RpcEnvelope(process_fn) => process_fn.await, Work::ChainSegment { process_fn, process_id: (chain_id, batch_epoch), @@ -557,11 +584,14 @@ impl TestRig { } let will_omit_columns = req.data_column_ids.iter().any(|id| { - id.columns.iter().any(|c| { - self.complete_strategy - .return_no_columns_on_indices - .contains(c) - }) + self.complete_strategy + .return_no_columns_for_block + .is_none_or(|root| id.block_root == root) + && id.columns.iter().any(|c| { + self.complete_strategy + .return_no_columns_on_indices + .contains(c) + }) }); let columns_to_omit = if will_omit_columns && self.complete_strategy.return_no_columns_on_indices_n_times > 0 @@ -615,15 +645,34 @@ impl TestRig { .return_wrong_sidecar_for_block_n_times -= 1; let first = columns.first_mut().expect("empty columns"); let column = Arc::make_mut(first); - column - .signed_block_header_mut() - .expect("not fulu") - .message - .body_root = Hash256::ZERO; + // Corrupt the claimed block root. + match column { + DataColumnSidecar::Fulu(col) => { + col.signed_block_header.message.body_root = Hash256::ZERO; + } + DataColumnSidecar::Gloas(col) => { + col.beacon_block_root = Hash256::ZERO; + } + } } self.send_rpc_columns_response(req_id, peer_id, &columns); } + (RequestType::PayloadEnvelopesByRoot(req), AppRequestId::Sync(req_id)) => { + // Lookup sync requests one envelope root at a time. + let block_root = req + .beacon_block_roots + .as_slice() + .first() + .copied() + .unwrap_or_else(|| panic!("empty envelope request: {req:?}")); + if self.complete_strategy.hold_envelope_for_block == Some(block_root) { + return; + } + let envelope = self.network_envelopes_by_root.get(&block_root).cloned(); + self.send_rpc_envelope_response(req_id, peer_id, envelope); + } + (RequestType::BlocksByRange(req), AppRequestId::Sync(req_id)) => { if self.complete_strategy.skip_by_range_routes { return; @@ -883,16 +932,44 @@ impl TestRig { }); } + fn send_rpc_envelope_response( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + ) { + self.log(&format!( + "Completing request {sync_request_id:?} to {peer_id} with envelope {:?}", + envelope.as_ref().map(|e| e.slot()) + )); + + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: envelope.clone(), + seen_timestamp: D, + }); + // Stream termination + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: None, + seen_timestamp: D, + }); + } + + #[allow(dead_code)] + fn is_after_gloas(&self) -> bool { + self.fork_name.gloas_enabled() + } + // Preparation steps - /// Returns the block root of the tip of the built chain - pub(super) async fn build_chain(&mut self, block_count: usize) -> Hash256 { - let mut blocks = vec![]; - + fn get_external_harness_with_genesis(&mut self) -> BeaconChainHarness> { // Initialise a new beacon chain let external_harness = BeaconChainHarness::>::builder(E) .spec(self.harness.spec.clone()) - .deterministic_keypairs(1) + .deterministic_keypairs(TEST_RIG_VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() .testing_slot_clock(self.harness.chain.slot_clock.clone()) @@ -912,7 +989,17 @@ impl TestRig { self.network_blocks_by_slot .insert(genesis_block.slot(), genesis_block); - for i in 0..block_count { + external_harness + } + + /// Returns the block root of the tip of the built chain + pub(super) async fn build_chain(&mut self, block_count: usize) -> Hash256 { + let mut blocks = vec![]; + + // Initialise a new beacon chain + let external_harness = self.get_external_harness_with_genesis(); + + for _ in 0..block_count { external_harness.advance_slot(); let block_root = external_harness .extend_chain( @@ -922,23 +1009,17 @@ impl TestRig { ) .await; let block = external_harness.get_full_block(&block_root); - let block_root = block.canonical_root(); let block_slot = block.slot(); - self.network_blocks_by_root - .insert(block_root, block.clone()); - self.network_blocks_by_slot.insert(block_slot, block); - self.log(&format!( - "Produced block {} index {i} in external harness", - block_slot, - )); + self.insert_external_block( + block, + external_harness + .chain + .get_payload_envelope(&block_root) + .unwrap(), + ); blocks.push((block_slot, block_root)); } - // Re-log to have a nice list of block roots at the end - for block in &blocks { - self.log(&format!("Build chain {block:?}")); - } - // Auto-update the clock on the main harness to accept the blocks self.harness .set_current_slot(external_harness.get_current_slot()); @@ -946,6 +1027,152 @@ impl TestRig { blocks.last().expect("empty blocks").1 } + /// Builds: + /// + /// ```text + /// G (full) -> A (full) -> B (FULL: bid.parent_block_hash == A.block_hash) + /// A -> C (EMPTY: bid.parent_block_hash == G.block_hash) + /// ``` + pub(super) async fn build_full_empty_fork(&mut self) -> (Hash256, Hash256, Hash256) { + // Initialise a new beacon chain (mirrors `build_chain`). + let external_harness = self.get_external_harness_with_genesis(); + + // G: full canonical block on genesis. + external_harness.advance_slot(); + let g_root = external_harness + .extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + let g_block_hash = external_harness + .get_full_block(&g_root) + .as_block() + .payload_bid_block_hash() + .unwrap(); + + // A: full block on G, imported with its envelope so the FULL child below sees A as full. + external_harness.advance_slot(); + let a_slot = external_harness.get_current_slot(); + let (a_contents, a_envelope, a_state) = external_harness + .make_block_with_envelope(external_harness.get_current_state(), a_slot) + .await; + let a_block = a_contents.0.clone(); + let a_root = a_block.canonical_root(); + let a_block_hash = a_block.as_block().payload_bid_block_hash().unwrap(); + external_harness + .process_block(a_slot, a_root, a_contents) + .await + .unwrap(); + + external_harness.advance_slot(); + let child_slot = external_harness.get_current_slot(); + + // C: EMPTY child of A. Built before A's envelope is imported, so its bid points at G. + let (c_contents, c_envelope, c_state) = external_harness + .make_block_with_envelope(a_state.clone(), child_slot) + .await; + let c_block = c_contents.0.clone(); + let c_root = c_block.canonical_root(); + + // Import A's envelope so the next child sees A as full. + let a_envelope = a_envelope.expect("A should have envelope"); + external_harness + .process_envelope(a_root, a_envelope, &a_state, a_block.state_root()) + .await; + + // B: FULL child of A. Built after A's envelope is imported, so its bid points at A. + let (b_contents, b_envelope, b_state) = external_harness + .make_block_with_envelope(a_state.clone(), child_slot) + .await; + let b_block = b_contents.0.clone(); + let b_root = b_block.canonical_root(); + + assert_eq!( + ( + b_block.parent_root(), + c_block.parent_root(), + b_block.is_parent_block_full(a_block_hash), + c_block.is_parent_block_full(a_block_hash), + c_block.is_parent_block_full(g_block_hash), + ), + (a_root, a_root, true, false, true) + ); + + // Import both children (and their envelopes) so every block is served through the same + // `get_full_block` path as the rest of the chain. + external_harness + .process_block(child_slot, c_root, c_contents) + .await + .unwrap(); + if let Some(c_envelope) = c_envelope { + external_harness + .process_envelope(c_root, c_envelope, &c_state, c_block.state_root()) + .await; + } + external_harness + .process_block(child_slot, b_root, b_contents) + .await + .unwrap(); + if let Some(b_envelope) = b_envelope { + external_harness + .process_envelope(b_root, b_envelope, &b_state, b_block.state_root()) + .await; + } + + // Cache every block through the single `get_full_block` + `insert_external_block2` path. + for root in [g_root, a_root, c_root, b_root] { + let block = external_harness.get_full_block(&root); + let envelope = external_harness.chain.get_payload_envelope(&root).unwrap(); + self.insert_external_block(block, envelope); + } + + self.harness.set_current_slot(child_slot); + + (a_root, b_root, c_root) + } + + async fn new_gloas_full_empty_fork() -> Option<(Self, FullEmptyFork)> { + let Some(mut r) = Self::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else { + return None; + }; + if !r.is_after_gloas() { + return None; + } + + let (a, b, c) = r.build_full_empty_fork().await; + let fork = FullEmptyFork { + a, + b, + c, + b_block: r.network_blocks_by_root.get(&b).unwrap().block_cloned(), + c_block: r.network_blocks_by_root.get(&c).unwrap().block_cloned(), + }; + + Some((r, fork)) + } + + fn insert_external_block( + &mut self, + block: RangeSyncBlock, + envelope: Option>, + ) { + let block_root = block.canonical_root(); + let block_slot = block.slot(); + self.network_blocks_by_root + .insert(block_root, block.clone()); + self.network_blocks_by_slot.insert(block_slot, block); + // Cache Gloas envelopes for lookup RPCs. + if let Some(envelope) = envelope { + self.network_envelopes_by_root + .insert(block_root, envelope.into()); + } + self.log(&format!( + "Produced block {block_root:?} slot {block_slot} in external harness", + )); + } + fn corrupt_last_block_signature(&mut self) { let range_sync_block = self.get_last_block().clone(); let mut block = (*range_sync_block.block_cloned()).clone(); @@ -978,7 +1205,16 @@ impl TestRig { } fn corrupt_last_column_kzg_proof(&mut self) { - let range_sync_block = self.get_last_block().clone(); + let block_root = self.get_last_block().canonical_root(); + self.corrupt_column_kzg_proof(block_root); + } + + fn corrupt_column_kzg_proof(&mut self, block_root: Hash256) { + let range_sync_block = self + .network_blocks_by_root + .get(&block_root) + .unwrap_or_else(|| panic!("No block for root {block_root}")) + .clone(); let block = range_sync_block.block_cloned(); let blobs = range_sync_block.block_data().blobs(); let mut columns = range_sync_block @@ -989,7 +1225,7 @@ impl TestRig { let column = Arc::make_mut(first); let proof = column.kzg_proofs_mut().first_mut().expect("no kzg proofs"); *proof = kzg::KzgProof::empty(); - self.re_insert_block(block, blobs, Some(columns)); + self.upsert_block(block, blobs, Some(columns)); } fn get_last_block(&self) -> &RangeSyncBlock { @@ -1009,6 +1245,15 @@ impl TestRig { ) { self.network_blocks_by_slot.clear(); self.network_blocks_by_root.clear(); + self.upsert_block(block, blobs, columns); + } + + fn upsert_block( + &mut self, + block: Arc>, + blobs: Option>, + columns: Option>, + ) { let block_root = block.canonical_root(); let block_slot = block.slot(); let block_data = if let Some(columns) = columns { @@ -1135,6 +1380,10 @@ impl TestRig { self.harness.chain.head().head_slot() } + pub(super) fn head_root(&self) -> Hash256 { + self.harness.chain.head().head_block_root() + } + pub(super) fn assert_head_slot(&self, slot: u64) { assert_eq!(self.head_slot(), Slot::new(slot), "Unexpected head slot"); } @@ -1341,6 +1590,40 @@ impl TestRig { self.fork_name.fulu_enabled() } + fn trigger_unknown_parent_blocks_from_all_peers( + &mut self, + blocks: &[Arc>], + ) { + for peer in self.new_connected_peers_for_peerdas() { + for block in blocks { + self.trigger_unknown_parent_block(peer, block.clone()); + } + } + } + + fn trigger_full_empty_fork(&mut self, fork: &FullEmptyFork) { + self.trigger_unknown_parent_blocks_from_all_peers(&[ + fork.b_block.clone(), + fork.c_block.clone(), + ]); + } + + async fn trigger_custody_lookup_from_all_peers(&mut self) -> Option { + if self.is_after_gloas() { + self.build_chain(2).await; + let child = self.get_last_block().block_cloned(); + let parent_root = child.parent_root(); + self.trigger_unknown_parent_blocks_from_all_peers(&[child]); + Some(parent_root) + } else { + let block_root = self.build_chain(1).await; + for peer in self.new_connected_peers_for_peerdas() { + self.trigger_unknown_block_from_attestation(block_root, peer); + } + None + } + } + fn trigger_unknown_parent_block(&mut self, peer_id: PeerId, block: Arc>) { let block_root = block.canonical_root(); self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root)) @@ -1351,17 +1634,17 @@ impl TestRig { peer_id: PeerId, data_column: Arc>, ) { - let block_root = data_column.block_root(); - let slot = data_column.slot(); - let parent_root = match data_column.as_ref() { - DataColumnSidecar::Fulu(column) => column.block_parent_root(), - DataColumnSidecar::Gloas(_) => panic!("Gloas data column not supported in this test"), + let DataColumnSidecar::Fulu(col) = data_column.as_ref() else { + self.log(&format!( + "trigger_unknown_parent_data_column noop for Gloas peer {peer_id:?}" + )); + return; }; self.send_sync_message(SyncMessage::UnknownParentSidecarHeader { peer_id, - block_root, - parent_root, - slot, + block_root: col.block_root(), + parent_root: col.block_parent_root(), + slot: col.slot(), }); } @@ -1395,6 +1678,13 @@ impl TestRig { self.sync_manager.block_lookups().active_single_lookups() } + fn active_lookup_roots(&self) -> Vec { + self.active_single_lookups() + .iter() + .map(|l| l.block_root) + .collect() + } + fn active_single_lookups_count(&self) -> usize { self.active_single_lookups().len() } @@ -1789,6 +2079,10 @@ async fn happy_path_unknown_data_parent(depth: usize) { let Some(mut r) = TestRig::new_after_fulu() else { return; }; + // No unknown-parent data-column trigger post-Gloas. + if r.is_after_gloas() { + return; + } r.build_chain(depth).await; r.trigger_with_last_unknown_data_column_parent(); r.simulate(SimulateConfig::happy_path()).await; @@ -1806,7 +2100,9 @@ async fn happy_path_multiple_triggers(depth: usize) { r.trigger_with_last_block(); r.trigger_with_last_unknown_block_parent(); r.trigger_with_last_unknown_block_parent(); - r.trigger_with_last_unknown_data_column_parent(); + if !r.is_after_gloas() { + r.trigger_with_last_unknown_data_column_parent(); + } r.simulate(SimulateConfig::happy_path()).await; assert_eq!(r.created_lookups(), depth + 1, "Don't create extra lookups"); r.assert_successful_lookup_sync(); @@ -1838,7 +2134,10 @@ async fn bad_peer_empty_data_response(depth: usize) { r.simulate(SimulateConfig::new().return_no_data_once()) .await; // We register a penalty, retry and complete sync successfully - r.assert_penalties(&["NotEnoughResponsesReturned"]); + if !r.is_after_gloas() { + // TODO(gloas): tip columns have no attributable FULL-child peer here. + r.assert_penalties(&["NotEnoughResponsesReturned"]); + } r.assert_successful_lookup_sync(); // TODO(tree-sync) Assert that a single lookup is created (no drops) } @@ -1853,7 +2152,10 @@ async fn bad_peer_too_few_data_response(depth: usize) { r.simulate(SimulateConfig::new().return_too_few_data_once()) .await; // We register a penalty, retry and complete sync successfully - r.assert_penalties(&["NotEnoughResponsesReturned"]); + if !r.is_after_gloas() { + // TODO(gloas): tip columns have no attributable FULL-child peer here. + r.assert_penalties(&["NotEnoughResponsesReturned"]); + } r.assert_successful_lookup_sync(); // TODO(tree-sync) Assert that a single lookup is created (no drops) } @@ -1878,8 +2180,13 @@ async fn bad_peer_wrong_data_response(depth: usize) { r.build_chain_and_trigger_last_block(depth).await; r.simulate(SimulateConfig::new().return_wrong_sidecar_for_block_once()) .await; - // We register a penalty, retry and complete sync successfully - r.assert_penalties(&["UnrequestedBlockRoot"]); + // We register a penalty, retry and complete sync successfully. Under Gloas the tip block + // (depth 1) has no attributable FULL-child peer so no custody request is made and no penalty + // is possible; at depth >= 2 the parent's columns are served by the tip (its FULL child), so + // the wrong-sidecar penalty is attributable. + if !r.is_after_gloas() || depth >= 2 { + r.assert_penalties(&["UnrequestedBlockRoot"]); + } r.assert_successful_lookup_sync(); // TODO(tree-sync) Assert that a single lookup is created (no drops) } @@ -1953,10 +2260,16 @@ async fn unknown_parent_does_not_add_peers_to_itself() { r.build_chain(2).await; r.trigger_with_last_unknown_block_parent(); r.trigger_with_last_unknown_block_parent(); - r.trigger_with_last_unknown_data_column_parent(); + // No data-column parent trigger post-Gloas. + let parent_lookup_peers = if r.is_after_gloas() { + 2 + } else { + r.trigger_with_last_unknown_data_column_parent(); + 3 + }; r.simulate(SimulateConfig::happy_path()).await; r.assert_peers_at_lookup_of_slot(2, 0); - r.assert_peers_at_lookup_of_slot(1, 3); + r.assert_peers_at_lookup_of_slot(1, parent_lookup_peers); assert_eq!(r.created_lookups(), 2, "Don't create extra lookups"); // All lookups should NOT complete on this test, however note the following for the tip lookup, // it's the lookup for the tip block which has 0 peers and a block cached: @@ -1996,6 +2309,10 @@ async fn test_single_block_lookup_ignored_response() { /// Assert that if the beacon processor returns DuplicateFullyImported, the lookup completes successfully async fn test_single_block_lookup_duplicate_response() { let mut r = TestRig::default(); + // The mock only covers block processing; Gloas also needs real envelope/column results. + if r.is_after_gloas() { + return; + } r.build_chain_and_trigger_last_block(1).await; // Send a DuplicateFullyImported response, the lookup should complete successfully r.simulate( @@ -2060,6 +2377,10 @@ async fn lookups_form_chain() { /// Assert that if a lookup chain (by appending ancestors) is too long we drop it async fn test_parent_lookup_too_deep_grow_ancestor_one() { let mut r = TestRig::default(); + // TODO(gloas): range sync does not fetch payload envelopes yet. + if r.is_after_gloas() { + return; + } r.build_chain(PARENT_DEPTH_TOLERANCE + 1).await; r.trigger_with_last_block(); r.simulate(SimulateConfig::happy_path()).await; @@ -2210,6 +2531,10 @@ async fn block_in_da_checker_skips_download() { let Some(mut r) = TestRig::new_after_fulu() else { return; }; + // TODO(gloas): the helper does not populate the envelope missing-component path yet. + if r.is_after_gloas() { + return; + } // Add block to da_checker // Complete test with happy path // Assert that there were no requests for blocks @@ -2279,14 +2604,13 @@ async fn custody_lookup_some_custody_failures(test_type: FuluTestType) { let Some(mut r) = TestRig::new_fulu_peer_test(test_type) else { return; }; - let block_root = r.build_chain(1).await; - // Send the same trigger from all peers, so that the lookup has all peers - for peer in r.new_connected_peers_for_peerdas() { - r.trigger_unknown_block_from_attestation(block_root, peer); - } + let block_under_test = r.trigger_custody_lookup_from_all_peers().await; let custody_columns = r.custody_columns(); - r.simulate(SimulateConfig::new().return_no_columns_on_indices(&custody_columns[..4], 3)) - .await; + let mut config = SimulateConfig::new().return_no_columns_on_indices(&custody_columns[..4], 3); + if let Some(block_root) = block_under_test { + config = config.return_no_columns_for_block(block_root); + } + r.simulate(config).await; r.assert_penalties_of_type("NotEnoughResponsesReturned"); r.assert_successful_lookup_sync(); } @@ -2295,20 +2619,15 @@ async fn custody_lookup_permanent_custody_failures(test_type: FuluTestType) { let Some(mut r) = TestRig::new_fulu_peer_test(test_type) else { return; }; - let block_root = r.build_chain(1).await; - - // Send the same trigger from all peers, so that the lookup has all peers - for peer in r.new_connected_peers_for_peerdas() { - r.trigger_unknown_block_from_attestation(block_root, peer); - } + let block_under_test = r.trigger_custody_lookup_from_all_peers().await; let custody_columns = r.custody_columns(); - r.simulate( - SimulateConfig::new().return_no_columns_on_indices(&custody_columns[..2], usize::MAX), - ) - .await; - // Every peer that does not return a column is part of the lookup because it claimed to have - // imported the lookup, so we will penalize. + let mut config = + SimulateConfig::new().return_no_columns_on_indices(&custody_columns[..2], usize::MAX); + if let Some(block_root) = block_under_test { + config = config.return_no_columns_for_block(block_root); + } + r.simulate(config).await; r.assert_penalties_of_type("NotEnoughResponsesReturned"); r.assert_failed_lookup_sync(); } @@ -2346,6 +2665,10 @@ async fn crypto_on_fail_with_bad_column_proposer_signature() { let Some(mut r) = TestRig::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else { return; }; + // Gloas columns have no per-column proposer signature. + if r.is_after_gloas() { + return; + } r.build_chain(1).await; r.corrupt_last_column_proposer_signature(); r.trigger_with_last_block(); @@ -2364,9 +2687,16 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() { let Some(mut r) = TestRig::new_fulu_peer_test(FuluTestType::WeSupernodeThemSupernode) else { return; }; - r.build_chain(1).await; - r.corrupt_last_column_kzg_proof(); - r.trigger_with_last_block(); + if r.is_after_gloas() { + r.build_chain(2).await; + let child = r.get_last_block().block_cloned(); + r.corrupt_column_kzg_proof(child.parent_root()); + r.trigger_unknown_parent_blocks_from_all_peers(&[child]); + } else { + r.build_chain(1).await; + r.corrupt_last_column_kzg_proof(); + r.trigger_with_last_block(); + } r.simulate(SimulateConfig::happy_path()).await; if cfg!(feature = "fake_crypto") { r.assert_successful_lookup_sync(); @@ -2376,3 +2706,36 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() { r.assert_penalties_of_type("AvailabilityCheck"); } } + +#[tokio::test] +async fn gloas_full_empty_children_retain_parent_for_payload() { + let Some((mut r, fork)) = TestRig::new_gloas_full_empty_fork().await else { + return; + }; + + r.trigger_full_empty_fork(&fork); + + r.simulate(SimulateConfig::happy_path()).await; + r.assert_successful_lookup_sync(); +} + +#[tokio::test] +async fn gloas_empty_child_continues_while_parent_payload_withheld() { + let Some((mut r, fork)) = TestRig::new_gloas_full_empty_fork().await else { + return; + }; + + r.trigger_full_empty_fork(&fork); + + r.simulate(SimulateConfig::happy_path().hold_envelope_for_block(fork.a)) + .await; + + assert_eq!(r.head_root(), fork.c); + assert_eq!(r.created_lookups(), 4); + assert_eq!(r.completed_lookups(), 2); + assert_eq!(r.dropped_lookups(), 0); + assert_eq!(r.active_lookup_roots(), vec![fork.a, fork.b]); + r.assert_no_penalties(); + r.assert_empty_network(); + r.assert_empty_processor(); +} diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index 4e185cc081..2f318bfb9a 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -21,7 +21,7 @@ use tokio::sync::mpsc; use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use types::{ForkName, Hash256, MinimalEthSpec as E, Slot}; +use types::{ForkName, Hash256, MinimalEthSpec as E, SignedExecutionPayloadEnvelope, Slot}; mod lookups; mod range; @@ -77,6 +77,10 @@ struct TestRig { /// Blocks that will be used in the test but may not be known to `harness` yet. network_blocks_by_root: HashMap>, network_blocks_by_slot: HashMap>, + /// Gloas execution payload envelopes keyed by block root, populated during `build_chain` + /// from the external harness store. The rig serves these when a lookup issues a + /// `PayloadEnvelopesByRoot` request. + network_envelopes_by_root: HashMap>>, penalties: Vec, /// All seen lookups through the test run seen_lookups: HashMap, diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 1499ae5016..e6890cf242 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -34,6 +34,13 @@ use types::{Epoch, EthSpec, Hash256, MinimalEthSpec as E, Slot}; const SLOTS_PER_EPOCH: usize = 8; impl TestRig { + /// Range sync doesn't yet ingest Gloas blocks in these tests: the range harness doesn't serve + /// payload envelopes, so a Gloas block never becomes fully available and sync can't complete. + /// Skip the affected completion tests under a Gloas genesis. TODO(gloas): support range sync. + fn skip_range_sync_under_gloas(&self) -> bool { + self.fork_name.gloas_enabled() + } + fn add_head_peer(&mut self) -> PeerId { let local_info = self.local_info(); self.add_supernode_peer(SyncInfo { @@ -260,6 +267,9 @@ impl TestRig { #[tokio::test] async fn head_sync_completes() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } r.setup_head_sync().await; r.simulate(SimulateConfig::happy_path()).await; r.assert_head_sync_completed(); @@ -271,6 +281,9 @@ async fn head_sync_completes() { #[tokio::test] async fn finalized_to_head_transition() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } r.setup_finalized_and_head_sync().await; r.simulate(SimulateConfig::happy_path()).await; r.assert_range_sync_completed(); @@ -282,6 +295,9 @@ async fn finalized_to_head_transition() { #[tokio::test] async fn finalized_sync_completes() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } r.setup_finalized_sync().await; r.simulate(SimulateConfig::happy_path()).await; r.assert_range_sync_completed(); @@ -293,6 +309,9 @@ async fn finalized_sync_completes() { #[tokio::test] async fn batch_rpc_error_retries() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } r.setup_finalized_sync().await; r.simulate(SimulateConfig::happy_path().return_rpc_error(RPCError::UnsupportedProtocol)) .await; @@ -361,6 +380,9 @@ async fn batch_peer_returns_partial_columns_then_succeeds() { #[tokio::test] async fn batch_non_faulty_failure_retries() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } r.setup_finalized_sync().await; r.simulate(SimulateConfig::happy_path().with_range_non_faulty_failures(1)) .await; @@ -372,6 +394,9 @@ async fn batch_non_faulty_failure_retries() { #[tokio::test] async fn batch_faulty_failure_redownloads() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } r.setup_finalized_sync().await; r.simulate(SimulateConfig::happy_path().with_range_faulty_failures(1)) .await; @@ -428,6 +453,9 @@ async fn late_response_for_removed_chain() { #[tokio::test] async fn ee_offline_then_online_resumes_sync() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } r.setup_finalized_sync().await; r.simulate(SimulateConfig::happy_path().with_ee_offline_for_n_range_responses(2)) .await; @@ -440,6 +468,9 @@ async fn ee_offline_then_online_resumes_sync() { #[tokio::test] async fn finalized_sync_with_local_head_partial() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } r.setup_finalized_sync_with_local_head(3).await; r.simulate(SimulateConfig::happy_path()).await; r.assert_range_sync_completed(); @@ -450,6 +481,9 @@ async fn finalized_sync_with_local_head_partial() { #[tokio::test] async fn finalized_sync_with_local_head_near_target() { let mut r = TestRig::default(); + if r.skip_range_sync_under_gloas() { + return; + } let target_epochs = 5; let local_slots = (target_epochs * SLOTS_PER_EPOCH) - 1; // all blocks except last r.build_chain(target_epochs * SLOTS_PER_EPOCH).await; @@ -468,7 +502,7 @@ async fn finalized_sync_with_local_head_near_target() { #[tokio::test] async fn not_enough_custody_peers_then_peers_arrive() { let mut r = TestRig::default(); - if !r.fork_name.fulu_enabled() { + if !r.fork_name.fulu_enabled() || r.skip_range_sync_under_gloas() { return; } let remote_info = r.setup_finalized_sync_insufficient_peers().await; @@ -495,7 +529,7 @@ async fn not_enough_custody_peers_then_peers_arrive() { #[tokio::test] async fn finalized_sync_not_enough_custody_peers_resume_after_peer_cgc_update() { let mut r = TestRig::default(); - if !r.fork_name.fulu_enabled() { + if !r.fork_name.fulu_enabled() || r.skip_range_sync_under_gloas() { return; }