Fix and improve handling of empty columns after getBlobs response (#9361)

This PR fixes two issues:

1. This condition is inverted: dfb259171a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs (L1507-L1508)
We are supposed to filter out incomplete columns when we DON'T have local blobs yet!
2. When the EL returns no blobs, we never store a partial in the assembler, and this code fails to publish our need to the network, as no partials are returned: dfb259171a/beacon_node/network/src/network_beacon_processor/mod.rs (L1038-L1050)


  The simple fix for 1 would be to invert the condition, but we can improve the flow here: Instead of not publishing anything, we can publish what we got, but not request anything. This ties into the fix for 2: After get blobs completes, we not only publish anything in the partial assembler, but also for every missing custody column in there, publish an empty column and a request for all cells.

In particular:
- When sending a partial message to `network`, allow specifying a request bitmap instead of hardcoding an all-ones bitmap.
- For clarity and to prepare for Gloas integration, add a `PubsubPartialMessage` enum with a `DataColumnFulu` variant.
- On republishing after merging a gossip column: always publish, but only request cells if local blobs are known or get blobs is disabled. This also prepares us to request only *some* cells, e.g. in cases where we are aware of the blobs that the EL is going to send us, e.g. via `engine_hasBlobs`.
- Move guards in `fetch_engine_blobs_and_publish` to ensure everything works fine if there are no blobs or if get_blobs is disabled.


Co-Authored-By: Daniel Knopik <daniel@dknopik.de>
This commit is contained in:
Daniel Knopik
2026-06-19 02:50:24 +02:00
committed by GitHub
parent ddfc265123
commit 560f90611e
15 changed files with 313 additions and 148 deletions

View File

@@ -908,6 +908,7 @@ where
let shuffling_cache_size = self.chain_config.shuffling_cache_size; let shuffling_cache_size = self.chain_config.shuffling_cache_size;
let complete_blob_backfill = self.chain_config.complete_blob_backfill; let complete_blob_backfill = self.chain_config.complete_blob_backfill;
let enable_partial_columns = self.chain_config.enable_partial_columns; let enable_partial_columns = self.chain_config.enable_partial_columns;
let disable_get_blobs = self.chain_config.disable_get_blobs;
// Calculate the weak subjectivity point in which to backfill blocks to. // Calculate the weak subjectivity point in which to backfill blocks to.
let genesis_backfill_slot = if self.chain_config.genesis_backfill { let genesis_backfill_slot = if self.chain_config.genesis_backfill {
@@ -1043,6 +1044,7 @@ where
custody_context.clone(), custody_context.clone(),
self.spec.clone(), self.spec.clone(),
enable_partial_columns, enable_partial_columns,
disable_get_blobs,
) )
.map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?,
), ),

View File

