Fix data columns not persisting for PeerDAS due to a getBlobs race condition (#6756)

* Fix data columns not persisting for PeerDAS due to a `getBlobs` race condition.

* Refactor blobs and columns logic in `chain.import_block` for clarity. Add more docs on `data_column_recv`.

* Add more code comments for clarity.

* Merge remote-tracking branch 'origin/unstable' into fix-column-race

# Conflicts:
#	beacon_node/beacon_chain/src/block_verification_types.rs
#	beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

* Fix lint.
This commit is contained in:
Jimmy Chen
2025-01-15 17:56:51 +11:00
committed by GitHub
parent 4fd8e521a4
commit dd7591f712
8 changed files with 188 additions and 105 deletions

View File

@@ -121,7 +121,7 @@ use store::{
KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
@@ -3088,7 +3088,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
@@ -3216,7 +3216,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};
let r = self
.process_availability(slot, availability, None, || Ok(()))
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
@@ -3344,7 +3344,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block), None).await
self.import_available_block(Box::new(block)).await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
@@ -3476,7 +3476,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3492,7 +3492,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3515,7 +3515,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_gossip_data_columns(block_root, data_columns)?;
self.process_availability(slot, availability, None, publish_fn)
self.process_availability(slot, availability, publish_fn)
.await
}
@@ -3559,7 +3559,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3568,14 +3568,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_engine_blobs(block_root, blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;
self.process_availability(slot, availability, data_column_recv, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3615,7 +3615,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3627,14 +3627,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block, recv).await
self.import_available_block(block).await
}
Availability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
@@ -3645,7 +3644,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let AvailableExecutedBlock {
block,
@@ -3660,6 +3658,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;
// Record the time at which this block's blobs became available.
@@ -3726,7 +3725,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
@@ -3894,44 +3893,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// if block is made available via blobs, dropped the data columns.
let data_columns = data_columns.filter(|columns| columns.len() == custody_columns_count);
let data_columns = match (data_columns, data_column_recv) {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
(Some(columns), _) => Some(columns),
// Otherwise, it means blobs were likely available via fetching from EL, in this case we
// wait for the data columns to be computed (blocking).
(None, Some(mut data_column_recv)) => {
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
if let Some(columns) = data_column_recv.blocking_recv() {
Some(columns)
} else {
let err_msg = "Did not receive data columns from sender";
error!(
self.log,
"Failed to store data columns into the database";
"msg" => "Restoring fork choice from disk",
"error" => err_msg,
);
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(err_msg.to_string())));
}
match self.get_blobs_or_columns_store_op(
block_root,
signed_block.epoch(),
blobs,
data_columns,
data_column_recv,
) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
// No data columns present and compute data columns task was not spawned.
// Could either be no blobs in the block or before PeerDAS activation.
(None, None) => None,
};
Ok(None) => {}
Err(e) => {
error!(
self.log,
"Failed to store data columns into the database";
"msg" => "Restoring fork choice from disk",
"error" => &e,
"block_root" => ?block_root
);
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(e)));
}
}
let block = signed_block.message();
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
@@ -3943,30 +3930,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));
if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
ops.push(StoreOp::PutBlobs(block_root, blobs));
}
}
if let Some(data_columns) = data_columns {
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec.
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
ops.push(StoreOp::PutDataColumns(block_root, data_columns));
}
}
let txn_lock = self.store.hot_db.begin_rw_transaction();
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
@@ -7184,6 +7147,68 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(),
}
}
fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: Option<BlobSidecarList<T::EthSpec>>,
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Option<StoreOp<T::EthSpec>>, String> {
if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
let custody_columns_available = data_columns
.as_ref()
.as_ref()
.is_some_and(|columns| columns.len() == custody_columns_count);
let data_columns_to_persist = if custody_columns_available {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
data_columns
} else if let Some(data_column_recv) = data_column_recv {
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
let computed_data_columns = data_column_recv
.blocking_recv()
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
Some(computed_data_columns)
} else {
// No blobs in the block.
None
};
if let Some(data_columns) = data_columns_to_persist {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
}
}
} else if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
}
}
Ok(None)
}
}
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {