mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-17 10:48:28 +00:00
Implement gloas checkpoint sync
This commit is contained in:
@@ -64,6 +64,7 @@ use crate::payload_attestation_verification::VerifiedPayloadAttestationMessage;
|
||||
use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache;
|
||||
#[cfg(not(test))]
|
||||
use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream};
|
||||
use crate::payload_envelope_verification::AvailableEnvelope;
|
||||
use crate::pending_payload_cache::PendingPayloadCache;
|
||||
use crate::pending_payload_cache::{
|
||||
Availability as PayloadAvailability,
|
||||
@@ -155,6 +156,23 @@ pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
|
||||
/// Alias to appease clippy.
|
||||
type HashBlockTuple<E> = (Hash256, RangeSyncBlock<E>);
|
||||
|
||||
/// Carries everything `process_range_sync_envelope` needs to import the anchor slots envelope.
|
||||
type AnchorEnvelopeImport<E> = (
|
||||
Hash256,
|
||||
Arc<SignedBeaconBlock<E>>,
|
||||
Box<AvailableEnvelope<E>>,
|
||||
);
|
||||
|
||||
/// The output of [`BeaconChain::filter_chain_segment`], the blocks that should be imported, plus
|
||||
/// the anchor slots envelope if it appeared in this segment and isn't yet imported.
|
||||
///
|
||||
/// The checkpoint server serves only the anchor block and state, never its envelope.
|
||||
/// See [`BeaconChain::queue_anchor_envelope_import`] and its usage.
|
||||
pub struct FilteredChainSegment<E: EthSpec> {
|
||||
pub blocks: Vec<HashBlockTuple<E>>,
|
||||
pub anchor_envelope: Option<AnchorEnvelopeImport<E>>,
|
||||
}
|
||||
|
||||
// These keys are all zero because they get stored in different columns, see `DBColumn` type.
|
||||
pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::ZERO;
|
||||
pub const OP_POOL_DB_KEY: Hash256 = Hash256::ZERO;
|
||||
@@ -2953,10 +2971,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
pub fn filter_chain_segment(
|
||||
self: &Arc<Self>,
|
||||
chain_segment: Vec<RangeSyncBlock<T::EthSpec>>,
|
||||
) -> Result<Vec<HashBlockTuple<T::EthSpec>>, Box<ChainSegmentResult>> {
|
||||
) -> Result<FilteredChainSegment<T::EthSpec>, Box<ChainSegmentResult>> {
|
||||
// This function will never import any blocks.
|
||||
let imported_blocks = vec![];
|
||||
let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len());
|
||||
let mut anchor_envelope = None;
|
||||
|
||||
// Produce a list of the parent root and slot of the child of each block.
|
||||
//
|
||||
@@ -3007,7 +3026,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
//
|
||||
// Note that `check_block_relevancy` is incapable of returning
|
||||
// `DuplicateImportStatusUnknown` so we don't need to handle that case here.
|
||||
Err(BlockError::DuplicateFullyImported(_)) => continue,
|
||||
Err(BlockError::DuplicateFullyImported(_)) => {
|
||||
// The block is already imported. If it's the anchor block, its envelope
|
||||
// may still be missing from the store, queue it for import rather
|
||||
// than dropping it.
|
||||
self.queue_anchor_envelope_import(block_root, block, &mut anchor_envelope)?;
|
||||
continue;
|
||||
}
|
||||
// If the block is the genesis block, simply ignore this block.
|
||||
Err(BlockError::GenesisBlock) => continue,
|
||||
// If the block is is for a finalized slot, simply ignore this block.
|
||||
@@ -3018,12 +3043,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
// 2. In some non-canonical chain at a slot that has been finalized already.
|
||||
//
|
||||
// In the case of (1), there's no need to re-import and later blocks in this
|
||||
// segement might be useful.
|
||||
// segment might be useful.
|
||||
//
|
||||
// In the case of (2), skipping the block is valid since we should never import it.
|
||||
// However, we will potentially get a `ParentUnknown` on a later block. The sync
|
||||
// protocol will need to ensure this is handled gracefully.
|
||||
Err(BlockError::WouldRevertFinalizedSlot { .. }) => continue,
|
||||
Err(BlockError::WouldRevertFinalizedSlot { .. }) => {
|
||||
// The block is at/below the finalized slot and won't be imported. If it's the
|
||||
// anchor block, its envelope may still be missing from the store, queue it for
|
||||
// import rather than dropping its.
|
||||
self.queue_anchor_envelope_import(block_root, block, &mut anchor_envelope)?;
|
||||
continue;
|
||||
}
|
||||
// The block has a known parent that does not descend from the finalized block.
|
||||
// There is no need to process this block or any children.
|
||||
Err(BlockError::NotFinalizedDescendant { block_parent_root }) => {
|
||||
@@ -3046,7 +3077,62 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(filtered_chain_segment)
|
||||
Ok(FilteredChainSegment {
|
||||
blocks: filtered_chain_segment,
|
||||
anchor_envelope,
|
||||
})
|
||||
}
|
||||
|
||||
/// If `block` is the checkpoint sync anchor block and its execution payload envelope is not yet
|
||||
/// in the store, record it in `anchor_envelope` for a verified import via
|
||||
/// [`Self::process_range_sync_envelope`].
|
||||
///
|
||||
/// The checkpoint server serves only the anchor block and state. so after checkpoint sync the
|
||||
/// anchor block sits in the store without an envelope. Range sync re-fetches the anchor
|
||||
/// block (as `DuplicateFullyImported` / `WouldRevertFinalizedSlot`) carrying its envelope, and
|
||||
/// we pick it up here.
|
||||
fn queue_anchor_envelope_import(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
block: RangeSyncBlock<T::EthSpec>,
|
||||
anchor_envelope: &mut Option<AnchorEnvelopeImport<T::EthSpec>>,
|
||||
) -> Result<(), Box<ChainSegmentResult>> {
|
||||
let db_failure = |e: Error| {
|
||||
Box::new(ChainSegmentResult::Failed {
|
||||
imported_blocks: vec![],
|
||||
error: BlockError::BeaconChainError(Box::new(e)),
|
||||
})
|
||||
};
|
||||
|
||||
// If pre-gloas or no envelope there is no need to queue for import.
|
||||
let RangeSyncBlock::Gloas {
|
||||
block: signed_block,
|
||||
envelope: Some(available_envelope),
|
||||
} = block
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// If the envelope is already in the db there's nothing to do.
|
||||
if self
|
||||
.store
|
||||
.payload_envelope_exists(&block_root)
|
||||
.map_err(|e| db_failure(e.into()))?
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Ensure we only ever queue the anchor envelope for import.
|
||||
let anchor_slot = self.store.get_anchor_info().anchor_slot;
|
||||
let anchor_block_root = self
|
||||
.block_root_at_slot(anchor_slot, WhenSlotSkipped::Prev)
|
||||
.map_err(db_failure)?;
|
||||
|
||||
if anchor_block_root == Some(block_root) {
|
||||
*anchor_envelope = Some((block_root, signed_block, available_envelope));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempt to verify and import a chain of blocks to `self`.
|
||||
@@ -3081,8 +3167,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
move || chain.filter_chain_segment(chain_segment),
|
||||
"filter_chain_segment",
|
||||
);
|
||||
let mut filtered_chain_segment = match filtered_chain_segment_future.await {
|
||||
Ok(Ok(filtered_segment)) => filtered_segment,
|
||||
let (mut filtered_chain_segment, anchor_envelope) = match filtered_chain_segment_future
|
||||
.await
|
||||
{
|
||||
Ok(Ok(filtered_segment)) => (filtered_segment.blocks, filtered_segment.anchor_envelope),
|
||||
Ok(Err(segment_result)) => return *segment_result,
|
||||
Err(error) => {
|
||||
return ChainSegmentResult::Failed {
|
||||
@@ -3092,6 +3180,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
};
|
||||
|
||||
// Import the checkpoint-sync anchor block's envelope, if this segment carried it and it
|
||||
// wasn't already stored. This must run *before* importing the segment's blocks: a
|
||||
// post-checkpoint child's `load_parent` needs the anchor envelope already present. Using
|
||||
// the verified import path means a bad envelope fails the batch, so range sync will
|
||||
// downscore the peer and retry from another.
|
||||
if let Some((block_root, block, envelope)) = anchor_envelope
|
||||
&& let Err(e) = self
|
||||
.process_range_sync_envelope(envelope, block_root, block)
|
||||
.await
|
||||
{
|
||||
return ChainSegmentResult::Failed {
|
||||
imported_blocks,
|
||||
error: BlockError::EnvelopeError(Box::new(e)),
|
||||
};
|
||||
}
|
||||
|
||||
while let Some((_root, block)) = filtered_chain_segment.first() {
|
||||
// Determine the epoch of the first block in the remaining segment.
|
||||
let start_epoch = block.epoch();
|
||||
|
||||
Reference in New Issue
Block a user