From 3fab6a2c0ba702c20c38d3083a3c533ea647dcac Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 24 Feb 2025 01:47:09 -0300 Subject: [PATCH] Block availability data enum (#6866) PeerDAS has undergone multiple refactors + the blending with the get_blobs optimization has generated technical debt. A function signature like this https://github.com/sigp/lighthouse/blob/f008b84079bbb6eb86de22bb3421dfc8263a5650/beacon_node/beacon_chain/src/beacon_chain.rs#L7171-L7178 Allows at least the following combination of states: - blobs: Some / None - data_columns: Some / None - data_column_recv: Some / None - Block has data? Yes / No - Block post-PeerDAS? Yes / No In reality, we don't have that many possible states, only: - `NoData`: pre-deneb, pre-PeerDAS with 0 blobs or post-PeerDAS with 0 blobs - `Blobs(BlobSidecarList)`: post-Deneb pre-PeerDAS with > 0 blobs - `DataColumns(DataColumnSidecarList)`: post-PeerDAS with > 0 blobs - `DataColumnsRecv(oneshot::Receiver>)`: post-PeerDAS with > 0 blobs, but we obtained the columns via reconstruction ^ this are the variants of the new `AvailableBlockData` enum So we go from 2^5 states to 4 well-defined. Downstream code benefits nicely from this clarity and I think it makes the whole feature much more maintainable. Currently `is_available` returns a bool, and then we construct the available block in `make_available`. In a way the availability condition is duplicated in both functions. Instead, this PR constructs `AvailableBlockData` in `is_available` so the availability conditions are written once ```rust if let Some(block_data) = is_available(..) { let available_block = make_available(block_data); } ``` --- beacon_node/beacon_chain/src/beacon_chain.rs | 110 ++--- .../beacon_chain/src/block_verification.rs | 1 - .../src/block_verification_types.rs | 16 +- .../src/data_availability_checker.rs | 153 ++++--- .../src/data_availability_checker/error.rs | 4 +- .../overflow_lru_cache.rs | 429 ++++++++++-------- .../state_lru_cache.rs | 1 - .../beacon_chain/src/early_attester_cache.rs | 17 +- .../beacon_chain/src/historical_blocks.rs | 38 +- .../tests/attestation_production.rs | 4 +- beacon_node/beacon_chain/tests/store_tests.rs | 20 +- 11 files changed, 432 insertions(+), 361 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b89dbe3dca..ad31e085ca 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -21,8 +21,8 @@ use crate::block_verification_types::{ pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ - Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, - DataColumnReconstructionResult, + Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData, + DataAvailabilityChecker, DataColumnReconstructionResult, }; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; @@ -3169,7 +3169,14 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } - self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); + // process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS + // consumers don't expect the blobs event to fire erratically. + if !self + .spec + .is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())) + { + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); + } let r = self .check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv) @@ -3640,9 +3647,12 @@ impl BeaconChain { data_column_recv: Option>>, ) -> Result { self.check_blobs_for_slashability(block_root, &blobs)?; - let availability = - self.data_availability_checker - .put_engine_blobs(block_root, blobs, data_column_recv)?; + let availability = self.data_availability_checker.put_engine_blobs( + block_root, + slot.epoch(T::EthSpec::slots_per_epoch()), + blobs, + data_column_recv, + )?; self.process_availability(slot, availability, || Ok(())) .await @@ -3727,7 +3737,6 @@ impl BeaconChain { parent_eth1_finalization_data, confirmed_state_roots, consensus_context, - data_column_recv, } = import_data; // Record the time at which this block's blobs became available. @@ -3755,7 +3764,6 @@ impl BeaconChain { parent_block, parent_eth1_finalization_data, consensus_context, - data_column_recv, ) }, "payload_verification_handle", @@ -3794,7 +3802,6 @@ impl BeaconChain { parent_block: SignedBlindedBeaconBlock, parent_eth1_finalization_data: Eth1FinalizationData, mut consensus_context: ConsensusContext, - data_column_recv: Option>>, ) -> Result { // ----------------------------- BLOCK NOT YET ATTESTABLE ---------------------------------- // Everything in this initial section is on the hot path between processing the block and @@ -3892,7 +3899,7 @@ impl BeaconChain { if let Some(proto_block) = fork_choice.get_block(&block_root) { if let Err(e) = self.early_attester_cache.add_head_block( block_root, - signed_block.clone(), + &signed_block, proto_block, &state, &self.spec, @@ -3961,15 +3968,9 @@ impl BeaconChain { // If the write fails, revert fork choice to the version from disk, else we can // end up with blocks in fork choice that are missing from disk. // See https://github.com/sigp/lighthouse/issues/2028 - let (_, signed_block, blobs, data_columns) = signed_block.deconstruct(); + let (_, signed_block, block_data) = signed_block.deconstruct(); - match self.get_blobs_or_columns_store_op( - block_root, - signed_block.epoch(), - blobs, - data_columns, - data_column_recv, - ) { + match self.get_blobs_or_columns_store_op(block_root, block_data) { Ok(Some(blobs_or_columns_store_op)) => { ops.push(blobs_or_columns_store_op); } @@ -7218,29 +7219,34 @@ impl BeaconChain { } } - fn get_blobs_or_columns_store_op( + pub(crate) fn get_blobs_or_columns_store_op( &self, block_root: Hash256, - block_epoch: Epoch, - blobs: Option>, - data_columns: Option>, - data_column_recv: Option>>, + block_data: AvailableBlockData, ) -> Result>, String> { - if self.spec.is_peer_das_enabled_for_epoch(block_epoch) { - // TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non - // custody columns: https://github.com/sigp/lighthouse/issues/6465 - let custody_columns_count = self.data_availability_checker.get_sampling_column_count(); + // TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non + // custody columns: https://github.com/sigp/lighthouse/issues/6465 + let _custody_columns_count = self.data_availability_checker.get_sampling_column_count(); - let custody_columns_available = data_columns - .as_ref() - .as_ref() - .is_some_and(|columns| columns.len() == custody_columns_count); - - let data_columns_to_persist = if custody_columns_available { - // If the block was made available via custody columns received from gossip / rpc, use them - // since we already have them. - data_columns - } else if let Some(data_column_recv) = data_column_recv { + match block_data { + AvailableBlockData::NoData => Ok(None), + AvailableBlockData::Blobs(blobs) => { + debug!( + self.log, "Writing blobs to store"; + "block_root" => %block_root, + "count" => blobs.len(), + ); + Ok(Some(StoreOp::PutBlobs(block_root, blobs))) + } + AvailableBlockData::DataColumns(data_columns) => { + debug!( + self.log, "Writing data columns to store"; + "block_root" => %block_root, + "count" => data_columns.len(), + ); + Ok(Some(StoreOp::PutDataColumns(block_root, data_columns))) + } + AvailableBlockData::DataColumnsRecv(data_column_recv) => { // Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking). let _column_recv_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT); @@ -7250,34 +7256,18 @@ impl BeaconChain { let computed_data_columns = data_column_recv .blocking_recv() .map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?; - Some(computed_data_columns) - } else { - // No blobs in the block. - None - }; - - if let Some(data_columns) = data_columns_to_persist { - if !data_columns.is_empty() { - debug!( - self.log, "Writing data_columns to store"; - "block_root" => %block_root, - "count" => data_columns.len(), - ); - return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns))); - } - } - } else if let Some(blobs) = blobs { - if !blobs.is_empty() { debug!( - self.log, "Writing blobs to store"; + self.log, "Writing data columns to store"; "block_root" => %block_root, - "count" => blobs.len(), + "count" => computed_data_columns.len(), ); - return Ok(Some(StoreOp::PutBlobs(block_root, blobs))); + // TODO(das): Store only this node's custody columns + Ok(Some(StoreOp::PutDataColumns( + block_root, + computed_data_columns, + ))) } } - - Ok(None) } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 1265276376..9a8def585f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1707,7 +1707,6 @@ impl ExecutionPendingBlock { parent_eth1_finalization_data, confirmed_state_roots, consensus_context, - data_column_recv: None, }, payload_verification_handle, }) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 38d0fc708c..07ffae7712 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -7,11 +7,10 @@ use derivative::Derivative; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use tokio::sync::oneshot; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList, - Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec, + Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; /// A block that has been received over RPC. It has 2 internal variants: @@ -265,7 +264,6 @@ impl ExecutedBlock { /// A block that has completed all pre-deneb block processing checks including verification /// by an EL client **and** has all requisite blob data to be imported into fork choice. -#[derive(PartialEq)] pub struct AvailableExecutedBlock { pub block: AvailableBlock, pub import_data: BlockImportData, @@ -338,8 +336,7 @@ impl AvailabilityPendingExecutedBlock { } } -#[derive(Debug, Derivative)] -#[derivative(PartialEq)] +#[derive(Debug, PartialEq)] pub struct BlockImportData { pub block_root: Hash256, pub state: BeaconState, @@ -347,12 +344,6 @@ pub struct BlockImportData { pub parent_eth1_finalization_data: Eth1FinalizationData, pub confirmed_state_roots: Vec, pub consensus_context: ConsensusContext, - #[derivative(PartialEq = "ignore")] - /// An optional receiver for `DataColumnSidecarList`. - /// - /// This field is `Some` when data columns are being computed asynchronously. - /// The resulting `DataColumnSidecarList` will be sent through this receiver. - pub data_column_recv: Option>>, } impl BlockImportData { @@ -371,7 +362,6 @@ impl BlockImportData { }, confirmed_state_roots: vec![], consensus_context: ConsensusContext::new(Slot::new(0)), - data_column_recv: None, } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index f10d59ca1a..875645ee9f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -91,7 +91,6 @@ pub enum DataColumnReconstructionResult { /// /// Indicates if the block is fully `Available` or if we need blobs or blocks /// to "complete" the requirements for an `AvailableBlock`. -#[derive(PartialEq)] pub enum Availability { MissingComponents(Hash256), Available(Box>), @@ -219,7 +218,7 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidBlobs)?; self.availability_cache - .put_kzg_verified_blobs(block_root, verified_blobs, None, &self.log) + .put_kzg_verified_blobs(block_root, verified_blobs, &self.log) } /// Put a list of custody columns received via RPC into the availability cache. This performs KZG @@ -253,23 +252,29 @@ impl DataAvailabilityChecker { pub fn put_engine_blobs( &self, block_root: Hash256, + block_epoch: Epoch, blobs: FixedBlobSidecarList, - data_column_recv: Option>>, + data_columns_recv: Option>>, ) -> Result, AvailabilityCheckError> { - let seen_timestamp = self - .slot_clock - .now_duration() - .ok_or(AvailabilityCheckError::SlotClockError)?; - - let verified_blobs = - KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp); - - self.availability_cache.put_kzg_verified_blobs( - block_root, - verified_blobs, - data_column_recv, - &self.log, - ) + // `data_columns_recv` is always Some if block_root is post-PeerDAS + if let Some(data_columns_recv) = data_columns_recv { + self.availability_cache.put_computed_data_columns_recv( + block_root, + block_epoch, + data_columns_recv, + &self.log, + ) + } else { + let seen_timestamp = self + .slot_clock + .now_duration() + .ok_or(AvailabilityCheckError::SlotClockError)?; + self.availability_cache.put_kzg_verified_blobs( + block_root, + KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp), + &self.log, + ) + } } /// Check if we've cached other blobs for this block. If it completes a set and we also @@ -284,7 +289,6 @@ impl DataAvailabilityChecker { self.availability_cache.put_kzg_verified_blobs( gossip_blob.block_root(), vec![gossip_blob.into_inner()], - None, &self.log, ) } @@ -338,15 +342,14 @@ impl DataAvailabilityChecker { ) -> Result, AvailabilityCheckError> { let (block_root, block, blobs, data_columns) = block.deconstruct(); if self.blobs_required_for_block(&block) { - return if let Some(blob_list) = blobs.as_ref() { + return if let Some(blob_list) = blobs { verify_kzg_for_blob_list(blob_list.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidBlobs)?; Ok(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, - blobs, + blob_data: AvailableBlockData::Blobs(blob_list), blobs_available_timestamp: None, - data_columns: None, spec: self.spec.clone(), })) } else { @@ -365,14 +368,13 @@ impl DataAvailabilityChecker { Ok(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, - blobs: None, - blobs_available_timestamp: None, - data_columns: Some( + blob_data: AvailableBlockData::DataColumns( data_column_list .into_iter() .map(|d| d.clone_arc()) .collect(), ), + blobs_available_timestamp: None, spec: self.spec.clone(), })) } else { @@ -383,9 +385,8 @@ impl DataAvailabilityChecker { Ok(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, - blobs: None, + blob_data: AvailableBlockData::NoData, blobs_available_timestamp: None, - data_columns: None, spec: self.spec.clone(), })) } @@ -437,27 +438,25 @@ impl DataAvailabilityChecker { let (block_root, block, blobs, data_columns) = block.deconstruct(); let maybe_available_block = if self.blobs_required_for_block(&block) { - if blobs.is_some() { + if let Some(blobs) = blobs { MaybeAvailableBlock::Available(AvailableBlock { block_root, block, - blobs, + blob_data: AvailableBlockData::Blobs(blobs), blobs_available_timestamp: None, - data_columns: None, spec: self.spec.clone(), }) } else { MaybeAvailableBlock::AvailabilityPending { block_root, block } } } else if self.data_columns_required_for_block(&block) { - if data_columns.is_some() { + if let Some(data_columns) = data_columns { MaybeAvailableBlock::Available(AvailableBlock { block_root, block, - blobs: None, - data_columns: data_columns.map(|data_columns| { - data_columns.into_iter().map(|d| d.into_inner()).collect() - }), + blob_data: AvailableBlockData::DataColumns( + data_columns.into_iter().map(|d| d.into_inner()).collect(), + ), blobs_available_timestamp: None, spec: self.spec.clone(), }) @@ -468,8 +467,7 @@ impl DataAvailabilityChecker { MaybeAvailableBlock::Available(AvailableBlock { block_root, block, - blobs: None, - data_columns: None, + blob_data: AvailableBlockData::NoData, blobs_available_timestamp: None, spec: self.spec.clone(), }) @@ -545,11 +543,11 @@ impl DataAvailabilityChecker { &self, block_root: &Hash256, ) -> Result, AvailabilityCheckError> { - let pending_components = match self + let verified_data_columns = match self .availability_cache .check_and_set_reconstruction_started(block_root) { - ReconstructColumnsDecision::Yes(pending_components) => pending_components, + ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns, ReconstructColumnsDecision::No(reason) => { return Ok(DataColumnReconstructionResult::NotStarted(reason)); } @@ -560,7 +558,7 @@ impl DataAvailabilityChecker { let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( &self.kzg, - &pending_components.verified_data_columns, + &verified_data_columns, &self.spec, ) .map_err(|e| { @@ -713,13 +711,25 @@ async fn availability_cache_maintenance_service( } } +#[derive(Debug)] +pub enum AvailableBlockData { + /// Block is pre-Deneb or has zero blobs + NoData, + /// Block is post-Deneb, pre-PeerDAS and has more than zero blobs + Blobs(BlobSidecarList), + /// Block is post-PeerDAS and has more than zero blobs + DataColumns(DataColumnSidecarList), + /// Block is post-PeerDAS, has more than zero blobs and we recomputed the columns from the EL's + /// mempool blobs + DataColumnsRecv(oneshot::Receiver>), +} + /// A fully available block that is ready to be imported into fork choice. -#[derive(Clone, Debug, PartialEq)] +#[derive(Debug)] pub struct AvailableBlock { block_root: Hash256, block: Arc>, - blobs: Option>, - data_columns: Option>, + blob_data: AvailableBlockData, /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). blobs_available_timestamp: Option, pub spec: Arc, @@ -729,15 +739,13 @@ impl AvailableBlock { pub fn __new_for_testing( block_root: Hash256, block: Arc>, - blobs: Option>, - data_columns: Option>, + data: AvailableBlockData, spec: Arc, ) -> Self { Self { block_root, block, - blobs, - data_columns, + blob_data: data, blobs_available_timestamp: None, spec, } @@ -750,39 +758,56 @@ impl AvailableBlock { self.block.clone() } - pub fn blobs(&self) -> Option<&BlobSidecarList> { - self.blobs.as_ref() - } - pub fn blobs_available_timestamp(&self) -> Option { self.blobs_available_timestamp } - pub fn data_columns(&self) -> Option<&DataColumnSidecarList> { - self.data_columns.as_ref() + pub fn data(&self) -> &AvailableBlockData { + &self.blob_data + } + + pub fn has_blobs(&self) -> bool { + match self.blob_data { + AvailableBlockData::NoData => false, + AvailableBlockData::Blobs(..) => true, + AvailableBlockData::DataColumns(_) => false, + AvailableBlockData::DataColumnsRecv(_) => false, + } } #[allow(clippy::type_complexity)] - pub fn deconstruct( - self, - ) -> ( - Hash256, - Arc>, - Option>, - Option>, - ) { + pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { let AvailableBlock { block_root, block, - blobs, - data_columns, + blob_data, .. } = self; - (block_root, block, blobs, data_columns) + (block_root, block, blob_data) + } + + /// Only used for testing + pub fn __clone_without_recv(&self) -> Result { + Ok(Self { + block_root: self.block_root, + block: self.block.clone(), + blob_data: match &self.blob_data { + AvailableBlockData::NoData => AvailableBlockData::NoData, + AvailableBlockData::Blobs(blobs) => AvailableBlockData::Blobs(blobs.clone()), + AvailableBlockData::DataColumns(data_columns) => { + AvailableBlockData::DataColumns(data_columns.clone()) + } + AvailableBlockData::DataColumnsRecv(_) => { + return Err("Can't clone DataColumnsRecv".to_owned()) + } + }, + blobs_available_timestamp: self.blobs_available_timestamp, + spec: self.spec.clone(), + }) } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum MaybeAvailableBlock { /// This variant is fully available. /// i.e. for pre-deneb blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index 1ab85ab105..4e75ed4945 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -10,7 +10,7 @@ pub enum Error { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, - Unexpected, + Unexpected(&'static str), SszTypes(ssz_types::Error), MissingBlobs, MissingCustodyColumns, @@ -40,7 +40,7 @@ impl Error { | Error::MissingCustodyColumns | Error::StoreError(_) | Error::DecodeError(_) - | Error::Unexpected + | Error::Unexpected(_) | Error::ParentStateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 034a6582ad..78de538929 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -1,4 +1,5 @@ use super::state_lru_cache::{DietAvailabilityPendingExecutedBlock, StateLRUCache}; +use super::AvailableBlockData; use crate::beacon_chain::BeaconStore; use crate::blob_verification::KzgVerifiedBlob; use crate::block_verification_types::{ @@ -10,6 +11,7 @@ use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::RwLock; use slog::{debug, Logger}; +use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::oneshot; @@ -39,19 +41,6 @@ pub struct PendingComponents { } impl PendingComponents { - /// Clones the `PendingComponent` without cloning `data_column_recv`, as `Receiver` is not cloneable. - /// This should only be used when the receiver is no longer needed. - pub fn clone_without_column_recv(&self) -> Self { - PendingComponents { - block_root: self.block_root, - verified_blobs: self.verified_blobs.clone(), - verified_data_columns: self.verified_data_columns.clone(), - executed_block: self.executed_block.clone(), - reconstruction_started: self.reconstruction_started, - data_column_recv: None, - } - } - /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { &self.executed_block @@ -95,26 +84,6 @@ impl PendingComponents { .unwrap_or(false) } - /// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a - /// block. - /// - /// This corresponds to the number of commitments that are present in a block. - pub fn block_kzg_commitments_count(&self) -> Option { - self.get_cached_block() - .as_ref() - .map(|b| b.get_commitments().len()) - } - - /// Returns the number of blobs that have been received and are stored in the cache. - pub fn num_received_blobs(&self) -> usize { - self.get_cached_blobs().iter().flatten().count() - } - - /// Returns the number of data columns that have been received and are stored in the cache. - pub fn num_received_data_columns(&self) -> usize { - self.verified_data_columns.len() - } - /// Returns the indices of cached custody columns pub fn get_cached_data_columns_indices(&self) -> Vec { self.verified_data_columns @@ -189,51 +158,121 @@ impl PendingComponents { self.merge_blobs(reinsert); } - /// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are - /// available in the cache. + /// Returns Some if the block has received all its required data for import. The return value + /// must be persisted in the DB along with the block. /// - /// Returns `true` if both the block exists and the number of received blobs / custody columns - /// matches the number of expected blobs / custody columns. - pub fn is_available(&self, custody_column_count: usize, log: &Logger) -> bool { - let block_kzg_commitments_count_opt = self.block_kzg_commitments_count(); - let expected_blobs_msg = block_kzg_commitments_count_opt - .as_ref() - .map(|num| num.to_string()) - .unwrap_or("unknown".to_string()); + /// WARNING: This function can potentially take a lot of time if the state needs to be + /// reconstructed from disk. Ensure you are not holding any write locks while calling this. + pub fn make_available( + &mut self, + custody_column_count: usize, + spec: &Arc, + recover: R, + ) -> Result>, AvailabilityCheckError> + where + R: FnOnce( + DietAvailabilityPendingExecutedBlock, + ) -> Result, AvailabilityCheckError>, + { + let Some(block) = &self.executed_block else { + // Block not available yet + return Ok(None); + }; - // No data columns when there are 0 blobs - let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| { - if blob_count > 0 { - custody_column_count - } else { - 0 + let num_expected_blobs = block.num_blobs_expected(); + + let blob_data = if num_expected_blobs == 0 { + Some(AvailableBlockData::NoData) + } else if spec.is_peer_das_enabled_for_epoch(block.epoch()) { + match self.verified_data_columns.len().cmp(&custody_column_count) { + Ordering::Greater => { + // Should never happen + return Err(AvailabilityCheckError::Unexpected("too many columns")); + } + Ordering::Equal => { + // Block is post-peerdas, and we got enough columns + let data_columns = self + .verified_data_columns + .iter() + .map(|d| d.clone().into_inner()) + .collect::>(); + Some(AvailableBlockData::DataColumns(data_columns)) + } + Ordering::Less => { + // The data_columns_recv is an infallible promise that we will receive all expected + // columns, so we consider the block available. + // We take the receiver as it can't be cloned, and make_available should never + // be called again once it returns `Some`. + self.data_column_recv + .take() + .map(AvailableBlockData::DataColumnsRecv) + } } - }); - let expected_columns_msg = expected_columns_opt - .as_ref() - .map(|num| num.to_string()) - .unwrap_or("unknown".to_string()); + } else { + // Before PeerDAS, blobs + let num_received_blobs = self.verified_blobs.iter().flatten().count(); + match num_received_blobs.cmp(&num_expected_blobs) { + Ordering::Greater => { + // Should never happen + return Err(AvailabilityCheckError::Unexpected("too many blobs")); + } + Ordering::Equal => { + let max_blobs = spec.max_blobs_per_block(block.epoch()) as usize; + let blobs_vec = self + .verified_blobs + .iter() + .flatten() + .map(|blob| blob.clone().to_blob()) + .collect::>(); + let blobs = RuntimeVariableList::new(blobs_vec, max_blobs) + .map_err(|_| AvailabilityCheckError::Unexpected("over max_blobs"))?; + Some(AvailableBlockData::Blobs(blobs)) + } + Ordering::Less => { + // Not enough blobs received yet + None + } + } + }; - let num_received_blobs = self.num_received_blobs(); - let num_received_columns = self.num_received_data_columns(); + // Block's data not available yet + let Some(blob_data) = blob_data else { + return Ok(None); + }; - debug!( - log, - "Component(s) added to data availability checker"; - "block_root" => ?self.block_root, - "received_blobs" => num_received_blobs, - "expected_blobs" => expected_blobs_msg, - "received_columns" => num_received_columns, - "expected_columns" => expected_columns_msg, - ); + // Block is available, construct `AvailableExecutedBlock` - let all_blobs_received = block_kzg_commitments_count_opt - .is_some_and(|num_expected_blobs| num_expected_blobs == num_received_blobs); + let blobs_available_timestamp = match blob_data { + AvailableBlockData::NoData => None, + AvailableBlockData::Blobs(_) => self + .verified_blobs + .iter() + .flatten() + .map(|blob| blob.seen_timestamp()) + .max(), + // TODO(das): To be fixed with https://github.com/sigp/lighthouse/pull/6850 + AvailableBlockData::DataColumns(_) => None, + AvailableBlockData::DataColumnsRecv(_) => None, + }; - let all_columns_received = expected_columns_opt - .is_some_and(|num_expected_columns| num_expected_columns == num_received_columns); + let AvailabilityPendingExecutedBlock { + block, + import_data, + payload_verification_outcome, + } = recover(block.clone())?; - all_blobs_received || all_columns_received + let available_block = AvailableBlock { + block_root: self.block_root, + block, + blob_data, + blobs_available_timestamp, + spec: spec.clone(), + }; + Ok(Some(AvailableExecutedBlock::new( + available_block, + import_data, + payload_verification_outcome, + ))) } /// Returns an empty `PendingComponents` object with the given block root. @@ -248,87 +287,6 @@ impl PendingComponents { } } - /// Verifies an `SignedBeaconBlock` against a set of KZG verified blobs. - /// This does not check whether a block *should* have blobs, these checks should have been - /// completed when producing the `AvailabilityPendingBlock`. - /// - /// WARNING: This function can potentially take a lot of time if the state needs to be - /// reconstructed from disk. Ensure you are not holding any write locks while calling this. - pub fn make_available( - self, - spec: &Arc, - recover: R, - ) -> Result, AvailabilityCheckError> - where - R: FnOnce( - DietAvailabilityPendingExecutedBlock, - ) -> Result, AvailabilityCheckError>, - { - let Self { - block_root, - verified_blobs, - verified_data_columns, - executed_block, - data_column_recv, - .. - } = self; - - let blobs_available_timestamp = verified_blobs - .iter() - .flatten() - .map(|blob| blob.seen_timestamp()) - .max(); - - let Some(diet_executed_block) = executed_block else { - return Err(AvailabilityCheckError::Unexpected); - }; - - let is_peer_das_enabled = spec.is_peer_das_enabled_for_epoch(diet_executed_block.epoch()); - let (blobs, data_columns) = if is_peer_das_enabled { - let data_columns = verified_data_columns - .into_iter() - .map(|d| d.into_inner()) - .collect::>(); - (None, Some(data_columns)) - } else { - let num_blobs_expected = diet_executed_block.num_blobs_expected(); - let Some(verified_blobs) = verified_blobs - .into_iter() - .map(|b| b.map(|b| b.to_blob())) - .take(num_blobs_expected) - .collect::>>() - else { - return Err(AvailabilityCheckError::Unexpected); - }; - let max_len = spec.max_blobs_per_block(diet_executed_block.as_block().epoch()) as usize; - ( - Some(RuntimeVariableList::new(verified_blobs, max_len)?), - None, - ) - }; - let executed_block = recover(diet_executed_block)?; - - let AvailabilityPendingExecutedBlock { - block, - mut import_data, - payload_verification_outcome, - } = executed_block; - - import_data.data_column_recv = data_column_recv; - - let available_block = AvailableBlock { - block_root, - block, - blobs, - data_columns, - blobs_available_timestamp, - spec: spec.clone(), - }; - Ok(Availability::Available(Box::new( - AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome), - ))) - } - /// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob. pub fn epoch(&self) -> Option { self.executed_block @@ -354,6 +312,41 @@ impl PendingComponents { None }) } + + pub fn status_str( + &self, + block_epoch: Epoch, + sampling_column_count: usize, + spec: &ChainSpec, + ) -> String { + let block_count = if self.executed_block.is_some() { 1 } else { 0 }; + if spec.is_peer_das_enabled_for_epoch(block_epoch) { + let data_column_recv_count = if self.data_column_recv.is_some() { + 1 + } else { + 0 + }; + format!( + "block {} data_columns {}/{} data_columns_recv {}", + block_count, + self.verified_data_columns.len(), + sampling_column_count, + data_column_recv_count, + ) + } else { + let num_expected_blobs = if let Some(block) = self.get_cached_block() { + &block.num_blobs_expected().to_string() + } else { + "?" + }; + format!( + "block {} blobs {}/{}", + block_count, + self.verified_blobs.len(), + num_expected_blobs + ) + } + } } /// This is the main struct for this module. Outside methods should @@ -374,7 +367,7 @@ pub struct DataAvailabilityCheckerInner { // the current usage, as it's deconstructed immediately. #[allow(clippy::large_enum_variant)] pub(crate) enum ReconstructColumnsDecision { - Yes(PendingComponents), + Yes(Vec>), No(&'static str), } @@ -455,16 +448,10 @@ impl DataAvailabilityCheckerInner { } /// Puts the KZG verified blobs into the availability cache as pending components. - /// - /// The `data_column_recv` parameter is an optional `Receiver` for data columns that are - /// computed asynchronously. This method remains **used** after PeerDAS activation, because - /// blocks can be made available if the EL already has the blobs and returns them via the - /// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs). pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, kzg_verified_blobs: I, - data_column_recv: Option>>, log: &Logger, ) -> Result, AvailabilityCheckError> { let mut kzg_verified_blobs = kzg_verified_blobs.into_iter().peekable(); @@ -474,7 +461,7 @@ impl DataAvailabilityCheckerInner { .map(|verified_blob| verified_blob.as_blob().epoch()) else { // Verified blobs list should be non-empty. - return Err(AvailabilityCheckError::Unexpected); + return Err(AvailabilityCheckError::Unexpected("empty blobs")); }; let mut fixed_blobs = @@ -499,21 +486,22 @@ impl DataAvailabilityCheckerInner { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - if data_column_recv.is_some() { - // If `data_column_recv` is `Some`, it means we have all the blobs from engine, and have - // started computing data columns. We store the receiver in `PendingComponents` for - // later use when importing the block. - pending_components.data_column_recv = data_column_recv; - } + debug!(log, "Component added to data availability checker"; + "component" => "blobs", + "block_root" => ?block_root, + "status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec), + ); - if pending_components.is_available(self.sampling_column_count, log) { + if let Some(available_block) = + pending_components.make_available(self.sampling_column_count, &self.spec, |block| { + self.state_cache.recover_pending_executed_block(block) + })? + { // We keep the pending components in the availability cache during block import (#5845). // `data_column_recv` is returned as part of the available block and is no longer needed here. - write_lock.put(block_root, pending_components.clone_without_column_recv()); + write_lock.put(block_root, pending_components); drop(write_lock); - pending_components.make_available(&self.spec, |diet_block| { - self.state_cache.recover_pending_executed_block(diet_block) - }) + Ok(Availability::Available(Box::new(available_block))) } else { write_lock.put(block_root, pending_components); Ok(Availability::MissingComponents(block_root)) @@ -535,7 +523,7 @@ impl DataAvailabilityCheckerInner { .map(|verified_blob| verified_blob.as_data_column().epoch()) else { // Verified data_columns list should be non-empty. - return Err(AvailabilityCheckError::Unexpected); + return Err(AvailabilityCheckError::Unexpected("empty columns")); }; let mut write_lock = self.critical.write(); @@ -551,14 +539,72 @@ impl DataAvailabilityCheckerInner { // Merge in the data columns. pending_components.merge_data_columns(kzg_verified_data_columns)?; - if pending_components.is_available(self.sampling_column_count, log) { + debug!(log, "Component added to data availability checker"; + "component" => "data_columns", + "block_root" => ?block_root, + "status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec), + ); + + if let Some(available_block) = + pending_components.make_available(self.sampling_column_count, &self.spec, |block| { + self.state_cache.recover_pending_executed_block(block) + })? + { // We keep the pending components in the availability cache during block import (#5845). // `data_column_recv` is returned as part of the available block and is no longer needed here. - write_lock.put(block_root, pending_components.clone_without_column_recv()); + write_lock.put(block_root, pending_components); drop(write_lock); - pending_components.make_available(&self.spec, |diet_block| { - self.state_cache.recover_pending_executed_block(diet_block) - }) + Ok(Availability::Available(Box::new(available_block))) + } else { + write_lock.put(block_root, pending_components); + Ok(Availability::MissingComponents(block_root)) + } + } + + /// The `data_column_recv` parameter is a `Receiver` for data columns that are computed + /// asynchronously. This method is used if the EL already has the blobs and returns them via the + /// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs). + pub fn put_computed_data_columns_recv( + &self, + block_root: Hash256, + block_epoch: Epoch, + data_column_recv: oneshot::Receiver>, + log: &Logger, + ) -> Result, AvailabilityCheckError> { + let mut write_lock = self.critical.write(); + + // Grab existing entry or create a new entry. + let mut pending_components = write_lock + .pop_entry(&block_root) + .map(|(_, v)| v) + .unwrap_or_else(|| { + PendingComponents::empty( + block_root, + self.spec.max_blobs_per_block(block_epoch) as usize, + ) + }); + + // We have all the blobs from engine, and have started computing data columns. We store the + // receiver in `PendingComponents` for later use when importing the block. + // TODO(das): Error or log if we overwrite a prior receiver https://github.com/sigp/lighthouse/issues/6764 + pending_components.data_column_recv = Some(data_column_recv); + + debug!(log, "Component added to data availability checker"; + "component" => "data_columns_recv", + "block_root" => ?block_root, + "status" => pending_components.status_str(block_epoch, self.sampling_column_count, &self.spec), + ); + + if let Some(available_block) = + pending_components.make_available(self.sampling_column_count, &self.spec, |block| { + self.state_cache.recover_pending_executed_block(block) + })? + { + // We keep the pending components in the availability cache during block import (#5845). + // `data_column_recv` is returned as part of the available block and is no longer needed here. + write_lock.put(block_root, pending_components); + drop(write_lock); + Ok(Availability::Available(Box::new(available_block))) } else { write_lock.put(block_root, pending_components); Ok(Availability::MissingComponents(block_root)) @@ -603,7 +649,7 @@ impl DataAvailabilityCheckerInner { } pending_components.reconstruction_started = true; - ReconstructColumnsDecision::Yes(pending_components.clone_without_column_recv()) + ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone()) } /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. @@ -643,15 +689,23 @@ impl DataAvailabilityCheckerInner { // Merge in the block. pending_components.merge_block(diet_executed_block); + debug!(log, "Component added to data availability checker"; + "component" => "block", + "block_root" => ?block_root, + "status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec), + ); + // Check if we have all components and entire set is consistent. - if pending_components.is_available(self.sampling_column_count, log) { + if let Some(available_block) = + pending_components.make_available(self.sampling_column_count, &self.spec, |block| { + self.state_cache.recover_pending_executed_block(block) + })? + { // We keep the pending components in the availability cache during block import (#5845). // `data_column_recv` is returned as part of the available block and is no longer needed here. - write_lock.put(block_root, pending_components.clone_without_column_recv()); + write_lock.put(block_root, pending_components); drop(write_lock); - pending_components.make_available(&self.spec, |diet_block| { - self.state_cache.recover_pending_executed_block(diet_block) - }) + Ok(Availability::Available(Box::new(available_block))) } else { write_lock.put(block_root, pending_components); Ok(Availability::MissingComponents(block_root)) @@ -882,7 +936,6 @@ mod test { parent_eth1_finalization_data, confirmed_state_roots: vec![], consensus_context, - data_column_recv: None, }; let payload_verification_outcome = PayloadVerificationOutcome { @@ -989,7 +1042,7 @@ mod test { for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger()) + .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger()) .expect("should put blob"); if blob_index == blobs_expected - 1 { assert!(matches!(availability, Availability::Available(_))); @@ -1017,11 +1070,10 @@ mod test { for gossip_blob in blobs { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger()) + .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger()) .expect("should put blob"); - assert_eq!( - availability, - Availability::MissingComponents(root), + assert!( + matches!(availability, Availability::MissingComponents(_)), "should be pending block" ); assert_eq!(cache.critical.read().len(), 1); @@ -1273,7 +1325,6 @@ mod pending_components_tests { }, confirmed_state_roots: vec![], consensus_context: ConsensusContext::new(Slot::new(0)), - data_column_recv: None, }, payload_verification_outcome: PayloadVerificationOutcome { payload_verification_status: PayloadVerificationStatus::Verified, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 2a2a0431cc..5b9b7c7023 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -136,7 +136,6 @@ impl StateLRUCache { consensus_context: diet_executed_block .consensus_context .into_consensus_context(), - data_column_recv: None, }, payload_verification_outcome: diet_executed_block.payload_verification_outcome, }) diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index c94ea0e941..a90911026c 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -1,4 +1,4 @@ -use crate::data_availability_checker::AvailableBlock; +use crate::data_availability_checker::{AvailableBlock, AvailableBlockData}; use crate::{ attester_cache::{CommitteeLengths, Error}, metrics, @@ -52,7 +52,7 @@ impl EarlyAttesterCache { pub fn add_head_block( &self, beacon_block_root: Hash256, - block: AvailableBlock, + block: &AvailableBlock, proto_block: ProtoBlock, state: &BeaconState, spec: &ChainSpec, @@ -70,14 +70,23 @@ impl EarlyAttesterCache { }, }; - let (_, block, blobs, data_columns) = block.deconstruct(); + let (blobs, data_columns) = match block.data() { + AvailableBlockData::NoData => (None, None), + AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None), + AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())), + // TODO(das): Once the columns are received, they will not be available in + // the early attester cache. If someone does a query to us via RPC we + // will get downscored. + AvailableBlockData::DataColumnsRecv(_) => (None, None), + }; + let item = CacheItem { epoch, committee_lengths, beacon_block_root, source, target, - block, + block: block.block_cloned(), blobs, data_columns, proto_block, diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index a48f32e7b4..a9caeb18bb 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -1,4 +1,4 @@ -use crate::data_availability_checker::AvailableBlock; +use crate::data_availability_checker::{AvailableBlock, AvailableBlockData}; use crate::{metrics, BeaconChain, BeaconChainTypes}; use itertools::Itertools; use slog::debug; @@ -105,7 +105,7 @@ impl BeaconChain { let blob_batch_size = blocks_to_import .iter() - .filter(|available_block| available_block.blobs().is_some()) + .filter(|available_block| available_block.has_blobs()) .count() .saturating_mul(n_blob_ops_per_block); @@ -114,14 +114,13 @@ impl BeaconChain { let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; - let mut blob_batch = Vec::with_capacity(blob_batch_size); + let mut blob_batch = Vec::::with_capacity(blob_batch_size); let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); for available_block in blocks_to_import.into_iter().rev() { - let (block_root, block, maybe_blobs, maybe_data_columns) = - available_block.deconstruct(); + let (block_root, block, block_data) = available_block.deconstruct(); if block_root != expected_block_root { return Err(HistoricalBlockError::MismatchedBlockRoot { @@ -144,17 +143,26 @@ impl BeaconChain { ); } - // Store the blobs too - if let Some(blobs) = maybe_blobs { - new_oldest_blob_slot = Some(block.slot()); - self.store - .blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch); + match &block_data { + AvailableBlockData::NoData => {} + AvailableBlockData::Blobs(..) => { + new_oldest_blob_slot = Some(block.slot()); + } + AvailableBlockData::DataColumns(_) | AvailableBlockData::DataColumnsRecv(_) => { + new_oldest_data_column_slot = Some(block.slot()); + } } - // Store the data columns too - if let Some(data_columns) = maybe_data_columns { - new_oldest_data_column_slot = Some(block.slot()); - self.store - .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch); + + // Store the blobs or data columns too + if let Some(op) = self + .get_blobs_or_columns_store_op(block_root, block_data) + .map_err(|e| { + HistoricalBlockError::StoreError(StoreError::DBError { + message: format!("get_blobs_or_columns_store_op error {e:?}"), + }) + })? + { + blob_batch.extend(self.store.convert_to_kv_batch(vec![op])?); } // Store block roots, including at all skip slots in the freezer DB. diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 621475a3ec..d89a8530e1 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -242,7 +242,7 @@ async fn produces_attestations() { .early_attester_cache .add_head_block( block_root, - available_block, + &available_block, proto_block, &state, &chain.spec, @@ -310,7 +310,7 @@ async fn early_attester_cache_old_request() { .early_attester_cache .add_head_block( head.beacon_block_root, - available_block, + &available_block, head_proto_block, &head.beacon_state, &harness.chain.spec, diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 7a2df76970..997a2859b7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2517,18 +2517,13 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // Corrupt the signature on the 1st block to ensure that the backfill processor is checking // signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120. - let mut batch_with_invalid_first_block = available_blocks.clone(); + let mut batch_with_invalid_first_block = + available_blocks.iter().map(clone_block).collect::>(); batch_with_invalid_first_block[0] = { - let (block_root, block, blobs, data_columns) = available_blocks[0].clone().deconstruct(); + let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct(); let mut corrupt_block = (*block).clone(); *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing( - block_root, - Arc::new(corrupt_block), - blobs, - data_columns, - Arc::new(spec), - ) + AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec)) }; // Importing the invalid batch should error. @@ -2540,8 +2535,9 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { )); // Importing the batch with valid signatures should succeed. + let available_blocks_dup = available_blocks.iter().map(clone_block).collect::>(); beacon_chain - .import_historical_block_batch(available_blocks.clone()) + .import_historical_block_batch(available_blocks_dup) .unwrap(); assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0); @@ -3690,3 +3686,7 @@ fn get_blocks( .map(|checkpoint| checkpoint.beacon_block_root.into()) .collect() } + +fn clone_block(block: &AvailableBlock) -> AvailableBlock { + block.__clone_without_recv().unwrap() +}