From 560f90611ee810dcab0f055daf7dcc50fab891b4 Mon Sep 17 00:00:00 2001 From: Daniel Knopik <107140945+dknopik@users.noreply.github.com> Date: Fri, 19 Jun 2026 02:50:24 +0200 Subject: [PATCH] Fix and improve handling of empty columns after getBlobs response (#9361) This PR fixes two issues: 1. This condition is inverted: https://github.com/sigp/lighthouse/blob/dfb259171a65cacd6db57b8874af8f543cabcb7a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs#L1507-L1508 We are supposed to filter out incomplete columns when we DON'T have local blobs yet! 2. When the EL returns no blobs, we never store a partial in the assembler, and this code fails to publish our need to the network, as no partials are returned: https://github.com/sigp/lighthouse/blob/dfb259171a65cacd6db57b8874af8f543cabcb7a/beacon_node/network/src/network_beacon_processor/mod.rs#L1038-L1050 The simple fix for 1 would be to invert the condition, but we can improve the flow here: Instead of not publishing anything, we can publish what we got, but not request anything. This ties into the fix for 2: After get blobs completes, we not only publish anything in the partial assembler, but also for every missing custody column in there, publish an empty column and a request for all cells. In particular: - When sending a partial message to `network`, allow specifying a request bitmap instead of hardcoding an all-ones bitmap. - For clarity and to prepare for Gloas integration, add a `PubsubPartialMessage` enum with a `DataColumnFulu` variant. - On republishing after merging a gossip column: always publish, but only request cells if local blobs are known or get blobs is disabled. This also prepares us to request only *some* cells, e.g. in cases where we are aware of the blobs that the EL is going to send us, e.g. via `engine_hasBlobs`. - Move guards in `fetch_engine_blobs_and_publish` to ensure everything works fine if there are no blobs or if get_blobs is disabled. Co-Authored-By: Daniel Knopik --- beacon_node/beacon_chain/src/builder.rs | 2 + .../src/data_availability_checker.rs | 3 + .../beacon_chain/src/fetch_blobs/tests.rs | 2 +- .../src/partial_data_column_assembler.rs | 14 +- beacon_node/beacon_chain/src/test_utils.rs | 1 + beacon_node/http_api/src/publish_blocks.rs | 20 ++- beacon_node/lighthouse_network/src/lib.rs | 4 +- .../lighthouse_network/src/service/mod.rs | 117 ++++++++------- .../lighthouse_network/src/types/mod.rs | 2 +- .../lighthouse_network/src/types/partial.rs | 44 +++++- .../lighthouse_network/src/types/pubsub.rs | 24 ++- .../gossip_methods.rs | 75 ++++++---- .../src/network_beacon_processor/mod.rs | 138 +++++++++++++----- .../network_beacon_processor/sync_methods.rs | 2 +- beacon_node/network/src/service.rs | 13 +- 15 files changed, 313 insertions(+), 148 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index c24c7b6fc6..91a9dcc7c8 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -908,6 +908,7 @@ where let shuffling_cache_size = self.chain_config.shuffling_cache_size; let complete_blob_backfill = self.chain_config.complete_blob_backfill; let enable_partial_columns = self.chain_config.enable_partial_columns; + let disable_get_blobs = self.chain_config.disable_get_blobs; // Calculate the weak subjectivity point in which to backfill blocks to. let genesis_backfill_slot = if self.chain_config.genesis_backfill { @@ -1043,6 +1044,7 @@ where custody_context.clone(), self.spec.clone(), enable_partial_columns, + disable_get_blobs, ) .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 29cbf84235..e559dc7689 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -121,6 +121,7 @@ impl DataAvailabilityChecker { custody_context: Arc>, spec: Arc, enable_partial_columns: bool, + disable_get_blobs: bool, ) -> Result { let inner = DataAvailabilityCheckerInner::new( OVERFLOW_LRU_CAPACITY, @@ -130,6 +131,7 @@ impl DataAvailabilityChecker { let partial_assembler = if enable_partial_columns { Some(Arc::new(PartialDataColumnAssembler::new( OVERFLOW_LRU_CAPACITY, + disable_get_blobs, ))) } else { None @@ -1432,6 +1434,7 @@ mod test { custody_context, spec, true, + false, ) .expect("should initialise data availability checker") } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 3c0f43fef0..8b805a8da3 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -344,7 +344,7 @@ fn mock_beacon_adapter(fork_name: ForkName, get_blobs_v3: bool) -> MockFetchBlob let test_runtime = TestRuntime::default(); let spec = Arc::new(fork_name.make_genesis_spec(E::default_spec())); let kzg = get_kzg(&spec); - let partial_assembler = PartialDataColumnAssembler::new(32); + let partial_assembler = PartialDataColumnAssembler::new(32, false); let mut mock_adapter = MockFetchBlobsBeaconAdapter::default(); mock_adapter.expect_spec().return_const(spec.clone()); diff --git a/beacon_node/beacon_chain/src/partial_data_column_assembler.rs b/beacon_node/beacon_chain/src/partial_data_column_assembler.rs index f8580352b2..0d21cb7621 100644 --- a/beacon_node/beacon_chain/src/partial_data_column_assembler.rs +++ b/beacon_node/beacon_chain/src/partial_data_column_assembler.rs @@ -13,6 +13,9 @@ use types::data::{ColumnIndex, PartialDataColumnHeader}; pub struct PartialDataColumnAssembler { /// Cache of assemblies keyed by block root assemblies: RwLock>>, + /// Whether getBlobs is disabled. If so, always set `has_local_blobs` to true, as we will never + /// retrieve blobs from the EL and therefore should immediately request cells from the network. + disable_get_blobs: bool, } /// Tracks partial columns being assembled for a single block @@ -43,9 +46,10 @@ pub struct PartialMergeResult { } impl PartialDataColumnAssembler { - pub fn new(capacity: usize) -> Self { + pub fn new(capacity: usize, disable_get_blobs: bool) -> Self { Self { assemblies: RwLock::new(LruCache::new(capacity)), + disable_get_blobs, } } @@ -60,7 +64,7 @@ impl PartialDataColumnAssembler { let assembly = PartialAssembly { header, - has_local_blobs: false, + has_local_blobs: self.disable_get_blobs, columns: HashMap::new(), }; @@ -82,7 +86,7 @@ impl PartialDataColumnAssembler { .entry(block_root) .or_insert_with(|| PartialAssembly { header: header.clone(), - has_local_blobs: false, + has_local_blobs: self.disable_get_blobs, columns: HashMap::new(), }); @@ -174,7 +178,7 @@ impl PartialDataColumnAssembler { signed_block_header: fulu.signed_block_header.clone(), kzg_commitments_inclusion_proof: fulu.kzg_commitments_inclusion_proof.clone(), }), - has_local_blobs: false, + has_local_blobs: self.disable_get_blobs, columns: Default::default(), }); let prev = assembly @@ -367,7 +371,7 @@ mod tests { } fn make_assembler() -> PartialDataColumnAssembler { - PartialDataColumnAssembler::new(16) + PartialDataColumnAssembler::new(16, false) } // -- init and get_header tests -- diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 2adfe26d4b..deaae6cba5 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -246,6 +246,7 @@ pub fn test_da_checker( custody_context, spec, true, + false, ) .expect("should initialise data availability checker") } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 8b45a4b04c..e3e9839b2d 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -14,7 +14,7 @@ use eth2::types::{ }; use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse}; use futures::TryFutureExt; -use lighthouse_network::PubsubMessage; +use lighthouse_network::{PubsubMessage, PubsubPartialMessage}; use logging::crit; use network::NetworkMessage; use rand::prelude::SliceRandom; @@ -442,12 +442,22 @@ pub(crate) fn publish_column_sidecars( // Publish partial messages if !partial_columns.is_empty() { if let Some(header) = partial_header { + let header = Arc::new(header); + let messages = partial_columns + .into_iter() + .map(|column| { + let mut request_cells = column.sidecar.cells_present_bitmap.clone(); + request_cells.not_inplace(); + PubsubPartialMessage::DataColumnFulu { + column, + request_cells, + header: header.clone(), + } + }) + .collect(); crate::utils::publish_network_message( sender_clone, - NetworkMessage::PublishPartialColumns { - columns: partial_columns, - header: Arc::new(header), - }, + NetworkMessage::PublishPartialColumns { messages }, ) .map_err(|_| { BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)) diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index fdb6ff095e..7719ee8540 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -98,8 +98,8 @@ impl std::fmt::Display for ClearDialError<'_> { } pub use crate::types::{ - Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage, Subnet, - SubnetDiscovery, decode_partial, + Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage, + PubsubPartialMessage, Subnet, SubnetDiscovery, decode_partial, }; pub use prometheus_client; diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 862281c910..9037975a0e 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -20,7 +20,9 @@ use crate::types::{ SubnetDiscovery, all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, }; -use crate::{Enr, NetworkGlobals, PubsubMessage, TopicHash, decode_partial, metrics}; +use crate::{ + Enr, NetworkGlobals, PubsubMessage, PubsubPartialMessage, TopicHash, decode_partial, metrics, +}; use api_types::{AppRequestId, Response}; use futures::stream::StreamExt; use gossipsub_scoring_parameters::{PeerScoreSettings, lighthouse_gossip_thresholds}; @@ -43,8 +45,9 @@ use std::sync::Arc; use std::time::Duration; use tracing::{debug, error, info, trace, warn}; use types::{ - ChainSpec, DataColumnSubnetId, EnrForkId, EthSpec, ForkContext, ForkName, PartialDataColumn, - PartialDataColumnHeader, Slot, SubnetId, consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, + CellBitmap, ChainSpec, DataColumnSubnetId, EnrForkId, EthSpec, ForkContext, ForkName, + PartialDataColumn, PartialDataColumnHeader, Slot, SubnetId, + consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, }; use utils::{Context as ServiceContext, build_transport, strip_peer_id}; @@ -920,65 +923,73 @@ impl Network { } /// Publishes partial data column sidecars to the gossipsub network. - pub fn publish_partial( - &mut self, - columns: Vec>>, - header: Arc>, - ) { + pub fn publish_partial(&mut self, messages: Vec>) { if !self.network_globals.config.enable_partial_columns { return; } - debug!( - count = columns.len(), - "Sending partial data column sidecars" - ); + debug!(count = messages.len(), "Sending partial messages"); - for column in columns { - let subnet = - DataColumnSubnetId::from_column_index(column.index, &self.fork_context.spec); - let topic = GossipTopic::new( - GossipKind::DataColumnSidecar(subnet), - GossipEncoding::default(), - self.enr_fork_id.fork_digest, - ); - let header_sent_set = self - .partial_column_header_tracker - .get_for_block(column.block_root); - let partial_message = OutgoingPartialColumn::new(column, &header, header_sent_set); - let publish_topic: Topic = topic.clone().into(); - - if let Err(e) = self - .gossipsub_mut() - .publish_partial(publish_topic, partial_message) - { - match e { - PublishError::NoPeersSubscribedToTopic => { - debug!( - kind = %topic.kind(), - "No peers supporting partial messages" - ); - } - ref e => { - warn!( - error = ?e, - kind = %topic.kind(), - "Could not publish partial message" - ); - } - } - - // add to metrics - if let Some(v) = metrics::get_int_gauge( - &metrics::FAILED_PARTIAL_PUBLISHES_PER_MAIN_TOPIC, - &[&format!("{:?}", topic.kind())], - ) { - v.inc() - }; + for message in messages { + match message { + PubsubPartialMessage::DataColumnFulu { + column, + request_cells, + header, + } => self.publish_partial_data_column_fulu(column, request_cells, header), } } } + fn publish_partial_data_column_fulu( + &mut self, + column: Arc>, + request_cells: CellBitmap, + header: Arc>, + ) { + let subnet = DataColumnSubnetId::from_column_index(column.index, &self.fork_context.spec); + let topic = GossipTopic::new( + GossipKind::DataColumnSidecar(subnet), + GossipEncoding::default(), + self.enr_fork_id.fork_digest, + ); + let header_sent_set = self + .partial_column_header_tracker + .get_for_block(column.block_root); + let partial_message = + OutgoingPartialColumn::new(column, &header, header_sent_set, request_cells); + let publish_topic: Topic = topic.clone().into(); + + if let Err(e) = self + .gossipsub_mut() + .publish_partial(publish_topic, partial_message) + { + match e { + PublishError::NoPeersSubscribedToTopic => { + debug!( + kind = %topic.kind(), + "No peers supporting partial messages" + ); + } + ref e => { + warn!( + error = ?e, + kind = %topic.kind(), + "Could not publish partial message" + ); + } + } + + // add to metrics + if let Some(v) = metrics::get_int_gauge( + &metrics::FAILED_PARTIAL_PUBLISHES_PER_MAIN_TOPIC, + &[&format!("{:?}", topic.kind())], + ) { + v.inc() + }; + } + } + /// Informs the gossipsub about the result of a message validation. /// If the message is valid it will get propagated by gossipsub. pub fn report_message_validation_result( diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index d0173e5b9a..8a20e9da94 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -16,7 +16,7 @@ pub use eth2::lighthouse::sync_state::{BackFillState, CustodyBackFillState, Sync pub use globals::NetworkGlobals; pub use partial::HeaderSentSet; pub use partial::OutgoingPartialColumn; -pub use pubsub::{PubsubMessage, SnappyTransform, decode_partial}; +pub use pubsub::{PubsubMessage, PubsubPartialMessage, SnappyTransform, decode_partial}; pub use subnet::{Subnet, SubnetDiscovery}; pub use topics::{ GossipEncoding, GossipKind, GossipTopic, TopicConfig, all_topics_at_fork, diff --git a/beacon_node/lighthouse_network/src/types/partial.rs b/beacon_node/lighthouse_network/src/types/partial.rs index 4b5dcd8ad6..0dd5c58ac6 100644 --- a/beacon_node/lighthouse_network/src/types/partial.rs +++ b/beacon_node/lighthouse_network/src/types/partial.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use tracing::{error, trace}; use types::core::{EthSpec, Hash256}; use types::data::{ - PartialDataColumn, PartialDataColumnHeader, PartialDataColumnPartsMetadata, + CellBitmap, PartialDataColumn, PartialDataColumnHeader, PartialDataColumnPartsMetadata, PartialDataColumnSidecar, PartialDataColumnSidecarRef, }; @@ -30,10 +30,29 @@ impl OutgoingPartialColumn { partial_column: Arc>, header: &PartialDataColumnHeader, header_sent_set: HeaderSentSet, + requests: CellBitmap, ) -> Self { - // For now, always request all cells - let mut requests = partial_column.sidecar.cells_present_bitmap.clone_zeroed(); - requests.not_inplace(); + // Always set the request bit for available cells. + // + // Gossipsub applys certain optimisations to avoid sending redundant messages. This + // requires that we stay consistent with our metadata. Gossipsub uses the `Metadata` trait + // impl below to determine whether it can perform these optimisations. + // + // If we request a cell and then receive it, un-setting the request bit in the next + // published message may cause issues: + // Gossipsub tries to avoid the impact of application race conditions by checking newly + // published metadata against previously published metadata. This no longer functions + // correctly if request bits are unset between calls, as Gossipsub will consider a message + // with new requests as new info to be propagated, possibly overwriting previous messages + // with more cells (but fewer request bits). This is because gossipsub will see that both + // metadata have some bits that are not set in the other metadata and therefore cannot + // decide which actually carries more data. By always setting request bits for available + // cells, we avoid this issue, as requests will never be unset between calls. + // + // In other words, gossipsub relies on the fact that metadata is additive. The request bit + // is, therefore, to be seen as a "request if not available" bit. + let requests = requests.union(&partial_column.sidecar.cells_present_bitmap); + let metadata = PartialDataColumnPartsMetadata:: { available: partial_column.sidecar.cells_present_bitmap.clone(), requests, @@ -322,6 +341,14 @@ mod tests { }) } + fn make_all_one_bitmap(len: usize) -> CellBitmap { + let mut request_cells = CellBitmap::::with_capacity(len).unwrap(); + for idx in 0..request_cells.len() { + request_cells.set(idx, true).unwrap(); + } + request_cells + } + fn random_peer_id() -> PeerId { let keypair = Keypair::generate_ed25519(); PeerId::from(keypair.public()) @@ -422,7 +449,8 @@ mod tests { let header = make_header(4); let partial = make_partial_column(root, 4, &[0, 1]); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); - let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); + let requests = make_all_one_bitmap(4); + let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set, requests); let peer = random_peer_id(); @@ -442,7 +470,8 @@ mod tests { // We have cells [0, 2, 3] let partial = make_partial_column(root, 4, &[0, 2, 3]); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); - let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); + let requests = make_all_one_bitmap(4); + let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set, requests); let peer = random_peer_id(); @@ -474,7 +503,8 @@ mod tests { // We have cells [0] let partial = make_partial_column(root, 4, &[0]); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); - let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); + let requests = make_all_one_bitmap(4); + let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set, requests); let peer = random_peer_id(); diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index d486ca5129..00b2c42629 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -7,10 +7,10 @@ use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; use types::{ - AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, DataColumnSidecar, + AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, CellBitmap, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, Hash256, LightClientFinalityUpdate, - LightClientOptimisticUpdate, PartialDataColumn, PartialDataColumnSidecar, - PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof, + LightClientOptimisticUpdate, PartialDataColumn, PartialDataColumnHeader, + PartialDataColumnSidecar, PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, @@ -56,6 +56,24 @@ pub enum PubsubMessage { LightClientOptimisticUpdate(Box>), } +/// A message published via the partial gossipsub protocol. +#[derive(Debug, Clone)] +pub enum PubsubPartialMessage { + /// A partial data column sidecar from the Fulu fork. + DataColumnFulu { + /// The column to publish. Libp2p will cache it and treat it as the data to send if any peer + /// asks for data within it. + column: Arc>, + /// The cells we are requesting. Usually, this will be all-ones, as we need all cells. + /// However, while get_blobs is still in progress, blobs we expect from the EL should not be + /// requested to conserve bandwidth. + request_cells: CellBitmap, + /// The header associated with the column above. This is set separately here, as the column + /// to be published does not contain the header - it is stored without. + header: Arc>, + }, +} + // Implements the `DataTransform` trait of gossipsub to employ snappy compression pub struct SnappyTransform { /// Sets the maximum size we allow gossipsub messages to decompress to. diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 20342c1aa9..346e621879 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -33,7 +33,7 @@ use beacon_chain::{ use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{ Client, GossipTopic, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage, - ReportSource, + PubsubPartialMessage, ReportSource, }; use logging::crit; use operation_pool::ReceivedPreCapella; @@ -937,9 +937,15 @@ impl NetworkBeaconProcessor { Ok(mut column) => { let header = column.sidecar.header.take(); if let Some(header) = header { + // Requesting cells is irrelevant as all cells are available, simply clone + // the `cells_present_bitmap`. + let request_cells = column.sidecar.cells_present_bitmap.clone(); self.send_network_message(NetworkMessage::PublishPartialColumns { - columns: vec![Arc::new(column)], - header: Arc::new(header), + messages: vec![PubsubPartialMessage::DataColumnFulu { + column: Arc::new(column), + request_cells, + header: Arc::new(header), + }], }); } else { crit!("Converting from full to partial yielded headerless partial") @@ -1077,8 +1083,10 @@ impl NetworkBeaconProcessor { debug!(block = %block_root, "Triggering getBlobs after receiving partial header"); // We want to publish immediately when this finishes let publish_blobs = true; - self.fetch_engine_blobs_and_publish(header.into_header(), block_root, publish_blobs) - .await + let header = header.into_header(); + self.fetch_engine_blobs_and_publish_full(header.clone(), block_root, publish_blobs) + .await; + self.publish_partial_data_columns(header, block_root).await; } } } @@ -1311,28 +1319,31 @@ impl NetworkBeaconProcessor { }); } - let only_send_completed_partials = - merge_result.local_blobs || self.chain.config.disable_get_blobs; - let columns = merge_result - .updated_partials - .into_iter() - .map(|partial| partial.into_inner()) - .filter(|partial| { - !only_send_completed_partials || partial.sidecar.is_complete() - }) - .collect::>(); - - if !columns.is_empty() { - if only_send_completed_partials { - debug!( - block = %block_root, - "Not publishing incomplete partials before getBlobs" - ); - } - self.send_network_message(NetworkMessage::PublishPartialColumns { - columns, - header: verified_header.into_header(), - }); + if !merge_result.updated_partials.is_empty() { + let header = verified_header.into_header(); + let messages = merge_result + .updated_partials + .into_iter() + .map(|partial| { + let column = partial.into_inner(); + let present_cells = &column.sidecar.cells_present_bitmap; + let request_cells = if merge_result.local_blobs { + // Request all cells that are not available locally. + let mut all_one = present_cells.clone_zeroed(); + all_one.not_inplace(); + all_one + } else { + // Do not request cells if we don't know the local blobs yet. + present_cells.clone_zeroed() + }; + PubsubPartialMessage::DataColumnFulu { + column, + request_cells, + header: header.clone(), + } + }) + .collect(); + self.send_network_message(NetworkMessage::PublishPartialColumns { messages }); } Ok(avail) } @@ -1803,8 +1814,16 @@ impl NetworkBeaconProcessor { self.executor.spawn( async move { if let Ok(header) = PartialDataColumnHeader::try_from(block_clone.as_ref()) { + let header = Arc::new(header); self_clone - .fetch_engine_blobs_and_publish(Arc::new(header), block_root, publish_blobs) + .fetch_engine_blobs_and_publish_full( + header.clone(), + block_root, + publish_blobs, + ) + .await; + self_clone + .publish_partial_data_columns(header, block_root) .await } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7619f706cc..c5023ed5f4 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -22,9 +22,13 @@ use lighthouse_network::rpc::methods::{ use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::{ Client, GossipTopic, MessageId, NetworkConfig, NetworkGlobals, PeerId, PubsubMessage, + PubsubPartialMessage, rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, }; +use logging::crit; use rand::prelude::SliceRandom; +use ssz_types::VariableList; +use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -907,7 +911,7 @@ impl NetworkBeaconProcessor { }); } - pub async fn fetch_engine_blobs_and_publish( + pub async fn fetch_engine_blobs_and_publish_full( self: &Arc, header: Arc>, block_root: Hash256, @@ -931,7 +935,7 @@ impl NetworkBeaconProcessor { match fetch_and_process_engine_blobs( self.chain.clone(), block_root, - header.clone(), + header, custody_columns, publish_fn, ) @@ -975,44 +979,108 @@ impl NetworkBeaconProcessor { ); } } + } - // Publish partial columns without eager send - // TODO(gloas): implement publish partial columns without eager send - if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() { - let columns = assembler.get_columns_and_mark_as_local_fetched(block_root, &header); + pub async fn publish_partial_data_columns( + self: &Arc, + header: Arc>, + block_root: Hash256, + ) { + if header.kzg_commitments.is_empty() { + return; + } + + // TODO(gloas): implement publish partial columns + let Some(assembler) = self.chain.data_availability_checker.partial_assembler() else { + // Partials are disabled. + return; + }; + let epoch = header.slot().epoch(T::EthSpec::slots_per_epoch()); + let custody_columns = self.chain.sampling_columns_for_epoch(epoch); + let columns = assembler.get_columns_and_mark_as_local_fetched(block_root, &header); + + let mut present_indices: HashSet = HashSet::with_capacity(columns.len()); + let mut messages: Vec> = Vec::with_capacity(columns.len()); + for column in columns { // Republish both complete and incomplete columns as partials - let columns: Vec<_> = columns - .into_iter() - .filter_map(|column| match column { - AssemblyColumn::Incomplete(partial) => Some(partial.into_inner()), - AssemblyColumn::Complete(full) => { - let DataColumnSidecar::Fulu(fulu) = full.as_data_column() else { - return None; - }; - match fulu.to_partial() { - Ok(partial) => Some(Arc::new(partial)), - Err(err) => { - error!( - %block_root, - column_index = %full.index(), - ?err, - "Failed to convert complete column to partial for re-seeding" - ); - None - } + let partial_column = match column { + AssemblyColumn::Incomplete(partial) => partial.into_inner(), + AssemblyColumn::Complete(full) => { + let DataColumnSidecar::Fulu(fulu) = full.as_data_column() else { + continue; + }; + match fulu.to_partial() { + Ok(partial) => Arc::new(partial), + Err(err) => { + error!( + %block_root, + column_index = %full.index(), + ?err, + "Failed to convert complete column to partial for re-seeding" + ); + continue; } } - }) - .collect(); - if !columns.is_empty() { - debug!(block = %block_root, "Publishing all partials after getBlobs"); - self.send_network_message(NetworkMessage::PublishPartialColumns { - columns, - header, - }); - } else { - debug!(block = %block_root, "No partials to publish after getBlobs"); + } + }; + + present_indices.insert(partial_column.index); + let mut request_cells = partial_column.sidecar.cells_present_bitmap.clone_zeroed(); + request_cells.not_inplace(); + messages.push(PubsubPartialMessage::DataColumnFulu { + column: partial_column, + request_cells, + header: header.clone(), + }); + } + + // For each custody column without any local partial, send an empty placeholder + // that requests all cells. + let num_cells = header.kzg_commitments.len(); + for col_idx in custody_columns { + if present_indices.contains(col_idx) { + continue; } + // `kzg_commitments.len()` is bounded by `MaxBlobCommitmentsPerBlock`, so the + // bitmap constructor is infallible. + let Ok(cells_present_bitmap) = CellBitmap::::with_capacity(num_cells) + else { + crit!( + %block_root, + num_cells, + column_index = %col_idx, + "CellBitmap construction failed despite being bounded by MaxBlobCommitmentsPerBlock" + ); + continue; + }; + let request_cells = cells_present_bitmap.not(); + messages.push(PubsubPartialMessage::DataColumnFulu { + column: Arc::new(PartialDataColumn { + block_root, + index: *col_idx, + sidecar: PartialDataColumnSidecar { + cells_present_bitmap, + column: VariableList::empty(), + kzg_proofs: VariableList::empty(), + header: None.into(), + }, + }), + request_cells, + header: header.clone(), + }); + } + + if !messages.is_empty() { + debug!( + block = %block_root, + count = messages.len(), + "Publishing all partials" + ); + self.send_network_message(NetworkMessage::PublishPartialColumns { messages }); + } else { + // This should not happen, as any custody columns will have at least an empty + // partial published. + warn!(block = %block_root, "No partials to publish"); } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 35437e1a2e..b9e07743eb 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -222,7 +222,7 @@ impl NetworkBeaconProcessor { // to be sent from the peers if we already have them. if let Ok(header) = signed_beacon_block.as_ref().try_into() { let publish_blobs = false; - self.fetch_engine_blobs_and_publish( + self.fetch_engine_blobs_and_publish_full( Arc::new(header), block_root, publish_blobs, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index c2e79fe9e8..ba4aada352 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -19,7 +19,7 @@ use lighthouse_network::rpc::methods::RpcResponse; use lighthouse_network::service::Network; use lighthouse_network::types::GossipKind; use lighthouse_network::{ - Context, PeerAction, PubsubMessage, ReportSource, Response, Subnet, + Context, PeerAction, PubsubMessage, PubsubPartialMessage, ReportSource, Response, Subnet, rpc::{GoodbyeReason, RpcErrorResponse}, }; use lighthouse_network::{MessageAcceptance, prometheus_client::registry::Registry}; @@ -39,8 +39,8 @@ use tokio::time::Sleep; use tracing::{debug, error, info, trace, warn}; use typenum::Unsigned; use types::{ - EthSpec, ForkContext, PartialDataColumn, PartialDataColumnHeader, Slot, SubnetId, - SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription, + EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + ValidatorSubscription, }; mod tests; @@ -85,8 +85,7 @@ pub enum NetworkMessage { Publish { messages: Vec> }, /// Publish partial data column sidecars via the partial gossipsub protocol. PublishPartialColumns { - columns: Vec>>, - header: Arc>, + messages: Vec>, }, /// Validates a received gossipsub message. This will propagate the message on the network. ValidationResult { @@ -683,8 +682,8 @@ impl NetworkService { ); self.libp2p.publish(messages); } - NetworkMessage::PublishPartialColumns { columns, header } => { - self.libp2p.publish_partial(columns, header); + NetworkMessage::PublishPartialColumns { messages } => { + self.libp2p.publish_partial(messages); } NetworkMessage::ReportPeer { peer_id,