Deprecate gossip blobs (#9126)

#9124

Deprecate unneeded pre-Fulu blob features

- blob gossip
- blob lookup sync
- engine getBlobsV1

Also deprecates some tests and cleans up production code paths

I think this is blocked until gnosis forks to fulu?


  


Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>

Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>

Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>

Co-Authored-By: Daniel Knopik <daniel@dknopik.de>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>
This commit is contained in:
Eitan Seri-Levi
2026-05-28 19:59:23 -07:00
committed by GitHub
parent ba3abf943f
commit 8396dc87d0
48 changed files with 485 additions and 2346 deletions

View File

@@ -455,20 +455,18 @@ impl BlockId {
warp_utils::reject::custom_not_found(format!("no blobs stored for block {root}"))
})?;
let blob_sidecar_list_filtered = match indices {
Some(vec) => {
let list: Vec<_> = vec
.into_iter()
.flat_map(|index| blob_sidecar_list.get(index as usize).cloned())
.collect();
let blob_sidecar_list: Vec<_> = blob_sidecar_list.into_iter().collect();
BlobSidecarList::new(list, max_blobs_per_block)
.map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e)))?
}
let blob_sidecar_list = match indices {
Some(indices) => indices
.into_iter()
.filter_map(|i| blob_sidecar_list.get(i as usize).cloned())
.collect(),
None => blob_sidecar_list,
};
Ok(blob_sidecar_list_filtered)
BlobSidecarList::new(blob_sidecar_list, max_blobs_per_block)
.map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e)))
}
fn get_blobs_from_data_columns<T: BeaconChainTypes>(

View File

@@ -1,7 +1,6 @@
use crate::metrics;
use std::future::Future;
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::{AsBlock, LookupBlock};
use beacon_chain::data_column_verification::GossipVerifiedDataColumn;
use beacon_chain::validator_monitor::get_block_delay_ms;
@@ -26,13 +25,12 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{Span, debug, debug_span, error, field, info, instrument, warn};
use tracing::{Span, debug, error, field, info, instrument, warn};
use tree_hash::TreeHash;
use types::{
AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource,
DataColumnSidecar, DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName,
FullPayload, FullPayloadBellatrix, Hash256, KzgProofs, SignedBeaconBlock,
SignedBlindedBeaconBlock,
AbstractExecPayload, BeaconBlockRef, BlobsList, BlockImportSource, DataColumnSidecar,
DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload,
FullPayloadBellatrix, Hash256, KzgProofs, SignedBeaconBlock, SignedBlindedBeaconBlock,
};
use warp::{Rejection, Reply, reply::Response};
@@ -195,23 +193,8 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
Ok(())
};
// Wait for blobs/columns to get gossip verified before proceeding further as we need them for import.
let (gossip_verified_blobs, gossip_verified_columns) = build_sidecar_task_handle.await?;
for blob in gossip_verified_blobs.into_iter().flatten() {
publish_blob_sidecars(network_tx, &blob).map_err(|_| {
warp_utils::reject::custom_server_error("unable to publish blob sidecars".into())
})?;
if let Err(e) = Box::pin(chain.process_gossip_blob(blob)).await {
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(reason = &msg, "Invalid blob provided to HTTP API");
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
// Wait for columns to get gossip verified before proceeding further as we need them for import.
let gossip_verified_columns = build_sidecar_task_handle.await?;
if !gossip_verified_columns.is_empty() {
if let Some(data_column_publishing_delay) = data_column_publishing_delay_for_testing {
@@ -342,18 +325,9 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
}
}
type BuildDataSidecarTaskResult<T> = Result<
(
Vec<Option<GossipVerifiedBlob<T>>>,
Vec<GossipVerifiedDataColumn<T>>,
),
Rejection,
>;
type BuildDataSidecarTaskResult<T> = Result<Vec<GossipVerifiedDataColumn<T>>, Rejection>;
/// Convert blobs to either:
///
/// 1. Blob sidecars if prior to peer DAS, or
/// 2. Data column sidecars if post peer DAS.
/// Convert blobs to data column sidecars.
fn spawn_build_data_sidecar_task<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
@@ -365,22 +339,9 @@ fn spawn_build_data_sidecar_task<T: BeaconChainTypes>(
.spawn_blocking_handle(
move || {
let Some((kzg_proofs, blobs)) = proofs_and_blobs else {
return Ok((vec![], vec![]));
return Ok(vec![]);
};
let _span = debug_span!("build_data_sidecars").entered();
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
if !peer_das_enabled {
// Pre-PeerDAS: construct blob sidecars for the network.
let gossip_verified_blobs =
build_gossip_verified_blobs(&chain, &block, blobs, kzg_proofs)?;
Ok((gossip_verified_blobs, vec![]))
} else {
// Post PeerDAS: construct data columns.
let gossip_verified_data_columns =
build_data_columns(&chain, &block, blobs, kzg_proofs)?;
Ok((vec![], gossip_verified_data_columns))
}
build_data_columns(&chain, &block, blobs, kzg_proofs)
},
"build_data_sidecars",
)
@@ -424,76 +385,6 @@ fn build_data_columns<T: BeaconChainTypes>(
Ok(gossip_verified_data_columns)
}
fn build_gossip_verified_blobs<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
block: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
blobs: BlobsList<T::EthSpec>,
kzg_proofs: KzgProofs<T::EthSpec>,
) -> Result<Vec<Option<GossipVerifiedBlob<T>>>, Rejection> {
let slot = block.slot();
let gossip_verified_blobs = kzg_proofs
.into_iter()
.zip(blobs)
.enumerate()
.map(|(i, (proof, unverified_blob))| {
let timer = metrics::start_timer(
&beacon_chain::metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION,
);
let blob_sidecar = BlobSidecar::new(i, unverified_blob, block, proof)
.map(Arc::new)
.map_err(|e| {
error!(
error = ?e,
blob_index = i,
%slot,
"Invalid blob - not publishing block"
);
warp_utils::reject::custom_bad_request(format!("{e:?}"))
})?;
drop(timer);
let gossip_verified_blob =
GossipVerifiedBlob::new(blob_sidecar.clone(), blob_sidecar.index, chain);
match gossip_verified_blob {
Ok(blob) => Ok(Some(blob)),
Err(GossipBlobError::RepeatBlob { proposer, .. }) => {
// Log the error but do not abort publication, we may need to publish the block
// or some of the other blobs if the block & blobs are only partially published
// by the other publisher.
debug!(
blob_index = blob_sidecar.index,
%slot,
proposer,
"Blob for publication already known"
);
Ok(None)
}
Err(e) => {
error!(
blob_index = blob_sidecar.index,
%slot,
error = ?e,
"Blob for publication is gossip-invalid"
);
Err(warp_utils::reject::custom_bad_request(e.to_string()))
}
}
})
.collect::<Result<Vec<_>, Rejection>>()?;
Ok(gossip_verified_blobs)
}
fn publish_blob_sidecars<T: BeaconChainTypes>(
sender_clone: &UnboundedSender<NetworkMessage<T::EthSpec>>,
blob: &GossipVerifiedBlob<T>,
) -> Result<(), BlockError> {
let pubsub_message = PubsubMessage::BlobSidecar(Box::new((blob.index(), blob.clone_blob())));
crate::utils::publish_pubsub_message(sender_clone, pubsub_message)
.map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)))
}
pub(crate) fn publish_column_sidecars<T: BeaconChainTypes>(
sender_clone: &UnboundedSender<NetworkMessage<T::EthSpec>>,
data_column_sidecars: &[GossipVerifiedDataColumn<T>],

View File

@@ -1587,7 +1587,7 @@ pub async fn block_seen_on_gossip_without_blobs_or_columns() {
let state = tester.harness.get_current_state();
let fork_name = state.fork_name(&tester.harness.spec).unwrap();
// Gloas blocks don't carry blobs (execution data comes via envelopes).
if !fork_name.deneb_enabled() || fork_name.gloas_enabled() {
if !fork_name.fulu_enabled() || fork_name.gloas_enabled() {
return;
}
@@ -1647,7 +1647,7 @@ pub async fn block_seen_on_gossip_without_blobs_or_columns() {
/// This test checks that an HTTP POST request with the block & blobs/columns succeeds with a 200 response
/// even if the block has already been seen on gossip without all blobs/columns.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn block_seen_on_gossip_with_some_blobs_or_columns() {
pub async fn block_seen_on_gossip_with_columns() {
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
@@ -1658,7 +1658,7 @@ pub async fn block_seen_on_gossip_with_some_blobs_or_columns() {
let state = tester.harness.get_current_state();
let fork_name = state.fork_name(&tester.harness.spec).unwrap();
// Gloas blocks don't carry blobs (execution data comes via envelopes).
if !fork_name.deneb_enabled() || fork_name.gloas_enabled() {
if !fork_name.fulu_enabled() || fork_name.gloas_enabled() {
return;
}
@@ -1690,9 +1690,6 @@ pub async fn block_seen_on_gossip_with_some_blobs_or_columns() {
blobs.0.len()
);
let partial_kzg_proofs = [*blobs.0.first().unwrap()];
let partial_blobs = [blobs.1.first().unwrap().clone()];
// Simulate the block being seen on gossip.
block
.clone()
@@ -1702,12 +1699,7 @@ pub async fn block_seen_on_gossip_with_some_blobs_or_columns() {
// Simulate some of the blobs being seen on gossip.
tester
.harness
.process_gossip_blobs_or_columns(
&block,
partial_blobs.iter(),
partial_kzg_proofs.iter(),
Some(get_custody_columns(&tester, block.slot())),
)
.process_gossip_columns(&block, Some(get_custody_columns(&tester, block.slot())))
.await;
// It should not yet be added to fork choice because all blobs have not been seen.
@@ -1740,7 +1732,7 @@ pub async fn block_seen_on_gossip_with_some_blobs_or_columns() {
/// This test checks that an HTTP POST request with the block & blobs/columns succeeds with a 200 response
/// even if the blobs/columns have already been seen on gossip.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn blobs_or_columns_seen_on_gossip_without_block() {
pub async fn columns_seen_on_gossip_without_block() {
let spec = test_spec::<E>();
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
@@ -1752,7 +1744,7 @@ pub async fn blobs_or_columns_seen_on_gossip_without_block() {
let state = tester.harness.get_current_state();
let fork_name = state.fork_name(&tester.harness.spec).unwrap();
// Gloas blocks don't carry blobs (execution data comes via envelopes).
if !fork_name.deneb_enabled() || fork_name.gloas_enabled() {
if !fork_name.fulu_enabled() || fork_name.gloas_enabled() {
return;
}
@@ -1778,12 +1770,7 @@ pub async fn blobs_or_columns_seen_on_gossip_without_block() {
// Simulate the blobs being seen on gossip.
tester
.harness
.process_gossip_blobs_or_columns(
&block,
blobs.iter(),
kzg_proofs.iter(),
Some(get_custody_columns(&tester, block.slot())),
)
.process_gossip_columns(&block, Some(get_custody_columns(&tester, block.slot())))
.await;
// It should not yet be added to fork choice because the block has not been seen.
@@ -1816,7 +1803,7 @@ pub async fn blobs_or_columns_seen_on_gossip_without_block() {
/// This test checks that an HTTP POST request with the block succeeds with a 200 response
/// if just the blobs have already been seen on gossip.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn blobs_or_columns_seen_on_gossip_without_block_and_no_http_blobs_or_columns() {
async fn columns_seen_on_gossip_without_block_and_no_http_columns() {
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
@@ -1827,7 +1814,7 @@ async fn blobs_or_columns_seen_on_gossip_without_block_and_no_http_blobs_or_colu
let state = tester.harness.get_current_state();
let fork_name = state.fork_name(&tester.harness.spec).unwrap();
// Gloas blocks don't carry blobs (execution data comes via envelopes).
if !fork_name.deneb_enabled() || fork_name.gloas_enabled() {
if !fork_name.fulu_enabled() || fork_name.gloas_enabled() {
return;
}
@@ -1848,18 +1835,13 @@ async fn blobs_or_columns_seen_on_gossip_without_block_and_no_http_blobs_or_colu
let state_a = tester.harness.get_current_state();
let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await;
let (kzg_proofs, blobs) = blobs.expect("should have some blobs");
let (_, blobs) = blobs.expect("should have some blobs");
assert!(!blobs.is_empty());
// Simulate the blobs being seen on gossip.
tester
.harness
.process_gossip_blobs_or_columns(
&block,
blobs.iter(),
kzg_proofs.iter(),
Some(get_custody_columns(&tester, block.slot())),
)
.process_gossip_columns(&block, Some(get_custody_columns(&tester, block.slot())))
.await;
// It should not yet be added to fork choice because the block has not been seen.
@@ -1893,7 +1875,7 @@ async fn blobs_or_columns_seen_on_gossip_without_block_and_no_http_blobs_or_colu
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn slashable_blobs_or_columns_seen_on_gossip_cause_failure() {
async fn slashable_columns_seen_on_gossip_cause_failure() {
let validation_level: Option<BroadcastValidation> =
Some(BroadcastValidation::ConsensusAndEquivocation);
@@ -1905,7 +1887,7 @@ async fn slashable_blobs_or_columns_seen_on_gossip_cause_failure() {
let state = tester.harness.get_current_state();
let fork_name = state.fork_name(&tester.harness.spec).unwrap();
// Gloas blocks don't carry blobs (execution data comes via envelopes).
if !fork_name.deneb_enabled() || fork_name.gloas_enabled() {
if !fork_name.fulu_enabled() || fork_name.gloas_enabled() {
return;
}
@@ -1926,19 +1908,13 @@ async fn slashable_blobs_or_columns_seen_on_gossip_cause_failure() {
let state_a = tester.harness.get_current_state();
let ((block_a, blobs_a), _) = tester.harness.make_block(state_a.clone(), slot_b).await;
let ((block_b, blobs_b), _) = tester.harness.make_block(state_a, slot_b).await;
let ((block_b, _), _) = tester.harness.make_block(state_a, slot_b).await;
let (kzg_proofs_a, blobs_a) = blobs_a.expect("should have some blobs");
let (kzg_proofs_b, blobs_b) = blobs_b.expect("should have some blobs");
// Simulate the blobs of block B being seen on gossip.
tester
.harness
.process_gossip_blobs_or_columns(
&block_b,
blobs_b.iter(),
kzg_proofs_b.iter(),
Some(get_custody_columns(&tester, block_b.slot())),
)
.process_gossip_columns(&block_b, Some(get_custody_columns(&tester, block_b.slot())))
.await;
// It should not yet be added to fork choice because block B has not been seen.
@@ -1984,7 +1960,7 @@ pub async fn duplicate_block_status_code() {
// Gloas blocks don't carry blobs (execution data comes via envelopes).
let spec = test_spec::<E>();
let genesis_fork = spec.fork_name_at_slot::<E>(Slot::new(0));
if !genesis_fork.deneb_enabled() || genesis_fork.gloas_enabled() {
if !genesis_fork.fulu_enabled() || genesis_fork.gloas_enabled() {
return;
}

View File

@@ -9167,11 +9167,17 @@ async fn builder_works_post_deneb() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_blob_sidecars() {
let mut config = ApiTesterConfig::default();
let mut config = ApiTesterConfig {
retain_historic_states: false,
spec: E::default_spec(),
node_custody_type: NodeCustodyType::Supernode,
};
config.spec.altair_fork_epoch = Some(Epoch::new(0));
config.spec.bellatrix_fork_epoch = Some(Epoch::new(0));
config.spec.capella_fork_epoch = Some(Epoch::new(0));
config.spec.deneb_fork_epoch = Some(Epoch::new(0));
config.spec.electra_fork_epoch = Some(Epoch::new(0));
config.spec.fulu_fork_epoch = Some(Epoch::new(0));
ApiTester::new_from_config(config)
.await