Merge branch 'unstable' of https://github.com/sigp/lighthouse into gloas-payload-processing

This commit is contained in:
Eitan Seri- Levi
2026-02-24 10:47:12 -08:00
20 changed files with 353 additions and 497 deletions

View File

@@ -3396,11 +3396,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
self.data_availability_checker.put_pre_execution_block(
block_root,
unverified_block.block_cloned(),
block_source,
)?;
// Gloas blocks dont need to be inserted into the DA cache
// they are always available.
if !unverified_block
.block()
.fork_name_unchecked()
.gloas_enabled()
{
self.data_availability_checker.put_pre_execution_block(
block_root,
unverified_block.block_cloned(),
block_source,
)?;
}
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);

View File

@@ -51,7 +51,9 @@
use crate::beacon_snapshot::PreProcessingSnapshot;
use crate::blob_verification::GossipBlobError;
use crate::block_verification_types::{AsBlock, BlockImportData, RpcBlock};
use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock};
use crate::data_availability_checker::{
AvailabilityCheckError, AvailableBlock, AvailableBlockData, MaybeAvailableBlock,
};
use crate::data_column_verification::GossipDataColumnError;
use crate::execution_payload::{
AllowOptimisticImport, NotifyExecutionLayer, PayloadNotifier,
@@ -334,6 +336,15 @@ pub enum BlockError {
max_blobs_at_epoch: usize,
block: usize,
},
/// The bid's parent_block_root does not match the block's parent_root.
///
/// ## Peer scoring
///
/// The block is invalid and the peer should be penalized.
BidParentRootMismatch {
bid_parent_root: Hash256,
block_parent_root: Hash256,
},
}
/// Which specific signature(s) are invalid in a SignedBeaconBlock
@@ -888,15 +899,15 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// Do not gossip blocks that claim to contain more blobs than the max allowed
// at the given block epoch.
if let Ok(commitments) = block.message().body().blob_kzg_commitments() {
if let Some(blob_kzg_commitments_len) = block.message().blob_kzg_commitments_len() {
let max_blobs_at_epoch = chain
.spec
.max_blobs_per_block(block.slot().epoch(T::EthSpec::slots_per_epoch()))
as usize;
if commitments.len() > max_blobs_at_epoch {
if blob_kzg_commitments_len > max_blobs_at_epoch {
return Err(BlockError::InvalidBlobCount {
max_blobs_at_epoch,
block: commitments.len(),
block: blob_kzg_commitments_len,
});
}
}
@@ -933,6 +944,24 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let (parent_block, block) =
verify_parent_block_is_known::<T>(&fork_choice_read_lock, block)?;
// [New in Gloas]: Verify bid.parent_block_root matches block.parent_root.
if let Ok(bid) = block.message().body().signed_execution_payload_bid()
&& bid.message.parent_block_root != block.message().parent_root()
{
return Err(BlockError::BidParentRootMismatch {
bid_parent_root: bid.message.parent_block_root,
block_parent_root: block.message().parent_root(),
});
}
// TODO(gloas) The following validation can only be completed once fork choice has been implemented:
// The block's parent execution payload (defined by bid.parent_block_hash) has been seen
// (via gossip or non-gossip sources) (a client MAY queue blocks for processing
// once the parent payload is retrieved). If execution_payload verification of block's execution
// payload parent by an execution node is complete, verify the block's execution payload
// parent (defined by bid.parent_block_hash) passes all validation.
drop(fork_choice_read_lock);
// Track the number of skip slots between the block and its parent.
@@ -1039,8 +1068,15 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
});
}
// Validate the block's execution_payload (if any).
validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?;
// [New in Gloas]: Skip payload validation checks. The payload now arrives separately
// via `ExecutionPayloadEnvelope`.
if !chain
.spec
.fork_name_at_slot::<T::EthSpec>(block.slot())
.gloas_enabled()
{
validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?;
}
// Beacon API block_gossip events
if let Some(event_handler) = chain.event_handler.as_ref()
@@ -1212,15 +1248,35 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify());
match result {
Ok(_) => Ok(Self {
block: MaybeAvailableBlock::AvailabilityPending {
Ok(_) => {
// gloas blocks are always available.
let maybe_available = if chain
.spec
.fork_name_at_slot::<T::EthSpec>(block.slot())
.gloas_enabled()
{
MaybeAvailableBlock::Available(
AvailableBlock::new(
block,
AvailableBlockData::NoData,
&chain.data_availability_checker,
chain.spec.clone(),
)
.map_err(BlockError::AvailabilityCheck)?,
)
} else {
MaybeAvailableBlock::AvailabilityPending {
block_root: from.block_root,
block,
}
};
Ok(Self {
block: maybe_available,
block_root: from.block_root,
block,
},
block_root: from.block_root,
parent: Some(parent),
consensus_context,
}),
parent: Some(parent),
consensus_context,
})
}
Err(_) => Err(BlockError::InvalidSignature(
InvalidSignature::BlockBodySignatures,
)),

View File

@@ -7,7 +7,6 @@ use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::custody_context::NodeCustodyType;
use crate::data_availability_checker::DataAvailabilityChecker;
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin};
use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sidecars_gloas};
use crate::light_client_server_cache::LightClientServerCache;
@@ -778,49 +777,17 @@ where
.get_head(current_slot, &self.spec)
.map_err(|e| format!("Unable to get fork choice head: {:?}", e))?;
// Try to decode the head block according to the current fork, if that fails, try
// to backtrack to before the most recent fork.
let (head_block_root, head_block, head_reverted) =
match store.get_full_block(&initial_head_block_root) {
Ok(Some(block)) => (initial_head_block_root, block, false),
Ok(None) => return Err("Head block not found in store".into()),
Err(StoreError::SszDecodeError(_)) => {
error!(
message = "This node has likely missed a hard fork. \
It will try to revert the invalid blocks and keep running, \
but any stray blocks and states will not be deleted. \
Long-term you should consider re-syncing this node.",
"Error decoding head block"
);
let (block_root, block) = revert_to_fork_boundary(
current_slot,
initial_head_block_root,
store.clone(),
&self.spec,
)?;
(block_root, block, true)
}
Err(e) => return Err(descriptive_db_error("head block", &e)),
};
let head_block_root = initial_head_block_root;
let head_block = store
.get_full_block(&initial_head_block_root)
.map_err(|e| descriptive_db_error("head block", &e))?
.ok_or("Head block not found in store")?;
let (_head_state_root, head_state) = store
.get_advanced_hot_state(head_block_root, current_slot, head_block.state_root())
.map_err(|e| descriptive_db_error("head state", &e))?
.ok_or("Head state not found in store")?;
// If the head reverted then we need to reset fork choice using the new head's finalized
// checkpoint.
if head_reverted {
fork_choice = reset_fork_choice_to_finalization(
head_block_root,
&head_state,
store.clone(),
Some(current_slot),
&self.spec,
)?;
}
let head_shuffling_ids = BlockShufflingIds::try_from_head(head_block_root, &head_state)?;
let mut head_snapshot = BeaconSnapshot {

View File

@@ -62,7 +62,10 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
state: &BeaconState<T::EthSpec>,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<Self, BlockError> {
let payload_verification_status = if is_execution_enabled(state, block.message().body()) {
let payload_verification_status = if block.fork_name_unchecked().gloas_enabled() {
// Gloas blocks don't contain an execution payload.
Some(PayloadVerificationStatus::Irrelevant)
} else if is_execution_enabled(state, block.message().body()) {
// Perform the initial stages of payload verification.
//
// We will duplicate these checks again during `per_block_processing`, however these
@@ -304,6 +307,12 @@ pub fn validate_execution_payload_for_gossip<T: BeaconChainTypes>(
block: BeaconBlockRef<'_, T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), BlockError> {
// Gloas blocks don't have an execution payload in the block body.
// Bid-related validations are handled in gossip block verification.
if block.fork_name_unchecked().gloas_enabled() {
return Ok(());
}
// Only apply this validation if this is a Bellatrix beacon block.
if let Ok(execution_payload) = block.body().execution_payload() {
// This logic should match `is_execution_enabled`. We use only the execution block hash of

View File

@@ -1,204 +0,0 @@
use crate::{BeaconForkChoiceStore, BeaconSnapshot};
use fork_choice::{ForkChoice, PayloadVerificationStatus};
use itertools::process_results;
use state_processing::state_advance::complete_state_advance;
use state_processing::{
ConsensusContext, VerifyBlockRoot, per_block_processing,
per_block_processing::BlockSignatureStrategy,
};
use std::sync::Arc;
use std::time::Duration;
use store::{HotColdDB, ItemStore, iter::ParentRootBlockIterator};
use tracing::{info, warn};
use types::{BeaconState, ChainSpec, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot};
const CORRUPT_DB_MESSAGE: &str = "The database could be corrupt. Check its file permissions or \
consider deleting it by running with the --purge-db flag.";
/// Revert the head to the last block before the most recent hard fork.
///
/// This function is destructive and should only be used if there is no viable alternative. It will
/// cause the reverted blocks and states to be completely forgotten, lying dormant in the database
/// forever.
///
/// Return the `(head_block_root, head_block)` that should be used post-reversion.
pub fn revert_to_fork_boundary<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
current_slot: Slot,
head_block_root: Hash256,
store: Arc<HotColdDB<E, Hot, Cold>>,
spec: &ChainSpec,
) -> Result<(Hash256, SignedBeaconBlock<E>), String> {
let current_fork = spec.fork_name_at_slot::<E>(current_slot);
let fork_epoch = spec
.fork_epoch(current_fork)
.ok_or_else(|| format!("Current fork '{}' never activates", current_fork))?;
if current_fork == ForkName::Base {
return Err(format!(
"Cannot revert to before phase0 hard fork. {}",
CORRUPT_DB_MESSAGE
));
}
warn!(
target_fork = %current_fork,
%fork_epoch,
"Reverting invalid head block"
);
let block_iter = ParentRootBlockIterator::fork_tolerant(&store, head_block_root);
let (block_root, blinded_block) = process_results(block_iter, |mut iter| {
iter.find_map(|(block_root, block)| {
if block.slot() < fork_epoch.start_slot(E::slots_per_epoch()) {
Some((block_root, block))
} else {
info!(
?block_root,
slot = %block.slot(),
"Reverting block"
);
None
}
})
})
.map_err(|e| {
format!(
"Error fetching blocks to revert: {:?}. {}",
e, CORRUPT_DB_MESSAGE
)
})?
.ok_or_else(|| format!("No pre-fork blocks found. {}", CORRUPT_DB_MESSAGE))?;
let block = store
.make_full_block(&block_root, blinded_block)
.map_err(|e| format!("Unable to add payload to new head block: {:?}", e))?;
Ok((block_root, block))
}
/// Reset fork choice to the finalized checkpoint of the supplied head state.
///
/// The supplied `head_block_root` should correspond to the most recently applied block on
/// `head_state`.
///
/// This function avoids quirks of fork choice initialization by replaying all of the blocks from
/// the checkpoint to the head.
///
/// See this issue for details: https://github.com/ethereum/consensus-specs/issues/2566
///
/// It will fail if the finalized state or any of the blocks to replay are unavailable.
///
/// WARNING: this function is destructive and causes fork choice to permanently forget all
/// chains other than the chain leading to `head_block_root`. It should only be used in extreme
/// circumstances when there is no better alternative.
pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
head_block_root: Hash256,
head_state: &BeaconState<E>,
store: Arc<HotColdDB<E, Hot, Cold>>,
current_slot: Option<Slot>,
spec: &ChainSpec,
) -> Result<ForkChoice<BeaconForkChoiceStore<E, Hot, Cold>, E>, String> {
// Fetch finalized block.
let finalized_checkpoint = head_state.finalized_checkpoint();
let finalized_block_root = finalized_checkpoint.root;
let finalized_block = store
.get_full_block(&finalized_block_root)
.map_err(|e| format!("Error loading finalized block: {:?}", e))?
.ok_or_else(|| {
format!(
"Finalized block missing for revert: {:?}",
finalized_block_root
)
})?;
// Advance finalized state to finalized epoch (to handle skipped slots).
let finalized_state_root = finalized_block.state_root();
// The enshrined finalized state should be in the state cache.
let mut finalized_state = store
.get_state(&finalized_state_root, Some(finalized_block.slot()), true)
.map_err(|e| format!("Error loading finalized state: {:?}", e))?
.ok_or_else(|| {
format!(
"Finalized block state missing from database: {:?}",
finalized_state_root
)
})?;
let finalized_slot = finalized_checkpoint.epoch.start_slot(E::slots_per_epoch());
complete_state_advance(
&mut finalized_state,
Some(finalized_state_root),
finalized_slot,
spec,
)
.map_err(|e| {
format!(
"Error advancing finalized state to finalized epoch: {:?}",
e
)
})?;
let finalized_snapshot = BeaconSnapshot {
beacon_block_root: finalized_block_root,
beacon_block: Arc::new(finalized_block),
beacon_state: finalized_state,
};
let fc_store =
BeaconForkChoiceStore::get_forkchoice_store(store.clone(), finalized_snapshot.clone())
.map_err(|e| format!("Unable to reset fork choice store for revert: {e:?}"))?;
let mut fork_choice = ForkChoice::from_anchor(
fc_store,
finalized_block_root,
&finalized_snapshot.beacon_block,
&finalized_snapshot.beacon_state,
current_slot,
spec,
)
.map_err(|e| format!("Unable to reset fork choice for revert: {:?}", e))?;
// Replay blocks from finalized checkpoint back to head.
// We do not replay attestations presently, relying on the absence of other blocks
// to guarantee `head_block_root` as the head.
let blocks = store
.load_blocks_to_replay(finalized_slot + 1, head_state.slot(), head_block_root)
.map_err(|e| format!("Error loading blocks to replay for fork choice: {:?}", e))?;
let mut state = finalized_snapshot.beacon_state;
for block in blocks {
complete_state_advance(&mut state, None, block.slot(), spec)
.map_err(|e| format!("State advance failed: {:?}", e))?;
let mut ctxt = ConsensusContext::new(block.slot())
.set_proposer_index(block.message().proposer_index());
per_block_processing(
&mut state,
&block,
BlockSignatureStrategy::NoVerification,
VerifyBlockRoot::True,
&mut ctxt,
spec,
)
.map_err(|e| format!("Error replaying block: {:?}", e))?;
// Setting this to unverified is the safest solution, since we don't have a way to
// retro-actively determine if they were valid or not.
//
// This scenario is so rare that it seems OK to double-verify some blocks.
let payload_verification_status = PayloadVerificationStatus::Optimistic;
fork_choice
.on_block(
block.slot(),
block.message(),
block.canonical_root(),
// Reward proposer boost. We are reinforcing the canonical chain.
Duration::from_secs(0),
&state,
payload_verification_status,
spec,
)
.map_err(|e| format!("Error applying replayed block to fork choice: {:?}", e))?;
}
Ok(fork_choice)
}

View File

@@ -27,7 +27,6 @@ pub mod events;
pub mod execution_payload;
pub mod fetch_blobs;
pub mod fork_choice_signal;
pub mod fork_revert;
pub mod graffiti_calculator;
pub mod historical_blocks;
pub mod historical_data_columns;

View File

@@ -3924,188 +3924,6 @@ async fn finalizes_after_resuming_from_db() {
);
}
#[allow(clippy::large_stack_frames)]
#[tokio::test]
async fn revert_minority_fork_on_resume() {
let validator_count = 16;
let slots_per_epoch = MinimalEthSpec::slots_per_epoch();
let fork_epoch = Epoch::new(4);
let fork_slot = fork_epoch.start_slot(slots_per_epoch);
let initial_blocks = slots_per_epoch * fork_epoch.as_u64() - 1;
let post_fork_blocks = slots_per_epoch * 3;
let mut spec1 = MinimalEthSpec::default_spec();
spec1.altair_fork_epoch = None;
let mut spec2 = MinimalEthSpec::default_spec();
spec2.altair_fork_epoch = Some(fork_epoch);
let all_validators = (0..validator_count).collect::<Vec<usize>>();
// Chain with no fork epoch configured.
let db_path1 = tempdir().unwrap();
let store1 = get_store_generic(&db_path1, StoreConfig::default(), spec1.clone());
let harness1 = BeaconChainHarness::builder(MinimalEthSpec)
.spec(spec1.clone().into())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_disk_store(store1)
.mock_execution_layer()
.build();
// Chain with fork epoch configured.
let db_path2 = tempdir().unwrap();
let store2 = get_store_generic(&db_path2, StoreConfig::default(), spec2.clone());
let harness2 = BeaconChainHarness::builder(MinimalEthSpec)
.spec(spec2.clone().into())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_disk_store(store2)
.mock_execution_layer()
.build();
// Apply the same blocks to both chains initially.
let mut state = harness1.get_current_state();
let mut block_root = harness1.chain.genesis_block_root;
for slot in (1..=initial_blocks).map(Slot::new) {
let state_root = state.update_tree_hash_cache().unwrap();
let attestations = harness1.make_attestations(
&all_validators,
&state,
state_root,
block_root.into(),
slot,
);
harness1.set_current_slot(slot);
harness2.set_current_slot(slot);
harness1.process_attestations(attestations.clone(), &state);
harness2.process_attestations(attestations, &state);
let ((block, blobs), new_state) = harness1.make_block(state, slot).await;
harness1
.process_block(slot, block.canonical_root(), (block.clone(), blobs.clone()))
.await
.unwrap();
harness2
.process_block(slot, block.canonical_root(), (block.clone(), blobs.clone()))
.await
.unwrap();
state = new_state;
block_root = block.canonical_root();
}
assert_eq!(harness1.head_slot(), fork_slot - 1);
assert_eq!(harness2.head_slot(), fork_slot - 1);
// Fork the two chains.
let mut state1 = state.clone();
let mut state2 = state.clone();
let mut majority_blocks = vec![];
for i in 0..post_fork_blocks {
let slot = fork_slot + i;
// Attestations on majority chain.
let state_root = state.update_tree_hash_cache().unwrap();
let attestations = harness2.make_attestations(
&all_validators,
&state2,
state_root,
block_root.into(),
slot,
);
harness2.set_current_slot(slot);
harness2.process_attestations(attestations, &state2);
// Minority chain block (no attesters).
let ((block1, blobs1), new_state1) = harness1.make_block(state1, slot).await;
harness1
.process_block(slot, block1.canonical_root(), (block1, blobs1))
.await
.unwrap();
state1 = new_state1;
// Majority chain block (all attesters).
let ((block2, blobs2), new_state2) = harness2.make_block(state2, slot).await;
harness2
.process_block(slot, block2.canonical_root(), (block2.clone(), blobs2))
.await
.unwrap();
state2 = new_state2;
block_root = block2.canonical_root();
majority_blocks.push(block2);
}
let end_slot = fork_slot + post_fork_blocks - 1;
assert_eq!(harness1.head_slot(), end_slot);
assert_eq!(harness2.head_slot(), end_slot);
// Resume from disk with the hard-fork activated: this should revert the post-fork blocks.
// We have to do some hackery with the `slot_clock` so that the correct slot is set when
// the beacon chain builder loads the head block.
drop(harness1);
let resume_store = get_store_generic(&db_path1, StoreConfig::default(), spec2.clone());
let resumed_harness = TestHarness::builder(MinimalEthSpec)
.spec(spec2.clone().into())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.resumed_disk_store(resume_store)
.override_store_mutator(Box::new(move |mut builder| {
builder = builder
.resume_from_db()
.unwrap()
.testing_slot_clock(spec2.get_slot_duration())
.unwrap();
builder
.get_slot_clock()
.unwrap()
.set_slot(end_slot.as_u64());
builder
}))
.mock_execution_layer()
.build();
// Head should now be just before the fork.
resumed_harness.chain.recompute_head_at_current_slot().await;
assert_eq!(resumed_harness.head_slot(), fork_slot - 1);
// Fork choice should only know the canonical head. When we reverted the head we also should
// have called `reset_fork_choice_to_finalization` which rebuilds fork choice from scratch
// without the reverted block.
assert_eq!(
resumed_harness.chain.heads(),
vec![(resumed_harness.head_block_root(), fork_slot - 1)]
);
// Apply blocks from the majority chain and trigger finalization.
let initial_split_slot = resumed_harness.chain.store.get_split_slot();
for block in &majority_blocks {
resumed_harness
.process_block_result((block.clone(), None))
.await
.unwrap();
// The canonical head should be the block from the majority chain.
resumed_harness.chain.recompute_head_at_current_slot().await;
assert_eq!(resumed_harness.head_slot(), block.slot());
assert_eq!(resumed_harness.head_block_root(), block.canonical_root());
}
let advanced_split_slot = resumed_harness.chain.store.get_split_slot();
// Check that the migration ran successfully.
assert!(advanced_split_slot > initial_split_slot);
// Check that there is only a single head now matching harness2 (the minority chain is gone).
let heads = resumed_harness.chain.heads();
assert_eq!(heads, harness2.chain.heads());
assert_eq!(heads.len(), 1);
}
// This test checks whether the schema downgrade from the latest version to some minimum supported
// version is correct. This is the easiest schema test to write without historic versions of
// Lighthouse on-hand, but has the disadvantage that the min version needs to be adjusted manually

View File

@@ -674,7 +674,7 @@ impl<E: EthSpec> Discovery<E> {
/// updates the min_ttl field.
fn add_subnet_query(&mut self, subnet: Subnet, min_ttl: Option<Instant>, retries: usize) {
// remove the entry and complete the query if greater than the maximum search count
if retries > MAX_DISCOVERY_RETRY {
if retries >= MAX_DISCOVERY_RETRY {
debug!("Subnet peer discovery did not find sufficient peers. Reached max retry limit");
return;
}

View File

@@ -1359,7 +1359,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
| Err(e @ BlockError::ParentExecutionPayloadInvalid { .. })
| Err(e @ BlockError::KnownInvalidExecutionPayload(_))
| Err(e @ BlockError::GenesisBlock)
| Err(e @ BlockError::InvalidBlobCount { .. }) => {
| Err(e @ BlockError::InvalidBlobCount { .. })
| Err(e @ BlockError::BidParentRootMismatch { .. }) => {
warn!(error = %e, "Could not verify block for gossip. Rejecting the block");
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject);
self.gossip_penalize_peer(
@@ -1493,19 +1494,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Block is gossip valid. Attempt to fetch blobs from the EL using versioned hashes derived
// from kzg commitments, without having to wait for all blobs to be sent from the peers.
let publish_blobs = true;
let self_clone = self.clone();
let block_clone = block.clone();
let current_span = Span::current();
self.executor.spawn(
async move {
self_clone
.fetch_engine_blobs_and_publish(block_clone, block_root, publish_blobs)
.await
}
.instrument(current_span),
"fetch_blobs_gossip",
);
// TODO(gloas) we'll want to use this same optimization, but we need to refactor the
// `fetch_and_process_engine_blobs` flow to support gloas.
if !block.fork_name_unchecked().gloas_enabled() {
let publish_blobs = true;
let self_clone = self.clone();
let block_clone = block.clone();
let current_span = Span::current();
self.executor.spawn(
async move {
self_clone
.fetch_engine_blobs_and_publish(block_clone, block_root, publish_blobs)
.await
}
.instrument(current_span),
"fetch_blobs_gossip",
);
}
let result = self
.chain

View File

@@ -1071,7 +1071,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.iter()
.filter(|&(_epoch, batch)| in_buffer(batch))
.count()
> BACKFILL_BATCH_BUFFER_SIZE as usize
>= BACKFILL_BATCH_BUFFER_SIZE as usize
{
return None;
}

View File

@@ -213,6 +213,9 @@ impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {
/// After different operations over a batch, this could be in a state that allows it to
/// continue, or in failed state. When the batch has failed, we check if it did mainly due to
/// processing failures. In this case the batch is considered failed and faulty.
///
/// When failure counts are equal, `blacklist` is `false` — we assume network issues over
/// peer fault when the evidence is ambiguous.
pub fn outcome(&self) -> BatchOperationOutcome {
match self.state {
BatchState::Poisoned => unreachable!("Poisoned batch"),
@@ -255,8 +258,10 @@ impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {
/// Mark the batch as failed and return whether we can attempt a re-download.
///
/// This can happen if a peer disconnects or some error occurred that was not the peers fault.
/// The `peer` parameter, when set to None, does not increment the failed attempts of
/// this batch and register the peer, rather attempts a re-download.
/// The `peer` parameter, when set to `None`, still counts toward
/// `max_batch_download_attempts` (to prevent infinite retries on persistent failures)
/// but does not register a peer in `failed_peers()`. Use
/// [`Self::downloading_to_awaiting_download`] to retry without counting a failed attempt.
#[must_use = "Batch may have failed"]
pub fn download_failed(
&mut self,
@@ -272,7 +277,6 @@ impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {
{
BatchState::Failed
} else {
// drop the blocks
BatchState::AwaitingDownload
};
Ok(self.outcome())
@@ -524,3 +528,196 @@ impl<D: Hash> BatchState<D> {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sync::range_sync::RangeSyncBatchConfig;
use types::MinimalEthSpec;
type Cfg = RangeSyncBatchConfig<MinimalEthSpec>;
type TestBatch = BatchInfo<MinimalEthSpec, Cfg, Vec<u64>>;
fn max_dl() -> u8 {
Cfg::max_batch_download_attempts()
}
fn max_proc() -> u8 {
Cfg::max_batch_processing_attempts()
}
fn new_batch() -> TestBatch {
BatchInfo::new(&Epoch::new(0), 1, ByRangeRequestType::Blocks)
}
fn peer() -> PeerId {
PeerId::random()
}
fn advance_to_processing(batch: &mut TestBatch, req_id: Id, peer_id: PeerId) {
batch.start_downloading(req_id).unwrap();
batch.download_completed(vec![1, 2, 3], peer_id).unwrap();
batch.start_processing().unwrap();
}
fn advance_to_awaiting_validation(batch: &mut TestBatch, req_id: Id, peer_id: PeerId) {
advance_to_processing(batch, req_id, peer_id);
batch
.processing_completed(BatchProcessingResult::Success)
.unwrap();
}
#[test]
fn happy_path_lifecycle() {
let mut batch = new_batch();
let p = peer();
assert!(matches!(batch.state(), BatchState::AwaitingDownload));
batch.start_downloading(1).unwrap();
assert!(matches!(batch.state(), BatchState::Downloading(1)));
batch.download_completed(vec![10, 20], p).unwrap();
assert!(matches!(batch.state(), BatchState::AwaitingProcessing(..)));
let (data, _duration) = batch.start_processing().unwrap();
assert_eq!(data, vec![10, 20]);
assert!(matches!(batch.state(), BatchState::Processing(..)));
let outcome = batch
.processing_completed(BatchProcessingResult::Success)
.unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
assert!(matches!(batch.state(), BatchState::AwaitingValidation(..)));
}
#[test]
fn download_failures_count_toward_limit() {
let mut batch = new_batch();
for i in 1..max_dl() as Id {
batch.start_downloading(i).unwrap();
let outcome = batch.download_failed(Some(peer())).unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
}
// Next failure hits the limit
batch.start_downloading(max_dl() as Id).unwrap();
let outcome = batch.download_failed(Some(peer())).unwrap();
assert!(matches!(
outcome,
BatchOperationOutcome::Failed { blacklist: false }
));
}
#[test]
fn download_failed_none_counts_but_does_not_blame_peer() {
let mut batch = new_batch();
// None still counts toward the limit (prevents infinite retry on persistent
// network failures), but doesn't register a peer in failed_peers().
for i in 0..max_dl() as Id {
batch.start_downloading(i).unwrap();
batch.download_failed(None).unwrap();
}
assert!(matches!(batch.state(), BatchState::Failed));
assert!(batch.failed_peers().is_empty());
}
#[test]
fn faulty_processing_failures_count_toward_limit() {
let mut batch = new_batch();
for i in 1..max_proc() as Id {
advance_to_processing(&mut batch, i, peer());
let outcome = batch
.processing_completed(BatchProcessingResult::FaultyFailure)
.unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
}
// Next faulty failure: limit reached
advance_to_processing(&mut batch, max_proc() as Id, peer());
let outcome = batch
.processing_completed(BatchProcessingResult::FaultyFailure)
.unwrap();
assert!(matches!(
outcome,
BatchOperationOutcome::Failed { blacklist: true }
));
}
#[test]
fn non_faulty_processing_failures_never_exhaust_batch() {
let mut batch = new_batch();
// Well past both limits — non-faulty failures should never cause failure
let iterations = (max_dl() + max_proc()) as Id * 2;
for i in 0..iterations {
advance_to_processing(&mut batch, i, peer());
let outcome = batch
.processing_completed(BatchProcessingResult::NonFaultyFailure)
.unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
}
// Non-faulty failures also don't register peers as failed
assert!(batch.failed_peers().is_empty());
}
#[test]
fn validation_failures_count_toward_processing_limit() {
let mut batch = new_batch();
for i in 1..max_proc() as Id {
advance_to_awaiting_validation(&mut batch, i, peer());
let outcome = batch.validation_failed().unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
}
advance_to_awaiting_validation(&mut batch, max_proc() as Id, peer());
let outcome = batch.validation_failed().unwrap();
assert!(matches!(
outcome,
BatchOperationOutcome::Failed { blacklist: true }
));
}
#[test]
fn mixed_failure_types_interact_correctly() {
let mut batch = new_batch();
let mut req_id: Id = 0;
let mut next_id = || {
req_id += 1;
req_id
};
// One download failure
batch.start_downloading(next_id()).unwrap();
batch.download_failed(Some(peer())).unwrap();
// One faulty processing failure (requires a successful download first)
advance_to_processing(&mut batch, next_id(), peer());
batch
.processing_completed(BatchProcessingResult::FaultyFailure)
.unwrap();
// One non-faulty processing failure
advance_to_processing(&mut batch, next_id(), peer());
batch
.processing_completed(BatchProcessingResult::NonFaultyFailure)
.unwrap();
assert!(matches!(batch.state(), BatchState::AwaitingDownload));
// Fill remaining download failures to hit the limit
for _ in 1..max_dl() {
batch.start_downloading(next_id()).unwrap();
batch.download_failed(Some(peer())).unwrap();
}
// Download failures > processing failures → blacklist: false
assert!(matches!(
batch.outcome(),
BatchOperationOutcome::Failed { blacklist: false }
));
}
}

View File

@@ -398,7 +398,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Lookups contain untrusted data, bound the total count of lookups hold in memory to reduce
// the risk of OOM in case of bugs of malicious activity.
if self.single_block_lookups.len() > MAX_LOOKUPS {
if self.single_block_lookups.len() >= MAX_LOOKUPS {
warn!(?block_root, "Dropping lookup reached max");
return false;
}

View File

@@ -422,7 +422,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
.iter()
.filter(|&(_epoch, batch)| in_buffer(batch))
.count()
> BACKFILL_BATCH_BUFFER_SIZE as usize
>= BACKFILL_BATCH_BUFFER_SIZE as usize
{
return None;
}

View File

@@ -239,7 +239,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
if let Some(wait_duration) = request.is_awaiting_download() {
// Note: an empty response is considered a successful response, so we may end up
// retrying many more times than `MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS`.
if request.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS {
if request.download_failures >= MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS {
return Err(Error::TooManyFailures);
}

View File

@@ -1277,7 +1277,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.iter()
.filter(|&(_epoch, batch)| in_buffer(batch))
.count()
> BATCH_BUFFER_SIZE as usize
>= BATCH_BUFFER_SIZE as usize
{
return None;
}

View File

@@ -5,6 +5,8 @@ mod chain_collection;
mod range;
mod sync_type;
#[cfg(test)]
pub use chain::RangeSyncBatchConfig;
pub use chain::{ChainId, EPOCHS_PER_BATCH};
#[cfg(test)]
pub use chain_collection::SyncChainStatus;

View File

@@ -721,14 +721,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
}
/// Fetch a block from the store, ignoring which fork variant it *should* be for.
pub fn get_block_any_variant<Payload: AbstractExecPayload<E>>(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<E, Payload>>, Error> {
self.get_block_with(block_root, SignedBeaconBlock::any_from_ssz_bytes)
}
/// Fetch a block from the store using a custom decode function.
///
/// This is useful for e.g. ignoring the slot-indicated fork to forcefully load a block as if it

View File

@@ -249,7 +249,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
store: &'a HotColdDB<E, Hot, Cold>,
next_block_root: Hash256,
decode_any_variant: bool,
_phantom: PhantomData<E>,
}
@@ -260,17 +259,6 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
Self {
store,
next_block_root: start_block_root,
decode_any_variant: false,
_phantom: PhantomData,
}
}
/// Block iterator that is tolerant of blocks that have the wrong fork for their slot.
pub fn fork_tolerant(store: &'a HotColdDB<E, Hot, Cold>, start_block_root: Hash256) -> Self {
Self {
store,
next_block_root: start_block_root,
decode_any_variant: true,
_phantom: PhantomData,
}
}
@@ -285,12 +273,10 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
Ok(None)
} else {
let block_root = self.next_block_root;
let block = if self.decode_any_variant {
self.store.get_block_any_variant(&block_root)
} else {
self.store.get_blinded_block(&block_root)
}?
.ok_or(Error::BlockNotFound(block_root))?;
let block = self
.store
.get_blinded_block(&block_root)?
.ok_or(Error::BlockNotFound(block_root))?;
self.next_block_root = block.message().parent_root();
Ok(Some((block_root, block)))
}

View File

@@ -309,6 +309,26 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload<E>> BeaconBlockRef<'a, E, Payl
pub fn execution_payload(&self) -> Result<Payload::Ref<'a>, BeaconStateError> {
self.body().execution_payload()
}
pub fn blob_kzg_commitments_len(&self) -> Option<usize> {
match self {
BeaconBlockRef::Base(_) => None,
BeaconBlockRef::Altair(_) => None,
BeaconBlockRef::Bellatrix(_) => None,
BeaconBlockRef::Capella(_) => None,
BeaconBlockRef::Deneb(block) => Some(block.body.blob_kzg_commitments.len()),
BeaconBlockRef::Electra(block) => Some(block.body.blob_kzg_commitments.len()),
BeaconBlockRef::Fulu(block) => Some(block.body.blob_kzg_commitments.len()),
BeaconBlockRef::Gloas(block) => Some(
block
.body
.signed_execution_payload_bid
.message
.blob_kzg_commitments
.len(),
),
}
}
}
impl<'a, E: EthSpec, Payload: AbstractExecPayload<E>> BeaconBlockRefMut<'a, E, Payload> {

View File

@@ -56,9 +56,10 @@ use crate::{
pub const CACHED_EPOCHS: usize = 3;
// Pre-electra WS calculations are not supported. On mainnet, pre-electra epochs are outside the weak subjectivity
// period. The default pre-electra WS value is set to 256 to allow for `basic-sim``, `fallback-sim`` test case `revert_minority_fork_on_resume`
// to pass. 256 is a small enough number to trigger the WS safety check pre-electra on mainnet.
// Pre-electra WS calculations are not supported. On mainnet, pre-electra epochs are outside the
// weak subjectivity period. The default pre-electra WS value is set to 256 to allow for `basic-sim`
// and `fallback-sim` tests to pass. 256 is a small enough number to trigger the WS safety check
// pre-electra on mainnet.
pub const DEFAULT_PRE_ELECTRA_WS_PERIOD: u64 = 256;
const MAX_RANDOM_BYTE: u64 = (1 << 8) - 1;