diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index 9d986f8fb6..74dc680a9a 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -42,6 +42,7 @@ use types::{ /// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the /// gossip network. The blobs / data columns have not been marked as observed yet, as they may not /// be published immediately. +#[derive(Debug)] pub enum EngineGetBlobsOutput { Blobs(Vec>), /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. @@ -163,9 +164,22 @@ async fn fetch_and_process_blobs_v1( inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); return Ok(None); } else { + debug!( + num_expected_blobs, + num_fetched_blobs, "Received blobs from the EL" + ); inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); } + if chain_adapter.fork_choice_contains_block(&block_root) { + // Avoid computing sidecars if the block has already been imported. + debug!( + info = "block has already been imported", + "Ignoring EL blobs response" + ); + return Ok(None); + } + let (signed_block_header, kzg_commitments_proof) = block .signed_block_header_and_kzg_commitments_proof() .map_err(FetchEngineBlobError::BeaconStateError)?; @@ -197,13 +211,13 @@ async fn fetch_and_process_blobs_v1( .collect::, _>>() .map_err(FetchEngineBlobError::GossipBlob)?; - if !blobs_to_import_and_publish.is_empty() { - publish_fn(EngineGetBlobsOutput::Blobs( - blobs_to_import_and_publish.clone(), - )); + if blobs_to_import_and_publish.is_empty() { + return Ok(None); } - debug!(num_fetched_blobs, "Processing engine blobs"); + publish_fn(EngineGetBlobsOutput::Blobs( + blobs_to_import_and_publish.clone(), + )); let availability_processing_status = chain_adapter .process_engine_blobs( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index afee5adcb2..4556948ffc 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -7,227 +7,509 @@ use crate::test_utils::{get_kzg, EphemeralHarnessType}; use crate::AvailabilityProcessingStatus; use bls::Signature; use eth2::types::BlobsBundle; -use execution_layer::json_structures::BlobAndProofV2; +use execution_layer::json_structures::{BlobAndProof, BlobAndProofV1, BlobAndProofV2}; use execution_layer::test_utils::generate_blobs; use maplit::hashset; use std::sync::{Arc, Mutex}; use task_executor::test_utils::TestRuntime; use types::{ - BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, SignedBeaconBlock, - SignedBeaconBlockFulu, + BeaconBlock, BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, + SignedBeaconBlock, SignedBeaconBlockFulu, }; type E = MainnetEthSpec; type T = EphemeralHarnessType; -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_fetch_blobs_v2_no_blobs_in_block() { - let mut mock_adapter = mock_beacon_adapter(); - let (publish_fn, _s) = mock_publish_fn(); - let block = SignedBeaconBlock::::Fulu(SignedBeaconBlockFulu { - message: BeaconBlockFulu::empty(mock_adapter.spec()), - signature: Signature::empty(), - }); - let block_root = block.canonical_root(); +mod get_blobs_v2 { + use super::*; - // Expectations: engine fetch blobs should not be triggered - mock_adapter.expect_get_blobs_v2().times(0); - mock_adapter.expect_process_engine_blobs().times(0); + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v2_no_blobs_in_block() { + let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu); + let (publish_fn, _s) = mock_publish_fn(); + let block = SignedBeaconBlock::::Fulu(SignedBeaconBlockFulu { + message: BeaconBlockFulu::empty(mock_adapter.spec()), + signature: Signature::empty(), + }); + let block_root = block.canonical_root(); - let custody_columns = hashset![0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - Arc::new(block), - custody_columns.clone(), - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + // Expectations: engine fetch blobs should not be triggered + mock_adapter.expect_get_blobs_v2().times(0); + mock_adapter.expect_process_engine_blobs().times(0); - assert_eq!(processing_status, None); + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + Arc::new(block), + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v2_no_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu); + let (publish_fn, _) = mock_publish_fn(); + let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2); + let block_root = block.canonical_root(); + + // No blobs in EL response + mock_get_blobs_v2_response(&mut mock_adapter, None); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v2_partial_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2); + let block_root = block.canonical_root(); + + // Missing blob in EL response + blobs_and_proofs.pop(); + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v2_block_imported_after_el_response() { + let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2); + let block_root = block.canonical_root(); + + // All blobs returned, but fork choice already imported the block + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v2_no_new_columns_to_import() { + let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2); + let block_root = block.canonical_root(); + + // **GIVEN**: + // All blobs returned + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + // block not yet imported into fork choice + mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + // All data columns already seen on gossip + mock_adapter + .expect_verify_data_column_for_gossip() + .returning(|c| { + Err(GossipDataColumnError::PriorKnown { + proposer: c.block_proposer_index(), + slot: c.slot(), + index: c.index, + }) + }); + // No blobs should be processed + mock_adapter.expect_process_engine_blobs().times(0); + + // **WHEN**: Trigger `fetch_blobs` on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + // **THEN**: Should NOT be processed and no columns should be published. + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no columns should be published" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v2_success() { + let mut mock_adapter = mock_beacon_adapter(ForkName::Fulu); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2); + let block_root = block.canonical_root(); + + // All blobs returned, fork choice doesn't contain block + mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); + mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + mock_adapter + .expect_verify_data_column_for_gossip() + .returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c))); + mock_process_engine_blobs_result( + &mut mock_adapter, + Ok(AvailabilityProcessingStatus::Imported(block_root)), + ); + + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns.clone(), + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + assert_eq!( + processing_status, + Some(AvailabilityProcessingStatus::Imported(block_root)) + ); + + let published_columns = extract_published_blobs(publish_fn_args); + assert!( + matches!( + published_columns, + EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len() + ), + "should publish custody columns" + ); + } + + fn mock_get_blobs_v2_response( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + blobs_and_proofs_opt: Option>>, + ) { + let blobs_and_proofs_v2_opt = blobs_and_proofs_opt.map(|blobs_and_proofs| { + blobs_and_proofs + .into_iter() + .map(|blob_and_proof| match blob_and_proof { + BlobAndProof::V2(inner) => inner, + _ => panic!("BlobAndProofV2 not expected"), + }) + .collect() + }); + mock_adapter + .expect_get_blobs_v2() + .return_once(move |_| Ok(blobs_and_proofs_v2_opt)); + } } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_fetch_blobs_v2_no_blobs_returned() { - let mut mock_adapter = mock_beacon_adapter(); - let (publish_fn, _) = mock_publish_fn(); - let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); - let block_root = block.canonical_root(); +mod get_blobs_v1 { + use super::*; + use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; + use crate::block_verification_types::AsBlock; - // No blobs in EL response - mock_get_blobs_v2_response(&mut mock_adapter, None); + const ELECTRA_FORK: ForkName = ForkName::Electra; - // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - custody_columns.clone(), - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v1_no_blobs_in_block() { + let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK); + let spec = mock_adapter.spec(); + let (publish_fn, _s) = mock_publish_fn(); + let block_no_blobs = + SignedBeaconBlock::from_block(BeaconBlock::empty(spec), Signature::empty()); + let block_root = block_no_blobs.canonical_root(); - assert_eq!(processing_status, None); -} + // Expectations: engine fetch blobs should not be triggered + mock_adapter.expect_get_blobs_v1().times(0); -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_fetch_blobs_v2_partial_blobs_returned() { - let mut mock_adapter = mock_beacon_adapter(); - let (publish_fn, publish_fn_args) = mock_publish_fn(); - let (block, mut blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); - let block_root = block.canonical_root(); + // WHEN: Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + Arc::new(block_no_blobs), + custody_columns, + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); - // Missing blob in EL response - blobs_and_proofs.pop(); - mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); - // No blobs should be processed - mock_adapter.expect_process_engine_blobs().times(0); + // THEN: No blob is processed + assert_eq!(processing_status, None); + } - // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - custody_columns.clone(), - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v1_no_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK); + let (publish_fn, _) = mock_publish_fn(); + let (block, _blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2); + let block_root = block.canonical_root(); - assert_eq!(processing_status, None); - assert_eq!( - publish_fn_args.lock().unwrap().len(), - 0, - "no columns should be published" - ); -} + // GIVEN: No blobs in EL response + let expected_blob_count = block.message().body().blob_kzg_commitments().unwrap().len(); + mock_get_blobs_v1_response(&mut mock_adapter, vec![None; expected_blob_count]); -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_fetch_blobs_v2_block_imported_after_el_response() { - let mut mock_adapter = mock_beacon_adapter(); - let (publish_fn, publish_fn_args) = mock_publish_fn(); - let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); - let block_root = block.canonical_root(); + // WHEN: Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns, + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); - // All blobs returned, but fork choice already imported the block - mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); - mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]); - // No blobs should be processed - mock_adapter.expect_process_engine_blobs().times(0); + // THEN: No blob is processed + assert_eq!(processing_status, None); + } - // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - custody_columns.clone(), - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v1_partial_blobs_returned() { + let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let blob_count = 2; + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, blob_count); + let block_slot = block.slot(); + let block_root = block.canonical_root(); - assert_eq!(processing_status, None); - assert_eq!( - publish_fn_args.lock().unwrap().len(), - 0, - "no columns should be published" - ); -} + // GIVEN: Missing a blob in EL response (remove 1 blob from response) + let mut blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::>(); + blob_and_proof_opts.first_mut().unwrap().take(); + mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); + // AND block is not imported into fork choice + mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + // AND all blobs returned are valid + mock_adapter + .expect_verify_blob_for_gossip() + .returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone()))); + // Returned blobs should be processed + mock_process_engine_blobs_result( + &mut mock_adapter, + Ok(AvailabilityProcessingStatus::MissingComponents( + block_slot, block_root, + )), + ); -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_fetch_blobs_v2_no_new_columns_to_import() { - let mut mock_adapter = mock_beacon_adapter(); - let (publish_fn, publish_fn_args) = mock_publish_fn(); - let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); - let block_root = block.canonical_root(); + // WHEN: Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns, + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); - // **GIVEN**: - // All blobs returned - mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); - // block not yet imported into fork choice - mock_fork_choice_contains_block(&mut mock_adapter, vec![]); - // All data columns already seen on gossip - mock_adapter - .expect_verify_data_column_for_gossip() - .returning(|c| { - Err(GossipDataColumnError::PriorKnown { - proposer: c.block_proposer_index(), - slot: c.slot(), - index: c.index, + // THEN: Returned blobs are processed and published + assert_eq!( + processing_status, + Some(AvailabilityProcessingStatus::MissingComponents( + block_slot, block_root, + )) + ); + assert!( + matches!( + extract_published_blobs(publish_fn_args), + EngineGetBlobsOutput::Blobs(blobs) if blobs.len() == blob_count - 1 + ), + "partial blob results should still be published" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v1_block_imported_after_el_response() { + let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2); + let block_root = block.canonical_root(); + + // GIVEN: All blobs returned, but fork choice already imported the block + let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::>(); + mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); + mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]); + + // WHEN: Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns, + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); + + // THEN: Returned blobs should NOT be processed or published. + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no blobs should be published" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v1_no_new_blobs_to_import() { + let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, 2); + let block_root = block.canonical_root(); + + // **GIVEN**: + // All blobs returned + let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::>(); + mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); + // block not yet imported into fork choice + mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + // All blobs already seen on gossip + mock_adapter.expect_verify_blob_for_gossip().returning(|b| { + Err(GossipBlobError::RepeatBlob { + proposer: b.block_proposer_index(), + slot: b.slot(), + index: b.index, }) }); - // No blobs should be processed - mock_adapter.expect_process_engine_blobs().times(0); - // **WHEN**: Trigger `fetch_blobs` on the block - let custody_columns = hashset![0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - custody_columns.clone(), - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + // **WHEN**: Trigger `fetch_blobs` on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns, + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); - // **THEN**: Should NOT be processed and no columns should be published. - assert_eq!(processing_status, None); - assert_eq!( - publish_fn_args.lock().unwrap().len(), - 0, - "no columns should be published" - ); -} + // **THEN**: Should NOT be processed and no blobs should be published. + assert_eq!(processing_status, None); + assert_eq!( + publish_fn_args.lock().unwrap().len(), + 0, + "no blobs should be published" + ); + } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_fetch_blobs_v2_success() { - let mut mock_adapter = mock_beacon_adapter(); - let (publish_fn, publish_fn_args) = mock_publish_fn(); - let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter); - let block_root = block.canonical_root(); + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_fetch_blobs_v1_success() { + let mut mock_adapter = mock_beacon_adapter(ELECTRA_FORK); + let (publish_fn, publish_fn_args) = mock_publish_fn(); + let blob_count = 2; + let (block, blobs_and_proofs) = create_test_block_and_blobs(&mock_adapter, blob_count); + let block_root = block.canonical_root(); - // All blobs returned, fork choice doesn't contain block - mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); - mock_fork_choice_contains_block(&mut mock_adapter, vec![]); - mock_adapter - .expect_verify_data_column_for_gossip() - .returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c))); - mock_process_engine_blobs_result( - &mut mock_adapter, - Ok(AvailabilityProcessingStatus::Imported(block_root)), - ); + // All blobs returned, fork choice doesn't contain block + let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::>(); + mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); + mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + mock_adapter + .expect_verify_blob_for_gossip() + .returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone()))); + mock_process_engine_blobs_result( + &mut mock_adapter, + Ok(AvailabilityProcessingStatus::Imported(block_root)), + ); - // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - custody_columns.clone(), - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + // Trigger fetch blobs on the block + let custody_columns = hashset![0, 1, 2]; + let processing_status = fetch_and_process_engine_blobs_inner( + mock_adapter, + block_root, + block, + custody_columns, + publish_fn, + ) + .await + .expect("fetch blobs should succeed"); - assert_eq!( - processing_status, - Some(AvailabilityProcessingStatus::Imported(block_root)) - ); + // THEN all fetched blobs are processed and published + assert_eq!( + processing_status, + Some(AvailabilityProcessingStatus::Imported(block_root)) + ); - let published_columns = extract_published_blobs(publish_fn_args); - assert!( - matches!( - published_columns, - EngineGetBlobsOutput::CustodyColumns(columns) if columns.len() == custody_columns.len() - ), - "should publish custody columns" - ); + let published_blobs = extract_published_blobs(publish_fn_args); + assert!( + matches!( + published_blobs, + EngineGetBlobsOutput::Blobs(blobs) if blobs.len() == blob_count + ), + "should publish fetched blobs" + ); + } + + fn mock_get_blobs_v1_response( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + blobs_and_proofs_opt: Vec>>, + ) { + let blobs_and_proofs_v1 = blobs_and_proofs_opt + .into_iter() + .map(|blob_and_proof_opt| { + blob_and_proof_opt.map(|blob_and_proof| match blob_and_proof { + BlobAndProof::V1(inner) => inner, + _ => panic!("BlobAndProofV1 not expected"), + }) + }) + .collect(); + mock_adapter + .expect_get_blobs_v1() + .return_once(move |_| Ok(blobs_and_proofs_v1)); + } } /// Extract the `EngineGetBlobsOutput` passed to the `publish_fn`. @@ -257,23 +539,14 @@ fn mock_fork_choice_contains_block( .returning(move |block_root| block_roots.contains(block_root)); } -fn mock_get_blobs_v2_response( - mock_adapter: &mut MockFetchBlobsBeaconAdapter, - blobs_and_proofs_opt: Option>>, -) { - mock_adapter - .expect_get_blobs_v2() - .return_once(move |_| Ok(blobs_and_proofs_opt)); -} - fn create_test_block_and_blobs( mock_adapter: &MockFetchBlobsBeaconAdapter, -) -> (Arc>, Vec>) { - let mut block = SignedBeaconBlock::Fulu(SignedBeaconBlockFulu { - message: BeaconBlockFulu::empty(mock_adapter.spec()), - signature: Signature::empty(), - }); - let (blobs_bundle, _tx) = generate_blobs::(2, block.fork_name_unchecked()).unwrap(); + blob_count: usize, +) -> (Arc>, Vec>) { + let mut block = + SignedBeaconBlock::from_block(BeaconBlock::empty(mock_adapter.spec()), Signature::empty()); + let fork = block.fork_name_unchecked(); + let (blobs_bundle, _tx) = generate_blobs::(blob_count, fork).unwrap(); let BlobsBundle { commitments, proofs, @@ -286,16 +559,27 @@ fn create_test_block_and_blobs( .blob_kzg_commitments_mut() .unwrap() = commitments; - let proofs_len = proofs.len() / blobs.len(); - let blob_and_proofs: Vec> = blobs - .into_iter() - .zip(proofs.chunks(proofs_len)) - .map(|(blob, proofs)| BlobAndProofV2 { - blob, - proofs: proofs.to_vec().into(), - }) - .collect(); - (Arc::new(block), blob_and_proofs) + let blobs_and_proofs = if fork.fulu_enabled() { + let proofs_len = proofs.len() / blobs.len(); + blobs + .into_iter() + .zip(proofs.chunks(proofs_len)) + .map(|(blob, proofs)| { + BlobAndProof::V2(BlobAndProofV2 { + blob, + proofs: proofs.to_vec().into(), + }) + }) + .collect() + } else { + blobs + .into_iter() + .zip(proofs) + .map(|(blob, proof)| BlobAndProof::V1(BlobAndProofV1 { blob, proof })) + .collect() + }; + + (Arc::new(block), blobs_and_proofs) } #[allow(clippy::type_complexity)] @@ -313,9 +597,9 @@ fn mock_publish_fn() -> ( (publish_fn, captured_args) } -fn mock_beacon_adapter() -> MockFetchBlobsBeaconAdapter { +fn mock_beacon_adapter(fork_name: ForkName) -> MockFetchBlobsBeaconAdapter { let test_runtime = TestRuntime::default(); - let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let spec = Arc::new(fork_name.make_genesis_spec(E::default_spec())); let kzg = get_kzg(&spec); let mut mock_adapter = MockFetchBlobsBeaconAdapter::default(); diff --git a/beacon_node/beacon_chain/src/observed_data_sidecars.rs b/beacon_node/beacon_chain/src/observed_data_sidecars.rs index 1ca6c03f00..d3bda09712 100644 --- a/beacon_node/beacon_chain/src/observed_data_sidecars.rs +++ b/beacon_node/beacon_chain/src/observed_data_sidecars.rs @@ -161,6 +161,7 @@ pub trait ObservationStrategy { /// Type for messages that are observed immediately. pub struct Observe; /// Type for messages that have not been observed. +#[derive(Debug)] pub struct DoNotObserve; impl ObservationStrategy for Observe {