@@ -121,6 +121,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
custody_context: Arc<CustodyContext<T::EthSpec>>, custody_context: Arc<CustodyContext<T::EthSpec>>,
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
enable_partial_columns: bool, enable_partial_columns: bool,
disable_get_blobs: bool,
) -> Result<Self, AvailabilityCheckError> { ) -> Result<Self, AvailabilityCheckError> {
let inner = DataAvailabilityCheckerInner::new( let inner = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY, OVERFLOW_LRU_CAPACITY,
@@ -130,6 +131,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let partial_assembler = if enable_partial_columns { let partial_assembler = if enable_partial_columns {
Some(Arc::new(PartialDataColumnAssembler::new( Some(Arc::new(PartialDataColumnAssembler::new(
OVERFLOW_LRU_CAPACITY, OVERFLOW_LRU_CAPACITY,
disable_get_blobs,
))) )))
} else { } else {
None None
@@ -1432,6 +1434,7 @@ mod test {
custody_context, custody_context,
spec, spec,
true, true,
false,
) )
.expect("should initialise data availability checker") .expect("should initialise data availability checker")
} }

View File

@@ -344,7 +344,7 @@ fn mock_beacon_adapter(fork_name: ForkName, get_blobs_v3: bool) -> MockFetchBlob
let test_runtime = TestRuntime::default(); let test_runtime = TestRuntime::default();
let spec = Arc::new(fork_name.make_genesis_spec(E::default_spec())); let spec = Arc::new(fork_name.make_genesis_spec(E::default_spec()));
let kzg = get_kzg(&spec); let kzg = get_kzg(&spec);
let partial_assembler = PartialDataColumnAssembler::new(32); let partial_assembler = PartialDataColumnAssembler::new(32, false);
let mut mock_adapter = MockFetchBlobsBeaconAdapter::default(); let mut mock_adapter = MockFetchBlobsBeaconAdapter::default();
mock_adapter.expect_spec().return_const(spec.clone()); mock_adapter.expect_spec().return_const(spec.clone());

View File

@@ -13,6 +13,9 @@ use types::data::{ColumnIndex, PartialDataColumnHeader};
pub struct PartialDataColumnAssembler<E: EthSpec> { pub struct PartialDataColumnAssembler<E: EthSpec> {
/// Cache of assemblies keyed by block root /// Cache of assemblies keyed by block root
assemblies: RwLock<LruCache<Hash256, PartialAssembly<E>>>, assemblies: RwLock<LruCache<Hash256, PartialAssembly<E>>>,
/// Whether getBlobs is disabled. If so, always set `has_local_blobs` to true, as we will never
/// retrieve blobs from the EL and therefore should immediately request cells from the network.
disable_get_blobs: bool,
} }
/// Tracks partial columns being assembled for a single block /// Tracks partial columns being assembled for a single block
@@ -43,9 +46,10 @@ pub struct PartialMergeResult<E: EthSpec> {
} }
impl<E: EthSpec> PartialDataColumnAssembler<E> { impl<E: EthSpec> PartialDataColumnAssembler<E> {
pub fn new(capacity: usize) -> Self { pub fn new(capacity: usize, disable_get_blobs: bool) -> Self {
Self { Self {
assemblies: RwLock::new(LruCache::new(capacity)), assemblies: RwLock::new(LruCache::new(capacity)),
disable_get_blobs,
} }
} }
@@ -60,7 +64,7 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
let assembly = PartialAssembly { let assembly = PartialAssembly {
header, header,
has_local_blobs: false, has_local_blobs: self.disable_get_blobs,
columns: HashMap::new(), columns: HashMap::new(),
}; };
@@ -82,7 +86,7 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
.entry(block_root) .entry(block_root)
.or_insert_with(|| PartialAssembly { .or_insert_with(|| PartialAssembly {
header: header.clone(), header: header.clone(),
has_local_blobs: false, has_local_blobs: self.disable_get_blobs,
columns: HashMap::new(), columns: HashMap::new(),
}); });
@@ -174,7 +178,7 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
signed_block_header: fulu.signed_block_header.clone(), signed_block_header: fulu.signed_block_header.clone(),
kzg_commitments_inclusion_proof: fulu.kzg_commitments_inclusion_proof.clone(), kzg_commitments_inclusion_proof: fulu.kzg_commitments_inclusion_proof.clone(),
}), }),
has_local_blobs: false, has_local_blobs: self.disable_get_blobs,
columns: Default::default(), columns: Default::default(),
}); });
let prev = assembly let prev = assembly
@@ -367,7 +371,7 @@ mod tests {
} }
fn make_assembler() -> PartialDataColumnAssembler<E> { fn make_assembler() -> PartialDataColumnAssembler<E> {
PartialDataColumnAssembler::new(16) PartialDataColumnAssembler::new(16, false)
} }
// -- init and get_header tests -- // -- init and get_header tests --

View File

@@ -246,6 +246,7 @@ pub fn test_da_checker<E: EthSpec>(
custody_context, custody_context,
spec, spec,
true, true,
false,
) )
.expect("should initialise data availability checker") .expect("should initialise data availability checker")
} }

View File

