mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-17 04:48:21 +00:00
Prevent AvailabilityCheckError when there's no new custody columns to import (#7533)
Addresses a regression recently introduced when we started gossip verifying data columns from EL blobs
```
failures:
network_beacon_processor::tests::accept_processed_gossip_data_columns_without_import
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 90 filtered out; finished in 16.60s
stderr ───
thread 'network_beacon_processor::tests::accept_processed_gossip_data_columns_without_import' panicked at beacon_node/network/src/network_beacon_processor/tests.rs:829:10:
should put data columns into availability cache: Unexpected("empty columns")
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
```
https://github.com/sigp/lighthouse/actions/runs/15309278812/job/43082341868?pr=7521
If an empty `Vec` is passed to the DA checker, it causes an unexpected error.
This PR addresses it by not passing an empty `Vec` for processing, and not spawning a task to publish.
This commit is contained in:
@@ -264,6 +264,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug!(num_fetched_blobs, "All expected blobs received from the EL");
|
||||||
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
||||||
|
|
||||||
if chain_adapter.fork_choice_contains_block(&block_root) {
|
if chain_adapter.fork_choice_contains_block(&block_root) {
|
||||||
@@ -276,23 +277,32 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let chain_adapter = Arc::new(chain_adapter);
|
let chain_adapter = Arc::new(chain_adapter);
|
||||||
let custody_columns = compute_and_publish_data_columns(
|
let custody_columns_to_import = compute_custody_columns_to_import(
|
||||||
&chain_adapter,
|
&chain_adapter,
|
||||||
block.clone(),
|
block.clone(),
|
||||||
blobs,
|
blobs,
|
||||||
proofs,
|
proofs,
|
||||||
custody_columns_indices,
|
custody_columns_indices,
|
||||||
publish_fn,
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
debug!(num_fetched_blobs, "Processing engine blobs");
|
if custody_columns_to_import.is_empty() {
|
||||||
|
debug!(
|
||||||
|
info = "No new data columns to import",
|
||||||
|
"Ignoring EL blobs response"
|
||||||
|
);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
publish_fn(EngineGetBlobsOutput::CustodyColumns(
|
||||||
|
custody_columns_to_import.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
let availability_processing_status = chain_adapter
|
let availability_processing_status = chain_adapter
|
||||||
.process_engine_blobs(
|
.process_engine_blobs(
|
||||||
block.slot(),
|
block.slot(),
|
||||||
block_root,
|
block_root,
|
||||||
EngineGetBlobsOutput::CustodyColumns(custody_columns),
|
EngineGetBlobsOutput::CustodyColumns(custody_columns_to_import),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -300,13 +310,12 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
|
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
|
||||||
async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
|
async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
|
||||||
chain_adapter: &Arc<FetchBlobsBeaconAdapter<T>>,
|
chain_adapter: &Arc<FetchBlobsBeaconAdapter<T>>,
|
||||||
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
|
||||||
blobs: Vec<Blob<T::EthSpec>>,
|
blobs: Vec<Blob<T::EthSpec>>,
|
||||||
proofs: Vec<KzgProofs<T::EthSpec>>,
|
proofs: Vec<KzgProofs<T::EthSpec>>,
|
||||||
custody_columns_indices: HashSet<ColumnIndex>,
|
custody_columns_indices: HashSet<ColumnIndex>,
|
||||||
publish_fn: impl Fn(EngineGetBlobsOutput<T>) + Send + 'static,
|
|
||||||
) -> Result<Vec<GossipVerifiedDataColumn<T, DoNotObserve>>, FetchEngineBlobError> {
|
) -> Result<Vec<GossipVerifiedDataColumn<T, DoNotObserve>>, FetchEngineBlobError> {
|
||||||
let kzg = chain_adapter.kzg().clone();
|
let kzg = chain_adapter.kzg().clone();
|
||||||
let spec = chain_adapter.spec().clone();
|
let spec = chain_adapter.spec().clone();
|
||||||
@@ -380,13 +389,9 @@ async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
|
|||||||
.collect::<Result<Vec<_>, _>>()
|
.collect::<Result<Vec<_>, _>>()
|
||||||
.map_err(FetchEngineBlobError::GossipDataColumn)?;
|
.map_err(FetchEngineBlobError::GossipDataColumn)?;
|
||||||
|
|
||||||
publish_fn(EngineGetBlobsOutput::CustodyColumns(
|
|
||||||
columns_to_import_and_publish.clone(),
|
|
||||||
));
|
|
||||||
|
|
||||||
Ok(columns_to_import_and_publish)
|
Ok(columns_to_import_and_publish)
|
||||||
},
|
},
|
||||||
"compute_and_publish_data_columns",
|
"compute_custody_columns_to_import",
|
||||||
)
|
)
|
||||||
.ok_or(FetchEngineBlobError::RuntimeShutdown)?
|
.ok_or(FetchEngineBlobError::RuntimeShutdown)?
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use crate::data_column_verification::GossipVerifiedDataColumn;
|
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||||
use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter;
|
use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter;
|
||||||
use crate::fetch_blobs::{
|
use crate::fetch_blobs::{
|
||||||
fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError,
|
fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError,
|
||||||
@@ -139,6 +139,52 @@ async fn test_fetch_blobs_v2_block_imported_after_el_response() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_no_new_columns_to_import() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter();
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// **GIVEN**:
|
||||||
|
// All blobs returned
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||||
|
// block not yet imported into fork choice
|
||||||
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||||
|
// All data columns already seen on gossip
|
||||||
|
mock_adapter
|
||||||
|
.expect_verify_data_column_for_gossip()
|
||||||
|
.returning(|c| {
|
||||||
|
Err(GossipDataColumnError::PriorKnown {
|
||||||
|
proposer: c.block_proposer_index(),
|
||||||
|
slot: c.slot(),
|
||||||
|
index: c.index,
|
||||||
|
})
|
||||||
|
});
|
||||||
|
// No blobs should be processed
|
||||||
|
mock_adapter.expect_process_engine_blobs().times(0);
|
||||||
|
|
||||||
|
// **WHEN**: Trigger `fetch_blobs` on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
// **THEN**: Should NOT be processed and no columns should be published.
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
assert_eq!(
|
||||||
|
publish_fn_args.lock().unwrap().len(),
|
||||||
|
0,
|
||||||
|
"no columns should be published"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn test_fetch_blobs_v2_success() {
|
async fn test_fetch_blobs_v2_success() {
|
||||||
let mut mock_adapter = mock_beacon_adapter();
|
let mut mock_adapter = mock_beacon_adapter();
|
||||||
|
|||||||
Reference in New Issue
Block a user