mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-19 05:48:31 +00:00
Fix unexpected blob error and duplicate import in fetch blobs (#7541)
Getting this error on a non-PeerDAS network:
```
May 29 13:30:13.484 ERROR Error fetching or processing blobs from EL error: BlobProcessingError(AvailabilityCheck(Unexpected("empty blobs"))), block_root: 0x98aa3927056d453614fefbc79eb1f9865666d1f119d0e8aa9e6f4d02aa9395d9
```
It appears we're passing an empty `Vec` to DA checker, because all blobs were already seen on gossip and filtered out, this causes a `AvailabilityCheckError::Unexpected("empty blobs")`.
I've added equivalent unit tests for `getBlobsV1` to cover all the scenarios we test in `getBlobsV2`. This would have caught the bug if I had added it earlier. It also caught another bug which could trigger duplicate block import.
Thanks Santito for reporting this! 🙏
This commit is contained in:
@@ -42,6 +42,7 @@ use types::{
|
|||||||
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
|
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
|
||||||
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
|
/// gossip network. The blobs / data columns have not been marked as observed yet, as they may not
|
||||||
/// be published immediately.
|
/// be published immediately.
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
|
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
|
||||||
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
|
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
|
||||||
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
|
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
|
||||||
@@ -163,9 +164,22 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
|||||||
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
|
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
} else {
|
} else {
|
||||||
|
debug!(
|
||||||
|
num_expected_blobs,
|
||||||
|
num_fetched_blobs, "Received blobs from the EL"
|
||||||
|
);
|
||||||
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if chain_adapter.fork_choice_contains_block(&block_root) {
|
||||||
|
// Avoid computing sidecars if the block has already been imported.
|
||||||
|
debug!(
|
||||||
|
info = "block has already been imported",
|
||||||
|
"Ignoring EL blobs response"
|
||||||
|
);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
let (signed_block_header, kzg_commitments_proof) = block
|
let (signed_block_header, kzg_commitments_proof) = block
|
||||||
.signed_block_header_and_kzg_commitments_proof()
|
.signed_block_header_and_kzg_commitments_proof()
|
||||||
.map_err(FetchEngineBlobError::BeaconStateError)?;
|
.map_err(FetchEngineBlobError::BeaconStateError)?;
|
||||||
@@ -197,13 +211,13 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
|
|||||||
.collect::<Result<Vec<_>, _>>()
|
.collect::<Result<Vec<_>, _>>()
|
||||||
.map_err(FetchEngineBlobError::GossipBlob)?;
|
.map_err(FetchEngineBlobError::GossipBlob)?;
|
||||||
|
|
||||||
if !blobs_to_import_and_publish.is_empty() {
|
if blobs_to_import_and_publish.is_empty() {
|
||||||
publish_fn(EngineGetBlobsOutput::Blobs(
|
return Ok(None);
|
||||||
blobs_to_import_and_publish.clone(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(num_fetched_blobs, "Processing engine blobs");
|
publish_fn(EngineGetBlobsOutput::Blobs(
|
||||||
|
blobs_to_import_and_publish.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
let availability_processing_status = chain_adapter
|
let availability_processing_status = chain_adapter
|
||||||
.process_engine_blobs(
|
.process_engine_blobs(
|
||||||
|
|||||||
@@ -7,227 +7,509 @@ use crate::test_utils::{get_kzg, EphemeralHarnessType};
|
|||||||
use crate::AvailabilityProcessingStatus;
|
use crate::AvailabilityProcessingStatus;
|
||||||
use bls::Signature;
|
use bls::Signature;
|
||||||
use eth2::types::BlobsBundle;
|
use eth2::types::BlobsBundle;
|
||||||
use execution_layer::json_structures::BlobAndProofV2;
|
use execution_layer::json_structures::{BlobAndProof, BlobAndProofV1, BlobAndProofV2};
|
||||||
use execution_layer::test_utils::generate_blobs;
|
use execution_layer::test_utils::generate_blobs;
|
||||||
use maplit::hashset;
|
use maplit::hashset;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use task_executor::test_utils::TestRuntime;
|
use task_executor::test_utils::TestRuntime;
|
||||||
use types::{
|
use types::{
|
||||||
BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock,
|
BeaconBlock, BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec,
|
||||||
SignedBeaconBlockFulu,
|
SignedBeaconBlock, SignedBeaconBlockFulu,
|
||||||
};
|
};
|
||||||
|
|
||||||
type E = MainnetEthSpec;
|
type E = MainnetEthSpec;
|
||||||
type T = EphemeralHarnessType<E>;
|
type T = EphemeralHarnessType<E>;
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
mod get_blobs_v2 {
|
||||||
async fn test_fetch_blobs_v2_no_blobs_in_block() {
|
use super::*;
|
||||||
let mut mock_adapter = mock_beacon_adapter();
|
|
||||||
let (publish_fn, _s) = mock_publish_fn();
|
|
||||||
let block = SignedBeaconBlock::<E>::Fulu(SignedBeaconBlockFulu {
|
|
||||||
message: BeaconBlockFulu::empty(mock_adapter.spec()),
|
|
||||||
signature: Signature::empty(),
|
|
||||||
});
|
|
||||||
let block_root = block.canonical_root();
|
|
||||||
|
|
||||||
// Expectations: engine fetch blobs should not be triggered
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
mock_adapter.expect_get_blobs_v2().times(0);
|
async fn test_fetch_blobs_v2_no_blobs_in_block() {
|
||||||
mock_adapter.expect_process_engine_blobs().times(0);
|
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||||
|
let (publish_fn, _s) = mock_publish_fn();
|
||||||
|
let block = SignedBeaconBlock::<E>::Fulu(SignedBeaconBlockFulu {
|
||||||
|
message: BeaconBlockFulu::empty(mock_adapter.spec()),
|
||||||
|
signature: Signature::empty(),
|
||||||
|
});
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
let custody_columns = hashset![0, 1, 2];
|
// Expectations: engine fetch blobs should not be triggered
|
||||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
mock_adapter.expect_get_blobs_v2().times(0);
|
||||||
mock_adapter,
|
mock_adapter.expect_process_engine_blobs().times(0);
|
||||||
block_root,
|
|
||||||
Arc::new(block),
|
|
||||||
custody_columns.clone(),
|
|
||||||
publish_fn,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("fetch blobs should succeed");
|
|
||||||
|
|
||||||
assert_eq!(processing_status, None);
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
Arc::new(block),
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_no_blobs_returned() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||||
|
let (publish_fn, _) = mock_publish_fn();
|
||||||
|
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// No blobs in EL response
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, None);
|
||||||
|
|
||||||
|
// Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_partial_blobs_returned() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// Missing blob in EL response
|
||||||
|
blobs_and_proofs.pop();
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||||
|
// No blobs should be processed
|
||||||
|
mock_adapter.expect_process_engine_blobs().times(0);
|
||||||
|
|
||||||
|
// Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
assert_eq!(
|
||||||
|
publish_fn_args.lock().unwrap().len(),
|
||||||
|
0,
|
||||||
|
"no columns should be published"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_block_imported_after_el_response() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// All blobs returned, but fork choice already imported the block
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||||
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
|
||||||
|
// No blobs should be processed
|
||||||
|
mock_adapter.expect_process_engine_blobs().times(0);
|
||||||
|
|
||||||
|
// Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
assert_eq!(
|
||||||
|
publish_fn_args.lock().unwrap().len(),
|
||||||
|
0,
|
||||||
|
"no columns should be published"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_no_new_columns_to_import() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// **GIVEN**:
|
||||||
|
// All blobs returned
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||||
|
// block not yet imported into fork choice
|
||||||
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||||
|
// All data columns already seen on gossip
|
||||||
|
mock_adapter
|
||||||
|
.expect_verify_data_column_for_gossip()
|
||||||
|
.returning(|c| {
|
||||||
|
Err(GossipDataColumnError::PriorKnown {
|
||||||
|
proposer: c.block_proposer_index(),
|
||||||
|
slot: c.slot(),
|
||||||
|
index: c.index,
|
||||||
|
})
|
||||||
|
});
|
||||||
|
// No blobs should be processed
|
||||||
|
mock_adapter.expect_process_engine_blobs().times(0);
|
||||||
|
|
||||||
|
// **WHEN**: Trigger `fetch_blobs` on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
// **THEN**: Should NOT be processed and no columns should be published.
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
assert_eq!(
|
||||||
|
publish_fn_args.lock().unwrap().len(),
|
||||||
|
0,
|
||||||
|
"no columns should be published"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v2_success() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu);
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// All blobs returned, fork choice doesn't contain block
|
||||||
|
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
||||||
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||||
|
mock_adapter
|
||||||
|
.expect_verify_data_column_for_gossip()
|
||||||
|
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
|
||||||
|
mock_process_engine_blobs_result(
|
||||||
|
&mut mock_adapter,
|
||||||
|
Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns.clone(),
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
processing_status,
|
||||||
|
Some(AvailabilityProcessingStatus::Imported(block_root))
|
||||||
|
);
|
||||||
|
|
||||||
|
let published_columns = extract_published_blobs(publish_fn_args);
|
||||||
|
assert!(
|
||||||
|
matches!(
|
||||||
|
published_columns,
|
||||||
|
EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len()
|
||||||
|
),
|
||||||
|
"should publish custody columns"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mock_get_blobs_v2_response(
|
||||||
|
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
||||||
|
blobs_and_proofs_opt: Option<Vec<BlobAndProof<E>>>,
|
||||||
|
) {
|
||||||
|
let blobs_and_proofs_v2_opt = blobs_and_proofs_opt.map(|blobs_and_proofs| {
|
||||||
|
blobs_and_proofs
|
||||||
|
.into_iter()
|
||||||
|
.map(|blob_and_proof| match blob_and_proof {
|
||||||
|
BlobAndProof::V2(inner) => inner,
|
||||||
|
_ => panic!("BlobAndProofV2 not expected"),
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
});
|
||||||
|
mock_adapter
|
||||||
|
.expect_get_blobs_v2()
|
||||||
|
.return_once(move |_| Ok(blobs_and_proofs_v2_opt));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
mod get_blobs_v1 {
|
||||||
async fn test_fetch_blobs_v2_no_blobs_returned() {
|
use super::*;
|
||||||
let mut mock_adapter = mock_beacon_adapter();
|
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
|
||||||
let (publish_fn, _) = mock_publish_fn();
|
use crate::block_verification_types::AsBlock;
|
||||||
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
|
||||||
let block_root = block.canonical_root();
|
|
||||||
|
|
||||||
// No blobs in EL response
|
const ELECTRA_FORK: ForkName = ForkName::Electra;
|
||||||
mock_get_blobs_v2_response(&mut mock_adapter, None);
|
|
||||||
|
|
||||||
// Trigger fetch blobs on the block
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
let custody_columns = hashset![0, 1, 2];
|
async fn test_fetch_blobs_v1_no_blobs_in_block() {
|
||||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||||
mock_adapter,
|
let spec = mock_adapter.spec();
|
||||||
block_root,
|
let (publish_fn, _s) = mock_publish_fn();
|
||||||
block,
|
let block_no_blobs =
|
||||||
custody_columns.clone(),
|
SignedBeaconBlock::from_block(BeaconBlock::empty(spec), Signature::empty());
|
||||||
publish_fn,
|
let block_root = block_no_blobs.canonical_root();
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("fetch blobs should succeed");
|
|
||||||
|
|
||||||
assert_eq!(processing_status, None);
|
// Expectations: engine fetch blobs should not be triggered
|
||||||
}
|
mock_adapter.expect_get_blobs_v1().times(0);
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
// WHEN: Trigger fetch blobs on the block
|
||||||
async fn test_fetch_blobs_v2_partial_blobs_returned() {
|
let custody_columns = hashset![0, 1, 2];
|
||||||
let mut mock_adapter = mock_beacon_adapter();
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
mock_adapter,
|
||||||
let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
block_root,
|
||||||
let block_root = block.canonical_root();
|
Arc::new(block_no_blobs),
|
||||||
|
custody_columns,
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
// Missing blob in EL response
|
// THEN: No blob is processed
|
||||||
blobs_and_proofs.pop();
|
assert_eq!(processing_status, None);
|
||||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
}
|
||||||
// No blobs should be processed
|
|
||||||
mock_adapter.expect_process_engine_blobs().times(0);
|
|
||||||
|
|
||||||
// Trigger fetch blobs on the block
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
let custody_columns = hashset![0, 1, 2];
|
async fn test_fetch_blobs_v1_no_blobs_returned() {
|
||||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||||
mock_adapter,
|
let (publish_fn, _) = mock_publish_fn();
|
||||||
block_root,
|
let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||||
block,
|
let block_root = block.canonical_root();
|
||||||
custody_columns.clone(),
|
|
||||||
publish_fn,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("fetch blobs should succeed");
|
|
||||||
|
|
||||||
assert_eq!(processing_status, None);
|
// GIVEN: No blobs in EL response
|
||||||
assert_eq!(
|
let expected_blob_count = block.message().body().blob_kzg_commitments().unwrap().len();
|
||||||
publish_fn_args.lock().unwrap().len(),
|
mock_get_blobs_v1_response(&mut mock_adapter, vec![None; expected_blob_count]);
|
||||||
0,
|
|
||||||
"no columns should be published"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
// WHEN: Trigger fetch blobs on the block
|
||||||
async fn test_fetch_blobs_v2_block_imported_after_el_response() {
|
let custody_columns = hashset![0, 1, 2];
|
||||||
let mut mock_adapter = mock_beacon_adapter();
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
mock_adapter,
|
||||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
block_root,
|
||||||
let block_root = block.canonical_root();
|
block,
|
||||||
|
custody_columns,
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
// All blobs returned, but fork choice already imported the block
|
// THEN: No blob is processed
|
||||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
assert_eq!(processing_status, None);
|
||||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
|
}
|
||||||
// No blobs should be processed
|
|
||||||
mock_adapter.expect_process_engine_blobs().times(0);
|
|
||||||
|
|
||||||
// Trigger fetch blobs on the block
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
let custody_columns = hashset![0, 1, 2];
|
async fn test_fetch_blobs_v1_partial_blobs_returned() {
|
||||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||||
mock_adapter,
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
block_root,
|
let blob_count = 2;
|
||||||
block,
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, blob_count);
|
||||||
custody_columns.clone(),
|
let block_slot = block.slot();
|
||||||
publish_fn,
|
let block_root = block.canonical_root();
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("fetch blobs should succeed");
|
|
||||||
|
|
||||||
assert_eq!(processing_status, None);
|
// GIVEN: Missing a blob in EL response (remove 1 blob from response)
|
||||||
assert_eq!(
|
let mut blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
|
||||||
publish_fn_args.lock().unwrap().len(),
|
blob_and_proof_opts.first_mut().unwrap().take();
|
||||||
0,
|
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
|
||||||
"no columns should be published"
|
// AND block is not imported into fork choice
|
||||||
);
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||||
}
|
// AND all blobs returned are valid
|
||||||
|
mock_adapter
|
||||||
|
.expect_verify_blob_for_gossip()
|
||||||
|
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
|
||||||
|
// Returned blobs should be processed
|
||||||
|
mock_process_engine_blobs_result(
|
||||||
|
&mut mock_adapter,
|
||||||
|
Ok(AvailabilityProcessingStatus::MissingComponents(
|
||||||
|
block_slot, block_root,
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
// WHEN: Trigger fetch blobs on the block
|
||||||
async fn test_fetch_blobs_v2_no_new_columns_to_import() {
|
let custody_columns = hashset![0, 1, 2];
|
||||||
let mut mock_adapter = mock_beacon_adapter();
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
mock_adapter,
|
||||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
block_root,
|
||||||
let block_root = block.canonical_root();
|
block,
|
||||||
|
custody_columns,
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
// **GIVEN**:
|
// THEN: Returned blobs are processed and published
|
||||||
// All blobs returned
|
assert_eq!(
|
||||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
processing_status,
|
||||||
// block not yet imported into fork choice
|
Some(AvailabilityProcessingStatus::MissingComponents(
|
||||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
block_slot, block_root,
|
||||||
// All data columns already seen on gossip
|
))
|
||||||
mock_adapter
|
);
|
||||||
.expect_verify_data_column_for_gossip()
|
assert!(
|
||||||
.returning(|c| {
|
matches!(
|
||||||
Err(GossipDataColumnError::PriorKnown {
|
extract_published_blobs(publish_fn_args),
|
||||||
proposer: c.block_proposer_index(),
|
EngineGetBlobsOutput::Blobs(blobs) if blobs.len() == blob_count - 1
|
||||||
slot: c.slot(),
|
),
|
||||||
index: c.index,
|
"partial blob results should still be published"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v1_block_imported_after_el_response() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// GIVEN: All blobs returned, but fork choice already imported the block
|
||||||
|
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
|
||||||
|
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
|
||||||
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]);
|
||||||
|
|
||||||
|
// WHEN: Trigger fetch blobs on the block
|
||||||
|
let custody_columns = hashset![0, 1, 2];
|
||||||
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
|
mock_adapter,
|
||||||
|
block_root,
|
||||||
|
block,
|
||||||
|
custody_columns,
|
||||||
|
publish_fn,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
|
// THEN: Returned blobs should NOT be processed or published.
|
||||||
|
assert_eq!(processing_status, None);
|
||||||
|
assert_eq!(
|
||||||
|
publish_fn_args.lock().unwrap().len(),
|
||||||
|
0,
|
||||||
|
"no blobs should be published"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_fetch_blobs_v1_no_new_blobs_to_import() {
|
||||||
|
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||||
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
|
// **GIVEN**:
|
||||||
|
// All blobs returned
|
||||||
|
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
|
||||||
|
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
|
||||||
|
// block not yet imported into fork choice
|
||||||
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||||
|
// All blobs already seen on gossip
|
||||||
|
mock_adapter.expect_verify_blob_for_gossip().returning(|b| {
|
||||||
|
Err(GossipBlobError::RepeatBlob {
|
||||||
|
proposer: b.block_proposer_index(),
|
||||||
|
slot: b.slot(),
|
||||||
|
index: b.index,
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
// No blobs should be processed
|
|
||||||
mock_adapter.expect_process_engine_blobs().times(0);
|
|
||||||
|
|
||||||
// **WHEN**: Trigger `fetch_blobs` on the block
|
// **WHEN**: Trigger `fetch_blobs` on the block
|
||||||
let custody_columns = hashset![0, 1, 2];
|
let custody_columns = hashset![0, 1, 2];
|
||||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
mock_adapter,
|
mock_adapter,
|
||||||
block_root,
|
block_root,
|
||||||
block,
|
block,
|
||||||
custody_columns.clone(),
|
custody_columns,
|
||||||
publish_fn,
|
publish_fn,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("fetch blobs should succeed");
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
// **THEN**: Should NOT be processed and no columns should be published.
|
// **THEN**: Should NOT be processed and no blobs should be published.
|
||||||
assert_eq!(processing_status, None);
|
assert_eq!(processing_status, None);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
publish_fn_args.lock().unwrap().len(),
|
publish_fn_args.lock().unwrap().len(),
|
||||||
0,
|
0,
|
||||||
"no columns should be published"
|
"no blobs should be published"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn test_fetch_blobs_v2_success() {
|
async fn test_fetch_blobs_v1_success() {
|
||||||
let mut mock_adapter = mock_beacon_adapter();
|
let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK);
|
||||||
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
let (publish_fn, publish_fn_args) = mock_publish_fn();
|
||||||
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter);
|
let blob_count = 2;
|
||||||
let block_root = block.canonical_root();
|
let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, blob_count);
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
// All blobs returned, fork choice doesn't contain block
|
// All blobs returned, fork choice doesn't contain block
|
||||||
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
|
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
|
||||||
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
|
||||||
mock_adapter
|
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
|
||||||
.expect_verify_data_column_for_gossip()
|
mock_adapter
|
||||||
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
|
.expect_verify_blob_for_gossip()
|
||||||
mock_process_engine_blobs_result(
|
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone())));
|
||||||
&mut mock_adapter,
|
mock_process_engine_blobs_result(
|
||||||
Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
&mut mock_adapter,
|
||||||
);
|
Ok(AvailabilityProcessingStatus::Imported(block_root)),
|
||||||
|
);
|
||||||
|
|
||||||
// Trigger fetch blobs on the block
|
// Trigger fetch blobs on the block
|
||||||
let custody_columns = hashset![0, 1, 2];
|
let custody_columns = hashset![0, 1, 2];
|
||||||
let processing_status = fetch_and_process_engine_blobs_inner(
|
let processing_status = fetch_and_process_engine_blobs_inner(
|
||||||
mock_adapter,
|
mock_adapter,
|
||||||
block_root,
|
block_root,
|
||||||
block,
|
block,
|
||||||
custody_columns.clone(),
|
custody_columns,
|
||||||
publish_fn,
|
publish_fn,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("fetch blobs should succeed");
|
.expect("fetch blobs should succeed");
|
||||||
|
|
||||||
assert_eq!(
|
// THEN all fetched blobs are processed and published
|
||||||
processing_status,
|
assert_eq!(
|
||||||
Some(AvailabilityProcessingStatus::Imported(block_root))
|
processing_status,
|
||||||
);
|
Some(AvailabilityProcessingStatus::Imported(block_root))
|
||||||
|
);
|
||||||
|
|
||||||
let published_columns = extract_published_blobs(publish_fn_args);
|
let published_blobs = extract_published_blobs(publish_fn_args);
|
||||||
assert!(
|
assert!(
|
||||||
matches!(
|
matches!(
|
||||||
published_columns,
|
published_blobs,
|
||||||
EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len()
|
EngineGetBlobsOutput::Blobs(blobs) if blobs.len() == blob_count
|
||||||
),
|
),
|
||||||
"should publish custody columns"
|
"should publish fetched blobs"
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mock_get_blobs_v1_response(
|
||||||
|
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
||||||
|
blobs_and_proofs_opt: Vec<Option<BlobAndProof<E>>>,
|
||||||
|
) {
|
||||||
|
let blobs_and_proofs_v1 = blobs_and_proofs_opt
|
||||||
|
.into_iter()
|
||||||
|
.map(|blob_and_proof_opt| {
|
||||||
|
blob_and_proof_opt.map(|blob_and_proof| match blob_and_proof {
|
||||||
|
BlobAndProof::V1(inner) => inner,
|
||||||
|
_ => panic!("BlobAndProofV1 not expected"),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
mock_adapter
|
||||||
|
.expect_get_blobs_v1()
|
||||||
|
.return_once(move |_| Ok(blobs_and_proofs_v1));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract the `EngineGetBlobsOutput` passed to the `publish_fn`.
|
/// Extract the `EngineGetBlobsOutput` passed to the `publish_fn`.
|
||||||
@@ -257,23 +539,14 @@ fn mock_fork_choice_contains_block(
|
|||||||
.returning(move |block_root| block_roots.contains(block_root));
|
.returning(move |block_root| block_roots.contains(block_root));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mock_get_blobs_v2_response(
|
|
||||||
mock_adapter: &mut MockFetchBlobsBeaconAdapter<T>,
|
|
||||||
blobs_and_proofs_opt: Option<Vec<BlobAndProofV2<E>>>,
|
|
||||||
) {
|
|
||||||
mock_adapter
|
|
||||||
.expect_get_blobs_v2()
|
|
||||||
.return_once(move |_| Ok(blobs_and_proofs_opt));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_test_block_and_blobs(
|
fn create_test_block_and_blobs(
|
||||||
mock_adapter: &MockFetchBlobsBeaconAdapter<T>,
|
mock_adapter: &MockFetchBlobsBeaconAdapter<T>,
|
||||||
) -> (Arc<SignedBeaconBlock<E>>, Vec<BlobAndProofV2<E>>) {
|
blob_count: usize,
|
||||||
let mut block = SignedBeaconBlock::Fulu(SignedBeaconBlockFulu {
|
) -> (Arc<SignedBeaconBlock<E>>, Vec<BlobAndProof<E>>) {
|
||||||
message: BeaconBlockFulu::empty(mock_adapter.spec()),
|
let mut block =
|
||||||
signature: Signature::empty(),
|
SignedBeaconBlock::from_block(BeaconBlock::empty(mock_adapter.spec()), Signature::empty());
|
||||||
});
|
let fork = block.fork_name_unchecked();
|
||||||
let (blobs_bundle, _tx) = generate_blobs::<E>(2, block.fork_name_unchecked()).unwrap();
|
let (blobs_bundle, _tx) = generate_blobs::<E>(blob_count, fork).unwrap();
|
||||||
let BlobsBundle {
|
let BlobsBundle {
|
||||||
commitments,
|
commitments,
|
||||||
proofs,
|
proofs,
|
||||||
@@ -286,16 +559,27 @@ fn create_test_block_and_blobs(
|
|||||||
.blob_kzg_commitments_mut()
|
.blob_kzg_commitments_mut()
|
||||||
.unwrap() = commitments;
|
.unwrap() = commitments;
|
||||||
|
|
||||||
let proofs_len = proofs.len() / blobs.len();
|
let blobs_and_proofs = if fork.fulu_enabled() {
|
||||||
let blob_and_proofs: Vec<BlobAndProofV2<E>> = blobs
|
let proofs_len = proofs.len() / blobs.len();
|
||||||
.into_iter()
|
blobs
|
||||||
.zip(proofs.chunks(proofs_len))
|
.into_iter()
|
||||||
.map(|(blob, proofs)| BlobAndProofV2 {
|
.zip(proofs.chunks(proofs_len))
|
||||||
blob,
|
.map(|(blob, proofs)| {
|
||||||
proofs: proofs.to_vec().into(),
|
BlobAndProof::V2(BlobAndProofV2 {
|
||||||
})
|
blob,
|
||||||
.collect();
|
proofs: proofs.to_vec().into(),
|
||||||
(Arc::new(block), blob_and_proofs)
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
blobs
|
||||||
|
.into_iter()
|
||||||
|
.zip(proofs)
|
||||||
|
.map(|(blob, proof)| BlobAndProof::V1(BlobAndProofV1 { blob, proof }))
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
(Arc::new(block), blobs_and_proofs)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
@@ -313,9 +597,9 @@ fn mock_publish_fn() -> (
|
|||||||
(publish_fn, captured_args)
|
(publish_fn, captured_args)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter<T> {
|
fn mock_beacon_adapter(fork_name: ForkName) -> MockFetchBlobsBeaconAdapter<T> {
|
||||||
let test_runtime = TestRuntime::default();
|
let test_runtime = TestRuntime::default();
|
||||||
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
|
let spec = Arc::new(fork_name.make_genesis_spec(E::default_spec()));
|
||||||
let kzg = get_kzg(&spec);
|
let kzg = get_kzg(&spec);
|
||||||
|
|
||||||
let mut mock_adapter = MockFetchBlobsBeaconAdapter::default();
|
let mut mock_adapter = MockFetchBlobsBeaconAdapter::default();
|
||||||
|
|||||||
@@ -161,6 +161,7 @@ pub trait ObservationStrategy {
|
|||||||
/// Type for messages that are observed immediately.
|
/// Type for messages that are observed immediately.
|
||||||
pub struct Observe;
|
pub struct Observe;
|
||||||
/// Type for messages that have not been observed.
|
/// Type for messages that have not been observed.
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct DoNotObserve;
|
pub struct DoNotObserve;
|
||||||
|
|
||||||
impl ObservationStrategy for Observe {
|
impl ObservationStrategy for Observe {
|
||||||
|
|||||||
Reference in New Issue
Block a user