@@ -14,7 +14,7 @@ use eth2::types::{
}; };
use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse}; use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse};
use futures::TryFutureExt; use futures::TryFutureExt;
use lighthouse_network::PubsubMessage; use lighthouse_network::{PubsubMessage, PubsubPartialMessage};
use logging::crit; use logging::crit;
use network::NetworkMessage; use network::NetworkMessage;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
@@ -442,12 +442,22 @@ pub(crate) fn publish_column_sidecars<T: BeaconChainTypes>(
// Publish partial messages // Publish partial messages
if !partial_columns.is_empty() { if !partial_columns.is_empty() {
if let Some(header) = partial_header { if let Some(header) = partial_header {
let header = Arc::new(header);
let messages = partial_columns
.into_iter()
.map(|column| {
let mut request_cells = column.sidecar.cells_present_bitmap.clone();
request_cells.not_inplace();
PubsubPartialMessage::DataColumnFulu {
column,
request_cells,
header: header.clone(),
}
})
.collect();
crate::utils::publish_network_message( crate::utils::publish_network_message(
sender_clone, sender_clone,
NetworkMessage::PublishPartialColumns { NetworkMessage::PublishPartialColumns { messages },
columns: partial_columns,
header: Arc::new(header),
},
) )
.map_err(|_| { .map_err(|_| {
BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)) BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish))

View File

@@ -98,8 +98,8 @@ impl std::fmt::Display for ClearDialError<'_> {
} }
pub use crate::types::{ pub use crate::types::{
Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage, Subnet, Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage,
SubnetDiscovery, decode_partial, PubsubPartialMessage, Subnet, SubnetDiscovery, decode_partial,
}; };
pub use prometheus_client; pub use prometheus_client;

View File

@@ -20,7 +20,9 @@ use crate::types::{
SubnetDiscovery, all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, SubnetDiscovery, all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic,
subnet_from_topic_hash, subnet_from_topic_hash,
}; };
use crate::{Enr, NetworkGlobals, PubsubMessage, TopicHash, decode_partial, metrics}; use crate::{
Enr, NetworkGlobals, PubsubMessage, PubsubPartialMessage, TopicHash, decode_partial, metrics,
};
use api_types::{AppRequestId, Response}; use api_types::{AppRequestId, Response};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use gossipsub_scoring_parameters::{PeerScoreSettings, lighthouse_gossip_thresholds}; use gossipsub_scoring_parameters::{PeerScoreSettings, lighthouse_gossip_thresholds};
@@ -43,8 +45,9 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
use types::{ use types::{
ChainSpec, DataColumnSubnetId, EnrForkId, EthSpec, ForkContext, ForkName, PartialDataColumn, CellBitmap, ChainSpec, DataColumnSubnetId, EnrForkId, EthSpec, ForkContext, ForkName,
PartialDataColumnHeader, Slot, SubnetId, consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, PartialDataColumn, PartialDataColumnHeader, Slot, SubnetId,
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT,
}; };
use utils::{Context as ServiceContext, build_transport, strip_peer_id}; use utils::{Context as ServiceContext, build_transport, strip_peer_id};
@@ -920,65 +923,73 @@ impl<E: EthSpec> Network<E> {
} }
/// Publishes partial data column sidecars to the gossipsub network. /// Publishes partial data column sidecars to the gossipsub network.
pub fn publish_partial( pub fn publish_partial(&mut self, messages: Vec<PubsubPartialMessage<E>>) {
&mut self,
columns: Vec<Arc<PartialDataColumn<E>>>,
header: Arc<PartialDataColumnHeader<E>>,
) {
if !self.network_globals.config.enable_partial_columns { if !self.network_globals.config.enable_partial_columns {
return; return;
} }
debug!( debug!(count = messages.len(), "Sending partial messages");
count = columns.len(),
"Sending partial data column sidecars"
);
for column in columns { for message in messages {
let subnet = match message {
DataColumnSubnetId::from_column_index(column.index, &self.fork_context.spec); PubsubPartialMessage::DataColumnFulu {
let topic = GossipTopic::new( column,
GossipKind::DataColumnSidecar(subnet), request_cells,
GossipEncoding::default(), header,
self.enr_fork_id.fork_digest, } => self.publish_partial_data_column_fulu(column, request_cells, header),
);
let header_sent_set = self
.partial_column_header_tracker
.get_for_block(column.block_root);
let partial_message = OutgoingPartialColumn::new(column, &header, header_sent_set);
let publish_topic: Topic = topic.clone().into();
if let Err(e) = self
.gossipsub_mut()
.publish_partial(publish_topic, partial_message)
{
match e {
PublishError::NoPeersSubscribedToTopic => {
debug!(
kind = %topic.kind(),
"No peers supporting partial messages"
);
}
ref e => {
warn!(
error = ?e,
kind = %topic.kind(),
"Could not publish partial message"
);
}
}
// add to metrics
if let Some(v) = metrics::get_int_gauge(
&metrics::FAILED_PARTIAL_PUBLISHES_PER_MAIN_TOPIC,
&[&format!("{:?}", topic.kind())],
) {
v.inc()
};
} }
} }
} }
fn publish_partial_data_column_fulu(
&mut self,
column: Arc<PartialDataColumn<E>>,
request_cells: CellBitmap<E>,
header: Arc<PartialDataColumnHeader<E>>,
) {
let subnet = DataColumnSubnetId::from_column_index(column.index, &self.fork_context.spec);
let topic = GossipTopic::new(
GossipKind::DataColumnSidecar(subnet),
GossipEncoding::default(),
self.enr_fork_id.fork_digest,
);
let header_sent_set = self
.partial_column_header_tracker
.get_for_block(column.block_root);
let partial_message =
OutgoingPartialColumn::new(column, &header, header_sent_set, request_cells);
let publish_topic: Topic = topic.clone().into();
if let Err(e) = self
.gossipsub_mut()
.publish_partial(publish_topic, partial_message)
{
match e {
PublishError::NoPeersSubscribedToTopic => {
debug!(
kind = %topic.kind(),
"No peers supporting partial messages"
);
}
ref e => {
warn!(
error = ?e,
kind = %topic.kind(),
"Could not publish partial message"
);
}
}
// add to metrics
if let Some(v) = metrics::get_int_gauge(
&metrics::FAILED_PARTIAL_PUBLISHES_PER_MAIN_TOPIC,
&[&format!("{:?}", topic.kind())],
) {
v.inc()
};
}
}
/// Informs the gossipsub about the result of a message validation. /// Informs the gossipsub about the result of a message validation.
/// If the message is valid it will get propagated by gossipsub. /// If the message is valid it will get propagated by gossipsub.
pub fn report_message_validation_result( pub fn report_message_validation_result(

View File

@@ -16,7 +16,7 @@ pub use eth2::lighthouse::sync_state::{BackFillState, CustodyBackFillState, Sync
pub use globals::NetworkGlobals; pub use globals::NetworkGlobals;
pub use partial::HeaderSentSet; pub use partial::HeaderSentSet;
pub use partial::OutgoingPartialColumn; pub use partial::OutgoingPartialColumn;
pub use pubsub::{PubsubMessage, SnappyTransform, decode_partial}; pub use pubsub::{PubsubMessage, PubsubPartialMessage, SnappyTransform, decode_partial};
pub use subnet::{Subnet, SubnetDiscovery}; pub use subnet::{Subnet, SubnetDiscovery};
pub use topics::{ pub use topics::{
GossipEncoding, GossipKind, GossipTopic, TopicConfig, all_topics_at_fork, GossipEncoding, GossipKind, GossipTopic, TopicConfig, all_topics_at_fork,

View File

@@ -9,7 +9,7 @@ use std::sync::Arc;
use tracing::{error, trace}; use tracing::{error, trace};
use types::core::{EthSpec, Hash256}; use types::core::{EthSpec, Hash256};
use types::data::{ use types::data::{
PartialDataColumn, PartialDataColumnHeader, PartialDataColumnPartsMetadata, CellBitmap, PartialDataColumn, PartialDataColumnHeader, PartialDataColumnPartsMetadata,
PartialDataColumnSidecar, PartialDataColumnSidecarRef, PartialDataColumnSidecar, PartialDataColumnSidecarRef,
}; };
@@ -30,10 +30,29 @@ impl<E: EthSpec> OutgoingPartialColumn<E> {
partial_column: Arc<PartialDataColumn<E>>, partial_column: Arc<PartialDataColumn<E>>,
header: &PartialDataColumnHeader<E>, header: &PartialDataColumnHeader<E>,
header_sent_set: HeaderSentSet, header_sent_set: HeaderSentSet,
requests: CellBitmap<E>,
) -> Self { ) -> Self {
// For now, always request all cells // Always set the request bit for available cells.
let mut requests = partial_column.sidecar.cells_present_bitmap.clone_zeroed(); //
requests.not_inplace(); // Gossipsub applys certain optimisations to avoid sending redundant messages. This
// requires that we stay consistent with our metadata. Gossipsub uses the `Metadata` trait
// impl below to determine whether it can perform these optimisations.
//
// If we request a cell and then receive it, un-setting the request bit in the next
// published message may cause issues:
// Gossipsub tries to avoid the impact of application race conditions by checking newly
// published metadata against previously published metadata. This no longer functions
// correctly if request bits are unset between calls, as Gossipsub will consider a message
// with new requests as new info to be propagated, possibly overwriting previous messages
// with more cells (but fewer request bits). This is because gossipsub will see that both
// metadata have some bits that are not set in the other metadata and therefore cannot
// decide which actually carries more data. By always setting request bits for available
// cells, we avoid this issue, as requests will never be unset between calls.
//
// In other words, gossipsub relies on the fact that metadata is additive. The request bit
// is, therefore, to be seen as a "request if not available" bit.
let requests = requests.union(&partial_column.sidecar.cells_present_bitmap);
let metadata = PartialDataColumnPartsMetadata::<E> { let metadata = PartialDataColumnPartsMetadata::<E> {
available: partial_column.sidecar.cells_present_bitmap.clone(), available: partial_column.sidecar.cells_present_bitmap.clone(),
requests, requests,
@@ -322,6 +341,14 @@ mod tests {
}) })
} }
fn make_all_one_bitmap(len: usize) -> CellBitmap<E> {
let mut request_cells = CellBitmap::<E>::with_capacity(len).unwrap();
for idx in 0..request_cells.len() {
request_cells.set(idx, true).unwrap();
}
request_cells
}
fn random_peer_id() -> PeerId { fn random_peer_id() -> PeerId {
let keypair = Keypair::generate_ed25519(); let keypair = Keypair::generate_ed25519();
PeerId::from(keypair.public()) PeerId::from(keypair.public())
@@ -422,7 +449,8 @@ mod tests {
let header = make_header(4); let header = make_header(4);
let partial = make_partial_column(root, 4, &[0, 1]); let partial = make_partial_column(root, 4, &[0, 1]);
let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new()));
let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); let requests = make_all_one_bitmap(4);
let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set, requests);
let peer = random_peer_id(); let peer = random_peer_id();
@@ -442,7 +470,8 @@ mod tests {
// We have cells [0, 2, 3] // We have cells [0, 2, 3]
let partial = make_partial_column(root, 4, &[0, 2, 3]); let partial = make_partial_column(root, 4, &[0, 2, 3]);
let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new()));
let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); let requests = make_all_one_bitmap(4);
let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set, requests);
let peer = random_peer_id(); let peer = random_peer_id();
@@ -474,7 +503,8 @@ mod tests {
// We have cells [0] // We have cells [0]
let partial = make_partial_column(root, 4, &[0]); let partial = make_partial_column(root, 4, &[0]);
let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new()));
let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); let requests = make_all_one_bitmap(4);
let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set, requests);
let peer = random_peer_id(); let peer = random_peer_id();

