Merge unstable

This commit is contained in:
Eitan Seri-Levi
2025-02-02 00:18:00 +03:00
340 changed files with 10457 additions and 4957 deletions

View File

@@ -34,6 +34,7 @@ use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, Prep
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::graffiti_calculator::GraffitiCalculator;
use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker};
use crate::kzg_utils::reconstruct_blobs;
use crate::light_client_finality_update_verification::{
Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate,
};
@@ -117,10 +118,11 @@ use std::sync::Arc;
use std::time::Duration;
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
BlobSidecarListFromRoot, DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore,
KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
@@ -575,7 +577,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.start_slot(T::EthSpec::slots_per_epoch());
let is_canonical = self
.block_root_at_slot(block_slot, WhenSlotSkipped::None)?
.map_or(false, |canonical_root| block_root == &canonical_root);
.is_some_and(|canonical_root| block_root == &canonical_root);
Ok(block_slot <= finalized_slot && is_canonical)
}
@@ -606,7 +608,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let slot_is_finalized = state_slot <= finalized_slot;
let canonical = self
.state_root_at_slot(state_slot)?
.map_or(false, |canonical_root| state_root == &canonical_root);
.is_some_and(|canonical_root| state_root == &canonical_root);
Ok(FinalizationAndCanonicity {
slot_is_finalized,
canonical,
@@ -1149,9 +1151,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn get_blobs_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<BlobSidecarList<T::EthSpec>, Error> {
) -> Result<BlobSidecarListFromRoot<T::EthSpec>, Error> {
self.early_attester_cache
.get_blobs(*block_root)
.map(Into::into)
.map_or_else(|| self.get_blobs(block_root), Ok)
}
@@ -1242,10 +1245,59 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// ## Errors
/// May return a database error.
pub fn get_blobs(&self, block_root: &Hash256) -> Result<BlobSidecarList<T::EthSpec>, Error> {
match self.store.get_blobs(block_root)? {
Some(blobs) => Ok(blobs),
None => Ok(BlobSidecarList::default()),
pub fn get_blobs(
&self,
block_root: &Hash256,
) -> Result<BlobSidecarListFromRoot<T::EthSpec>, Error> {
self.store.get_blobs(block_root).map_err(Error::from)
}
/// Returns the data columns at the given root, if any.
///
/// ## Errors
/// May return a database error.
pub fn get_data_columns(
&self,
block_root: &Hash256,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, Error> {
self.store.get_data_columns(block_root).map_err(Error::from)
}
/// Returns the blobs at the given root, if any.
///
/// Uses the `block.epoch()` to determine whether to retrieve blobs or columns from the store.
///
/// If at least 50% of columns are retrieved, blobs will be reconstructed and returned,
/// otherwise an error `InsufficientColumnsToReconstructBlobs` is returned.
///
/// ## Errors
/// May return a database error.
pub fn get_or_reconstruct_blobs(
&self,
block_root: &Hash256,
) -> Result<Option<BlobSidecarList<T::EthSpec>>, Error> {
let Some(block) = self.store.get_blinded_block(block_root)? else {
return Ok(None);
};
if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
if let Some(columns) = self.store.get_data_columns(block_root)? {
let num_required_columns = self.spec.number_of_columns / 2;
let reconstruction_possible = columns.len() >= num_required_columns as usize;
if reconstruction_possible {
reconstruct_blobs(&self.kzg, &columns, None, &block, &self.spec)
.map(Some)
.map_err(Error::FailedToReconstructBlobs)
} else {
Err(Error::InsufficientColumnsToReconstructBlobs {
columns_found: columns.len(),
})
}
} else {
Ok(None)
}
} else {
self.get_blobs(block_root).map(|b| b.blobs())
}
}
@@ -2161,10 +2213,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|v| {
// This method is called for API and gossip attestations, so this covers all unaggregated attestation events
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_single_attestation_subscribers() {
let current_fork = self
.spec
.fork_name_at_slot::<T::EthSpec>(v.attestation().data().slot);
if current_fork.electra_enabled() {
// I don't see a situation where this could return None. The upstream unaggregated attestation checks
// should have already verified that this is an attestation with a single committee bit set.
if let Some(single_attestation) = v.single_attestation() {
event_handler.register(EventKind::SingleAttestation(Box::new(
single_attestation,
)));
}
}
}
if event_handler.has_attestation_subscribers() {
event_handler.register(EventKind::Attestation(Box::new(
v.attestation().clone_as_attestation(),
)));
let current_fork = self
.spec
.fork_name_at_slot::<T::EthSpec>(v.attestation().data().slot);
if !current_fork.electra_enabled() {
event_handler.register(EventKind::Attestation(Box::new(
v.attestation().clone_as_attestation(),
)));
}
}
}
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES);
@@ -3214,7 +3286,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
@@ -3342,7 +3414,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};
let r = self
.process_availability(slot, availability, None, || Ok(()))
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
@@ -3470,7 +3542,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block), None).await
self.import_available_block(Box::new(block)).await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
@@ -3602,7 +3674,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3618,7 +3690,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3641,7 +3713,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_gossip_data_columns(block_root, data_columns)?;
self.process_availability(slot, availability, None, publish_fn)
self.process_availability(slot, availability, publish_fn)
.await
}
@@ -3685,7 +3757,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3694,14 +3766,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_engine_blobs(block_root, blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;
self.process_availability(slot, availability, data_column_recv, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3741,7 +3813,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -3753,14 +3825,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block, recv).await
self.import_available_block(block).await
}
Availability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
@@ -3771,7 +3842,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let AvailableExecutedBlock {
block,
@@ -3786,6 +3856,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;
// Record the time at which this block's blobs became available.
@@ -3852,7 +3923,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
@@ -4020,44 +4091,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// if block is made available via blobs, dropped the data columns.
let data_columns = data_columns.filter(|columns| columns.len() == custody_columns_count);
let data_columns = match (data_columns, data_column_recv) {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
(Some(columns), _) => Some(columns),
// Otherwise, it means blobs were likely available via fetching from EL, in this case we
// wait for the data columns to be computed (blocking).
(None, Some(mut data_column_recv)) => {
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
if let Some(columns) = data_column_recv.blocking_recv() {
Some(columns)
} else {
let err_msg = "Did not receive data columns from sender";
error!(
self.log,
"Failed to store data columns into the database";
"msg" => "Restoring fork choice from disk",
"error" => err_msg,
);
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(err_msg.to_string())));
}
match self.get_blobs_or_columns_store_op(
block_root,
signed_block.epoch(),
blobs,
data_columns,
data_column_recv,
) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
// No data columns present and compute data columns task was not spawned.
// Could either be no blobs in the block or before PeerDAS activation.
(None, None) => None,
};
Ok(None) => {}
Err(e) => {
error!(
self.log,
"Failed to store data columns into the database";
"msg" => "Restoring fork choice from disk",
"error" => &e,
"block_root" => ?block_root
);
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(e)));
}
}
let block = signed_block.message();
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
@@ -4069,30 +4128,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));
if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
ops.push(StoreOp::PutBlobs(block_root, blobs));
}
}
if let Some(data_columns) = data_columns {
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec.
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
ops.push(StoreOp::PutDataColumns(block_root, data_columns));
}
}
let txn_lock = self.store.hot_db.begin_rw_transaction();
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
@@ -5246,9 +5281,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.start_of(slot)
.unwrap_or_else(|| Duration::from_secs(0)),
);
block_delays.observed.map_or(false, |delay| {
delay >= self.slot_clock.unagg_attestation_production_delay()
})
block_delays
.observed
.is_some_and(|delay| delay >= self.slot_clock.unagg_attestation_production_delay())
}
/// Produce a block for some `slot` upon the given `state`.
@@ -5445,23 +5480,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If required, start the process of loading an execution payload from the EL early. This
// allows it to run concurrently with things like attestation packing.
let prepare_payload_handle = match &state {
BeaconState::Base(_) | BeaconState::Altair(_) => None,
BeaconState::Bellatrix(_)
| BeaconState::Capella(_)
| BeaconState::Deneb(_)
| BeaconState::Electra(_) => {
let prepare_payload_handle = get_execution_payload(
self.clone(),
&state,
parent_root,
proposer_index,
builder_params,
builder_boost_factor,
block_production_version,
)?;
Some(prepare_payload_handle)
}
let prepare_payload_handle = if state.fork_name_unchecked().bellatrix_enabled() {
let prepare_payload_handle = get_execution_payload(
self.clone(),
&state,
parent_root,
proposer_index,
builder_params,
builder_boost_factor,
block_production_version,
)?;
Some(prepare_payload_handle)
} else {
None
};
let (mut proposer_slashings, mut attester_slashings, mut voluntary_exits) =
@@ -5879,6 +5910,48 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
execution_payload_value,
)
}
BeaconState::Fulu(_) => {
let (
payload,
kzg_commitments,
maybe_blobs_and_proofs,
maybe_requests,
execution_payload_value,
) = block_contents
.ok_or(BlockProductionError::MissingExecutionPayload)?
.deconstruct();
(
BeaconBlock::Fulu(BeaconBlockFulu {
slot,
proposer_index,
parent_root,
state_root: Hash256::zero(),
body: BeaconBlockBodyFulu {
randao_reveal,
eth1_data,
graffiti,
proposer_slashings: proposer_slashings.into(),
attester_slashings: attester_slashings_electra.into(),
attestations: attestations_electra.into(),
deposits: deposits.into(),
voluntary_exits: voluntary_exits.into(),
sync_aggregate: sync_aggregate
.ok_or(BlockProductionError::MissingSyncAggregate)?,
execution_payload: payload
.try_into()
.map_err(|_| BlockProductionError::InvalidPayloadFork)?,
bls_to_execution_changes: bls_to_execution_changes.into(),
blob_kzg_commitments: kzg_commitments
.ok_or(BlockProductionError::InvalidPayloadFork)?,
execution_requests: maybe_requests
.ok_or(BlockProductionError::MissingExecutionRequests)?,
},
}),
maybe_blobs_and_proofs,
execution_payload_value,
)
}
};
let block = SignedBeaconBlock::from_block(
@@ -5955,6 +6028,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let kzg = self.kzg.as_ref();
// TODO(fulu): we no longer need blob proofs from PeerDAS and could avoid computing.
kzg_utils::validate_blobs::<T::EthSpec>(
kzg,
expected_kzg_commitments,
@@ -7307,6 +7381,68 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(),
}
}
fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: Option<BlobSidecarList<T::EthSpec>>,
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Option<StoreOp<T::EthSpec>>, String> {
if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
let custody_columns_available = data_columns
.as_ref()
.as_ref()
.is_some_and(|columns| columns.len() == custody_columns_count);
let data_columns_to_persist = if custody_columns_available {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
data_columns
} else if let Some(data_column_recv) = data_column_recv {
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
let computed_data_columns = data_column_recv
.blocking_recv()
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
Some(computed_data_columns)
} else {
// No blobs in the block.
None
};
if let Some(data_columns) = data_columns_to_persist {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
}
}
} else if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
}
}
Ok(None)
}
}
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {