Improving blob propagation post-PeerDAS with Decentralized Blob Building (#6268)

* Get blobs from EL.

Co-authored-by: Michael Sproul <michael@sigmaprime.io>

* Avoid cloning blobs after fetching blobs.

* Address review comments and refactor code.

* Fix lint.

* Move blob computation metric to the right spot.

* Merge branch 'unstable' into das-fetch-blobs

* Merge branch 'unstable' into das-fetch-blobs

# Conflicts:
#	beacon_node/beacon_chain/src/beacon_chain.rs
#	beacon_node/beacon_chain/src/block_verification.rs
#	beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

* Merge branch 'unstable' into das-fetch-blobs

# Conflicts:
#	beacon_node/beacon_chain/src/beacon_chain.rs

* Gradual publication of data columns for supernodes.

* Recompute head after importing block with blobs from the EL.

* Fix lint

* Merge branch 'unstable' into das-fetch-blobs

* Use blocking task instead of async when computing cells.

* Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse into das-fetch-blobs

* Merge remote-tracking branch 'origin/unstable' into das-fetch-blobs

* Fix semantic conflicts

* Downgrade error log.

* Merge branch 'unstable' into das-fetch-blobs

# Conflicts:
#	beacon_node/beacon_chain/src/data_availability_checker.rs
#	beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs
#	beacon_node/execution_layer/src/engine_api.rs
#	beacon_node/execution_layer/src/engine_api/json_structures.rs
#	beacon_node/network/src/network_beacon_processor/gossip_methods.rs
#	beacon_node/network/src/network_beacon_processor/mod.rs
#	beacon_node/network/src/network_beacon_processor/sync_methods.rs

* Merge branch 'unstable' into das-fetch-blobs

* Publish block without waiting for blob and column proof computation.

* Address review comments and refactor.

* Merge branch 'unstable' into das-fetch-blobs

* Fix test and docs.

* Comment cleanups.

* Merge branch 'unstable' into das-fetch-blobs

* Address review comments and cleanup

* Address review comments and cleanup

* Refactor to de-duplicate gradual publication logic.

* Add more logging.

* Merge remote-tracking branch 'origin/unstable' into das-fetch-blobs

# Conflicts:
#	Cargo.lock

* Fix incorrect comparison on `num_fetched_blobs`.

* Implement gradual blob publication.

* Merge branch 'unstable' into das-fetch-blobs

* Inline `publish_fn`.

* Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse into das-fetch-blobs

* Gossip verify blobs before publishing

* Avoid queries for 0 blobs and error for duplicates

* Gossip verified engine blob before processing them, and use observe cache to detect duplicates before publishing.

* Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse into das-fetch-blobs

# Conflicts:
#	beacon_node/network/src/network_beacon_processor/mod.rs

* Merge branch 'unstable' into das-fetch-blobs

* Fix invalid commitment inclusion proofs in blob sidecars created from EL blobs.

* Only publish EL blobs triggered from gossip block, and not RPC block.

* Downgrade gossip blob log to `debug`.

* Merge branch 'unstable' into das-fetch-blobs

* Merge branch 'unstable' into das-fetch-blobs

* Grammar
This commit is contained in:
Jimmy Chen
2024-11-15 10:34:13 +07:00
committed by GitHub
parent 8e95024945
commit 5f053b0b6d
36 changed files with 1660 additions and 613 deletions

View File

@@ -88,7 +88,7 @@ use kzg::Kzg;
use operation_pool::{
CompactAttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella,
};
use parking_lot::{Mutex, RwLock};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use proto_array::{DoNotReOrg, ProposerHeadError};
use safe_arith::SafeArith;
use slasher::Slasher;
@@ -120,6 +120,7 @@ use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc::Receiver;
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
@@ -2971,7 +2972,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let block_root = blob.block_root();
@@ -2990,17 +2990,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlobNotRequired(blob.slot()));
}
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar(
blob.as_blob(),
)));
}
}
self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));
let r = self
.check_gossip_blob_availability_and_import(blob, publish_fn)
.await;
let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
}
@@ -3078,20 +3070,63 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
self.remove_notified(&block_root, r)
}
/// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`.
///
/// `data_column_recv`: An optional receiver for `DataColumnSidecarList`.
/// If PeerDAS is enabled, this receiver will be provided and used to send
/// the `DataColumnSidecar`s once they have been successfully computed.
pub async fn process_engine_blobs(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::DuplicateFullyImported(block_root));
}
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
.await;
self.remove_notified(&block_root, r)
}
fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc<Self>, block_root: &Hash256, blobs_iter: I)
where
I: Iterator<Item = &'a BlobSidecar<T::EthSpec>>,
{
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
let imported_blobs = self
.data_availability_checker
.cached_blob_indexes(block_root)
.unwrap_or_default();
let new_blobs = blobs_iter.filter(|b| !imported_blobs.contains(&b.index));
for blob in new_blobs {
event_handler.register(EventKind::BlobSidecar(
SseBlobSidecar::from_blob_sidecar(blob),
));
}
}
}
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
self.remove_notified(&block_root, r)
}
/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
@@ -3181,7 +3216,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};
let r = self
.process_availability(slot, availability, || Ok(()))
.process_availability(slot, availability, None, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
@@ -3309,7 +3344,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block)).await
self.import_available_block(Box::new(block), None).await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
@@ -3441,7 +3476,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability, || Ok(()))
self.process_availability(slot, availability, None, || Ok(()))
.await
}
@@ -3450,7 +3485,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
async fn check_gossip_blob_availability_and_import(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let slot = blob.slot();
if let Some(slasher) = self.slasher.as_ref() {
@@ -3458,7 +3492,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;
self.process_availability(slot, availability, publish_fn)
self.process_availability(slot, availability, None, || Ok(()))
.await
}
@@ -3477,16 +3511,41 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
let availability = self.data_availability_checker.put_gossip_data_columns(
slot,
block_root,
data_columns,
)?;
let availability = self
.data_availability_checker
.put_gossip_data_columns(block_root, data_columns)?;
self.process_availability(slot, availability, publish_fn)
self.process_availability(slot, availability, None, publish_fn)
.await
}
fn check_blobs_for_slashability(
self: &Arc<Self>,
block_root: Hash256,
blobs: &FixedBlobSidecarList<T::EthSpec>,
) -> Result<(), BlockError> {
let mut slashable_cache = self.observed_slashable.write();
for header in blobs
.iter()
.filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone()))
.unique()
{
if verify_header_signature::<T, BlockError>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
}
}
Ok(())
}
/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_rpc_blob_availability_and_import(
@@ -3495,35 +3554,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
for header in blobs
.iter()
.filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone()))
.unique()
{
if verify_header_signature::<T, BlockError>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
}
}
}
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_rpc_blobs(block_root, epoch, blobs)?;
.put_rpc_blobs(block_root, blobs)?;
self.process_availability(slot, availability, || Ok(()))
self.process_availability(slot, availability, None, || Ok(()))
.await
}
async fn check_engine_blob_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_engine_blobs(block_root, blobs)?;
self.process_availability(slot, availability, data_column_recv, || Ok(()))
.await
}
@@ -3559,13 +3611,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
let availability = self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;
let availability = self
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;
self.process_availability(slot, availability, || Ok(()))
self.process_availability(slot, availability, None, || Ok(()))
.await
}
@@ -3577,13 +3627,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block).await
self.import_available_block(block, recv).await
}
Availability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
@@ -3594,6 +3645,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let AvailableExecutedBlock {
block,
@@ -3635,6 +3687,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block,
parent_eth1_finalization_data,
consensus_context,
data_column_recv,
)
},
"payload_verification_handle",
@@ -3673,6 +3726,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
@@ -3818,7 +3872,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// state if we returned early without committing. In other words, an error here would
// corrupt the node's database permanently.
// -----------------------------------------------------------------------------------------
self.import_block_update_shuffling_cache(block_root, &mut state);
self.import_block_observe_attestations(
block,
@@ -3835,15 +3888,53 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
self.import_block_update_slasher(block, &state, &mut consensus_context);
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
// Store the block and its state, and execute the confirmation batch for the intermediate
// states, which will delete their temporary flags.
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// if block is made available via blobs, dropped the data columns.
let data_columns = data_columns.filter(|columns| columns.len() == custody_columns_count);
let data_columns = match (data_columns, data_column_recv) {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
(Some(columns), _) => Some(columns),
// Otherwise, it means blobs were likely available via fetching from EL, in this case we
// wait for the data columns to be computed (blocking).
(None, Some(mut data_column_recv)) => {
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
if let Some(columns) = data_column_recv.blocking_recv() {
Some(columns)
} else {
let err_msg = "Did not receive data columns from sender";
error!(
self.log,
"Failed to store data columns into the database";
"msg" => "Restoring fork choice from disk",
"error" => err_msg,
);
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(err_msg.to_string())));
}
}
// No data columns present and compute data columns task was not spawned.
// Could either be no blobs in the block or before PeerDAS activation.
(None, None) => None,
};
let block = signed_block.message();
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
ops.extend(
confirmed_state_roots
.into_iter()
@@ -3885,33 +3976,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"msg" => "Restoring fork choice from disk",
"error" => ?e,
);
// Clear the early attester cache to prevent attestations which we would later be unable
// to verify due to the failure.
self.early_attester_cache.clear();
// Since the write failed, try to revert the canonical head back to what was stored
// in the database. This attempts to prevent inconsistency between the database and
// fork choice.
if let Err(e) = self.canonical_head.restore_from_store(
fork_choice,
ResetPayloadStatuses::always_reset_conditionally(
self.config.always_reset_payload_statuses,
),
&self.store,
&self.spec,
&self.log,
) {
crit!(
self.log,
"No stored fork choice found to restore from";
"error" => ?e,
"warning" => "The database is likely corrupt now, consider --purge-db"
);
return Err(BlockError::BeaconChainError(e));
}
return Err(e.into());
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(e.into()));
}
drop(txn_lock);
@@ -3979,6 +4047,41 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(block_root)
}
fn handle_import_block_db_write_error(
&self,
// We don't actually need this value, however it's always present when we call this function
// and it needs to be dropped to prevent a dead-lock. Requiring it to be passed here is
// defensive programming.
fork_choice_write_lock: RwLockWriteGuard<BeaconForkChoice<T>>,
) -> Result<(), BlockError> {
// Clear the early attester cache to prevent attestations which we would later be unable
// to verify due to the failure.
self.early_attester_cache.clear();
// Since the write failed, try to revert the canonical head back to what was stored
// in the database. This attempts to prevent inconsistency between the database and
// fork choice.
if let Err(e) = self.canonical_head.restore_from_store(
fork_choice_write_lock,
ResetPayloadStatuses::always_reset_conditionally(
self.config.always_reset_payload_statuses,
),
&self.store,
&self.spec,
&self.log,
) {
crit!(
self.log,
"No stored fork choice found to restore from";
"error" => ?e,
"warning" => "The database is likely corrupt now, consider --purge-db"
);
Err(BlockError::BeaconChainError(e))
} else {
Ok(())
}
}
/// Check block's consistentency with any configured weak subjectivity checkpoint.
fn check_block_against_weak_subjectivity_checkpoint(
&self,