View File

@@ -7,10 +7,10 @@ use ssz::{Decode, Encode};
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
use std::sync::Arc; use std::sync::Arc;
use types::{ use types::{
AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, DataColumnSidecar, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, CellBitmap, DataColumnSidecar,
DataColumnSubnetId, EthSpec, ForkContext, ForkName, Hash256, LightClientFinalityUpdate, DataColumnSubnetId, EthSpec, ForkContext, ForkName, Hash256, LightClientFinalityUpdate,
LightClientOptimisticUpdate, PartialDataColumn, PartialDataColumnSidecar, LightClientOptimisticUpdate, PartialDataColumn, PartialDataColumnHeader,
PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof, PartialDataColumnSidecar, PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof,
SignedAggregateAndProofBase, SignedAggregateAndProofElectra, SignedBeaconBlock, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra,
@@ -56,6 +56,24 @@ pub enum PubsubMessage<E: EthSpec> {
LightClientOptimisticUpdate(Box<LightClientOptimisticUpdate<E>>), LightClientOptimisticUpdate(Box<LightClientOptimisticUpdate<E>>),
} }
/// A message published via the partial gossipsub protocol.
#[derive(Debug, Clone)]
pub enum PubsubPartialMessage<E: EthSpec> {
/// A partial data column sidecar from the Fulu fork.
DataColumnFulu {
/// The column to publish. Libp2p will cache it and treat it as the data to send if any peer
/// asks for data within it.
column: Arc<PartialDataColumn<E>>,
/// The cells we are requesting. Usually, this will be all-ones, as we need all cells.
/// However, while get_blobs is still in progress, blobs we expect from the EL should not be
/// requested to conserve bandwidth.
request_cells: CellBitmap<E>,
/// The header associated with the column above. This is set separately here, as the column
/// to be published does not contain the header - it is stored without.
header: Arc<PartialDataColumnHeader<E>>,
},
}
// Implements the `DataTransform` trait of gossipsub to employ snappy compression // Implements the `DataTransform` trait of gossipsub to employ snappy compression
pub struct SnappyTransform { pub struct SnappyTransform {
/// Sets the maximum size we allow gossipsub messages to decompress to. /// Sets the maximum size we allow gossipsub messages to decompress to.

View File

@@ -33,7 +33,7 @@ use beacon_chain::{
use beacon_processor::{Work, WorkEvent}; use beacon_processor::{Work, WorkEvent};
use lighthouse_network::{ use lighthouse_network::{
Client, GossipTopic, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage, Client, GossipTopic, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage,
ReportSource, PubsubPartialMessage, ReportSource,
}; };
use logging::crit; use logging::crit;
use operation_pool::ReceivedPreCapella; use operation_pool::ReceivedPreCapella;
@@ -937,9 +937,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(mut column) => { Ok(mut column) => {
let header = column.sidecar.header.take(); let header = column.sidecar.header.take();
if let Some(header) = header { if let Some(header) = header {
// Requesting cells is irrelevant as all cells are available, simply clone
// the `cells_present_bitmap`.
let request_cells = column.sidecar.cells_present_bitmap.clone();
self.send_network_message(NetworkMessage::PublishPartialColumns { self.send_network_message(NetworkMessage::PublishPartialColumns {
columns: vec![Arc::new(column)], messages: vec![PubsubPartialMessage::DataColumnFulu {
header: Arc::new(header), column: Arc::new(column),
request_cells,
header: Arc::new(header),
}],
}); });
} else { } else {
crit!("Converting from full to partial yielded headerless partial") crit!("Converting from full to partial yielded headerless partial")
@@ -1077,8 +1083,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
debug!(block = %block_root, "Triggering getBlobs after receiving partial header"); debug!(block = %block_root, "Triggering getBlobs after receiving partial header");
// We want to publish immediately when this finishes // We want to publish immediately when this finishes
let publish_blobs = true; let publish_blobs = true;
self.fetch_engine_blobs_and_publish(header.into_header(), block_root, publish_blobs) let header = header.into_header();
.await self.fetch_engine_blobs_and_publish_full(header.clone(), block_root, publish_blobs)
.await;
self.publish_partial_data_columns(header, block_root).await;
} }
} }
} }
@@ -1311,28 +1319,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}); });
} }
let only_send_completed_partials = if !merge_result.updated_partials.is_empty() {
merge_result.local_blobs || self.chain.config.disable_get_blobs; let header = verified_header.into_header();
let columns = merge_result let messages = merge_result
.updated_partials .updated_partials
.into_iter() .into_iter()
.map(|partial| partial.into_inner()) .map(|partial| {
.filter(|partial| { let column = partial.into_inner();
!only_send_completed_partials || partial.sidecar.is_complete() let present_cells = &column.sidecar.cells_present_bitmap;
}) let request_cells = if merge_result.local_blobs {
.collect::<Vec<_>>(); // Request all cells that are not available locally.
let mut all_one = present_cells.clone_zeroed();
if !columns.is_empty() { all_one.not_inplace();
if only_send_completed_partials { all_one
debug!( } else {
block = %block_root, // Do not request cells if we don't know the local blobs yet.
"Not publishing incomplete partials before getBlobs" present_cells.clone_zeroed()
); };
} PubsubPartialMessage::DataColumnFulu {
self.send_network_message(NetworkMessage::PublishPartialColumns { column,
columns, request_cells,
header: verified_header.into_header(), header: header.clone(),
}); }
})
.collect();
self.send_network_message(NetworkMessage::PublishPartialColumns { messages });
} }
Ok(avail) Ok(avail)
} }
@@ -1803,8 +1814,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.executor.spawn( self.executor.spawn(
async move { async move {
if let Ok(header) = PartialDataColumnHeader::try_from(block_clone.as_ref()) { if let Ok(header) = PartialDataColumnHeader::try_from(block_clone.as_ref()) {
let header = Arc::new(header);
self_clone self_clone
.fetch_engine_blobs_and_publish(Arc::new(header), block_root, publish_blobs) .fetch_engine_blobs_and_publish_full(
header.clone(),
block_root,
publish_blobs,
)
.await;
self_clone
.publish_partial_data_columns(header, block_root)
.await .await
} }
} }

View File

@@ -22,9 +22,13 @@ use lighthouse_network::rpc::methods::{
use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use lighthouse_network::{ use lighthouse_network::{
Client, GossipTopic, MessageId, NetworkConfig, NetworkGlobals, PeerId, PubsubMessage, Client, GossipTopic, MessageId, NetworkConfig, NetworkGlobals, PeerId, PubsubMessage,
PubsubPartialMessage,
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
}; };
use logging::crit;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
use ssz_types::VariableList;
use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -907,7 +911,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}); });
} }
pub async fn fetch_engine_blobs_and_publish( pub async fn fetch_engine_blobs_and_publish_full(
self: &Arc<Self>, self: &Arc<Self>,
header: Arc<PartialDataColumnHeader<T::EthSpec>>, header: Arc<PartialDataColumnHeader<T::EthSpec>>,
block_root: Hash256, block_root: Hash256,
@@ -931,7 +935,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match fetch_and_process_engine_blobs( match fetch_and_process_engine_blobs(
self.chain.clone(), self.chain.clone(),
block_root, block_root,
header.clone(), header,
custody_columns, custody_columns,
publish_fn, publish_fn,
) )
@@ -975,44 +979,108 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
); );
} }
} }
}
// Publish partial columns without eager send pub async fn publish_partial_data_columns(
// TODO(gloas): implement publish partial columns without eager send self: &Arc<Self>,
if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() { header: Arc<PartialDataColumnHeader<T::EthSpec>>,
let columns = assembler.get_columns_and_mark_as_local_fetched(block_root, &header); block_root: Hash256,
) {
if header.kzg_commitments.is_empty() {
return;
}
// TODO(gloas): implement publish partial columns
let Some(assembler) = self.chain.data_availability_checker.partial_assembler() else {
// Partials are disabled.
return;
};
let epoch = header.slot().epoch(T::EthSpec::slots_per_epoch());
let custody_columns = self.chain.sampling_columns_for_epoch(epoch);
let columns = assembler.get_columns_and_mark_as_local_fetched(block_root, &header);
let mut present_indices: HashSet<ColumnIndex> = HashSet::with_capacity(columns.len());
let mut messages: Vec<PubsubPartialMessage<T::EthSpec>> = Vec::with_capacity(columns.len());
for column in columns {
// Republish both complete and incomplete columns as partials // Republish both complete and incomplete columns as partials
let columns: Vec<_> = columns let partial_column = match column {
.into_iter() AssemblyColumn::Incomplete(partial) => partial.into_inner(),
.filter_map(|column| match column { AssemblyColumn::Complete(full) => {
AssemblyColumn::Incomplete(partial) => Some(partial.into_inner()), let DataColumnSidecar::Fulu(fulu) = full.as_data_column() else {
AssemblyColumn::Complete(full) => { continue;
let DataColumnSidecar::Fulu(fulu) = full.as_data_column() else { };
return None; match fulu.to_partial() {
}; Ok(partial) => Arc::new(partial),
match fulu.to_partial() { Err(err) => {
Ok(partial) => Some(Arc::new(partial)), error!(
Err(err) => { %block_root,
error!( column_index = %full.index(),
%block_root, ?err,
column_index = %full.index(), "Failed to convert complete column to partial for re-seeding"
?err, );
"Failed to convert complete column to partial for re-seeding" continue;
);
None
}
} }
} }
}) }
.collect(); };
if !columns.is_empty() {
debug!(block = %block_root, "Publishing all partials after getBlobs"); present_indices.insert(partial_column.index);
self.send_network_message(NetworkMessage::PublishPartialColumns { let mut request_cells = partial_column.sidecar.cells_present_bitmap.clone_zeroed();
columns, request_cells.not_inplace();
header, messages.push(PubsubPartialMessage::DataColumnFulu {
}); column: partial_column,
} else { request_cells,
debug!(block = %block_root, "No partials to publish after getBlobs"); header: header.clone(),
});
}
// For each custody column without any local partial, send an empty placeholder
// that requests all cells.
let num_cells = header.kzg_commitments.len();
for col_idx in custody_columns {
if present_indices.contains(col_idx) {
continue;
} }
// `kzg_commitments.len()` is bounded by `MaxBlobCommitmentsPerBlock`, so the
// bitmap constructor is infallible.
let Ok(cells_present_bitmap) = CellBitmap::<T::EthSpec>::with_capacity(num_cells)
else {
crit!(
%block_root,
num_cells,
column_index = %col_idx,
"CellBitmap construction failed despite being bounded by MaxBlobCommitmentsPerBlock"
);
continue;
};
let request_cells = cells_present_bitmap.not();
messages.push(PubsubPartialMessage::DataColumnFulu {
column: Arc::new(PartialDataColumn {
block_root,
index: *col_idx,
sidecar: PartialDataColumnSidecar {
cells_present_bitmap,
column: VariableList::empty(),
kzg_proofs: VariableList::empty(),
header: None.into(),
},
}),
request_cells,
header: header.clone(),
});
}
if !messages.is_empty() {
debug!(
block = %block_root,
count = messages.len(),
"Publishing all partials"
);
self.send_network_message(NetworkMessage::PublishPartialColumns { messages });
} else {
// This should not happen, as any custody columns will have at least an empty
// partial published.
warn!(block = %block_root, "No partials to publish");
} }
} }

View File

@@ -222,7 +222,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// to be sent from the peers if we already have them. // to be sent from the peers if we already have them.
if let Ok(header) = signed_beacon_block.as_ref().try_into() { if let Ok(header) = signed_beacon_block.as_ref().try_into() {
let publish_blobs = false; let publish_blobs = false;
self.fetch_engine_blobs_and_publish( self.fetch_engine_blobs_and_publish_full(
Arc::new(header), Arc::new(header),
block_root, block_root,
publish_blobs, publish_blobs,

View File

@@ -19,7 +19,7 @@ use lighthouse_network::rpc::methods::RpcResponse;
use lighthouse_network::service::Network; use lighthouse_network::service::Network;
use lighthouse_network::types::GossipKind; use lighthouse_network::types::GossipKind;
use lighthouse_network::{ use lighthouse_network::{
Context, PeerAction, PubsubMessage, ReportSource, Response, Subnet, Context, PeerAction, PubsubMessage, PubsubPartialMessage, ReportSource, Response, Subnet,
rpc::{GoodbyeReason, RpcErrorResponse}, rpc::{GoodbyeReason, RpcErrorResponse},
}; };
use lighthouse_network::{MessageAcceptance, prometheus_client::registry::Registry}; use lighthouse_network::{MessageAcceptance, prometheus_client::registry::Registry};
@@ -39,8 +39,8 @@ use tokio::time::Sleep;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
use typenum::Unsigned; use typenum::Unsigned;
use types::{ use types::{
EthSpec, ForkContext, PartialDataColumn, PartialDataColumnHeader, Slot, SubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription, ValidatorSubscription,
}; };
mod tests; mod tests;
@@ -85,8 +85,7 @@ pub enum NetworkMessage<E: EthSpec> {
Publish { messages: Vec<PubsubMessage<E>> }, Publish { messages: Vec<PubsubMessage<E>> },
/// Publish partial data column sidecars via the partial gossipsub protocol. /// Publish partial data column sidecars via the partial gossipsub protocol.
PublishPartialColumns { PublishPartialColumns {
columns: Vec<Arc<PartialDataColumn<E>>>, messages: Vec<PubsubPartialMessage<E>>,
header: Arc<PartialDataColumnHeader<E>>,
}, },
/// Validates a received gossipsub message. This will propagate the message on the network. /// Validates a received gossipsub message. This will propagate the message on the network.
ValidationResult { ValidationResult {
@@ -683,8 +682,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
); );
self.libp2p.publish(messages); self.libp2p.publish(messages);
} }
NetworkMessage::PublishPartialColumns { columns, header } => { NetworkMessage::PublishPartialColumns { messages } => {
self.libp2p.publish_partial(columns, header); self.libp2p.publish_partial(messages);
} }
NetworkMessage::ReportPeer { NetworkMessage::ReportPeer {
peer_id, peer_id,